1

I have a "Task not serializable" Exception when I run a Spark Scala program with

  • Spark RDDs is of not serializable type (java class)
  • called functions are from a not serializable class (java class, again)

my code is something like this

object Main{
    def main(args : Array(String){
        ...
        var rdd = sc.textFile(filename)
                  .map(line => new NotSerializableJClass(line)).cache() 
        //rdd is RDD[NotSerializableJClass]
        ...
        var test = new NotSerializableJPredicate()
        rdd = rdd.filter(elem => test.test(elem))
        //throws TaskNotSerializable on test Predicate class
    }
}

I notice that i can resolve the second part with

rdd = rdd.filter(elem => (new NotSerializableJPredicate()).test(elem))

but I still get that Exception for the class of the objects in RDDs. And I would in another way also the second part in another way just because I don't want to create a great number of PredicateClass's object.

Can you help me? How can I go forward with non-serializable class?

1
  • Is NotSerializableJClass a third party class or a class defined in your application ? Commented Apr 8, 2017 at 13:58

2 Answers 2

1

RDDs must be serializable so you cannot create an RDD of a non serializable class.

For your predicate, you could write it using mapPartitions.

rdd.mapPartitions{
  part => 
    val test = new NotSerializableJPredicate()
    part.filter{elem => test.test(elem)}
   }

mapPartitons will run once per partition, so it allows you to instantiate non-serializable classes on the executor, but it only needs to do it once per partition rather than for every record.

Sign up to request clarification or add additional context in comments.

Comments

1

Some of the general rules that has helped me avoid Task Serialization Issue:

If you are calling a method of any class from your code;Spark will need to serialize the entire class containing the method.Ways to go around can be any of the following : a> Declare the method as a function variable in NotSerializableClass; so instead of writing : def foo(x:Int)={blah blah} try using val foo = (x:Int)=>{blah blah } So; spark no longer needs to serialize the entire class now. b> Refactoring the code to extract the relevant portions in a separate class may be the way to go in certain cases. c>Mark objects in the class which are actually not necessary for the job as @transient and mark the class Serializable

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.