1

I have following dataframe:

+----------+ 
|col       | 
+----------+ 
|[1, 4, 3] | 
|[1, 5, 11]| 
|[1, 3, 3] | 
|[1, 4, 3] | 
|[1, 6, 3] | 
|[1, 1, 3] | 
+----------+

What I want is:

+----------+ 
|col_new   | 
+----------+ 
|[3, -1]   | 
|[4, 6]    | 
|[2, 0]    | 
|[3, -1]   | 
|[5, -3]   | 
|[0, 2]    | 
+----------+

=> Diff operator arr[n+1] - arr[n]

And I don't know how I should do it.

I thought I should do it with udf? I'm not really familiar with it but yeah I tried.

from pyspark.sql.functions import col

def diff(a):
    return [a[ii+1]-a[ii] for ii in range(a.__len__()-1)]

function = udf(lambda c: diff(c))
df.withColumn("col_new",function(col("col"))).show(20,False)

But yeah that didn't work of course since I need a list... but I want to use the power of dataframe... Does someone have a hint for me?

Best Boendal

2
  • Why didn't that work, I mean what was the outcome? Also, you are missing the datatype in your udf: function = udf(lambda c: diff(c), ArrayType(IntegerType())), which will cause "col_new" to be null. Also: df.withColumn("col_new",function("col")).show(20,False) (remove the extra col) Commented Jan 15, 2018 at 15:59
  • your code should work Commented Jan 15, 2018 at 16:03

2 Answers 2

6

Your code is perfect. The only mistake is in your import statement which should be as

from pyspark.sql import functions as F

def diff(a):
    return [a[ii+1]-a[ii] for ii in range(a.__len__()-1)]

function = F.udf(lambda c: diff(c))
df.withColumn("col_new",function(F.col("col"))).show(20,False)

And you should be all good

Updated

To sum up more, I would suggest you try not using udf functions as much as possible as they require data serialization and deserialization which would certainly reduce the processing efficiency, you should always try using inbuilt functions as much as possible.

So simply saying you can use array and col functions as below to meet your requirement.

from pyspark.sql import functions as F
df.select(F.array([(F.col("col")[i+1]-F.col("col")[i]) for i in range(2)]).alias("col_new")).show(20,False)
Sign up to request clarification or add additional context in comments.

Comments

2

You could write a UDF that does what you need in plain python like this:

def diff(array):
     res = []
     for i in range(0, len(array) -1):
             res.append(array[i+1]-array[i])
     return res

import pyspark.sql.functions as fun
f=fun.udf(diff)

And this is how you apply it to your data:

d = sc.parallelize([[[1,4,3]], [[1,5,11]], [[1,3,3]]]).toDF(["col"])
d.show()
+----------+
|       col|
+----------+
| [1, 4, 3]|
|[1, 5, 11]|
| [1, 3, 3]|
+----------+

d.withColumn("new_col", f(d["col"])).drop("col").show()
+-------+
|new_col|
+-------+
|[3, -1]|
| [4, 6]|
| [2, 0]|
+-------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.