0

I'm working with spark streaming and kafka, i got this error.

Exception in thread "streaming-start" java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object; at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream$$anonfun$start$1.apply(DirectKafkaInputDStream.scala:246) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream$$anonfun$start$1.apply(DirectKafkaInputDStream.scala:245) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:45) at scala.collection.SetLike$class.map(SetLike.scala:93) at scala.collection.mutable.AbstractSet.map(Set.scala:45) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:245) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:145) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:138) at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:975) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:972) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 17/08/02 16:24:58 INFO StreamingContext: StreamingContext started

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>com.googlecode.json-simple</groupId>
            <artifactId>json-simple</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongo-java-driver</artifactId>
            <version>3.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>2.1.1</version>
        </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.0.0</version>
    </dependency>

MY CODE :

     SparkConf conf = new SparkConf().setAppName("Streaming").setMaster("local");


       JavaStreamingContext streamingContext = new JavaStreamingContext(conf, new Duration(1000));
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "exastax");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);

        Collection<String> topics = Arrays.asList("loglar");
        JavaInputDStream<ConsumerRecord<String, String>> stream =
                KafkaUtils.createDirectStream(
                        streamingContext,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                );
stream.foreachRDD(rdd -> {
        OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
        rdd.foreachPartition(consumerRecords -> {
            OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
            System.out.println(
                    o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
        });
    });
        streamingContext.start();
        streamingContext.awaitTermination();
    }
}

im using Kafka_2.11-0.11.0.0 I tried searching for this issue, but i can't find relevent jar.Please help me to fix this.

1 Answer 1

3

You are mixing Scala 2.10 and Scala 2.11 code. Use either a Kafka-dependency with Scala 2.10, or Spark with Scala 2.11.

Sign up to request clarification or add additional context in comments.

3 Comments

I do not understand where I need to organize.. Is it code or maven?
@TalhaK. this maven dep: spark-streaming-kafka-0-10_2.11 has to be spark-streaming-kafka-0-10_2.10 like the other spark dependencies
Yes! it's run!! thnx a lot sir @maasg .But I can not write the values in Kafka. stream.foreachRDD how to print values ?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.