1

I'm very new to scala and spark. Now I'm having an issue that makes me very confused. Please give me an advice.

I'm making RDD[myEntityClass] from RDD[Array[String]] using lambda. But I faced an error which says there is null value to parse String to Long. To investigate this I implemented a method which makes me able to use breakpoint.

However now I'm getting org.apache.spark.SparkException: Task not serializable and I can't find what's wrong. Below is my code snippet please help me if you can find anything.

def makingData() : RDD[MyEntityClass] = {
  .
  .
  data.map(row => toMyEntityClass(row))
}

def toMyEntityClass(row : Array[String]) : MyEntityClass = {
  var id = row(0).toLong
  var name = row(1)
  var code = row(2).toLong
  var parentId = row(3).toLong
  var status = row(4)

  MyEntityClass(id, name, code, parentId, status)
}

===== updated question =====

I'm updating my question to respond your advices. I've already had MyEntityClass as case class like below.

case class MyEntityClass(id: Long, name: String, code: Long, parentId: Long, status: String)

===== appended stack trace =====

Task not serializable
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2030)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:314)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:313)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.map(RDD.scala:313)
at com.myproject.repository.MyRepositorySpec.getDummyData(MyRepositorySpec.scala:40)
at com.myproject.repository.MyRepositorySpec$$anonfun$3.apply(MyRepositorySpec.scala:66)
at com.myproject.repository.MyRepositorySpec$$anonfun$3.apply(MyRepositorySpec.scala:65)
at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1681)
at org.scalatest.Suite$class.withFixture(Suite.scala:1031)
at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1691)
at org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1678)
at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1690)
at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1690)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:287)
at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1690)
at org.scalatest.FlatSpec.runTest(FlatSpec.scala:1691)
at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1748)
at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1748)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:394)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:382)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:382)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:371)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:408)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:382)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:382)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:377)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:459)
at org.scalatest.FlatSpecLike$class.runTests(FlatSpecLike.scala:1748)
at org.scalatest.FlatSpec.runTests(FlatSpec.scala:1691)
at org.scalatest.Suite$class.run(Suite.scala:1320)
at org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$run(FlatSpec.scala:1691)
at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1794)
at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1794)
at org.scalatest.SuperEngine.runImpl(Engine.scala:519)
at org.scalatest.FlatSpecLike$class.run(FlatSpecLike.scala:1794)
at org.scalatest.FlatSpec.run(FlatSpec.scala:1691)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:46)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1334)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
at org.scalatest.tools.Runner$.run(Runner.scala:850)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
Caused by: java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper
Serialization stack:
- object not serializable (class: org.scalatest.Assertions$AssertionsHelper, value: org.scalatest.Assertions$AssertionsHelper@45e639ee)
- field (class: org.scalatest.FlatSpec, name: assertionsHelper, type: class org.scalatest.Assertions$AssertionsHelper)
- object (class com.myproject.repository.MyRepositorySpec, MyRepositorySpec)
- field (class: com.myproject.repository.MyRepositorySpec$$anonfun$getDummyData$1, name: $outer, type: class com.myproject.repository.MyRepositorySpec)
- object (class com.myproject.repository.MyRepositorySpec$$anonfun$getDummyData$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 61 more
2
  • add with Serializable to MyEntityClass Commented Mar 3, 2017 at 8:34
  • Although your code is quite incomplete, it's possible that Spark tries to serialize your entire Class (i.,e. the class were you defined your main method)? This could happen if you make a make a closure on instance methods or variables etc... Commented Mar 3, 2017 at 14:54

1 Answer 1

1

From the code given above, I understand that you want to convert RDD[Array[String]] to RDD[MyEntityClass]

We've 2 options here..

  • Make a case class MyEntityClass which is by default Serializable. for example

    case MyEntityClass(id : Long, name : String, code : String, parentId : Long, status : String)

  • Make a normal class MyEntityClass with Serializable then its eligible for serialization... Note : In general this approach is used when case class has more than 22 fields(productarity issue) and if you are using < scala 2.10

EDIT : After you confirmed that MyEntityClass is a case class, and pasted Serialization Debugger stack trace, which reveals MyRepositorySpec is just a test class which extends FlatSpec and has makingData() and toMyEntityClass().You are using your test class inside the closure which is the cause of this exception

With below error it is clearly evident

caused by: java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper Serialization stack: - object not serializable (class: org.scalatest.Assertions$AssertionsHelper, value: org.scalatest.Assertions$AssertionsHelper@45e639ee) - field (class: org.scalatest.FlatSpec, name: assertionsHelper, type: class org.scalatest.Assertions$AssertionsHelper) - object (class com.myproject.repository.MyRepositorySpec, MyRepositorySpec) - field (class: com.myproject.repository.MyRepositorySpec$$anonfun$getDummyData$1, name:

Solution : Make MyRepositorySpec as Serializable

Sign up to request clarification or add additional context in comments.

6 Comments

do you mean still you are getting task not serializable ? if so I need complete stack trace and there is something called serialization debugger which will give exactly where this exception is coming. pls paste that in your question and ping me
Yes, it has been case class from the beginning. So I added full stack trace like you said.
what about this class MyRepositorySpec are you calling some where out side the closure? was it object or class ? if so where is the code snippet. you have to describe more. as the info is not sufficient. if not MyRepositorySpec is object make it as object and try. still I feel you have to give more details in the first place...
MyRepositorySpec is just a test class which extends FlatSpec and has makingData() and toMyEntityClass().
I think that's the cause which is not serialized. its clearly evident from stack trace you gave. data provider class i.e MyRepositorySpec make it as an object. it should work. if its not directly possible, move it to another object and then call that
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.