I'm fairly new to Spark and Scala. I'm trying to call a function as a Spark UDF but I run into this error that I can't seem to resolve.
I understand that in Scala, Array and Seq aren't the same. WrappedArray is a subtype of Seq and there is implicit conversions between WrappedArray and Array but I'm not sure why that doesn't happen in the case of the UDF.
Any pointers to help me understand and resolve this is much appreciated.
Here's a snippet of the code
def filterMapKeysWithSet(m: Map[Int, Int], a: Array[Int]): Map[Int, Int] = {
val seqToArray = a.toArray
val s = seqToArray.toSet
m filterKeys s
}
val myUDF = udf((m: Map[Int, Int], a: Array[Int]) => filterMapKeysWithSet(m, a))
case class myType(id: Int, m: Map[Int, Int])
val mapRDD = Seq(myType(1, Map(1 -> 100, 2 -> 200)), myType(2, Map(1 -> 100, 2 -> 200)), myType(3, Map(3 -> 300, 4 -> 400)))
val mapDF = mapRDD.toDF
mapDF: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>]
root
|-- id: integer (nullable = false)
|-- m: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
case class myType2(id: Int, a: Array[Int])
val idRDD = Seq(myType2(1, Array(1,2,100,200)), myType2(2, Array(100,200)), myType2(3, Array(1,2)) )
val idDF = idRDD.toDF
idDF: org.apache.spark.sql.DataFrame = [id: int, a: array<int>]
root
|-- id: integer (nullable = false)
|-- a: array (nullable = true)
| |-- element: integer (containsNull = false)
import sqlContext.implicits._
/* Hive context is exposed as sqlContext */
val j = mapDF.join(idDF, idDF("id") === mapDF("id")).drop(idDF("id"))
val k = j.withColumn("filteredMap",myUDF(j("m"), j("a")))
k.show
Looking at the Dataframe "j" & "k", the map and array columns have the right data types.
j: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>]
root
|-- id: integer (nullable = false)
|-- m: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
|-- a: array (nullable = true)
| |-- element: integer (containsNull = false)
k: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>, filteredMap: map<int,int>]
root
|-- id: integer (nullable = false)
|-- m: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
|-- a: array (nullable = true)
| |-- element: integer (containsNull = false)
|-- filteredMap: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
However, an action on the Dataframe "k" that calls the UDF fails with the following error -
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 6, ip-100-74-42-194.ec2.internal): java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [I
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:60)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1865)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1865)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)