2

I am reading the data[json as String] from kafka queue and tring to parse json as String into case class using liftweb json api.

here is the code Snippet

val sparkStreamingContext = new StreamingContext(sparkConf, Seconds(5))

    val kafkaParam: Map[String, String] = Map(
      "bootstrap.servers" -> kafkaServer,
      "key.deserializer" -> classOf[StringDeserializer].getCanonicalName,
      "value.deserializer" -> classOf[StringDeserializer].getCanonicalName,
      "zookeeper.connect" -> zookeeperUrl,
      "group.id" -> "demo-group")

    import org.apache.spark.streaming.kafka._
    import net.liftweb.json.{DefaultFormats, Formats}
    import net.liftweb.json._
    val topicSet = Map(kafkaTopic -> 1)
    val streaming = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](sparkStreamingContext, kafkaParam, topicSet, StorageLevel.MEMORY_AND_DISK)

    streaming.map { case (id, tweet) => implicit val formats: Formats = DefaultFormats
      (id, parse(tweet).extract[Tweet])
    }.print()

    sparkStreamingContext.start()
    sparkStreamingContext.awaitTermination()

and i am getting this exception

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 1.0 (TID 1) had a not serializable result: net.liftweb.json.DefaultFormats$
Serialization stack:
    - object not serializable (class: net.liftweb.json.DefaultFormats$, value: net.liftweb.json.DefaultFormats$@74a2fec)
    - field (class: Tweet, name: formats, type: interface net.liftweb.json.Formats)
    - object (class Tweet, Tweet(Akash24,Adele))
    - field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
    - object (class scala.Tuple2, (1,Tweet(Akash24,Adele)))
    - element of array (index: 0)
    - array (class [Lscala.Tuple2;, size 11)

Can anyone help me fix this problem Any Help will be appreciate Thanks

2
  • Could you include Tweet definition? Commented Apr 2, 2017 at 7:12
  • @zero323 case class Tweet(user: String, tweet: String) Commented Apr 2, 2017 at 9:08

1 Answer 1

2

From the logs it looks like a simple exception of Class not Serializable. to correct is use following code:

sparkConf.registerKryoClasses(Array(classOf[DefaultFormats]))

val sparkStreamingContext = new StreamingContext(sparkConf, Seconds(5))

val kafkaParam: Map[String, String] = Map(
  "bootstrap.servers" -> kafkaServer,
  "key.deserializer" -> classOf[StringDeserializer].getCanonicalName,
  "value.deserializer" -> classOf[StringDeserializer].getCanonicalName,
  "zookeeper.connect" -> zookeeperUrl,
  "group.id" -> "demo-group")

import org.apache.spark.streaming.kafka._
import net.liftweb.json.{DefaultFormats, Formats}
import net.liftweb.json._
val topicSet = Map(kafkaTopic -> 1)
val streaming = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](sparkStreamingContext, kafkaParam, topicSet, StorageLevel.MEMORY_AND_DISK)

streaming.map { case (id, tweet) => implicit val formats: Formats = DefaultFormats
  (id, parse(tweet).extract[Tweet])
}.print()

sparkStreamingContext.start()
sparkStreamingContext.awaitTermination()

It will make the DefaultFormats class serializable and Spark master will able to send implicit val formats to all worker nodes.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.