2

I use own Spark UDF functions in Spark SQL expressions(SQL language) (not via Spark API). In case of failure inside of my UDF function I'd like to access the whole row with all columns and expose this information (for example via custom exception or logs) for better error handling.

Right now I don't know how to access the row columns inside of my UDF or even how to pass all columns to my UDF via SQL. Please suggest.

1 Answer 1

2

You can pass the entire row as an additional argument with struct("*"), or struct(*) in SQL. Example:

val df = Seq(
  (1, Option.empty[String], 20)
).toDF("id", "name", "age")

val myUDF = udf((name: String, row: Row) =>
  try {
    Some(name.toLowerCase())
  } catch {
    case e: Exception => println(row.mkString(","))
      None
  }
)

df
  .select(myUDF($"name",struct("*")))
  .show()

you then see the content of the row (in this case 1,null,20) in the logs. As the logs are on remote machines, this can be frustrating.

More on Debugging/Exception handling : You can propagate the Exception to the driver by using re-throwing the Exception with the string-representation of the row as message. Note that your job will fail if an exception occurs:

val myUDF = udf((name: String, row: Row) =>
  try {
    name.toLowerCase()
  } catch {
    case e: Exception => throw new Exception("row : "+row.mkString(","),e)
  }
)

My preferred solution is to return an additional column from the UDF containing the error message, this will also not stop the spark job in case of an error:

val myUDF = udf((name: String) => {
  val result: (Option[String], Option[String]) = try {
    (Option(name.toLowerCase()), None)
  } catch {
    case e: java.lang.Exception  => (None, Option(e.toString()))
  }
  result
}
)

df
  .withColumn("tmp",myUDF($"name"))
  .withColumn("udf_result",$"tmp._1")
  .withColumn("error",$"tmp._2").drop($"tmp")
  .show(false) 

+---+----+---+----------+------------------------------+
|id |name|age|udf_result|error                         |
+---+----+---+----------+------------------------------+
|1  |null|20 |null      |java.lang.NullPointerException|
+---+----+---+----------+------------------------------+

like this, there is no need to pass the entire row to the udf, you can simply filter your df for df.where($"error".isNotNull)

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks! struct(*) works like a charm in SQL expression!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.