12

Given a dataframe in which one column is a sequence of structs generated by the following sequence

val df = spark
  .range(10)
  .map((i) => (i % 2, util.Random.nextInt(10), util.Random.nextInt(10)))
  .toDF("a","b","c")
  .groupBy("a")
  .agg(collect_list(struct($"b",$"c")).as("my_list"))
df.printSchema
df.show(false)

Outputs

root
 |-- a: long (nullable = false)
 |-- my_list: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- b: integer (nullable = false)
 |    |    |-- c: integer (nullable = false)

+---+-----------------------------------+
|a  |my_list                            |
+---+-----------------------------------+
|0  |[[0,3], [9,5], [3,1], [4,2], [3,3]]|
|1  |[[1,7], [4,6], [5,9], [6,4], [3,9]]|
+---+-----------------------------------+

I need to run a function over each struct list. The function prototype is similar to the function below

case class DataPoint(b: Int, c: Int)
def do_something_with_data(data: Seq[DataPoint]): Double = {
  // This is an example. I don't actually want the sum
  data.map(data_point => data_point.b + data_point.c).sum
}

I want to store the result of this function to another DataFrame column.

I tried to run

val my_udf = udf(do_something_with_data(_))
val df_with_result = df.withColumn("result", my_udf($"my_list"))
df_with_result.show(false)

and got

17/07/13 12:33:42 WARN TaskSetManager: Lost task 0.0 in stage 15.0 (TID 225, REDACTED, executor 0): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<struct<b:int,c:int>>) => double)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to $line27.$read$$iw$$iw$DataPoint
    at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$do_something_with_data$1.apply(<console>:29)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.do_something_with_data(<console>:29)
    at $line32.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:29)
    at $line32.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:29)

Is it possible to use a UDF like this without first casting my rows to a container struct with the DataFrame API?

Doing something like:

case class MyRow(a: Long, my_list: Seq[DataPoint])
df.as[MyRow].map(_ => (a, my_list, my_udf(my_list)))

using the DataSet api works, but I'd prefer to stick with the DataFrame API if possible.

1 Answer 1

25

You cannot use a case-class as the input-argument of your UDF (but you can return case classes from the UDF). To map an array of structs, you can pass in a Seq[Row] to your UDF:

val  my_uDF = udf((data: Seq[Row]) => {
  // This is an example. I don't actually want the sum
  data.map{case Row(x:Int,y:Int) => x+y}.sum
})

df.withColumn("result", my_udf($"my_list")).show

+---+--------------------+------+
|  a|             my_list|result|
+---+--------------------+------+
|  0|[[0,3], [5,5], [3...|    41|
|  1|[[0,9], [4,9], [6...|    54|
+---+--------------------+------+
Sign up to request clarification or add additional context in comments.

2 Comments

What do you do if each row contains many fields, i.e. writing out case Row(field1: Type1,....) is impractical?
anything here on a practical approach ?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.