Preamble
Based on my experience, I have the opinion that whenever something can be implemented using pyspark built-in functions, it is preferrable than a user-defined function.
One of the problem of udf is that error messages are hard to decrypt. For instance, in your case, I don't know why you meet this error.
pyspark.sql.functions allow you to do many things if you accept to do that in more steps. However, in terms of performance, that will be hard to beat because these functions are optimized by experts. If the thing you want to do cannot be done with pyspark.sql.functions (that happens), I prefer using rdd than udf. rdd are more natural in order to apply Python functions. You loose performance with respect to a built-in DataFrame method but you gain some flexibility.
Maybe an example regarding your problem could be instructive.
Python
Let's take your example based on numpy. You gave the python implementation:
import numpy as np
def lognormal_skew_numpy(v):
return (np.exp(v.std()) + 2) * np.sqrt(np.exp(v.std()) - 1)
It can be used to control that other implementations are consistent:
print(lognormal_skew_numpy(np.array([1,3,5])))
print(lognormal_skew_numpy(np.array([5,6])))
# 14.448897615797454
# 2.938798148174726
A DataFrame API logic
Now, let's turn out to Spark. I will use the following DataFrame:
df = spark.createDataFrame([(1, 'a'), (3, 'a'), (5, 'a'), (5,'b'), (6,'b')], ['x','period'])
df.show(2)
+---+------+
| x|period|
+---+------+
| 1| a|
| 3| a|
+---+------+
only showing top 2 rows
The skewness function only performs basic math operations. They are all implemented in pyspark.sql.functions so it's not very hard in this case to create a function that does that
import pyspark.sql.functions as psf
def lognormal_skew(df, xvar = 'x'):
df_agg = (df
.groupBy('period')
.agg(psf.stddev_pop(xvar).alias('sd'))
)
df_agg = df_agg.withColumn('skew', (psf.exp(psf.col('sd')) + 2)*psf.sqrt(psf.exp('sd') - 1))
return df_agg
Note that there exists different functions to compute standard deviation in psf: I use stddev_pop that is less efficient but reports a population level variance, not an estimator (with 3 or 2 points, the precision of an estimator would be quite poor).
We can control this yields the desired output:
lognormal_skew(df).show(2)
+------+-----------------+------------------+
|period| sd| skew|
+------+-----------------+------------------+
| b| 0.5| 2.938798148174726|
| a|1.632993161855452|14.448897615797454|
+------+-----------------+------------------+
We managed to get the expected result with a pure DataFrame logic.
rdd
Let's arrange the data to have an rdd that looks like parallelized numpy arrays:
rdd = df.rdd
rdd = rdd.mapValues(lambda l: l).map(lambda l: (l[1], [l[0]] )).reduceByKey(lambda x,y: x + y)
rdd.take(2)
[('b', [5, 6]), ('a', [1, 3, 5])]
Here we use reduceByKey to group the values into a list. At this step, with voluminous data, you might make your RAM explode.
Finally, you can easily your function in parallel with that structure:
rdd = rdd.map(lambda l: (l[0], np.array(l[1]))).map(lambda l: (l[0], lognormal_skew_numpy(l[1])))
rdd.take(2)
[('b', 2.938798148174726), ('a', 14.448897615797454)]
We have once again the same result. I see two flaws in this approach:
- It is less readable and portable. If you want to reuse the code with a different dataset, you will have to work more
- It is less efficient (speed and memory). The
reduceByKey operation here is the main bottleneck.
However, you gain some flexibility. That's a trade-off.