11

A Spark DataFrame contains a column of type Array[Double]. It throw a ClassCastException exception when I try to get it back in a map() function. The following Scala code generate an exception.

case class Dummy( x:Array[Double] )
val df = sqlContext.createDataFrame(Seq(Dummy(Array(1,2,3))))
val s = df.map( r => {
   val arr:Array[Double] = r.getAs[Array[Double]]("x")
   arr.sum
})
s.foreach(println)

The exception is

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [D
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:24)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:23)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Cam somebody explain me why it does not work? what should I do instead? I am using Spark 1.5.1 and scala 2.10.6

Thanks

2 Answers 2

25

ArrayType is represented in a Row as a scala.collection.mutable.WrappedArray. You can extract it using for example

val arr: Seq[Double] = r.getAs[Seq[Double]]("x")

or

val i: Int = ???
val arr = r.getSeq[Double](i)

or even:

import scala.collection.mutable.WrappedArray

val arr: WrappedArray[Double] = r.getAs[WrappedArray[Double]]("x")

If DataFrame is relatively thin then pattern matching could be a better approach:

import org.apache.spark.sql.Row

df.rdd.map{case Row(x: Seq[Double]) => (x.toArray, x.sum)}

although you have to keep in mind that the type of the sequence is unchecked.

In Spark >= 1.6 you can also use Dataset as follows:

df.select("x").as[Seq[Double]].rdd
Sign up to request clarification or add additional context in comments.

Comments

0

This approach can also be considered :

  val tuples = Seq(("Abhishek", "Sengupta", Seq("MATH", "PHYSICS")))
  val dF = tuples.toDF("firstName", "lastName", "subjects")

  case class StudentInfo(fName: String, lName: String, subjects: Seq[String])

  val students = dF
    .collect()
    .map(row => StudentInfo(row.getString(0), row.getString(1), row.getSeq(2)))

  students.foreach(println)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.