2

I'm trying to create a new column on a dataframe based on the values of some columns. It's returning null in all cases. Anyone know what's going wrong with this simple example?

df = pd.DataFrame([[0,1,0],[1,0,0],[1,1,1]],columns = ['Foo','Bar','Baz'])

spark_df = spark.createDataFrame(df)

def get_profile():
    if 'Foo'==1:
        return 'Foo'
    elif 'Bar' == 1:
        return 'Bar'
    elif 'Baz' ==1 :
        return 'Baz'

spark_df = spark_df.withColumn('get_profile', lit(get_profile()))
spark_df.show()

   Foo  Bar  Baz get_profile
    0    1    0        None
    1    0    0        None
    1    1    1        None

I would expect that the get_profile column would be filled out for all rows.

I also tried this:

spark_udf = udf(get_profile,StringType())

spark_df = spark_df.withColumn('get_profile', spark_udf())
print(spark_df.toPandas())

to the same effect.

4
  • 1
    You are comparing strings with numbers. 'Foo' != 1, same for other conditions. That's why you get none. A UDF expects columns as arguments, while the get_profile has zero arguments. Commented Sep 26, 2018 at 16:14
  • 1
    go with when/otherwise inbuilt function instead of udf function Commented Sep 26, 2018 at 16:15
  • 1
    I'd do it like this: spark_df.withColumn("get_profile", coalesce(*[when(col(c)==1, lit(c)) for c in spark_df.columns])) Commented Sep 26, 2018 at 16:24
  • Thanks - in reality the when/otherwise function isn't practical because there are many more comparisons that take place, this was just a simplified example. Commented Sep 26, 2018 at 18:28

1 Answer 1

6

The udf has no knowledge of what the column names are. So it checks each of your conditions in your if/elif block and all of them evaluate to False. Thus the function will return None.

You'd have to rewrite your udf to take in the columns you want to check:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def get_profile(foo, bar, baz):
    if foo == 1:
        return 'Foo'
    elif bar == 1:
        return 'Bar'
    elif baz == 1 :
        return 'Baz'

spark_udf = udf(get_profile, StringType())
spark_df = spark_df.withColumn('get_profile',spark_udf('Foo', 'Bar', 'Baz'))
spark_df.show()
#+---+---+---+-----------+
#|Foo|Bar|Baz|get_profile|
#+---+---+---+-----------+
#|  0|  1|  0|        Bar|
#|  1|  0|  0|        Foo|
#|  1|  1|  1|        Foo|
#+---+---+---+-----------+

If you have a lot of columns and want to pass them all (in order):

spark_df = spark_df.withColumn('get_profile', spark_udf(*spark_df.columns))

More generally, you can unpack any ordered list of columns:

cols_to_pass_to_udf = ['Foo', 'Bar', 'Baz']
spark_df = spark_df.withColumn('get_profile', spark_udf(*cols_to_pass_to_udf ))

But this particular operation does not require a udf. I would do it this way:

from pyspark.sql.functions import coalesce, when, col, lit

spark_df.withColumn(
    "get_profile",
    coalesce(*[when(col(c)==1, lit(c)) for c in spark_df.columns])
).show()
#+---+---+---+-----------+
#|Foo|Bar|Baz|get_profile|
#+---+---+---+-----------+
#|  0|  1|  0|        Bar|
#|  1|  0|  0|        Foo|
#|  1|  1|  1|        Foo|
#+---+---+---+-----------+

This works because pyspark.sql.functions.when() will return null by default if the condition evaluates to False and no otherwise is specified. Then the list comprehension of pyspark.sql.functions.coalesce will return the first non-null column.

Note this is equivalent to the udf ONLY if the order of the columns is the same as the sequence that's evaluated in the get_profile function. To be more explicit, you should do:

spark_df.withColumn(
    "get_profile",
    coalesce(*[when(col(c)==1, lit(c)) for c in ['Foo', 'Bar', 'Baz'])
).show()
Sign up to request clarification or add additional context in comments.

4 Comments

Got it - much appreciated. So this is a simplified version of the actual function. ln reality, there are a lot more columns/conditions for assigning values, and the nested when structure wouldn't be practical.. Is there a way to feed all columns into the UDF as an argument?
@flyingmeatball you can also do spark_udf(*spark_df.columns), but you have to make sure that the order of the columns is the same as the order of the arguments to your udf.
Also even if it's complicated, using a nested when is likely faster than using a udf.
Understood - it's more a readability/ability to program. I'm willing to trade a little speed for being able to certify I'm doing the calculations correctly.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.