1

This question has been asked here for Scala, and it does not help me as I am working with Java API. I have been literally throwing everything and the kitchen sink at it, so this was my approach:

List<String> sourceClasses = new ArrayList<String>();
//Add elements
List<String> targetClasses = new ArrayList<String>();
//Add elements

dataset = dataset.withColumn("Transformer", callUDF(
    "Transformer",
    lit((String[])sourceClasses.toArray())
        .cast(DataTypes.createArrayType(DataTypes.StringType)),
    lit((String[])targetClasses.toArray())
        .cast(DataTypes.createArrayType(DataTypes.StringType))
));

And for my UDF declaration:

public class Transformer implements UDF2<Seq<String>, Seq<String>, String> {


//  @SuppressWarnings("deprecation")
public String call(Seq<String> sourceClasses, Seq<String> targetClasses)
    throws Exception {

When I run the code, the execution does not proceed past the UDF call, which is expected because I am not being able to match up the types. Please help me in this regard.

EDIT

I tried the solution suggested by @Oli. However, I got the following exception:

org.apache.spark.SparkException: Failed to execute user defined function($anonfun$261: (array<string>, array<string>) => string)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.collection.immutable.Seq
at com.esrx.dqm.uuid.UUIDTransformerEngine$1.call(UUIDTransformerEngine.java:1)
at org.apache.spark.sql.UDFRegistration$$anonfun$261.apply(UDFRegistration.scala:774)
... 22 more

This line specifically seems to be indicative of a problem:

Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.collection.immutable.Seq
2
  • Could you add in your question: the stackTrace and the schema of your dataset please ? Commented Nov 25, 2019 at 9:40
  • 1
    @dnej, I created just a 1 row dataframe referring this: stackoverflow.com/questions/39967194/…. The execution just blocks at the UDF call. It does not terminate, nor does it throw an exception. Commented Nov 25, 2019 at 11:33

1 Answer 1

6

From what I understand from the type of your UDF, you are trying to create a UDF that takes two arrays as inputs and returns a string.

In java, that's a bit painful but manageable.

Let's say that you want to join both arrays and link them with the word AND. You could define the UDF as follows:

UDF2 my_udf2 = new UDF2<WrappedArray<String>, WrappedArray<String>, String>() {
    public String call(WrappedArray<String> a1, WrappedArray a2) throws Exception {
        ArrayList<String> l1 = new ArrayList(JavaConverters
            .asJavaCollectionConverter(a1)
            .asJavaCollection());
        ArrayList<String> l2 = new ArrayList(JavaConverters
            .asJavaCollectionConverter(a2)
            .asJavaCollection());
        return l1.stream().collect(Collectors.joining(",")) +
             " AND " +
             l2.stream().collect(Collectors.joining(","));
    }
};

Note that you need to use scala WrappedArray in the signature in the method and transform them in the body of the method with JavaConverters to be able to manipulate them in Java. Here are the required import just in case.

import scala.collection.mutable.WrappedArray;
import scala.collection.JavaConverters;

Then you can register the UDF can use it with Spark. To be able to use it, I created a sample dataframe and two dummy arrays from the 'id' column. Note that it can also work with the lit function as you were trying to do in your question.

spark.udf().register("my_udf2", my_udf2, DataTypes.StringType);

String[] data = {"abcd", "efgh", "ijkl"};

spark.range(3)
    .withColumn("id", col("id").cast("string"))
    .withColumn("array", functions.array(col("id"), col("id")))
    .withColumn("string_of_arrays",
          functions.callUDF("my_udf2", col("array"), lit(data)))
    .show(false);

which yields:

+---+------+----------------------+
|id |array |string_of_arrays      |
+---+------+----------------------+
|0  |[0, 0]|0,0 AND abcd,efgh,ijkl|
|1  |[1, 1]|1,1 AND abcd,efgh,ijkl|
|2  |[2, 2]|2,2 AND abcd,efgh,ijkl|
+---+------+----------------------+

In Spark >= 2.3, you could also do it like this:

UserDefinedFunction my_udf2 = udf(
    (WrappedArray<String> s1, WrappedArray<String> s2) -> "some_string",
    DataTypes.StringType
);

df.select(my_udf2.apply(col("a1"), col("a2")).show(false);
Sign up to request clarification or add additional context in comments.

11 Comments

Hi @Oli, can we pass actual Java arrays or lists as lit parameters into the UDF? That is actually my use case.
I was not sure so I just tried. It works. I edited the answer with an example ;)
I tried your solution, but got the exception whose stacktrace I have mentioned in an edit in my post.
Strange... The code worked for me with Spark 2.2.1 :-/ Let me edit my post with something more robust.
I am using Spark 2.4.4.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.