1
 Caused by: java.io.NotSerializableException: org.apache.spark.streaming.api.java.JavaStreamingContext
Serialization stack:
        - object not serializable (class: org.apache.spark.streaming.api.java.JavaStreamingContext, value: org.apache.spark.streaming.api.java.JavaStreamingContext@694c3f17)
        - field (class: com.emc.network.sparkanalysis.logic.ProcessVideoStreamData$2, name: val$jssc, type: class org.apache.spark.streaming.api.java.JavaStreamingContext)
        - object (class com.emc.network.sparkanalysis.logic.ProcessVideoStreamData$2, com.emc.network.sparkanalysis.logic.ProcessVideoStreamData$2@45abf747)
        - field (class: com.emc.network.sparkanalysis.logic.ProcessVideoStreamData$2$1, name: this$1, type: class com.emc.network.sparkanalysis.logic.ProcessVideoStreamData$2)
        - object (class com.emc.network.sparkanalysis.logic.ProcessVideoStreamData$2$1, com.emc.network.sparkanalysis.logic.ProcessVideoStreamData$2$1@5b02d2b0)
        - field (class: org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1, name: f$3, type: interface org.apache.spark.api.java.function.FlatMapFunction)
        - object (class org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1, <function1>)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)

i am getting error at this line:

JavaRDD<String> words = javarddPerIp.flatMap(new FlatMapFunction<String, String>()

JavaPairDStream<String, String> ones = stream
                .mapToPair(new PairFunction<String, String, String>() {
                    @Override
                    public Tuple2<String, String> call(String s) {
                        String[] lineArr = SPACE.split(s);
                        return new Tuple2<String, String>(lineArr[0], s);
                    }
                });

        JavaPairDStream<String, Iterable<String>> ipmapping = ones.groupByKey();

        ipmapping
                .foreachRDD(new Function2<JavaPairRDD<String, Iterable<String>>, Time, Void>() {
                    JavaRDD<String> finalLogs = jssc.sparkContext().emptyRDD();

                    @Override
                    public Void call(JavaPairRDD<String, Iterable<String>> v1,Time v2) throws Exception {

                        JavaRDD<Iterable<String>> stringValues = v1.values();
                        List<Iterable<String>> stringList = stringValues.collect();

                        for (Iterable<String> it : stringList) {
                            List<String> rddlist = new ArrayList<String>();
                            Iterator<String> values = it.iterator();
                            while (values.hasNext()) {
                                rddlist.add(values.next());
                            }

                            JavaRDD<String> javarddPerIp = jssc.sparkContext().parallelize(rddlist);

                            final Long numberOfrows;
                            numberOfrows = javarddPerIp.count();

                            System.out.println("javarddPerIp.count()"+ javarddPerIp.count());

                            JavaRDD<String> words = javarddPerIp
                                    .flatMap(new FlatMapFunction<String, String>() {
                                        @Override
                                        public Iterable<String> call(String s) {
                                            String[] splitstring = SPACE
                                                    .split(s);
                                            Double emcbitrate = Double
                                                    .parseDouble(splitstring[20])
                                                    / Double.parseDouble(splitstring[30]);
                                            StringBuffer sf = new StringBuffer();
                                            sf.append(emcbitrate.toString())
                                                    .append(SPACE)
                                                    .append(splitstring[44])
                                                    .append(SPACE)
                                                    .append(splitstring[51]);
                                            return Arrays.asList(s + SPACE
                                                    + emcbitrate.toString());
                                        }
                                    });
7
  • Did you have a look at this post? Commented Jan 13, 2016 at 8:21
  • @Rami - i am using maven and i don't have any dependency jar separately i have created using mvn package command Commented Jan 13, 2016 at 9:15
  • Where is defined the SPACE variable? Commented Jan 13, 2016 at 9:42
  • @mark91: private static final Pattern SPACE = Pattern.compile(" "); Commented Jan 13, 2016 at 9:57
  • Have you tried to put its declaration in the call function? Commented Jan 13, 2016 at 10:26

1 Answer 1

1

Finally below post helped me solving this: remove inner function and create new class and provide implementation:

org.apache.spark.SparkException: Task not serializable

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.