4

I am new to spark SQL and Dataframes. I have a Dataframe to which I should be adding a new column based on the values of other columns. I have a Nested IF formula from excel that I should be implementing (for adding values to the new column), which when converted into programmatic terms, is something like this:

if(k =='yes')
{
  if(!(i==''))
  {
    if(diff(max_date, target_date) < 0)
    {
      if(j == '')
      {
        "pending" //the value of the column
      }
      else {
        "approved" //the value of the column
      }
    }
    else{
      "expired" //the value of the column
    }
  }
  else{
    "" //the value should be empty
  }
}
else{
  "" //the value should be empty
} 

i,j,k are three other columns in the Dataframe. I know we can use withColumn and when to add new columns based on other columns, but I am not sure how I can achieve the above logic using that approach.

what would be an easy/efficient way to implement the above logic for adding the new column? Any help would be appreciated.

Thank you.

2
  • where do max_date and target_date come from? Commented Nov 22, 2017 at 23:53
  • ‘Max_date’ comes from the table that I get my dataframe from. ‘Target_date’ is one of the three columns, i,j,k. Commented Nov 23, 2017 at 0:35

1 Answer 1

6

First thing, lets simplify that if statement:

if(k == "yes" && i.nonEmpty)
  if(maxDate - targetDate < 0)
    if (j.isEmpty) "pending" 
    else "approved"
  else "expired"
else ""

Now there are 2 main ways to accomplish this

  1. Using a custom UDF
  2. Using spark built in functions: coalesce, when, otherwise

Custom UDF

Now due to the complexity of your conditions, it will be rather tricky to do number 2. Using a custom UDF should suit your needs.

def getState(i: String, j: String, k: String, maxDate: Long, targetDate: Long): String =  
  if(k == "yes" && i.nonEmpty)
    if(maxDate - targetDate < 0)
      if (j.isEmpty) "pending" 
      else "approved"
    else "expired"
  else ""

val stateUdf = udf(getState _)
df.withColumn("state", stateUdf($"i",$"j",$"k",lit(0),lit(0)))

Just change lit(0) and lit(0) to your date code, and this should work for you.

Using spark built in functions

If you notice performance issues, you can switch to using coalesce, otherwise, and when, which would look something like this:

val isApproved = df.withColumn("state", when($"k" === "yes" && $"i" =!= "" && (lit(max_date) - lit(target_date) < 0) && $"j" =!= "", "approved").otherwise(null))
val isPending = isApproved.withColumn("state", coalesce($"state", when($"k" === "yes" && $"i" =!= "" && (lit(max_date) - lit(target_date) < 0) && $"j" === "", "pending").otherwise(null)))
val isExpired = isPending.withColumn("state", coalesce($"state", when($"k" === "yes" && $"i" =!= "" && (lit(max_date) - lit(target_date) >= 0), "expired").otherwise(null)))
val finalDf = isExpired.withColumn("state", coalesce($"state", lit("")))

I've used custom udf's in the past with large input sources without issues, and custom udfs can lead to much more readable code, especially in this case.

Sign up to request clarification or add additional context in comments.

7 Comments

Thanks for the great answer! Will give this a try once I get the chance. Will let you know how it goes :)
Hello. in your answer Using spark built-in functions, what does the finalDF statement mean? we're already putting NULL wherever the conditions are not met, right?
@Hemanth the coalesce function returns the first non null value. So if the current value of the state column is still null by the time all the previous conditions have been tried, then the value of the state column will be set to empty string.
I actually can substitute NULL for empty values. Also, the i, j values can be checked for NULL instead of empty values. Can I achieve the same with the below piece of code?
DF2 = DF.withColumn("state", when(col("k") === 1 && col("i") != "NULL" &&(ChronoUnit.DAYS.between(LocalDate.parse(maxPartitionDate.trim), LocalDate.parse(col("i").toString.trim)) < 0) && col("j") != "NULL", "approved").when(col("k") === 1 && col("i") != "NULL" && (ChronoUnit.DAYS.between(LocalDate.parse(maxPartitionDate.trim), LocalDate.parse(col("i").toString.trim)) < 0) && col("j") == "NULL", "pending") .when(col("k") === 1 && col("i") != "NULL" && (ChronoUnit.DAYS.between(LocalDate.parse(maxPartitionDate.trim), LocalDate.parse(col("i").toString.trim)) >= 0), "expired").otherwise(null))
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.