2

I have 27 GB gz csv file, that I am trying to read with Spark. Our biggest node has 30 GB of memory.

When I am trying to read the file only one executors is loading the data (I am monitoring the memory and the network), the other 4 are stale.

After a while it crashes due to memory.
Is there a way to read this file in parallel?

Dataset<Row> result = sparkSession.read()
                .option("header","true")
                .option("escape", "\"")
                .option("multiLine","true")
                .format("csv")
                .load("s3a://csv-bucket");

result.repartition(10)

spark_conf:
 spark.executor.memoryOverhead: "512"
 spark.executor.cores: "5"

driver:
  memory: 10G

executor:
  instances: 5
  memory: 30G
4
  • 2
    gz files are not splittable and hence the read occurs on one executor. Once you read the gz file into a dataframe, you can repartition it. Commented Apr 22, 2020 at 16:29
  • 1
    @VamsiPrabhala is right... you have to repartition it to ensure uniform distribution of the data other wise OOM may occur or processing speed is slow. Commented Apr 22, 2020 at 16:41
  • I tried to repartition the data it fails before it gets to the repartition. I think the uncompressed data can't fit single machine Commented Apr 22, 2020 at 17:19
  • You can try the solution given in below URL, stackoverflow.com/questions/46638901/… Commented Apr 22, 2020 at 18:41

1 Answer 1

4

You have to repartition the data when it comes to huge data

In spark unit of parallelism is partition

Dataset<Row> result = sparkSession.read()
                .option("header","true")
                .option("escape", "\"")
                .option("multiLine","true")
                .format("csv")
                .load("s3a://csv-bucket");



result.repartition(5 * 5 *3) ( number of executors i.e.5 * cores i.e. 5 * replicationfactor i.e. 2-3)  i.e. 25 might be working for you to ensure uniform disribution data.

Cross check how many number of records are there per partition import org.apache.spark.sql.functions.spark_partition_id yourcsvdataframe.groupBy(spark_partition_id).count.show()

Example :

  val mycsvdata =
    """
      |rank,freq,Infinitiv,Unreg,Trans,"Präsens_ich","Präsens_du","Präsens_er, sie, es","Präteritum_ich","Partizip II","Konjunktiv II_ich","Imperativ Singular","Imperativ Plural",Hilfsverb
      |3,3796784,sein,"","",bin,bist,ist,war,gewesen,"wäre",sei,seid,sein
      |8,1618550,haben,"","",habe,hast,hat,hatte,gehabt,"hätte",habe,habt,haben
      |10,1379496,einen,"","",eine,einst,eint,einte,geeint,einte,eine,eint,haben
      |12,948246,werden,"","",werde,wirst,wird,wurde,geworden,"würde",werde,werdet,sein
    """.stripMargin.lines.toList.toDS
  val csvdf: DataFrame = spark.read.option("header", true)
    .option("header", true)
    .csv(mycsvdata)

  csvdf.show(false)
  println("all the 4 records are in single partition 0 ")

  import org.apache.spark.sql.functions.spark_partition_id
  csvdf.groupBy(spark_partition_id).count.show()

  println( "now divide data... 4 records to 2 per partition")
  csvdf.repartition(2).groupBy(spark_partition_id).count.show()

Result :

 +----+-------+---------+-----+-----+-----------+----------+-------------------+--------------+-----------+-----------------+------------------+----------------+---------+
|rank|freq   |Infinitiv|Unreg|Trans|Präsens_ich|Präsens_du|Präsens_er, sie, es|Präteritum_ich|Partizip II|Konjunktiv II_ich|Imperativ Singular|Imperativ Plural|Hilfsverb|
+----+-------+---------+-----+-----+-----------+----------+-------------------+--------------+-----------+-----------------+------------------+----------------+---------+
|3   |3796784|sein     |null |null |bin        |bist      |ist                |war           |gewesen    |wäre             |sei               |seid            |sein     |
|8   |1618550|haben    |null |null |habe       |hast      |hat                |hatte         |gehabt     |hätte            |habe              |habt            |haben    |
|10  |1379496|einen    |null |null |eine       |einst     |eint               |einte         |geeint     |einte            |eine              |eint            |haben    |
|12  |948246 |werden   |null |null |werde      |wirst     |wird               |wurde         |geworden   |würde            |werde             |werdet          |sein     |
+----+-------+---------+-----+-----+-----------+----------+-------------------+--------------+-----------+-----------------+------------------+----------------+---------+

all the 4 records are in single partition 0 
+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
|                   0|    4|
+--------------------+-----+

now divide data... 4 records to 2 per partition
+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
|                   1|    2|
|                   0|    2|
+--------------------+-----+

Sign up to request clarification or add additional context in comments.

4 Comments

I tried to repartition the data it fails before it gets to the repartition. I think the uncompressed data can't fit single machine
since gz is not splittable .. csvdf.repartition(200).write.parquet("/path/to/repartitioned/part/file") then process parquet also you may need to increase executor memory also..
I am getting this error: ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 123950 ms it retries and then fails
the reason reason/cause in the complete stack trace this is just a symptom

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.