You can pass the entire row as an additional argument with struct("*"), or struct(*) in SQL. Example:
val df = Seq(
(1, Option.empty[String], 20)
).toDF("id", "name", "age")
val myUDF = udf((name: String, row: Row) =>
try {
Some(name.toLowerCase())
} catch {
case e: Exception => println(row.mkString(","))
None
}
)
df
.select(myUDF($"name",struct("*")))
.show()
you then see the content of the row (in this case 1,null,20) in the logs. As the logs are on remote machines, this can be frustrating.
More on Debugging/Exception handling : You can propagate the Exception to the driver by using re-throwing the Exception with the string-representation of the row as message. Note that your job will fail if an exception occurs:
val myUDF = udf((name: String, row: Row) =>
try {
name.toLowerCase()
} catch {
case e: Exception => throw new Exception("row : "+row.mkString(","),e)
}
)
My preferred solution is to return an additional column from the UDF containing the error message, this will also not stop the spark job in case of an error:
val myUDF = udf((name: String) => {
val result: (Option[String], Option[String]) = try {
(Option(name.toLowerCase()), None)
} catch {
case e: java.lang.Exception => (None, Option(e.toString()))
}
result
}
)
df
.withColumn("tmp",myUDF($"name"))
.withColumn("udf_result",$"tmp._1")
.withColumn("error",$"tmp._2").drop($"tmp")
.show(false)
+---+----+---+----------+------------------------------+
|id |name|age|udf_result|error |
+---+----+---+----------+------------------------------+
|1 |null|20 |null |java.lang.NullPointerException|
+---+----+---+----------+------------------------------+
like this, there is no need to pass the entire row to the udf, you can simply filter your df for df.where($"error".isNotNull)