4

When i executor spark streaming application on yarn, i continued to receive the following error

Why the error happened and how to solve it ? Any suggestion will help, thank you~

15/05/07 11:11:50 INFO dstream.StateDStream: Marking RDD 2364 for time 1430968310000 ms for checkpointing
    15/05/07 11:11:50 INFO scheduler.JobScheduler: Added jobs for time 1430968310000 ms
    15/05/07 11:11:50 INFO scheduler.JobGenerator: Checkpointing graph for time 1430968310000 ms
    15/05/07 11:11:50 INFO streaming.DStreamGraph: Updating checkpoint data for time 1430968310000 ms
    15/05/07 11:11:50 INFO streaming.DStreamGraph: Updated checkpoint data for time 1430968310000 ms
    15/05/07 11:11:50 ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContext
    java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
            at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
            at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
            at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
            at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
            at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
            at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

The spark streaming application code as follow, i execute it in spark-shell

    import kafka.cluster.Cluster
import kafka.serializer.StringDecoder
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext._

val updateFunc = (values: Seq[Int], state: Option[Int]) => {
  Some(0)
}

val ssc = new StreamingContext(sc,
  new Duration(5000))
ssc.checkpoint(".")

val lines = KafkaUtils.createStream(ssc, "10.1.10.21:2181", "kafka_spark_streaming", Map("hello_test" -> 3))

val uuidDstream = lines.transform(rdd => rdd.map(_._2)).map(x => (x, 1)).updateStateByKey[Int](updateFunc)
uuidDstream.count().print()

ssc.start()
ssc.awaitTermination()
1
  • 1
    We aren't mind readers. Could you post the code please? Commented May 7, 2015 at 3:54

1 Answer 1

7

The reference to val updateFunc used within the closure of updateStateByKey is pulling the rest of that instance into the closure and taking the StreamingContext with it.

Two options:

  • Quick fix: Declare streaming context transient => @transient val ssc= ... Also a good idea to annotate the dstream declarations as @transient as well.
  • A better fix: Put your functions in a separate object

Like this:

case object TransformFunctions {
    val updateFunc = ???
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.