3

I have a dataframe of the following structure:

A: Array[String]   | B: Array[String] | [ ... multiple other columns ...]
=========================================================================
[A, B, C, D]       | [1, 2, 3, 4]     | [ ... array with 4 elements ... ]
[E, F, G, H, I]    | [5, 6, 7, 8, 9]  | [ ... array with 5 elements ... ]
[J]                | [10]             | [ ... array with 1 element ...  ]

I want to write a UDF, that

  1. Zips the elements on i'th position on each column in the DF
  2. Explodes the DF on each of these zipped tuples

The resulting column should look like this:

ZippedAndExploded: Array[String]
=================================
[A, 1, ...]
[B, 2, ...]
[C, 3, ...]
[D, 4, ...]
[E, 5, ...]
[F, 6, ...]
[G, 7, ...]
[H, 8, ...]
[I, 9, ...]
[J, 10, ...]

At the moment I'm using a multi-call (one per column name, list of column names is collected before during runtime) to a UDF like this:

val myudf6 = udf((xa:Seq[Seq[String]],xb:Seq[String]) => {
  xa.indices.map(i => {
    xa(i) :+ xb(i) // Add one element to the zip column
  })
})

val allColumnNames = df.columns.filter(...)    

for (columnName <- allColumnNames) {
  df = df.withColumn("zipped", myudf8(df("zipped"), df(columnName))
}
df = df.explode("zipped")

Since the dataframe can have hundreds of columns, this iterative call of withColumn seems to take a long time.

Question(s): Is this possible to do with one UDF and a single DF.withColumn(...) call?

Important: The UDF should zip a dynamic number of columns (read at runtime).

1
  • Any idea how to do it in PySpark? Commented Oct 5, 2021 at 12:42

2 Answers 2

3

Use an UDF that takes a variable number of columns as input. This can be done with an array of arrays (assuming that the types are the same). Since you have an array of arrays it's possible to use transpose which will acheive the same results as zipping the lists together. The resulting array can then be exploded.

val array_zip_udf = udf((cols: Seq[Seq[String]]) => {
  cols.transpose
})

val allColumnNames = df.columns.filter(...).map(col)
val df2 = df.withColumn("exploded", explode(array_zip_udf(array(allColumnNames: _*))))

Note that in Spark 2.4+ it would be possible to use arrays_zip instead of an UDF:

val df2 = df.withColumn("exploded", explode(arrays_zip(allColumnNames: _*)))
Sign up to request clarification or add additional context in comments.

3 Comments

Solved it using @Shaido's code (with changing the last row to val df2 = df.withColumn("exploded", explode(array_zip_udf(array(columnNames.head, columnNames.tail : _*)))))
Thank you very much, you saved me such a lot of time!
@D.Müller: Happy to help :) I fixed to answer a bit to avoid the head/tail part, this can be done by adding map(col) to the column names. (I forgot that array only accepts a Seq[Column] and not Seq[String].)
0

if you know and sure about your number of values in the array, below can be a one of the easier solution

select A[0], B[0]..... from your_table
union all
select A[1], B[1]..... from your_table
union all
select A[2], B[2]..... from your_table
union all
select A[3], B[3]..... from your_table

1 Comment

Thank you for the quick reply. I adapted my question, as the dataframe can contain arrays of different sizes (row-wise). I guess, that this solution would not work on such data, right? Sorry for the belated change of the question.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.