I'm very new to scala and spark. Now I'm having an issue that makes me very confused. Please give me an advice.
I'm making RDD[myEntityClass] from RDD[Array[String]] using lambda. But I faced an error which says there is null value to parse String to Long. To investigate this I implemented a method which makes me able to use breakpoint.
However now I'm getting org.apache.spark.SparkException: Task not serializable and I can't find what's wrong. Below is my code snippet please help me if you can find anything.
def makingData() : RDD[MyEntityClass] = {
.
.
data.map(row => toMyEntityClass(row))
}
def toMyEntityClass(row : Array[String]) : MyEntityClass = {
var id = row(0).toLong
var name = row(1)
var code = row(2).toLong
var parentId = row(3).toLong
var status = row(4)
MyEntityClass(id, name, code, parentId, status)
}
===== updated question =====
I'm updating my question to respond your advices. I've already had MyEntityClass as case class like below.
case class MyEntityClass(id: Long, name: String, code: Long, parentId: Long, status: String)
===== appended stack trace =====
Task not serializable
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2030)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:314)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:313)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.map(RDD.scala:313)
at com.myproject.repository.MyRepositorySpec.getDummyData(MyRepositorySpec.scala:40)
at com.myproject.repository.MyRepositorySpec$$anonfun$3.apply(MyRepositorySpec.scala:66)
at com.myproject.repository.MyRepositorySpec$$anonfun$3.apply(MyRepositorySpec.scala:65)
at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1681)
at org.scalatest.Suite$class.withFixture(Suite.scala:1031)
at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1691)
at org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1678)
at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1690)
at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1690)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:287)
at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1690)
at org.scalatest.FlatSpec.runTest(FlatSpec.scala:1691)
at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1748)
at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1748)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:394)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:382)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:382)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:371)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:408)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:382)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:382)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:377)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:459)
at org.scalatest.FlatSpecLike$class.runTests(FlatSpecLike.scala:1748)
at org.scalatest.FlatSpec.runTests(FlatSpec.scala:1691)
at org.scalatest.Suite$class.run(Suite.scala:1320)
at org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$run(FlatSpec.scala:1691)
at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1794)
at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1794)
at org.scalatest.SuperEngine.runImpl(Engine.scala:519)
at org.scalatest.FlatSpecLike$class.run(FlatSpecLike.scala:1794)
at org.scalatest.FlatSpec.run(FlatSpec.scala:1691)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:46)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1334)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
at org.scalatest.tools.Runner$.run(Runner.scala:850)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
Caused by: java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper
Serialization stack:
- object not serializable (class: org.scalatest.Assertions$AssertionsHelper, value: org.scalatest.Assertions$AssertionsHelper@45e639ee)
- field (class: org.scalatest.FlatSpec, name: assertionsHelper, type: class org.scalatest.Assertions$AssertionsHelper)
- object (class com.myproject.repository.MyRepositorySpec, MyRepositorySpec)
- field (class: com.myproject.repository.MyRepositorySpec$$anonfun$getDummyData$1, name: $outer, type: class com.myproject.repository.MyRepositorySpec)
- object (class com.myproject.repository.MyRepositorySpec$$anonfun$getDummyData$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 61 more
with Serializableto MyEntityClass