0

I am running pyspark code with JAVA UDF in databricks. I have r6id.xlarge (32g) driver and node of r6id.4xlarge (128) worker. I am reading only one file and my java UDF is just calling an open source X12 java lib to parse the file as a whole Sample code below, this works for files which are less then 100MB

df = spark.read.format('text').option('wholetext', True).load("s3://xxxx/xxxxxxx")
spark.udf.registerJavaFunction("x12_parser","com.abc", pst.StringType())
df.select(expr(x12_parser(input_columns)))

Whenever I parse big file (just one file), I will get error java.lang.OutOfMemoryError: Requested array size exceeds VM limit. When I parse this file locally, if I increase my heap size to 20g it will work otherwise same error. but as my worker node is way larger than this.(I am in databricks so no need to configure executor memory and set -Xmx is not permitted) I also tried to call my function directly like below

import boto3
s3 = boto3.client('s3')
bucket_name = 'xxxx'
key = 'xxxxxxx'
response = s3.get_object(Bucket=bucket_name, Key=key)
contents = response['Body'].read().decode('utf-8')

parser_class = spark._jvm.com.abc.x12_parser()
output = parser_class.call(contents)

This will work fine even when my driver is 4 times smaller than worker without touch java heap size. I tried to play with some spark setting like network timeout and spark.executor.extraJavaOptions -Xms20g -XX:+UseCompressedOops but none of them works.

I can't explain why with a huge worker I can't process the same file I can on much smaller driver or my local

2 Answers 2

0

The problem is that the Java UDF is executed on executors, not the driver. Executors process data in parallel. Despite the big memory of your workers, the heap size per executor may be not enough to handle such a large file. Furthermore, Spark's wholetext option loads a whole file as one row, therefore making things worse in terms of in-memory footprint when the UDF operates on it.

if you are using boto3 which will work on directly from driver it wont allow distributed execution. suggest you to use

  • braodcast variable for file content and then process it val fileContent = spark.sparkContext.broadcast(content) and then create dataframe with it. and then use that in your udf

  • set spark executor memroy ,memoryoverhead appropriately with number of core 4-5

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for the reply. Actually I need to file to be processed at one unit as it is EDI file which has logic between each line. The UDF is reading the whole file and parse it's internal logic then output human readable json string. That's why I am reading it as whole file
you can broadcast that after reading content ? No ??
0

it seems like there is some internal UDF memory limitation. I stopped using UDF and change the code to do map from java side

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.