Trying to scale a pyspark app on AWS EMR. Was able to get it to work for one day of data (around 8TB), but keep running into (what I believe are) OOM errors when trying to test it on one week of data (around 50TB)
I set my spark configs based on this article. Originally, I got a
java.lang.OutOfMemoryError: Java heap space from the Driver std out. From browsing online, it seemed like my spark.driver.memory was too low, so I boosted that up quite a bit. Now I am running into a different OOM error. In the driver std err, I see something like this:
.saveAsTable('hive_tables.apf_intermediate_table')
File "/mnt1/yarn/usercache/hadoop/appcache/application_1622843305817_0001/container_1622843305817_0001_02_000001/pyspark.zip/pyspark/sql/readwriter.py", line 778, in saveAsTable
File "/mnt1/yarn/usercache/hadoop/appcache/application_1622843305817_0001/container_1622843305817_0001_02_000001/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/mnt1/yarn/usercache/hadoop/appcache/application_1622843305817_0001/container_1622843305817_0001_02_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/mnt1/yarn/usercache/hadoop/appcache/application_1622843305817_0001/container_1622843305817_0001_02_000001/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o12119.saveAsTable.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:174)
at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:517)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:217)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:176)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:173)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:169)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:197)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:194)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:169)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:114)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:112)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696)
at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:494)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:473)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:429)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Task not serializable
and in the driver std out, I see something like this:
21/06/04 21:57:58 INFO CodeGenerator: Code generated in 52.445036 ms
21/06/04 21:57:58 INFO CodeGenerator: Code generated in 10.271344 ms
21/06/04 21:57:59 INFO CodeGenerator: Code generated in 7.076469 ms
21/06/04 21:57:59 INFO CodeGenerator: Code generated in 6.111364 ms
21/06/04 21:57:59 INFO CodeGenerator: Code generated in 9.17915 ms
21/06/04 21:57:59 INFO CodeGenerator: Code generated in 50.537577 ms
21/06/04 21:57:59 INFO CodeGenerator: Code generated in 9.398937 ms
21/06/04 21:58:00 INFO CodeGenerator: Code generated in 44.204289 ms
21/06/04 21:58:00 INFO CodeGenerator: Code generated in 6.294884 ms
21/06/04 21:58:00 INFO CodeGenerator: Code generated in 8.570691 ms
21/06/04 21:58:02 INFO CodeGenerator: Code generated in 55.276023 ms
21/06/04 21:58:02 INFO CodeGenerator: Code generated in 10.988539 ms
21/06/04 21:58:07 INFO CodeGenerator: Code generated in 284.051432 ms
21/06/04 21:58:07 ERROR Utils: uncaught error in thread spark-listener-group-eventLog, stopping SparkContext
java.lang.OutOfMemoryError
at java.lang.AbstractStringBuilder.hugeCapacity(AbstractStringBuilder.java:161)
at java.lang.AbstractStringBuilder.newCapacity(AbstractStringBuilder.java:155)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:125)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:596)
at java.lang.StringBuilder.append(StringBuilder.java:190)
at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:351)
at com.fasterxml.jackson.core.io.SegmentedStringWriter.getAndClear(SegmentedStringWriter.java:83)
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2933)
at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:107)
at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:138)
at org.apache.spark.scheduler.EventLoggingListener.onOtherEvent(EventLoggingListener.scala:236)
at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:80)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1347)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
21/06/04 21:58:07 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-eventLog
java.lang.OutOfMemoryError
with the physical plan being spit out repeatedly right after in both logs.
I assume that since the code has run successfully with less input data, that this is still an OOM error and that I can safely ignore the Task not serializable error. However, I have tried many different EMR / Spark configs and keep getting some variation of this error.
An example cluster set-up would be something like this:
Driver node: 1 M5.8Xlarge instance
Core nodes: 9 M5.8Xlarge instances (each with 32 GiB RAM, 128 vCores)
- spark.dynamicAllocation.enabled=False
- spark.executors.cores=5
- spark.executor.memory=18g
- spark.executor.memoryOverhead=2g
- spark.driver.memory=44g
- spark.driver.cores=5
- spark.executor.instances=53
- spark.default.parallelism=530
- spark.sql.shuffle.partitions=200
- spark.serializer=org.apache.spark.serializer.KryoSerializer
- spark.executor.extraJavaOptions="-XX:+UseG1GC
-XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'" - spark.driver.extraJavaOptions="-XX:+UseG1GC-XX:+UnlockDiagnosticVMOptions XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'"
Does anything stand out that could be causing this?
Spark 2.4.7
Relevant Code:
df = spark.read.parquet(SDL_PATH.format(datekeys=datekeys))
df = df.filter(F.col('request') == 1)\
.filter(F.col('pricing_rule_applied') == 0)\
.filter(F.col('deal_id') == 'company')\
.filter(F.col('multivariate_test_data').isNull() | (F.col('multivariate_test_data')=='{"passthrough":true}'))\
.select(F.col('platform_id'),
F.col('demand_id'),
F.col('publisher_id'),
'channel_id',
'market_id',
'deal_id',
F.col('auction_type'),
F.col('price_floor_gross'),
F.col('price_floor_net'),
F.col('ad_slots'),
F.col('country').alias('country_id'),
F.col('device_type'),
F.col('browser_id'),
F.col('request').alias('bid_request'),
F.when(F.col('bid'), 1).otherwise(0).alias('bid'),
F.col('max_bid').alias('max_bid'),
F.when(F.col('win'), 1).otherwise(0).alias('win'),
F.col('spend').alias('spend'),
F.col('above_floor'),
F.col('price_floor_adjusted'),
F.when((F.col('block_type') == "filter") | (F.col('block_type') == "ad_review") | (F.col('block_type') == "quarantine") | F.col('block_type').isNull(), 1).otherwise(0).alias('blocked'),
F.when((F.col('ad_timeout').isNull()) | (F.col('ad_timeout') == 0), 0).otherwise(1).alias('timeout'),
F.when((F.col('ad_opt_out').isNull()) | (F.col('ad_opt_out') == 0), 0).otherwise(1).alias('opt_out'))
df = df.join(deal, on=['deal_id', 'publisher_id'],how='left')
hashes = spark.read.table('market.server_hashes')\
.withColumn('fehash',F.substring(F.col('hash'), 1, 8))\
.select('fehash',F.substring(F.col('hostname'), 13, 5).alias('datacenter'))
df = df.withColumn('fehash',F.lower(F.substring(F.col('market_id'), -12,8)))\
.join(F.broadcast(hashes), on='fehash', how='left')
# dfWriter_sql = df.repartition(128).write.format("orc").option("orc.compress", "snappy")
# dfWriter_sql.mode('overwrite').saveAsTable('datascience.jp_auction_sim_sample_30p')
good_ids = df.filter('(ad_slots = 1) and (bid=1)').groupBy('market_id').agg(F.sum('win').alias('wins')).filter('wins <=1')
modified_floors = df.join(good_ids, on='market_id',how='inner').filter('(blocked=0) and (timeout=0) and (opt_out=0) and (bid=1)').cache()
#modified_floors = modified_floors.filter(F.col('price_floor_gross') == F.col('price_floor_adjusted'))
#print(modified_floors.count())
window = Window.partitionBy('market_id').orderBy(F.col('tier').asc(),F.col('max_bid').desc(),F.col('demand_id').asc())
check = 0
feats = ['country_id','device_type','browser_id']
features = ['publisher_id','channel_id','datacenter','country_id','device_type','browser_id']
for perc_val, val in zip(vals, str_vals):
# val = str(int(perc_val*100))
print(val)
if val == '114':
val = '115'
sim_df = modified_floors.withColumn('price_floor_modified', F.col('price_floor_adjusted')*perc_val)\
.withColumn('coeff', F.lit(perc_val))\
.withColumn('row_number',F.row_number().over(window))\
.withColumn('next_highest_temp',F.lead('max_bid', count=1, default=None).over(window))\
.withColumn('next_highest', F.when(F.col('next_highest_temp').isNull(), F.when(F.col('max_bid') >= F.col('price_floor_modified'), F.col('price_floor_modified')).otherwise(F.lit(0))).otherwise(F.col('next_highest_temp')))\
.withColumn('adj_win',
F.when((F.col('row_number') == 1) & (F.col('max_bid') >= F.col('price_floor_modified')),
F.lit(1)).otherwise(F.lit(0)))\
.withColumn('adj_spend',
F.when(
F.col('adj_win') == 1,
F.when(
(F.col('auction_type') == 1) | (F.col('auction_type') == 3),
F.col('max_bid')
).otherwise(
F.when(F.col('next_highest') >= F.col('price_floor_modified'),
F.col('next_highest')
).otherwise(
F.col('price_floor_modified')
)
)
).otherwise(
F.lit(0)
))\
.withColumn('adj_above_floor', F.when(F.col('max_bid') > F.col('price_floor_modified'), 1).otherwise(0))\
.withColumn('adj_below_floor', F.when(F.col('max_bid') < F.col('price_floor_modified'), 1).otherwise(0))
if check == 1:
agg_df = agg_df.union(get_full_tier(sim_df))
else:
agg_df = get_full_tier(sim_df)
check = 1
agg_df = agg_df.withColumn('datekey',F.lit(int(DATEKEY))).cache()
window = Window.partitionBy('hashkey').orderBy(F.col('estimated_platform_spend').desc(),F.col('coeff').desc())
agg_df = agg_df.withColumn('hashkey',F.md5(
F.concat_ws(
'-',
F.coalesce('channel_id',F.lit('')),
F.coalesce('datacenter',F.lit('')),
F.coalesce('country_id',F.lit('')),
F.coalesce('device_type',F.lit('')),
F.coalesce('browser_id',F.lit(''))
))).withColumn('optimum',(F.row_number().over(window) == 1)
& (F.col('platform_spend_improvement_incremental') > 0)
& (F.col('coeff') != 1.0))
#agg_df.write.format("parquet").mode('overwrite').saveAsTable('datascience.apf_intermediate_table')
affiliate = spark.read.table('tableau_cpc3.affiliate')
browsers = spark.read.table('tableau_cpc3.device_browser')\
.select(F.col('device_browser_name').alias('browser_name'),F.col('device_browser_value').cast('integer').alias('browser_id'))
regions = spark.sql('''SELECT region_id as country_id,
CASE region_name WHEN 'Croatia (Hrvatska)' THEN 'Croatia'
WHEN 'Great Britain (UK)' THEN 'United Kingdom'
WHEN 'New Zealand (Aotearoa)' THEN 'New Zealand'
WHEN 'Vatican City State (Holy See)' THEN 'Vatican City'
WHEN 'Congo' THEN 'Congo (Brazzaville)'
WHEN 'Palestinian Territory' THEN 'Palestinian Territories'
WHEN 'S. Georgia and S. Sandwich Islands' THEN 'South Georgia and the South Sandwich Islands'
ELSE region_name
END AS country
FROM tableau_cpc3.region''')
tier_check = 0
to_write = agg_df.join(F.broadcast(affiliate).select(F.col('name').alias('publisher_name'), F.col('affiliate_id').alias('publisher_id')), on='publisher_id', how='left')
to_write = to_write.join(F.broadcast(affiliate).select(F.col('name').alias('channel_name'), F.col('affiliate_id').alias('channel_id')), on='channel_id', how='left')
to_write = to_write.join(F.broadcast(regions), on='country_id', how='left').withColumn('date',F.col('datekey').cast('string'))
to_write = to_write.join(F.broadcast(browsers), on='browser_id', how='left')
to_write = to_write.select('publisher_id',
'publisher_name',
'channel_id',
'channel_name',
'datacenter',
'country_id',
'country',
'device_type',
F.expr('''CASE device_type WHEN 0 THEN 'Unknown'
WHEN 1 THEN 'Mobile'
WHEN 2 THEN 'Desktop'
WHEN 3 THEN 'CTV'
WHEN 4 THEN 'Phone'
WHEN 5 THEN 'Tablet'
WHEN 6 THEN 'Connected Device'
WHEN 7 THEN 'STB' ELSE 'Unknown' END''').alias('device_type_name'),
F.col('browser_id').cast('integer').alias('browser_id'),
'browser_name',
'coeff',
'auctions',
'bids_returned',
'channel_floor',
'unadjusted_platform_spend',
'unadjusted_count_win',
'unadjusted_count_above_floor',
'unadjusted_count_below_floor',
'estimated_platform_spend',
'estimated_count_win',
'estimated_count_above_floor',
'estimated_count_below_floor',
'competitive_rate',
'datekey',
'optimum',
'platform_spend_improvement_percent',
'platform_spend_improvement_incremental',
'date',
F.md5(
F.concat_ws(
'-',
F.coalesce('channel_id',F.lit('')),
F.coalesce('datacenter',F.lit('')),
F.coalesce('country_id',F.lit('')),
F.coalesce('device_type',F.lit('')),
F.coalesce('browser_id',F.lit(''))
)
).alias('hashkey')).distinct()
to_write.write\
.format("parquet")\
.partitionBy('coeff','datacenter')\
.mode('overwrite')\
.option("path", APF_INTERMEDIATE_TABLE_PATH)\
.saveAsTable('hive_tables.apf_intermediate_table')
java.lang.AbstractStringBuilder.hugeCapacityseems like you are using some huge strings. Is it possible some string is bigger than 2147483639 or ~ 2GB?