5

I have a PySpark Dataframe with two columns (A, B, whose type is double) whose values are either 0.0 or 1.0. I am trying to add a new column, which is the sum of those two. I followed examples in Pyspark: Pass multiple columns in UDF

import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType
sum_cols = F.udf(lambda x: x[0]+x[1], IntegerType())
df_with_sum = df.withColumn('SUM_COL',sum_cols(F.array('A','B')))
df_with_sum.select(['SUM_COL']).toPandas()

This shows a Series of NULLs instead of the results I expect.

I tried any of the following to see if there's an issue with data types

sum_cols = F.udf(lambda x: x[0], IntegerType())
sum_cols = F.udf(lambda x: int(x[0]), IntegerType())

still getting Nulls.

I tried removing the array:

sum_cols = F.udf(lambda x: x, IntegerType())
df_with_sum = df.withColumn('SUM_COL',sum_cols(df.A))

This works fine and shows 0/1

I tried removing the UDF, but leaving the array:

df_with_sum = df.withColumn('SUM_COL', F.array('A','B'))

This works fine and shows a series of arrays of [0.0/1.0, 0.0/1.0]

So, array works fine, UDF works fine, it is just when I try to pass an array to UDF that things break down. What am I doing wrong?

6
  • what are the datatypes of columns A and B? can you check that and update the question? Commented Sep 26, 2018 at 4:17
  • @RameshMaharjan yes, updated! The type is double. Commented Sep 26, 2018 at 4:59
  • how 0/1 is double? is it 0/1 or 0.1? Commented Sep 26, 2018 at 5:08
  • @RameshMaharjan it is 0.0 or 1.0 for A and B, the output should be 0, 1 or 2 (depending on what the operation is, I showed several examples - in one of them I call int() within the UDF, and the UDF return is IntegerType, in another I don't use a UDF at all, so it is 0.0 or 1.0) Commented Sep 26, 2018 at 5:20
  • I am not understanding the form 0.0/1.0 it should be 0.0 if the datatype is double. if the value is 0.0/1.0 then the datatype should be StringType. isn't it so? Commented Sep 26, 2018 at 7:03

1 Answer 1

5

The problem is that you are trying to return a double in a function that is supposed to output an integer, which does not fit, and pyspark by default silently resorts to NULL when a casting fails:

df_with_doubles = spark.createDataFrame([(1.0,1.0), (2.0,2.0)], ['A', 'B'])
sum_cols = F.udf(lambda x: x[0]+x[1], IntegerType())
df_with_sum = df_with_double.withColumn('SUM_COL',sum_cols(F.array('A','B')))
df_with_sum.select(['SUM_COL']).toPandas()

You get:

  SUM_COL
0    None
1    None

However, if you do:

df_with_integers = spark.createDataFrame([(1,1), (2,2)], ['A', 'B'])
sum_cols = F.udf(lambda x: x[0]+x[1], IntegerType())
df_with_sum = df_with_integers.withColumn('SUM_COL',sum_cols(F.array('A','B')))
df_with_sum.select(['SUM_COL']).toPandas()

You get:

   SUM_COL
0        2
1        4

So, either cast your columns to IntegerType beforehand (or cast them in the UDF), or change the return type of the UDF to DoubleType.

Sign up to request clarification or add additional context in comments.

2 Comments

Is there any way to get it to show logs or any hint that the problem is a casting failure vs. something else?
6 years later - I am struggling with the same issue -_- Can't i get spark to fail fast and throw an error in these cases ?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.