0
val data = spark.read
    .text(filePath)
    .toDF("val")
    .withColumn("id", monotonically_increasing_id())



    val count = data.count()



    val header = data.where("id==1").collect().map(s => s.getString(0)).apply(0)



    val columns = header
    .replace("H|*|", "")
    .replace("|##|", "")
    .split("\\|\\*\\|")


    val structSchema = StructType(columns.map(s=>StructField(s, StringType, true)))



    var correctData = data.where('id > 1 && 'id < count-1).select("val")
    var dataString = correctData.collect().map(s => s.getString(0)).mkString("").replace("\\\n","").replace("\\\r","")
    var dataArr = dataString.split("\\|\\#\\#\\|").map(s =>{ 
                                                          var arr = s.split("\\|\\*\\|")
                                                          while(arr.length < columns.length) arr = arr :+ ""
                                                          RowFactory.create(arr:_*)
                                                         })
    val finalDF = spark.createDataFrame(sc.parallelize(dataArr),structSchema)

    display(finalDF)

This portion of code giving error:

Exception in thread "dispatcher-event-loop-0" java.lang.OutOfMemoryError: Java heap space

After hours of debugging mainly the part:

var dataArr = dataString.split("\\|\\#\\#\\|").map(s =>{ 
                                                          var arr = s.split("\\|\\*\\|")
                                                          while(arr.length < columns.length) arr = arr :+ ""
                                                          RowFactory.create(arr:_*)
                                                         })
    val finalDF = spark.createDataFrame(sc.parallelize(dataArr),structSchema)

causing the error.

I changed the part as

var dataArr = dataString.split("\\|\\#\\#\\|").map(s =>{
                                                          var arr = s.split("\\|\\*\\|")
                                                          while(arr.length < columns.length) arr = arr :+ ""
                                                          RowFactory.create(arr:_*)
                                                         }).toList
  val finalDF = sqlContext.createDataFrame(sc.makeRDD(dataArr),structSchema)

But error remains same. What should I change to avoid this?

When I ran this code is databricks spark cluster, particular job gives this Spark driver error:

Job aborted due to stage failure: Serialized task 45:0 was 792585456 bytes, which exceeds max allowed: spark.rpc.message.maxSize (268435456 bytes).

I added this portion of code:

spark.conf.set("spark.rpc.message.maxSize",Int.MaxValue)

but of no use.

2

1 Answer 1

2

My guess is that

var dataString = correctData.collect().map(s => s.getString(0)).mkString("").replace("\\\n","").replace("\\\r","")

is the problem, because you collect (almost) all of the data to the driver, i.e. to 1 single JVM.

Maybe this line runs, but subsequent operations on dataString will exceed your memory limits. You should not collect your data! Instead, work with distributed "data structures" such as Dataframe or RDD.

I think you could just omit the collect in the above line

Sign up to request clarification or add additional context in comments.

1 Comment

Code is failing if I remove collect . As i am pretty new in this domain, can you help me on this.What should I do exactly here

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.