0

Based on the discussion from How to use Scala UDF in PySpark?, I am able to execute the UDF from a scala code for Primitive types, but I want to call scala UDF from PySpark which accepts a Map[String, String].

package com.test

object ScalaPySparkUDFs extends Serializable {
    def testFunction1(x: Int): Int = { x * 2 }
    def testFunction2(x: Map[String, String]) : String = { // use the Map key and value pair}
    def testUDFFunction1 = udf { x: Int => testFunction1(x) }
    def testUDFFunction2 = udf { x: Map[String, String] => testFunction2(x) }
}

The UDF1 works fine:

_f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1()
Column(_f.apply(_to_seq(sc, [col], _to_java_column)))

But I am not sure how to execute testUDFFunction2 from PySpark:

_f2 = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction2() 
Column(_f2.apply(_to_seq(sc, [lit("KEY"), col("FIRSTCOLUMN"), lit("KEY2"), col("SECONDCOLUMN")], _to_java_column)))

This fails and generates the below exception:

Py4JJavaError: An error occurred while calling o430.apply.
: java.lang.ClassCastException: sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction2$$Lambda$3693/1231805146 cannot be cast to scala.Function4
    at org.apache.spark.sql.catalyst.expressions.ScalaUDF.<init>(ScalaUDF.scala:241)
    at org.apache.spark.sql.expressions.SparkUserDefinedFunction.createScalaUDF(UserDefinedFunction.scala:113)
    at org.apache.spark.sql.expressions.SparkUserDefinedFunction.apply(UserDefinedFunction.scala:101)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Thread.java:748)

I can easily do that from scala as:

val output = input.withColumn("result", testUDFFunction2(map(
      lit("KEY1"), col("FIRSTCOLUMN"),
      lit("KEY2"), col("SECONDCOLUMN")
    )))

But I want convert that code in PySpark, I am not able to find good documentation. As mentioned in https://spark.apache.org/docs/3.2.1/api/java/org/apache/spark/sql/expressions/UserDefinedFunction.html, there are only two apply methods which accepts list of Column arguments. Any recommendations how I can proceed?

I tried create_map function in pyspark.sql.functions but that doesn't work with Col types.

1 Answer 1

0

I can see the problem with how you are calling the function.

You need to change the following line:

_f2 = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction2() 
Column(_f2.apply(_to_seq(sc, [lit("KEY"), col("FIRSTCOLUMN"), lit("KEY2"), col("SECONDCOLUMN")], _to_java_column)))

As, the function can be called using 'map' method in scala, there is an equivalent method 'create_map' in pyspark. Only thing you need to do is:

from pyspark.sql.functions import create_map
_f2 = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction2() 
Column(_f2.apply(_to_seq(sc, [create_map(lit("KEY"), col("FIRSTCOLUMN"), lit("KEY2"), col("SECONDCOLUMN"))], _to_java_column)))

That way, you will be able to call the function and solve ClassCastExceptions.

Sign up to request clarification or add additional context in comments.

1 Comment

Thank you very much, I am able to call the UDF using 'create_map' function which I missed earlier.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.