The error message indicates that the broadcast variable is not loaded. You can try increasing the timeout for the broadcast variable by setting the spark.databricks.broadcastTimeout configuration to a higher value using the SparkSession object.
from pyspark.sql.functions import udf, col
data = [("Ankit", "IN"),("Sagar", "USA")]
schema = "Name String, Country String"
df = spark.createDataFrame(data, schema)
dic = {"IN" : "India", "USA" : "United States of America"}
bc_dic = spark.sparkContext.broadcast(dic)
spark.conf.set("spark.databricks.broadcastTimeout", "600s")
def lookup(code):
return bc_dic.value[code]
u_lookup = udf(lookup)
df_final = df.withColumn("Country Name", u_lookup(col("Country")))
df_final.show()
"""
+-----+-------+--------------------+
| Name|Country| Country Name|
+-----+-------+--------------------+
|Ankit| IN| India|
|Sagar| USA|United States of ...|
+-----+-------+--------------------+
"""
In the above code, I am setting the broadcast timeout to 600s (10min).
Also, I have tried:
- I am defining a function named lookup that takes a row as an argument. The first element is accessed using **row[0]** and the second element is accessed using **row[1]**.
- Inside the function, it uses a dictionary called bc_dic to look up the value associated with the second element of the row **(row[1])**.
- Using `df.rdd`, and then applies the map transformation using the lookup function. The map operation transforms each row of the RDD by applying the lookup function to it.
Below is the code:
def lookup(row):
return (row[0], bc_dic.value[row[1]])
df_final = df.rdd.map(lookup).toDF(["Name", "Country Name"])
df_final.show()
"""
+-----+--------------------+
| Name| Country Name|
+-----+--------------------+
|Ankit| India|
|Sagar|United States of ...|
+-----+--------------------+
"""