I am reading a text file and it is fixed width file which I need to convert to csv. My program works fine in local machine but when I run it on cluster, it throws "Task not serializable" exception.
I tried to solve same problem with map and mapPartition.
It works fine by using toLocalIterator on RDD. But it doesm't work with large file(I have files of 8GB)
Below is code by using mapPartition which I recently tried
//reading source file and creating RDD
def main(){
var inpData = sc.textFile(s3File)
LOG.info(s"\n inpData >>>>>>>>>>>>>>> [${inpData.count()}]")
val rowRDD = inpData.mapPartitions(iter=>{
var listOfRow = new ListBuffer[Row]
while(iter.hasNext){
var line = iter.next()
if(line.length() >= maxIndex){
listOfRow += getRow(line,indexList)
}else{
counter+=1
}
}
listOfRow.toIterator
})
rowRDD .foreach(println)
}
case class StartEnd(startingPosition: Int, endingPosition: Int) extends Serializable
def getRow(x: String, inst: List[StartEnd]): Row = {
val columnArray = new Array[String](inst.size)
for (f <- 0 to inst.size - 1) {
columnArray(f) = x.substring(inst(f).startingPosition, inst(f).endingPosition)
}
Row.fromSeq(columnArray)
}
//Note : for your refernce, indexList I have created using StartEnd case class, which looks like below after creation
[List(StartEnd(0,4), StartEnd(4,10), StartEnd(7,12), StartEnd(10,14))]
This program works fine in my local machine. But when I put on cluster(AWS) it throws exception as shown below.
>>>>>>>>Map(ResultantDF -> [], ExceptionString ->
Exception occurred while applying the FileConversion transformation and the exception Message is :Task not serializable
Exception occurred while applying the FileConversion transformation and the exception Message is :Task not serializable)
[Driver] TRACE reflection.ReflectionInvoker$.invokeDTMethod - Exit
I am not able to understand what's wrong here and what is not serializable, why it is throwing exception.
Any help is appreciated. Thanks in advance!
LOG) gets serialized causing the issue. MaybeStartEndis an inner class or something, the exact reason can't be determined using the provided code