0

I am trying to apply a PySpark UDF to add a new column to a PySpark DataFrame inside a class. The Spark UDF has to be a static method in order to be used inside a class. The dummy example Case 1 below works fine.

The problem is that once the UDF is defined as static method, I cannot use any instance variable inside it. Hence, the Case 2 does not work.

Question: My question is not about why the Case 2 fails. I would like to know if there is any way to make the PySpark UDF access instance variable in the above example? I am aware of this workaround where the UDF is defined inside the method calling the UDF (calculate_new_marks()) in this example. Looking for alternates.


Case 1 (This works!): Static Method UDF without using any instance variable. It simply adds 10 to every student's marks.

import numpy as np
from pyspark.sql.types import StringType, IntegerType, StructType, StructField

class example():
    def __init__(self):
        self.students = [[f'student_{i}', np.random.randint(80)] for i in range(3)]
        self.increase = 10
    
    def create_spark_df(self):
        cSchema = StructType([StructField("Name", StringType())\
                             ,StructField("Marks", IntegerType())])
        return spark.createDataFrame(self.students, schema=cSchema)

    @staticmethod
    @udf(returnType=IntegerType())
    def add_ten_marks(marks):
        return marks + 10    
    
    def calculate_new_marks(self):
        df = self.create_spark_df()
        df = df.withColumn("New Marks", self.add_ten_marks(col("Marks")))
        return df

c = example()
c.calculate_new_marks().show()

+---------+-----+---------+
|     Name|Marks|New Marks|
+---------+-----+---------+
|student_0|    2|       12|
|student_1|   42|       52|
|student_2|   11|       21|
+---------+-----+---------+

Case 2 (This fails!): Static Method UDF with using any instance variable (self.increase here).

import numpy as np
from pyspark.sql.types import StringType, IntegerType, StructType, StructField

class example():
    def __init__(self):
        self.students = [[f'student_{i}', np.random.randint(80)] for i in range(3)]
        self.increase = 10
    
    def create_spark_df(self):
        cSchema = StructType([StructField("Name", StringType())\
                             ,StructField("Marks", IntegerType())])
        return spark.createDataFrame(self.students, schema=cSchema)

    @staticmethod
    @udf(returnType=IntegerType())
    def add_ten_marks(marks):
        return marks + self.increase # <--- constant replaced by instance variable. Problematic Line!!!
    
    def calculate_new_marks(self):
        df = self.create_spark_df()
        df = df.withColumn("New Marks", self.add_ten_marks(col("Marks")))
        return df

c = example()
c.calculate_new_marks().show()

>>> PythonException: An exception was thrown from a UDF: 'NameError: name 'self' is not defined'

1 Answer 1

1

Spark UDF is meant to be self contained and is serialized and executed on the worker and not on the driver thats why it is meant to be static. So, you can try the below alternative,

You can may be pass the variable increase to the udf as an argument like below.


@staticmethod
@udf(returnType=IntegerType())
def add_ten_marks(marks, increase):
    return marks + increase

df = df.withColumn("New Marks", self.add_ten_marks(col("Marks"), lit(self.increase)))

Sign up to request clarification or add additional context in comments.

1 Comment

This works perfectly fine. The lit( ) here is necessary. One can't just pass variables. Thanks!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.