7

I'm trying to make a pandas UDF that takes in two columns with integer values and based on the difference between these values return an array of decimals whose length is equal to the aforementioned difference.

Here's my attempt so far, I've been messing around with a lot of different ways trying to get this to work but here's the general idea

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
import pandas as pd

@pandas_udf(ArrayType(DecimalType()), PandasUDFType.SCALAR)
def zero_pad(x, y):
  buffer = []
  
  for i in range(0, (x - y)):
    buffer.append(0.0)
  
  return buffer

Here's how I use it:

df = df.withColumn("zero_list", zero_pad(df.x, df.y))

The end result being df with a new column called "zero_list" being an ArrayType(DecimalType()) column that looks like [0.0, 0.0, 0.0, ...] the length of which is (df.x - df.y).

The error message is so general it's almost not worth posting, simply "Job aborted due to stage failure" and it only traces back to the part of my code where I do a df.show():

Py4JJavaError                             Traceback (most recent call last)
<command-103561> in <module>()
---> 33 df.orderBy("z").show(n=1000)

/databricks/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    350         """
    351         if isinstance(truncate, bool) and truncate:
--> 352             print(self._jdf.showString(n, 20, vertical))
    353         else:
    354             print(self._jdf.showString(n, int(truncate), vertical))

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

How to create a pandas_udf that would return an array of variable length?

I'm doing all of this using Databricks with Spark 2.3.1.

3 Answers 3

3

This question is for about one year ago but I ran into the same problem and here is my solution with pandas_udf:

import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *

@pandas_udf(ArrayType(IntegerType()), PandasUDFType.SCALAR)
def zero_pad(xs,ys):
    buffer = []
    for idx, x in enumerate(xs):
        buffer.append([0]*int(x-ys[idx]))

    return pd.Series(buffer)

df = df.withColumn("zero_list", zero_pad(df.x, df.y))
Sign up to request clarification or add additional context in comments.

Comments

0

As a general case, this could be used:

from pyspark.sql import functions as F
import pandas as pd

@F.pandas_udf('array<string>')
def pudf(x: pd.Series, y: pd.Series) -> pd.Series:
    return pd.Series([[x[0], y[0]]])

df = spark.createDataFrame([('aa', 'bb')])
df.withColumn('out', pudf('_1', '_2')).show()
# +---+---+--------+
# | _1| _2|     out|
# +---+---+--------+
# | aa| bb|[aa, bb]|
# +---+---+--------+

The answer to the OP's question:

from pyspark.sql import functions as F
import pandas as pd

@F.pandas_udf('array<decimal>')
def zero_pad(xs: pd.Series, ys: pd.Series) -> pd.Series:
    return pd.Series([[0] * (xs[0] - ys[0])])

df = spark.createDataFrame([(5, 2), (8, 4)])
df = df.withColumn("zero_list", zero_pad('_1', '_2'))

df.show()
# +---+---+------------+
# | _1| _2|   zero_list|
# +---+---+------------+
# |  5|  2|   [0, 0, 0]|
# |  8|  4|[0, 0, 0, 0]|
# +---+---+------------+

df.printSchema()
# root
#  |-- _1: long (nullable = true)
#  |-- _2: long (nullable = true)
#  |-- zero_list: array (nullable = true)
#  |    |-- element: decimal(10,0) (containsNull = true)

Comments

-1

I don't understand why you return a pandas Series value from function. It returns multiple rows for every single input.

>>> import pandas as pd
>>> def zero_pad(x, y):
...     buffer = []
...     for i in range(0, (x - y)):
...             buffer.append(0.0)
...     return pd.Series(buffer)
... 
>>> zero_pad(5,1)
0    0.0
1    0.0
2    0.0
3    0.0
dtype: float64

So you can't add a column with a result which has multiple rows.

And on the other hands you can't use udf directly in withColumn statement. Please see my script below I think result is exactly what you are looking for

>>> from pyspark.sql.functions import udf
>>> 
>>> data = sc.parallelize([
...     (2,1),
...     (8,1),
...     (5,2),
...     (6,4)])
>>> columns = ['x','y']
>>> df = spark.createDataFrame(data, columns)
>>> df.show()
+---+---+
|  x|  y|
+---+---+
|  2|  1|
|  8|  1|
|  5|  2|
|  6|  4|
+---+---+

>>> def zero_pad(x, y):
...     buffer = []
...     for i in range(0, (x - y)):
...             buffer.append(0.0)
...     return buffer
... 
>>> my_udf = udf(zero_pad)
>>> df = df.withColumn("zero_list", my_udf(df.x, df.y))
>>> df.show()
+---+---+--------------------+
|  x|  y|           zero_list|
+---+---+--------------------+
|  2|  1|               [0.0]|
|  8|  1|[0.0, 0.0, 0.0, 0...|
|  5|  2|     [0.0, 0.0, 0.0]|
|  6|  4|          [0.0, 0.0]|
+---+---+--------------------+

1 Comment

Your first statement is absolutely correct and this is a solution I can use, but I need my own solution to use pandas UDFs: databricks.com/blog/2017/10/30/… Otherwise I would select your answer.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.