1

Trying to scale a pyspark app on AWS EMR. Was able to get it to work for one day of data (around 8TB), but keep running into (what I believe are) OOM errors when trying to test it on one week of data (around 50TB)

I set my spark configs based on this article. Originally, I got a java.lang.OutOfMemoryError: Java heap space from the Driver std out. From browsing online, it seemed like my spark.driver.memory was too low, so I boosted that up quite a bit. Now I am running into a different OOM error. In the driver std err, I see something like this:

    .saveAsTable('hive_tables.apf_intermediate_table')
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1622843305817_0001/container_1622843305817_0001_02_000001/pyspark.zip/pyspark/sql/readwriter.py", line 778, in saveAsTable
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1622843305817_0001/container_1622843305817_0001_02_000001/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1622843305817_0001/container_1622843305817_0001_02_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1622843305817_0001/container_1622843305817_0001_02_000001/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o12119.saveAsTable.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:174)
    at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:517)
    at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:217)
    at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:176)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:173)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:169)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:197)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:194)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:169)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:114)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:112)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
    at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
    at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696)
    at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:494)
    at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:473)
    at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:429)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Task not serializable

and in the driver std out, I see something like this:

21/06/04 21:57:58 INFO CodeGenerator: Code generated in 52.445036 ms
21/06/04 21:57:58 INFO CodeGenerator: Code generated in 10.271344 ms
21/06/04 21:57:59 INFO CodeGenerator: Code generated in 7.076469 ms
21/06/04 21:57:59 INFO CodeGenerator: Code generated in 6.111364 ms
21/06/04 21:57:59 INFO CodeGenerator: Code generated in 9.17915 ms
21/06/04 21:57:59 INFO CodeGenerator: Code generated in 50.537577 ms
21/06/04 21:57:59 INFO CodeGenerator: Code generated in 9.398937 ms
21/06/04 21:58:00 INFO CodeGenerator: Code generated in 44.204289 ms
21/06/04 21:58:00 INFO CodeGenerator: Code generated in 6.294884 ms
21/06/04 21:58:00 INFO CodeGenerator: Code generated in 8.570691 ms
21/06/04 21:58:02 INFO CodeGenerator: Code generated in 55.276023 ms
21/06/04 21:58:02 INFO CodeGenerator: Code generated in 10.988539 ms
21/06/04 21:58:07 INFO CodeGenerator: Code generated in 284.051432 ms
21/06/04 21:58:07 ERROR Utils: uncaught error in thread spark-listener-group-eventLog, stopping SparkContext
java.lang.OutOfMemoryError
    at java.lang.AbstractStringBuilder.hugeCapacity(AbstractStringBuilder.java:161)
    at java.lang.AbstractStringBuilder.newCapacity(AbstractStringBuilder.java:155)
    at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:125)
    at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:596)
    at java.lang.StringBuilder.append(StringBuilder.java:190)
    at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:351)
    at com.fasterxml.jackson.core.io.SegmentedStringWriter.getAndClear(SegmentedStringWriter.java:83)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2933)
    at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:107)
    at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:138)
    at org.apache.spark.scheduler.EventLoggingListener.onOtherEvent(EventLoggingListener.scala:236)
    at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:80)
    at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
    at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91)
    at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92)
    at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92)
    at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
    at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
    at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1347)
    at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
21/06/04 21:58:07 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-eventLog
java.lang.OutOfMemoryError

with the physical plan being spit out repeatedly right after in both logs.

I assume that since the code has run successfully with less input data, that this is still an OOM error and that I can safely ignore the Task not serializable error. However, I have tried many different EMR / Spark configs and keep getting some variation of this error.

An example cluster set-up would be something like this:

Driver node: 1 M5.8Xlarge instance

Core nodes: 9 M5.8Xlarge instances (each with 32 GiB RAM, 128 vCores)

  • spark.dynamicAllocation.enabled=False
  • spark.executors.cores=5
  • spark.executor.memory=18g
  • spark.executor.memoryOverhead=2g
  • spark.driver.memory=44g
  • spark.driver.cores=5
  • spark.executor.instances=53
  • spark.default.parallelism=530
  • spark.sql.shuffle.partitions=200
  • spark.serializer=org.apache.spark.serializer.KryoSerializer
  • spark.executor.extraJavaOptions="-XX:+UseG1GC
    -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'"
  • spark.driver.extraJavaOptions="-XX:+UseG1GC-XX:+UnlockDiagnosticVMOptions XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'"

Does anything stand out that could be causing this?

Spark 2.4.7

Relevant Code:

    df = spark.read.parquet(SDL_PATH.format(datekeys=datekeys))


    df = df.filter(F.col('request') == 1)\
            .filter(F.col('pricing_rule_applied') == 0)\
            .filter(F.col('deal_id') == 'company')\
            .filter(F.col('multivariate_test_data').isNull() | (F.col('multivariate_test_data')=='{"passthrough":true}'))\
            .select(F.col('platform_id'),
                    F.col('demand_id'),
                    F.col('publisher_id'),
                    'channel_id',
                    'market_id',
                    'deal_id',
                    F.col('auction_type'),
                    F.col('price_floor_gross'),
                    F.col('price_floor_net'),
                    F.col('ad_slots'),
                    F.col('country').alias('country_id'),
                    F.col('device_type'),
                    F.col('browser_id'),
                    F.col('request').alias('bid_request'),
                    F.when(F.col('bid'), 1).otherwise(0).alias('bid'),
                    F.col('max_bid').alias('max_bid'),
                    F.when(F.col('win'), 1).otherwise(0).alias('win'),
                    F.col('spend').alias('spend'),
                    F.col('above_floor'),
                    F.col('price_floor_adjusted'),
                    F.when((F.col('block_type') == "filter") | (F.col('block_type') == "ad_review") | (F.col('block_type') == "quarantine") | F.col('block_type').isNull(), 1).otherwise(0).alias('blocked'),
                    F.when((F.col('ad_timeout').isNull()) | (F.col('ad_timeout') == 0), 0).otherwise(1).alias('timeout'),
                    F.when((F.col('ad_opt_out').isNull()) | (F.col('ad_opt_out') == 0), 0).otherwise(1).alias('opt_out'))

    df = df.join(deal, on=['deal_id', 'publisher_id'],how='left')
    hashes = spark.read.table('market.server_hashes')\
                  .withColumn('fehash',F.substring(F.col('hash'), 1, 8))\
                  .select('fehash',F.substring(F.col('hostname'), 13, 5).alias('datacenter'))
    df = df.withColumn('fehash',F.lower(F.substring(F.col('market_id'), -12,8)))\
           .join(F.broadcast(hashes), on='fehash', how='left')
    # dfWriter_sql = df.repartition(128).write.format("orc").option("orc.compress", "snappy")
    # dfWriter_sql.mode('overwrite').saveAsTable('datascience.jp_auction_sim_sample_30p')
    good_ids = df.filter('(ad_slots = 1) and (bid=1)').groupBy('market_id').agg(F.sum('win').alias('wins')).filter('wins <=1')
    modified_floors = df.join(good_ids, on='market_id',how='inner').filter('(blocked=0) and (timeout=0) and (opt_out=0) and (bid=1)').cache()
    #modified_floors = modified_floors.filter(F.col('price_floor_gross') == F.col('price_floor_adjusted'))
    #print(modified_floors.count())
    window = Window.partitionBy('market_id').orderBy(F.col('tier').asc(),F.col('max_bid').desc(),F.col('demand_id').asc())
    check = 0
    feats = ['country_id','device_type','browser_id']
    features = ['publisher_id','channel_id','datacenter','country_id','device_type','browser_id']
    for perc_val, val in zip(vals, str_vals):
        # val = str(int(perc_val*100))
        print(val)
        if val == '114':
            val = '115'
        sim_df = modified_floors.withColumn('price_floor_modified', F.col('price_floor_adjusted')*perc_val)\
                                .withColumn('coeff', F.lit(perc_val))\
                                  .withColumn('row_number',F.row_number().over(window))\
                                  .withColumn('next_highest_temp',F.lead('max_bid', count=1, default=None).over(window))\
                                  .withColumn('next_highest', F.when(F.col('next_highest_temp').isNull(), F.when(F.col('max_bid') >= F.col('price_floor_modified'), F.col('price_floor_modified')).otherwise(F.lit(0))).otherwise(F.col('next_highest_temp')))\
                                  .withColumn('adj_win',
                                              F.when((F.col('row_number') == 1) & (F.col('max_bid') >= F.col('price_floor_modified')),
                                                     F.lit(1)).otherwise(F.lit(0)))\
                                  .withColumn('adj_spend',
                                              F.when(
                                                  F.col('adj_win') == 1,
                                                  F.when(
                                                      (F.col('auction_type') == 1) | (F.col('auction_type') == 3),
                                                      F.col('max_bid')
                                                  ).otherwise(
                                                      F.when(F.col('next_highest') >= F.col('price_floor_modified'),
                                                             F.col('next_highest')
                                                      ).otherwise(
                                                                F.col('price_floor_modified')
                                                      )
                                                  )
                                              ).otherwise(
                                                  F.lit(0)
                                              ))\
                                  .withColumn('adj_above_floor', F.when(F.col('max_bid') > F.col('price_floor_modified'), 1).otherwise(0))\
                                  .withColumn('adj_below_floor', F.when(F.col('max_bid') < F.col('price_floor_modified'), 1).otherwise(0))
        if check == 1:
            agg_df = agg_df.union(get_full_tier(sim_df))
        else:
            agg_df = get_full_tier(sim_df)
            check = 1
    agg_df = agg_df.withColumn('datekey',F.lit(int(DATEKEY))).cache()
    window = Window.partitionBy('hashkey').orderBy(F.col('estimated_platform_spend').desc(),F.col('coeff').desc())
    agg_df = agg_df.withColumn('hashkey',F.md5(
                                  F.concat_ws(
                                    '-',
                                    F.coalesce('channel_id',F.lit('')),
                                    F.coalesce('datacenter',F.lit('')),
                                    F.coalesce('country_id',F.lit('')),
                                    F.coalesce('device_type',F.lit('')),
                                    F.coalesce('browser_id',F.lit(''))
                                  ))).withColumn('optimum',(F.row_number().over(window) == 1)
                                     & (F.col('platform_spend_improvement_incremental') > 0)
                                     & (F.col('coeff') != 1.0))
    #agg_df.write.format("parquet").mode('overwrite').saveAsTable('datascience.apf_intermediate_table')
    affiliate = spark.read.table('tableau_cpc3.affiliate')
    browsers = spark.read.table('tableau_cpc3.device_browser')\
                         .select(F.col('device_browser_name').alias('browser_name'),F.col('device_browser_value').cast('integer').alias('browser_id'))
    regions = spark.sql('''SELECT region_id as country_id,
                                  CASE region_name WHEN 'Croatia (Hrvatska)' THEN 'Croatia'
                                      WHEN 'Great Britain (UK)' THEN 'United Kingdom'
                                      WHEN 'New Zealand (Aotearoa)' THEN 'New Zealand'
                                      WHEN 'Vatican City State (Holy See)' THEN 'Vatican City'
                                      WHEN 'Congo' THEN 'Congo (Brazzaville)'
                                      WHEN 'Palestinian Territory' THEN 'Palestinian Territories'
                                      WHEN 'S. Georgia and S. Sandwich Islands' THEN 'South Georgia and the South Sandwich Islands'
                                      ELSE region_name
                                  END AS country
                        FROM tableau_cpc3.region''')


    tier_check = 0
    to_write = agg_df.join(F.broadcast(affiliate).select(F.col('name').alias('publisher_name'), F.col('affiliate_id').alias('publisher_id')), on='publisher_id', how='left')
    to_write = to_write.join(F.broadcast(affiliate).select(F.col('name').alias('channel_name'), F.col('affiliate_id').alias('channel_id')), on='channel_id', how='left')
    to_write = to_write.join(F.broadcast(regions), on='country_id', how='left').withColumn('date',F.col('datekey').cast('string'))
    to_write = to_write.join(F.broadcast(browsers), on='browser_id', how='left')
    to_write = to_write.select('publisher_id',
                                'publisher_name',
                                'channel_id',
                                'channel_name',
                                'datacenter',
                                'country_id',
                                'country',
                                'device_type',
                                F.expr('''CASE device_type WHEN 0 THEN 'Unknown'
                                          WHEN 1 THEN 'Mobile'
                                          WHEN 2 THEN 'Desktop'
                                          WHEN 3 THEN 'CTV'
                                          WHEN 4 THEN 'Phone'
                                          WHEN 5 THEN 'Tablet'
                                          WHEN 6 THEN 'Connected Device'
                                          WHEN 7 THEN 'STB' ELSE 'Unknown' END''').alias('device_type_name'),
                                F.col('browser_id').cast('integer').alias('browser_id'),
                                'browser_name',
                                'coeff',
                                'auctions',
                                'bids_returned',
                                'channel_floor',
                                'unadjusted_platform_spend',
                                'unadjusted_count_win',
                                'unadjusted_count_above_floor',
                                'unadjusted_count_below_floor',
                                'estimated_platform_spend',
                                'estimated_count_win',
                                'estimated_count_above_floor',
                                'estimated_count_below_floor',
                                'competitive_rate',
                                'datekey',
                                'optimum',
                                'platform_spend_improvement_percent',
                                'platform_spend_improvement_incremental',
                                'date',
                                F.md5(
                                  F.concat_ws(
                                    '-',
                                    F.coalesce('channel_id',F.lit('')),
                                    F.coalesce('datacenter',F.lit('')),
                                    F.coalesce('country_id',F.lit('')),
                                    F.coalesce('device_type',F.lit('')),
                                    F.coalesce('browser_id',F.lit(''))
                                  )
                                ).alias('hashkey')).distinct()

    to_write.write\
        .format("parquet")\
        .partitionBy('coeff','datacenter')\
        .mode('overwrite')\
        .option("path", APF_INTERMEDIATE_TABLE_PATH)\
        .saveAsTable('hive_tables.apf_intermediate_table')
6
  • can you post your spark code ? Commented Jun 7, 2021 at 16:43
  • @Srinivas I added any relevant code. The input S3 data has about 2,000 partition files per hour of data by the way Commented Jun 7, 2021 at 16:55
  • Are you able to run this anywhere else? Like on some local cluster? Commented Jun 7, 2021 at 19:42
  • java.lang.AbstractStringBuilder.hugeCapacity seems like you are using some huge strings. Is it possible some string is bigger than 2147483639 or ~ 2GB? Commented Jun 7, 2021 at 19:51
  • @SašaZejnilović The application currently runs successfully on premise. I'm working on transferring the code to run in the cloud. I don't think one string would be bigger than 2GB, but if that's the case is there a spark config to increase that limit? The code already fully worked with 1 day of data which leads me to believe it's just some memory configs that I need to update Commented Jun 7, 2021 at 20:11

1 Answer 1

0

You use machine type that doesn't fit your configuration. You give your executor:

spark.executors.cores=5
spark.executor.memory=18g
spark.executor.memoryOverhead=2g 
spark.executor.instances=53

Actually, you say here: "for my executors i want 20G memory", while your machines have 32Gx0.85=27.2G, so you can place only one executor on single machine (using your current memory settings), that executor will use 5 cores, 1 core is required for Spark, so per machine, you don't use 128-1-5=122 cores and 27.2G-20G=7.2G RAM.

Effectively, with your settings, you have 9 nodes and you can run only 9 executors (1 executor per node). I would recommend you to use machines with more memory and less cores.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.