1

I have the below table for which I am generating a sequence of numbers with rowId column in order to perform a join but this is throwing the following error. What am I doing wrong? Please help me with this.

fListVec: org.apache.spark.sql.DataFrame = [features: vector]
+-----------------------------------------------------------------------------+
|features                                                                     |
+-----------------------------------------------------------------------------+
|[2.5046410000000003,2.1487149999999997,1.0884870000000002,3.5877090000000003]|
|[0.9558040000000001,0.9843780000000002,0.545025,0.9979860000000002]          |
+-----------------------------------------------------------------------------+

Code:

import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

val fListrdd = fListVec.rdd
    .map{case Row(features: Vector) => features}
    .zipWithIndex()
    .toDF("features","rowId")    

fListrdd.createOrReplaceTempView("featuresTable")
val f = spark.sql("SELECT features, rowId from featuresTable")
f.show(false)

Output:

import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.Row org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 206.0 failed 1 times, most recent failure: Lost task 0.0 in stage 206.0 (TID 1718, localhost, executor driver): scala.MatchError: [[2.5046410000000003,2.1487149999999997,1.0884870000000002,3.5877090000000003]] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) at $$$$4896e3e877b134a87d9ee46b238e22$$$$$anonfun$1.apply(:193) at $$$$4896e3e877b134a87d9ee46b238e22$$$$$anonfun$1.apply(:193) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1762) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) at org.apache.spark.rdd.ZippedWithIndexRDD.(ZippedWithIndexRDD.scala:50) at org.apache.spark.rdd.RDD$$anonfun$zipWithIndex$1.apply(RDD.scala:1293) at org.apache.spark.rdd.RDD$$anonfun$zipWithIndex$1.apply(RDD.scala:1293) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.zipWithIndex(RDD.scala:1292) ... 101 elided Caused by: scala.MatchError: [[2.5046410000000003,2.1487149999999997,1.0884870000000002,3.5877090000000003]] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) at $$$$4896e3e877b134a87d9ee46b238e22$$$$$anonfun$1.apply(:193) at $$$$4896e3e877b134a87d9ee46b238e22$$$$$anonfun$1.apply(:193) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1762) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) ... 3 more

Expected Output:

 features                 |       rowId

[2.5046410000000003,...]            0
[0.9558040000000001,...]            1

2 Answers 2

2

You have to write a map function in the middle to define the dataType for the new dataframe to be created as

val fListrdd = fListVec.rdd
  .map{case Row(features) => features}
  .zipWithIndex()
  .map(x => (x._1.asInstanceOf[DenseVector], x._2.toInt))
  .toDF("features","rowId")

where .map(x => (x._1.asInstanceOf[DenseVector], x._2.toInt)) line is only added.

You can go one step further and create a dataset. I personally recommend dataset as datasets are type safety and optimized form of dataframes.

For that you would need a case class

case class features(features: DenseVector, rowId: Int)

and just add features word in my above solution, so that you can call .toDS api to create a type-safety dataset.

val fListDS = fListVec.rdd
  .map{case Row(features: DenseVector) => features}
  .zipWithIndex()
  .map(x => features(x._1.asInstanceOf[DenseVector], x._2.toInt))
  .toDS
Sign up to request clarification or add additional context in comments.

Comments

1

You're almost there - just need to specify the proper vector type DenseVector:

import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.sql.Row

val fList = Seq(
  (Seq(2.5046410000000003, 2.1487149999999997, 1.0884870000000002, 3.5877090000000003)),
  (Seq(0.9558040000000001, 0.9843780000000002, 0.545025, 0.9979860000000002))
).toDF("features")

def seqToVec = udf(
  (s: Seq[Double]) => new DenseVector(s.toArray)
)

val fListVec = fList.withColumn("features", seqToVec($"features"))
// fListVec: org.apache.spark.sql.DataFrame = [features: vector]

val fListrdd = fListVec.rdd.
  map{ case Row(features: DenseVector) => features }.
  zipWithIndex.
  toDF("features", "rowId")  

fListrdd.show
// +--------------------+-----+
// |            features|rowId|
// +--------------------+-----+
// |[2.50464100000000...|    0|
// |[0.95580400000000...|    1|
// +--------------------+-----+

2 Comments

This gave me an error which got resolved when I added the map function that Ramesh specified. Thanks for taking time to respond.
You shouldn't need the additional map since the first one already hints the vector type. I've expanded the answer to provide a complete example.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.