1

I am trying to add an Array of values as a new column to the DataFrame.

Ex: Lets assume there is an Array(4,5,10) and a dataframe

+----------+-----+
|   name   | age |
+----------+-----+
|   John   | 32  |
| Elizabeth| 28  |
|   Eric   | 41  |
+----------+-----+

My requirement is to add the above array as a new column to the dataframe. My expected output is as follows:

+----------+-----+------+
|   name   | age | rank |
+----------+-----+------+
|   John   | 32  | 4    | 
| Elizabeth| 28  | 5    |
|   Eric   | 41  | 10   |
+----------+-----+------+

I am trying if I can achieve this using rdd and zipWithIndex.

df.rdd.zipWithIndex.map(_.swap).join(array_rdd.zipWithIndex.map(_.swap))

This is resulting in something of this sort.

(0,([John, 32],4))

I want to convert the above RDD back to required dataframe. Let me know how to achieve this.

Are there any alternatives available for achieving the desired result other than using rdd and zipWithIndex? What is the best way to do it?

PS:

Context for better understanding:

I am using Xpress optimization suite to solve a mathematical problem. Xpress takes inputs interms of Arrays and also outputs the result in an Array. I get input as a DataFrame and I am extracting columns as Arrays(using collect) and passing to Xpress. Xpress outputs Array[Double] as solution. I want to add this solution back to the dataframe as a column and every value in the solution array corresponds to the row of the dataframe at its index i.e value at index 'n' of the output Array corresponds to 'n'th row of the dataframe

1 Answer 1

2

After the join just map the results to what you are looking for. You can convert this back to a dataframe after joining the RDDs.


val originalDF = Seq(("John", 32), ("Elizabeth", 28), ("Eric", 41)).toDF("name", "age")

val rank = Array(4, 5, 10)

// convert to Seq first      
val rankDF = rank.toSeq.toDF("rank")

val joined = originalDF.rdd.zipWithIndex.map(_.swap).join(rankDF.rdd.zipWithIndex.map(_.swap))

val finalRDD = joined.map{ case (k,v) => (k, v._1.getString(0), v._1.getInt(1), v._2.getInt(0)) }

val finalDF = finalRDD.toDF("id", "name", "age", "rank")

finalDF.show()
/*
+---+---------+---+----+
| id|     name|age|rank|
+---+---------+---+----+
|  0|     John| 32|   4|
|  1|Elizabeth| 28|   5|
|  2|     Eric| 41|  10|
+---+---------+---+----+
*/

The only alternate way that I can think of is to use the org.apache.spark.sql.functions.row_number() window function. This essentially achieves the same thing by adding an increasing, consecutive row number to the dataframe.

The drawback with this is the large amount of data shuffle into one partition, since we need to have unrepeated row numbers for all rows in the dataframe. If your data is very large this can lead to an out of memory issue. (Note: this may not be applicable in your case, since you mentioned you are doing a collect on the data and have not mentioned any memory issues in this).

The approach of converting to an rdd and using zipWithIndex is an acceptable solution, but generally converting from dataframe to rdd is not recommended due to the performance difference of using an RDD instead of a dataframe.

Sign up to request clarification or add additional context in comments.

6 Comments

Missing something in your code sample ;)
From the docs, it is mentioned that The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. With this, it may not be true that ID for a row in the original DF matches ID of its corresponding row in the Array DF and thereby inner join cannot result the desired output dataframe
@SaiRameshGuptaPasumarthi I've updated the answer to use rdd and zipWithIndex.
@GaëlJ nothing caught my eye when i rechecked, apart from Sai's comment above. I've fixed my answer to account for that now. Is there anything else I'm missing?
@m_vemuri can you explain if there is any diff between map() of RDD vs map{}
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.