0

Error Message - org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 14.0 failed 4 times, most recent failure: Lost task 10.3 in stage 14.0 (TID 98) (10.139.64.15 executor 0): com.databricks.spark.safespark.UDFException: INVALID_ARGUMENT: [BROADCAST_VARIABLE_NOT_LOADED] Broadcast variable 17 not loaded.

Code -

from pyspark.sql.functions import udf, col

data = [("Ankit", "IN"),("Sagar", "USA")]
schema = "Name String, Country String"
df = spark.createDataFrame(data, schema)

dic = {"IN" : "India", "USA" : "United States of America"}
bc_dic = spark.sparkContext.broadcast(dic)

def lookup(code):
    return bc_dic.value[code]
u_lookup = udf(lookup)

df_final = df.withColumn("Country Name", u_lookup(col("Country")))
df_final.show()

2 Answers 2

1

The error message indicates that the broadcast variable is not loaded. You can try increasing the timeout for the broadcast variable by setting the spark.databricks.broadcastTimeout configuration to a higher value using the SparkSession object.

from pyspark.sql.functions import udf, col
data = [("Ankit", "IN"),("Sagar", "USA")]
schema = "Name String, Country String"
df = spark.createDataFrame(data, schema)
dic = {"IN" : "India", "USA" : "United States of America"}
bc_dic = spark.sparkContext.broadcast(dic)
spark.conf.set("spark.databricks.broadcastTimeout", "600s")
def lookup(code):
    return bc_dic.value[code]
u_lookup = udf(lookup)
df_final = df.withColumn("Country Name", u_lookup(col("Country")))
df_final.show()

"""
+-----+-------+--------------------+
| Name|Country|        Country Name|
+-----+-------+--------------------+
|Ankit|     IN|               India|
|Sagar|    USA|United States of ...|
+-----+-------+--------------------+
"""

In the above code, I am setting the broadcast timeout to 600s (10min).

Also, I have tried:

- I am defining a function named lookup that takes a row as an argument. The first element is accessed using **row[0]** and the second element is accessed using **row[1]**.
   
- Inside the function, it uses a dictionary called bc_dic to look up the value associated with the second element of the row **(row[1])**.
   
- Using `df.rdd`, and then applies the map transformation using the lookup function. The map operation transforms each row of the RDD by applying the lookup function to it.

Below is the code:

    def lookup(row):
        return (row[0], bc_dic.value[row[1]])
    df_final = df.rdd.map(lookup).toDF(["Name", "Country Name"])
    df_final.show()



"""
+-----+--------------------+
| Name|        Country Name|
+-----+--------------------+
|Ankit|               India|
|Sagar|United States of ...|
+-----+--------------------+
"""
Sign up to request clarification or add additional context in comments.

Comments

1

Problem

The error seems related to the broadcast variable has not been defined. It may caused by the size of the variable. If the size exceeds the value set in spark.sql.autoBroadcastJoinThreshold, it is not initialized.

Solution

As the default value of the variable is 10 mb I would try to set a bigger value, let is say one magnitude.

# Assuming spark is your SparkSession
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100M")

# Your existing code...
erp_bu = src_config.select("erp_code", "bu").collect()
erp_bu_dic = {}
for row in erp_bu:
    if row['erp_code']:
        erp_bu_dic[row['erp_code']] = row['bu']
broadcast_erp_bu_dic = sc.broadcast(erp_bu_dic)
def lookup(x):
    return broadcast_erp_bu_dic.value[x]
u_lookup = udf(lookup)
j06 = j06.withColumn('business_unit', u_lookup("entity_source_id"))
display(j06)

Test different values and find the most suitable for your use case.

Extra Ball

Discussion about the Spark Variable autoBroadcastJoinThreshold (See the accepted answer): What is the maximum size for a broadcast object in Spark?

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.