4

I'm trying to offload some computations from Python to Scala when using Apache Spark. I would like to use the class interface from Java to be able to use a persistent variable, like so (this is a nonsensical MWE based on my more complex use case):

package mwe

import org.apache.spark.sql.api.java.UDF1

class SomeFun extends UDF1[Int, Int] {
  private var prop: Int = 0

  override def call(input: Int): Int = {
    if (prop == 0) {
      prop = input
    }
    prop + input
  }
}

Now I'm attempting to use this class from within pyspark:

import pyspark
from pyspark.sql import SQLContext
from pyspark import SparkContext

conf = pyspark.SparkConf()
conf.set("spark.jars", "mwe.jar")
sc = SparkContext.getOrCreate(conf)

sqlContext = SQLContext.getOrCreate(sc)
sqlContext.registerJavaFunction("fun", "mwe.SomeFun")

df0 = sc.parallelize((i,) for i in range(6)).toDF(["num"])
df1 = df0.selectExpr("fun(num) + 3 as new_num")
df1.show()

And get the following exception:

pyspark.sql.utils.AnalysisException: u"cannot resolve '(UDF:fun(num) + 3)' due to data type mismatch: differing types in '(UDF:fun(num) + 3)' (struct<> and int).; line 1 pos 0;\n'Project [(UDF:fun(num#0L) + 3) AS new_num#2]\n+- AnalysisBarrier\n      +- LogicalRDD [num#0L], false\n"

What is the correct way to implement this? Will I have to resort to Java itself for the class? I'd greatly appreciate hints!

0

1 Answer 1

3

The source of the exception is usage of incompatible types:

  • First of all o.a.s.sql.api.java.UDF* objects require external Java (not Scala types), so UDF expecting integers should take boxed Integer (java.lang.Integer) not Int.

    class SomeFun extends UDF1[Integer, Integer] {
      ...
      override def call(input: Integer): Integer = {
        ...
    
  • Unless you use legacy Python num column uses of LongType not IntegerType:

    df0.printSchema()
    root
     |-- num: long (nullable = true)
    

    So the actual signature should be

    class SomeFun extends UDF1[java.lang.Long, java.lang.Long] {
      ...
      override def call(input: java.lang.Long): java.lang.Long = {
        ...
    

    or data should be casted before applying UDF

    df0.selectExpr("fun(cast(num as integer)) + 3 as new_num")
    

Finally mutable state is not allowed in UDFs. It won't cause an exception but overall behavior will be non-deterministic.

Sign up to request clarification or add additional context in comments.

1 Comment

Thank you! I reproduced this with the MWE and will now have to adapt my other code, as well as see if there is a way around mutable state (determinism is not critical).

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.