1

I have a spark Dataframe df with the following schema:

root
 |-- features: array (nullable = true)
 |    |-- element: double (containsNull = false)

I would like to create a new Dataframe where each row will be a Vector of Doubles and expecting to get the following schema:

root
     |-- features: vector (nullable = true)

So far I have the following piece of code (influenced by this post: Converting Spark Dataframe(with WrappedArray) to RDD[labelPoint] in scala) but I fear something is wrong with it because it takes a very long time to compute even a reasonable amount of rows. Also, if there are too many rows the application will crash with a heap space exception.

val clustSet = df.rdd.map(r => {
          val arr = r.getAs[mutable.WrappedArray[Double]]("features")
          val features: Vector = Vectors.dense(arr.toArray)
          features
          }).map(Tuple1(_)).toDF()

I suspect that the instruction arr.toArray is not a good Spark practice in this case. Any clarification would be very helpful.

Thank you!

1 Answer 1

4

It's because .rdd have to unserialize objects from internal in-memory format and it is very time consuming.

It's ok to use .toArray - you are operating on row level, not collecting everything to the driver node.

You can do this very easy with UDFs:

import org.apache.spark.ml.linalg._
val convertUDF = udf((array : Seq[Double]) => {
  Vectors.dense(array.toArray)
})
val withVector = dataset
  .withColumn("features", convertUDF('features))

Code is from this answer: Convert ArrayType(FloatType,false) to VectorUTD

However there author of the question didn't ask about differences

Sign up to request clarification or add additional context in comments.

3 Comments

Thank you very much, that helped a lot and marked it as the answer. I can run more rows now and it is satisfying time-wise. I still get though an exception: org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 1 when I try 200,000 rows. Would you have an insight about this? Thanks again.
I set in my code the following: val conf = new SparkConf() .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryoserializer.buffer.max.mb","256") and it worked! Thank you.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.