1

I am trying to read a json string from kafka using Spark stream library. The code is able to connect to kafka broker but fails while decoding the message. The code is inspired from

https://github.com/killrweather/killrweather/blob/master/killrweather-examples/src/main/scala/com/datastax/killrweather/KafkaStreamingJson.scala

val kStream = KafkaUtils.createDirectStream[String, String, StringDecoder, 
         StringDecoder](ssc, kParams, kTopic).map(_._2)
  println("Starting to read from kafka topic:" + topicStr)
kStream.foreachRDD { rdd =>

   if (rdd.toLocalIterator.nonEmpty) {

          val sqlContext = new org.apache.spark.sql.SQLContext(sc)
            sqlContext.read.json(rdd).registerTempTable("mytable")
            if (firstTime) {
                sqlContext.sql("SELECT * FROM mytable").printSchema()
            }
            val df = sqlContext.sql(selectStr)
            df.collect.foreach(println)
            df.rdd.saveAsTextFile(fileName)
            mergeFiles(fileName, firstTime)
            firstTime = false
           println(rdd.name)
        }

java.lang.NoSuchMethodError: kafka.message.MessageAndMetadata.(Ljava/lang/String;ILkafka/message/Message;JLkafka/serializer/Decoder;Lkafka/serializer/Decoder;)V at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:222) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)

2
  • How did you run the job? Seems like kafka isn't available at runtime Commented May 30, 2016 at 6:07
  • Kafka is available and it makes the connection. I tested this negatively by changing to random kafka broker. The exception is coming from line if (rdd.toLocalIterator.nonEmpty) { Commented May 30, 2016 at 20:15

1 Answer 1

0

The problem was with the version of Kafka jars used, using 0.9.0.0 fixed the issues. The class, kafka.message.MessageAndMetadata was introduced in 0.8.2.0.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.