2

I am trying to run a fairly simple example which involves connecting Spark to Cassandra and aggregating the data. The implementation is using the spring-cassandra connector, java, spring and really not much else.

Here is the Spark Config file I am wiring up via Spring

@Configuration
@ComponentScan("test.spark.service")
@Import({CassandraConfig.class})
public class SparkConfig {

    @Autowired
    private String cassandraUrl;

    @Bean
    public SparkConf sparkConf() {
        SparkConf sparkConf = new SparkConf();

        // configure all the bells and whistles
        sparkConf
                .setMaster("spark://localhost:7077")
                .setAppName("DataAggregator")
                .set("spark.cassandra.connection.host", cassandraUrl);

        return sparkConf;
    }

    @Bean
    public JavaStreamingContext javaStreamingContext() {
        return new JavaStreamingContext(sparkConf(), new Duration(1000));
    }
}

Here is the service class that doesn't pitch the exception

@Service
public class SparkServiceImpl implements SparkService, Serializable {

    private static final Logger LOGGER = LoggerFactory.getLogger(SparkServiceImpl.class);

    @Autowired
    JavaStreamingContext javaStreamingContext;

    @Override
    public void process() {
        CassandraJavaRDD<CassandraRow> rdd = CassandraStreamingJavaUtil.javaFunctions(javaStreamingContext).cassandraTable("keyspace", "table");

    }
}

This seems to work and returns a CassandraJavaRDD

As soon as I change the implementation to use a groupBy/function it freaks out with the serializable exception

@Service
public class SparkServiceImpl implements SparkService, Serializable {

    private static final Logger LOGGER = LoggerFactory.getLogger(SparkServiceImpl.class);

    @Autowired
    JavaStreamingContext javaStreamingContext;

    @Override
    public void process() {
        CassandraJavaRDD<CassandraRow> rdd = CassandraStreamingJavaUtil.javaFunctions(javaStreamingContext).cassandraTable("keyspace", "table");

        JavaPairRDD<Integer, Iterable<CassandraRow>> javaPairRDD = rdd.groupBy(new Function<CassandraRow, Integer>() {
            @Override
            public Integer call(CassandraRow row) throws Exception {
                return row.getInt("int_column");
            }
        });
    }
}

Here is the stack trace

org.apache.spark.SparkException: Task not serializable

    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$3.apply(RDD.scala:694)
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$3.apply(RDD.scala:693)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.groupBy(RDD.scala:693)
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$1.apply(RDD.scala:665)
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$1.apply(RDD.scala:665)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.groupBy(RDD.scala:664)
    at org.apache.spark.api.java.JavaRDDLike$class.groupBy(JavaRDDLike.scala:242)
    at org.apache.spark.api.java.AbstractJavaRDDLike.groupBy(JavaRDDLike.scala:45)
    at test.spark.service.SparkServiceImpl.process(SparkServiceServiceImpl.java:56)
    at test.spark.service.SparkServiceTest.testProcess(SparkServiceTest.java:27)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: org.apache.spark.streaming.api.java.JavaStreamingContext
Serialization stack:
    - object not serializable (class: org.apache.spark.streaming.api.java.JavaStreamingContext, value: org.apache.spark.streaming.api.java.JavaStreamingContext@4538856f)
    - field (class: test.spark.service.SparkServiceImpl, name: javaStreamingContext, type: class org.apache.spark.streaming.api.java.JavaStreamingContext)
    - object (class test.spark.service.SparkServiceImpl, test.spark.service.SparkServiceImpl@7e34b127)
    - field (class: test.spark.service.SparkServiceImpl$1, name: this$0, type: class test.spark.service.SparkServiceImpl)
    - object (class test.spark.service.SparkServiceImpl$1, test.spark.service.SparkServiceImpl$1@536b71b4)
    - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
    - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 52 more

In addition to this exception if my service is not serializable it also pitches an exception

Here is the service

@Service
public class SparkServiceImpl implements SparkService {

    private static final Logger LOGGER = LoggerFactory.getLogger(SparkServiceImpl.class);

    @Autowired
    JavaStreamingContext javaStreamingContext;

    @Override
    public void process() {
        CassandraJavaRDD<CassandraRow> rdd = CassandraStreamingJavaUtil.javaFunctions(javaStreamingContext).cassandraTable("keyspace", "table");

        JavaPairRDD<Integer, Iterable<CassandraRow>> javaPairRDD = rdd.groupBy(new Function<CassandraRow, Integer>() {
            @Override
            public Integer call(CassandraRow row) throws Exception {
                return row.getInt("int_column");
            }
        });
    }
}

Here is the exception

org.apache.spark.SparkException: Task not serializable

    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$3.apply(RDD.scala:694)
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$3.apply(RDD.scala:693)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.groupBy(RDD.scala:693)
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$1.apply(RDD.scala:665)
    at org.apache.spark.rdd.RDD$$anonfun$groupBy$1.apply(RDD.scala:665)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.groupBy(RDD.scala:664)
    at org.apache.spark.api.java.JavaRDDLike$class.groupBy(JavaRDDLike.scala:242)
    at org.apache.spark.api.java.AbstractJavaRDDLike.groupBy(JavaRDDLike.scala:45)
    at test.spark.service.SparkServiceImpl.process(SparkServiceImpl.java:32)
    at test.spark.service.SparkServiceTest.testProcess(SparkServiceTest.java:27)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: test.spark.service.SparkServiceImpl
Serialization stack:
    - object not serializable (class: test.spark.service.SparkServiceImpl, value: test.spark.service.SparkServiceImpl@47b269c4)
    - field (class: test.spark.service.SparkServiceImpl$1, name: this$0, type: class test.spark.service.SparkServiceImpl)
    - object (class test.spark.service.SparkServiceImpl$1, test.spark.service.SparkServiceImpl$1@23ad71bf)
    - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
    - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 52 more
5
  • How are you starting the Spark job? It seems like you're attempting to serialize your SparkServiceImpl class, which holds the JavaStreamingContext. Commented Sep 25, 2016 at 6:17
  • see this ` - object not serializable (class: test.spark.service.SparkServiceImpl, value: test.spark.service.SparkServiceImpl@47b269c4) - field (class: test.spark.service.SparkServiceImpl$1, name: this$0, type: class test.spark.service.SparkServiceImpl) - object (class test.spark.service.SparkServiceImpl$1, test.spark.service.SparkServiceImpl$1@23ad71bf) - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function) - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$to` Commented Sep 25, 2016 at 6:45
  • Serialization debugger printing these. so you have to look at these. Commented Sep 25, 2016 at 6:46
  • cause is Class is not serializable as @YuvalItzchakov already mentioned Commented Sep 25, 2016 at 6:50
  • I am not trying to serialize the service nor the streaming context, this is the part that doesn't make sense to me. According to their documentation this is how you do it via the CassandraJavaUtil or CassandraStreamingJavaUtil. github.com/datastax/spark-cassandra-connector/blob/master/doc/… Commented Sep 25, 2016 at 12:28

1 Answer 1

3

Quick fix:

Add transient keyword to JavaStreamingContext @Bean in your SparkServiceImpl

@Autowired
private transient JavaStreamingContext javaStreamingContext;

Quick explanation why:

It's because JavaStreamingContext is created on driver and JavaStreamingContext is necessary as the main entry point for Spark Streaming functionality.

In your SparkService implementation - SparkServiceImpl - you have some operation on RDDs and master creates tasks for declared transformation. After this phase, created tasks are sent to workers and basically this is the place where tasks are finally executed.

So workers dont need SparkContext as well as JavaStreamingContext - as you said it doesn't make sense to serialize JavaStreamingContext.

With transient keyword you just say that you dont want to serialize JavaStreamingContext and for execution of spark job it's alright.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.