0

I am reading a text file and it is fixed width file which I need to convert to csv. My program works fine in local machine but when I run it on cluster, it throws "Task not serializable" exception.

I tried to solve same problem with map and mapPartition.

It works fine by using toLocalIterator on RDD. But it doesm't work with large file(I have files of 8GB)

Below is code by using mapPartition which I recently tried

//reading source file and creating RDD

def main(){
    var inpData = sc.textFile(s3File)
    LOG.info(s"\n inpData >>>>>>>>>>>>>>> [${inpData.count()}]")

    val rowRDD = inpData.mapPartitions(iter=>{
    var listOfRow = new ListBuffer[Row]
    while(iter.hasNext){
       var line = iter.next()
       if(line.length() >= maxIndex){
          listOfRow += getRow(line,indexList)
        }else{
          counter+=1
        }
     }
    listOfRow.toIterator
    })

    rowRDD .foreach(println)
}

case class StartEnd(startingPosition: Int, endingPosition: Int) extends Serializable

def getRow(x: String, inst: List[StartEnd]): Row = {
    val columnArray = new Array[String](inst.size)
    for (f <- 0 to inst.size - 1) {
      columnArray(f) = x.substring(inst(f).startingPosition, inst(f).endingPosition)
    }
    Row.fromSeq(columnArray)
}

//Note : for your refernce, indexList I have created using StartEnd case class, which looks like below after creation

[List(StartEnd(0,4), StartEnd(4,10), StartEnd(7,12), StartEnd(10,14))]

This program works fine in my local machine. But when I put on cluster(AWS) it throws exception as shown below.

>>>>>>>>Map(ResultantDF -> [], ExceptionString -> 
Exception occurred while applying the FileConversion transformation and the exception Message is :Task not serializable
Exception occurred while applying the FileConversion transformation and the exception Message is :Task not serializable)
[Driver] TRACE reflection.ReflectionInvoker$.invokeDTMethod - Exit

I am not able to understand what's wrong here and what is not serializable, why it is throwing exception.

Any help is appreciated. Thanks in advance!

1
  • Basically an instance that contains the logger (probably LOG) gets serialized causing the issue. Maybe StartEnd is an inner class or something, the exact reason can't be determined using the provided code Commented May 19, 2019 at 18:34

1 Answer 1

3

You call getRow method inside a Spark mapPartition transformation. It forces spark to pass an instance of you main class to workers. The main class contains LOG as a field. Seems that this log is not serialization-friendly. You can

a) move getRow of LOG to a different object (general way to solve such issues)

b) make LOG a lazy val

c) use another logging library

Sign up to request clarification or add additional context in comments.

1 Comment

I put getRow function in different object

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.