2

updated:

I`m using spark sql 1.5.2. Trying to read many parquet files and filter and aggregate rows - there are ~35M of rows stored in ~30 files in my hdfs and it takes more than 10 minutes to process

val logins_12 = sqlContext.read.parquet("events/2015/12/*/login")
val l_12 = logins_12.where("event_data.level >= 90").select(
    "pid", 
    "timestamp", 
    "event_data.level" 
    ).withColumn("event_date", to_date(logins_12("timestamp"))).drop("timestamp").toDF("pid",  "level", "event_date").groupBy("pid", "event_date").agg(Map("level"->"max")).toDF("pid", "event_date", "level")
l_12.first()   

my spark is running in two node cluster with 8cores and 16Gb ram each, scala output makes me thing the computation runs in just one thread:

scala> x.first()
[Stage 1:=======>                                               (50 + 1) / 368]

when I try count() instead of first() it looks like two threads are doing computations. which is still less than I was expecting as there are ~30 files that can be processed in parallel

scala> l_12.count()   
[Stage 4:=====>                                                  (34 + 2) / 368]

I'm starting spark console with 14g for executor and 4g for driver in yarn-client mode

./bin/spark-shell -Dspark.executor.memory=14g -Dspark.driver.memory=4g --master yarn-client

my default config for spark:

spark.executor.memory              2g
spark.logConf                      true
spark.eventLog.dir                 maprfs:///apps/spark
spark.eventLog.enabled             true
spark.sql.hive.metastore.sharedPrefixes  com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc,com.mapr.fs.shim.LibraryLoader,com.mapr.security.JNISecurity,com.mapr.fs.jni
spark.executor.extraClassPath
spark.yarn.historyServer.address  http://test-01:18080

there are 200 partitions of the rdd

scala> logins_12.rdd.partitions.size
res2: Int = 368
scala> l_12.rdd.partitions.size
res0: Int = 200

is there a way to optimize this code? thanks

4
  • Since you use first seeing only one active thread at some point it is not particularly strange. Does the problem persist when call count? Commented Feb 3, 2016 at 13:33
  • you are right, when I call count it looks it run in two threads, which is still little less than I expected as there are 30 parquet files that can be mapped in parallel and there can be as many reducers as unique values of pid and date. I will also update my question Commented Feb 3, 2016 at 14:29
  • You should also provide configuration details (cluster manager, spark.executor.cores, default parallelism). Also please check number of partitions (logins_12.rdd.partitions.size). Commented Feb 3, 2016 at 14:32
  • right again, sorry for that. updated question Commented Feb 3, 2016 at 14:50

1 Answer 1

2

Both behaviors are more or less expected. Spark is rather lazy and it not only doesn't execute transformations unless you trigger an action but can also skip tasks if there are not required for the output. Since first requires only a single element it can compute only one partition. It is most likely the reason why you see only one running thread at some point.

Regarding the second issue it is most likely a matter of configuration. Assuming there is nothing wrong with YARN configuration (I don't use YARN but yarn.nodemanager.resource.cpu-vcores looks like a possible source of the problem) it is most likely a matter of Spark defaults. As you can read in the Configuration guide spark.executor.cores on Yarn is by default set to 1. Two workers gives two running threads.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.