7

I want to be able to use a Scala function as a UDF in PySpark

package com.test

object ScalaPySparkUDFs extends Serializable {
    def testFunction1(x: Int): Int = { x * 2 }
    def testUDFFunction1 = udf { x: Int => testFunction1(x) }
} 

I can access testFunction1 in PySpark and have it return values:

functions = sc._jvm.com.test.ScalaPySparkUDFs 
functions.testFunction1(10)

What I want to be able to do is use this function as a UDF, ideally in a withColumn call:

row = Row("Value")
numbers = sc.parallelize([1,2,3,4]).map(row).toDF()
numbers.withColumn("Result", testUDFFunction1(numbers['Value']))

I think a promising approach is as found here: Spark: How to map Python with Scala or Java User Defined Functions?

However, when making the changes to code found there to use testUDFFunction1 instead:

def udf_test(col):
    sc = SparkContext._active_spark_context
    _f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1.apply
    return Column(_f(_to_seq(sc, [col], _to_java_column)))

I get:

 AttributeError: 'JavaMember' object has no attribute 'apply' 

I don't understand this because I believe testUDFFunction1 does have an apply method?

I do not want to use expressions of the type found here: Register UDF to SqlContext from Scala to use in PySpark

Any suggestions as to how to make this work would be appreciated!

2 Answers 2

8

Agree with @user6910411, you have to call apply method directly on the function. So, your code will be.

UDF in Scala:

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._


object ScalaPySparkUDFs {

    def testFunction1(x: Int): Int = { x * 2 }

    def getFun(): UserDefinedFunction = udf(testFunction1 _ )
}

PySpark code:

def test_udf(col):
    sc = spark.sparkContext
    _test_udf = sc._jvm.com.test.ScalaPySparkUDFs.getFun()
    return Column(_test_udf.apply(_to_seq(sc, [col], _to_java_column)))


row = Row("Value")
numbers = sc.parallelize([1,2,3,4]).map(row).toDF()
numbers.withColumn("Result", test_udf(numbers['Value']))
Sign up to request clarification or add additional context in comments.

2 Comments

I want to pass multiple columns, Is there any way I can do that?
When I am running this, I am getting TypeError: 'JavaPackage' object is not callable
6

The question you've linked is using a Scala object. Scala object is a singleton and you can use apply method directly.

Here you use a nullary function which returns an object of UserDefinedFunction class co you have to call the function first:

_f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1() # Note () at the end
Column(_f.apply(_to_seq(sc, [col], _to_java_column)))

2 Comments

Is there something similar if I want to pass multiple columns.
@zero323 , would you please guide me for this problem: stackoverflow.com/q/75625201/6640504? Many thanks.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.