1

I am trying to deploy a simple if-else function specifically using pandas_udf. Here is the code:

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd

@pandas_udf("string", PandasUDFType.SCALAR )
def seq_sum1(col1,col2):
  if col1 + col2 <= 6:
    v = "low"
  elif ((col1 + col2 > 6) & (col1 + col2 <=10)) :
    v = "medium"
  else:
    v = "High"
  return (v)

# Deploy 
df.select("*",seq_sum1('c1','c2').alias('new_col')).show(10)

this results in an error:

PythonException: An exception was thrown from a UDF: 'ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().', from <command-1220380192863042>, line 13. Full traceback below:

if I deploy the same code but using @udf instead of @pandas_udf, it produces the results as expected. However, pandas_udf doesn't seem to work.

I know that this kind of functionally can be achieved through other means in spark (case when etc), so the point here is that I want to understand how pandas_udf works when dealing with such logics.

Thanks

2 Answers 2

1

The UDF should take a pandas series and return a pandas series, not taking and returning strings.

import pandas as pd
import numpy as np
import pyspark.sql.functions as F
import pyspark.sql.types as T

@F.pandas_udf("string", F.PandasUDFType.SCALAR)
def seq_sum1(col1, col2):
    return pd.Series(
        np.where(
            col1 + col2 <= 6, "low",
            np.where(
                (col1 + col2 > 6) & (col1 + col2 <= 10), "medium",
                    "high"
            )
        )
    )

df.select("*", seq_sum1('c1','c2').alias('new_col')).show()
+---+---+-------+
| c1| c2|new_col|
+---+---+-------+
|  1|  2|    low|
|  3|  4| medium|
|  5|  6|   high|
+---+---+-------+
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you! it opened up my eyes. I actually end up using map functions to make it work, I will post my code
0

@mck provided the insight, and I end up using the map function to solve it.

@pandas_udf("string", PandasUDFType.SCALAR)
def seq_sum(col1):
  
  # actual function/calculation goes here
  def main(x):
    if x < 6:
      v = "low"
    else:
      v = "high"
    return(v)
  
  # now apply map function, returning a panda series
  result = pd.Series(map(main,col1))
   
  return (result)

df.select("*",seq_sum('column_name').alias('new_col')).show(10)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.