I am trying to apply a PySpark UDF to add a new column to a PySpark DataFrame inside a class. The Spark UDF has to be a static method in order to be used inside a class. The dummy example Case 1 below works fine.
The problem is that once the UDF is defined as static method, I cannot use any instance variable inside it. Hence, the Case 2 does not work.
Question:
My question is not about why the Case 2 fails. I would like to know if there is any way to make the PySpark UDF access instance variable in the above example? I am aware of this workaround where the UDF is defined inside the method calling the UDF (calculate_new_marks()) in this example. Looking for alternates.
Case 1 (This works!): Static Method UDF without using any instance variable. It simply adds 10 to every student's marks.
import numpy as np
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
class example():
def __init__(self):
self.students = [[f'student_{i}', np.random.randint(80)] for i in range(3)]
self.increase = 10
def create_spark_df(self):
cSchema = StructType([StructField("Name", StringType())\
,StructField("Marks", IntegerType())])
return spark.createDataFrame(self.students, schema=cSchema)
@staticmethod
@udf(returnType=IntegerType())
def add_ten_marks(marks):
return marks + 10
def calculate_new_marks(self):
df = self.create_spark_df()
df = df.withColumn("New Marks", self.add_ten_marks(col("Marks")))
return df
c = example()
c.calculate_new_marks().show()
+---------+-----+---------+
| Name|Marks|New Marks|
+---------+-----+---------+
|student_0| 2| 12|
|student_1| 42| 52|
|student_2| 11| 21|
+---------+-----+---------+
Case 2 (This fails!): Static Method UDF with using any instance variable (self.increase here).
import numpy as np
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
class example():
def __init__(self):
self.students = [[f'student_{i}', np.random.randint(80)] for i in range(3)]
self.increase = 10
def create_spark_df(self):
cSchema = StructType([StructField("Name", StringType())\
,StructField("Marks", IntegerType())])
return spark.createDataFrame(self.students, schema=cSchema)
@staticmethod
@udf(returnType=IntegerType())
def add_ten_marks(marks):
return marks + self.increase # <--- constant replaced by instance variable. Problematic Line!!!
def calculate_new_marks(self):
df = self.create_spark_df()
df = df.withColumn("New Marks", self.add_ten_marks(col("Marks")))
return df
c = example()
c.calculate_new_marks().show()
>>> PythonException: An exception was thrown from a UDF: 'NameError: name 'self' is not defined'