0

I am receiving Json strings from Kafka in Spark Streaming (Scala). The processing of each string takes some time, so I want to distribute the processing over X clusters.

Currently I am just making tests on my laptop. So, for simplicity, let's assume that the processing that I should apply to each Json string is just some normalization of fields:

  def normalize(json: String): String = {
    val parsedJson = Json.parse(json)
    val parsedRecord = (parsedJson \ "records")(0)
    val idField = parsedRecord \ "identifier"
    val titleField = parsedRecord \ "title"

    val output = Json.obj(
      "id" -> Json.parse(idField.get.toString().replace('/', '-')),
      "publicationTitle" -> titleField.get
    )
    output.toString()
  }

This is my attempt to distribute the operation normalize over "clusters" (each Json string should be processed entirely; Json strings cannot be splitted). How to deal with the the issue Task not serializable at the line val callRDD = JSONstrings.map(normalize(_))?

val conf = new SparkConf().setAppName("My Spark Job").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))

val topicMap = topic.split(",").map((_, numThreads)).toMap

val JSONstrings = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

val callRDD = JSONstrings.map(normalize(_))

ssc.start()
ssc.awaitTermination()

UPDATE

This is the complete code:

package org.consumer.kafka

import java.util.Properties
import java.util.concurrent._
import com.typesafe.config.ConfigFactory
import kafka.consumer.{Consumer, ConsumerConfig}
import kafka.utils.Logging
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import play.api.libs.json.{JsObject, JsString, JsValue, Json}
import scalaj.http.{Http, HttpResponse}

class KafkaJsonConsumer(val datasource: String,
                        val apiURL: String,
                        val zkQuorum: String,
                        val group: String,
                        val topic: String) extends Logging
{
  val delay = 1000
  val config = createConsumerConfig(zkQuorum, group)
  val consumer = Consumer.create(config)
  var executor: ExecutorService = null

  def shutdown() = {
    if (consumer != null)
      consumer.shutdown();
    if (executor != null)
      executor.shutdown();
  }

  def createConsumerConfig(zkQuorum: String, group: String): ConsumerConfig = {
    val props = new Properties()
    props.put("zookeeper.connect", zkQuorum);
    props.put("group.id", group);
    props.put("auto.offset.reset", "largest");
    props.put("zookeeper.session.timeout.ms", "2000");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
    val config = new ConsumerConfig(props)
    config
  }

  def run(numThreads: Int) = {
    val conf = new SparkConf()
                              .setAppName("TEST")
                              .setMaster("local[*]")
                              //.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.checkpoint("checkpoint")

    val topicMap = topic.split(",").map((_, numThreads)).toMap

    val rawdata = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

    val parsed = rawdata.map(Json.parse(_))

    val result = parsed.map(record => {
      val parsedRecord = (record \ "records")(0)
      val idField = parsedRecord \ "identifier"
      val titleField = parsedRecord \ "title"
      val journalTitleField = parsedRecord \ "publicationName"
      Json.obj(
        "id" -> Json.parse(idField.get.toString().replace('/', '-')),
        "publicationTitle" -> titleField.get,
        "journalTitle" -> journalTitleField.get)
    })

    result.print

    val callRDD = result.map(JsonUtils.normalize(_))

    callRDD.print()

    ssc.start()
    ssc.awaitTermination()
  }

  object JsonUtils {
    def normalize(json: JsValue): String = {
      (json \ "id").as[JsString].value
    }
  }

}

I launch the execution of this calss KafkaJsonConsumer as follows:

package org.consumer

import org.consumer.kafka.KafkaJsonConsumer

object TestConsumer {

  def main(args: Array[String]) {

    if (args.length < 6) {
      System.exit(1)
    }

    val Array(datasource, apiURL, zkQuorum, group, topic, numThreads) = args

    val processor = new KafkaJsonConsumer(datasource, apiURL, zkQuorum, group, topic)
    processor.run(numThreads.toInt)

    //processor.shutdown()

  }

}

1 Answer 1

1

It looks like normalize method is a part of some class. In the line where you're using it inside the map operation, Spark needs to serialize not only the method itself, but the whole instance it is a part of. The easiest solution would be to move normalize to some singleton object:

object JsonUtils {
  def normalize(json: String): String = ???
}

and invoke like that:

val callRDD = JSONstrings.map(JsonUtils.normalize(_))
Sign up to request clarification or add additional context in comments.

11 Comments

It still says that the task is not serializable. Dos it make sense to try Kryo?
No, this problem is not related to using or not using Kryo. Are you running your code in the spark-shell or in an application?
I'm running my application from Intellij, not spark-shell. I use local[*] just for testing. Once it works locally, my idea is to use yarn and distribute calculations over different machines. I've already spent quite a lot of time on this issue, and finally I have no idea how to deal with it.
Don't you think that my issue relates to this post?: http://stackoverflow.com/questions/28554141/how-to-let-spark-serialize-an-object-using-kryo?rq=1. If it may help to detect the issue, I now get this error: Caused by: java.io.NotSerializableException: org.consumer.kafka.KafkaJsonConsumer, where KafkaJsonConsumer is the class with all the mentioned code
You should provide complete code, with classes as well.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.