11

Spark is throwing ClassCastExpection when performing any operation on WrappedArray.

Example:

I have an map output like below

Output:

Map(1 -> WrappedArray(Pan4), 2 -> WrappedArray(Pan15), 3 -> WrappedArray(Pan16, Pan17, Pan18), 4 -> WrappedArray(Pan19, Pan1, Pan2, Pan3, Pan4, Pan5, Pan6))]

when invoked map.values, it's printing the output as the below output

MapLike(WrappedArray(Pan4), WrappedArray(Pan15), WrappedArray(Pan16, Pan17, Pan18), WrappedArray(Pan19, Pan1, Pan2, Pan3, Pan4, Pan5, Pan6))

It is throwing the exception if invoked by map.values.map(arr => arr) or map.values.forEach { value => println(value)}.

I am not able to perform any operation on the wrapped array. I just need the size of the elements present in each wrappedArray.

Error StackTrace
------------------
java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to java.util.ArrayList
    at WindowTest$CustomMedian$$anonfun$1.apply(WindowTest.scala:176)
    at WindowTest$CustomMedian$$anonfun$1.apply(WindowTest.scala:176)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.Map$Map4.foreach(Map.scala:181)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at WindowTest$CustomMedian.evaluate(WindowTest.scala:176)
    at org.apache.spark.sql.execution.aggregate.ScalaUDAF.eval(udaf.scala:446)
    at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$35.apply(AggregationIterator.scala:376)
    at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$35.apply(AggregationIterator.scala:368)
    at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:154)
    at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
2
  • Please provide your sample code that you are using! Commented Nov 23, 2016 at 12:53
  • @ShivanshSrivastava resolved the error by changing it to Sequence type... Commented Nov 24, 2016 at 4:32

3 Answers 3

24

I resolved the error by converting to Seq (sequence type).

Earlier:

val bufferMap: Map[Int, util.ArrayList[String]] = buffer.getAs[Map[Int, util.ArrayList[String]]](1)

Modified:

val bufferMap: Map[Int, Seq[String]] = buffer.getAs[Map[Int, Seq[String]]](1)
Sign up to request clarification or add additional context in comments.

Comments

2

For those using Java Spark, encode the data set into an object instead of using Row and then the getAs method.

Suppose this data set that has some random information about a machine:

+-----------+------------+------------+-----------+---------+--------------------+
|epoch      |     RValues|     SValues|    TValues|      ids|               codes|
+-----------+------------+------------+-----------+---------+--------------------+
| 1546297225| [-1.0, 5.0]|  [2.0, 6.0]| [3.0, 7.0]|   [2, 3]|[MRT0000020611, M...|
| 1546297226| [-1.0, 3.0]| [-6.0, 6.0]| [3.0, 4.0]|   [2, 3]|[MRT0000020611, M...|
| 1546297227| [-1.0, 4.0]|[-8.0, 10.0]| [3.0, 6.0]|   [2, 3]|[MRT0000020611, M...|
| 1546297228| [-1.0, 6.0]|[-8.0, 11.0]| [3.0, 5.0]|   [2, 3]|[MRT0000020611, M...|
+-----------+------------+------------+-----------+---------+--------------------+

Instead of having Dataset<Row>, create Dataset<MachineLog> that complies with this dataset column definition and create the MachineLog class. When doing a transformation, use the .as(Encoders.bean(MachineLog.class)) method to define the encoder.

For example:

spark.createDataset(dataset.rdd(), Encoders.bean(MachineLog.class));

But converting from a Dataset to RDD is not recommended. Try to use the as method.

Dataset<MachineLog> mLog = spark.read().parquet("...").as(Encoders.bean(MachineLog.class));

It can also be used after a transformation.

Dataset<MachineLog> machineLogDataset = aDataset
                .join(
                        otherDataset,
                        functions.col("...").eqNullSafe("...")
                        )
                ).as(Encoders.bean(MachineLog.class));

Take into account that MachineLog class must obey the serialization rules (i.e., having anempty-explicit constructor, and getters and setters).

Comments

1

Try the below

map.values.**array**.forEach { value => println(value)}

array is a method in WrapperArray. It returns Array[T]. Here T is the type of the elements in the WrappedArray.

1 Comment

Doesn't work... It throws java.lang.ArrayStoreException: scala.collection.mutable.WrappedArray$ofRef at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.