2

I am trying to rewrite an UDF into pandas UDF.

However when it comes to the column with ArrayType inside. I am struggling to find the right solution.

I have a dataframe as below:

+-----------+--------------------+
|      genre|                 ids|
+-----------+--------------------+
|      Crime|[6, 22, 42, 47, 5...|
|    Romance|[3, 7, 11, 15, 17...|
|   Thriller|[6, 10, 16, 18, 2...|
|  Adventure|[2, 8, 10, 15, 29...|
|   Children|[1, 2, 8, 13, 34,...|
|      Drama|[4, 11, 14, 16, 1...|
|        War|[41, 110, 151, 15...|
|Documentary|[37, 77, 99, 108,...|
|    Fantasy|[2, 56, 60, 126, ...|
|    Mystery|[59, 113, 123, 16...|
+-----------+--------------------+

The following UDF works well:

pairs_udf = udf(lambda x: itertools.combinations(x, 2), transformer.schema)
df = df.select("genre", pairs_udf("ids").alias("ids"))

The output is like:

+-----------+--------------------+
|      genre|                 ids|
+-----------+--------------------+
|      Crime|[[6, 22], [6, 42]...|
|    Romance|[[3, 7], [3, 11],...|
|   Thriller|[[6, 10], [6, 16]...|
|  Adventure|[[2, 8], [2, 10],...|
|   Children|[[1, 2], [1, 8], ...|
|      Drama|[[4, 11], [4, 14]...|
|        War|[[41, 110], [41, ...|
|Documentary|[[37, 77], [37, 9...|
|    Fantasy|[[2, 56], [2, 60]...|
|    Mystery|[[59, 113], [59, ...|
+-----------+--------------------+

However, what would be the equivalent when writing the function in pandas udf.

PS: I understand, alternatively, I can use cross-join to achieve the same results.

But, I am more curious about how do pandas udf handle column with ArrayType.

2
  • maybe something like lambda row: row.apply(lambda x: itertools.combinations(x, 2)) Commented Nov 24, 2020 at 19:22
  • thanks, @mck, that was one of my attempts. now I think the issue I have is more related to java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available. after a few google search, it seems to do with java 11 and spark-arrow support. which may belong to a separate question. Commented Nov 24, 2020 at 21:54

1 Answer 1

2

I am going to share my findings here:

there are 3 aspects in order to make pandas udf work for your project:

1. pandas UDF, or more precisely, Apache Arrow does not support complex types as common udf does.(as of pyspark 3.0.1, pyarrow 2.0.0)

e.g.:

2. if you are running Java 11, which is the default in (py)Spark 3. you need to add the following as part of your spark config:

spark.driver.extraJavaOptions='-Dio.netty.tryReflectionSetAccessible=true'
spark.executor.extraJavaOptions='-Dio.netty.tryReflectionSetAccessible=true'

this will solve the java.lang.UnsupportedOperationException mentioned above.

3. make sure your virtual environment python path is added to your pyspark_python

i.e. environ['PYSPARK_PYTHON']='./your/virtual/enviroment/path'

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.