2

I am trying to define a pandas udf that would compute the skew of a lognormal distribution per period.

I currently have done the following:

@pandas_udf("double", PandasUDFType.GROUPED_AGG)  
def lognormal_skew(v):
  return (np.exp(v.std()) + 2) * np.sqrt(np.exp(v.std()) - 1)

my_df.groupBy('period').agg(lognormal_skew(my_df['my_columns'])).show()

However I get an error:

rg.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3047.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3047.0 (TID 208812, 10.139.64.8, executor 82): org.apache.spark.api.python.PythonException: Traceback (most recent call last):

My guess is that this has to do something with numpy since if I try to define a skew as follows:

@pandas_udf("double", PandasUDFType.GROUPED_AGG)  
def skew(v):
  return v.skew()

my_df.groupBy('period').agg(skew(my_df['my_columns'])).show()

It outputs a DataFrame and it does not error.

1
  • 1
    your code with numpy works fine. ensure you are using PyArrow version 0.14.1 or lower. Commented Apr 26, 2020 at 0:48

1 Answer 1

3
+50

Preamble

Based on my experience, I have the opinion that whenever something can be implemented using pyspark built-in functions, it is preferrable than a user-defined function.

One of the problem of udf is that error messages are hard to decrypt. For instance, in your case, I don't know why you meet this error.

pyspark.sql.functions allow you to do many things if you accept to do that in more steps. However, in terms of performance, that will be hard to beat because these functions are optimized by experts. If the thing you want to do cannot be done with pyspark.sql.functions (that happens), I prefer using rdd than udf. rdd are more natural in order to apply Python functions. You loose performance with respect to a built-in DataFrame method but you gain some flexibility.

Maybe an example regarding your problem could be instructive.

Python

Let's take your example based on numpy. You gave the python implementation:

import numpy as np
def lognormal_skew_numpy(v):
    return (np.exp(v.std()) + 2) * np.sqrt(np.exp(v.std()) - 1)

It can be used to control that other implementations are consistent:

print(lognormal_skew_numpy(np.array([1,3,5])))
print(lognormal_skew_numpy(np.array([5,6])))
# 14.448897615797454
# 2.938798148174726

A DataFrame API logic

Now, let's turn out to Spark. I will use the following DataFrame:

df = spark.createDataFrame([(1, 'a'), (3, 'a'), (5, 'a'), (5,'b'), (6,'b')], ['x','period'])
df.show(2)

+---+------+
|  x|period|
+---+------+
|  1|     a|
|  3|     a|
+---+------+
only showing top 2 rows

The skewness function only performs basic math operations. They are all implemented in pyspark.sql.functions so it's not very hard in this case to create a function that does that

import pyspark.sql.functions as psf

def lognormal_skew(df, xvar = 'x'):
    df_agg = (df
              .groupBy('period')
              .agg(psf.stddev_pop(xvar).alias('sd'))
             )
    df_agg = df_agg.withColumn('skew', (psf.exp(psf.col('sd')) + 2)*psf.sqrt(psf.exp('sd') - 1))
    return df_agg

Note that there exists different functions to compute standard deviation in psf: I use stddev_pop that is less efficient but reports a population level variance, not an estimator (with 3 or 2 points, the precision of an estimator would be quite poor).

We can control this yields the desired output:

lognormal_skew(df).show(2)
+------+-----------------+------------------+
|period|               sd|              skew|
+------+-----------------+------------------+
|     b|              0.5| 2.938798148174726|
|     a|1.632993161855452|14.448897615797454|
+------+-----------------+------------------+

We managed to get the expected result with a pure DataFrame logic.

rdd

Let's arrange the data to have an rdd that looks like parallelized numpy arrays:

rdd = df.rdd
rdd = rdd.mapValues(lambda l: l).map(lambda l: (l[1], [l[0]] )).reduceByKey(lambda x,y: x + y)
rdd.take(2)
[('b', [5, 6]), ('a', [1, 3, 5])]

Here we use reduceByKey to group the values into a list. At this step, with voluminous data, you might make your RAM explode.

Finally, you can easily your function in parallel with that structure:

rdd = rdd.map(lambda l: (l[0], np.array(l[1]))).map(lambda l: (l[0], lognormal_skew_numpy(l[1])))
rdd.take(2)
[('b', 2.938798148174726), ('a', 14.448897615797454)]

We have once again the same result. I see two flaws in this approach:

  • It is less readable and portable. If you want to reuse the code with a different dataset, you will have to work more
  • It is less efficient (speed and memory). The reduceByKey operation here is the main bottleneck.

However, you gain some flexibility. That's a trade-off.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.