I used your logic and shortened it a little.
import pyspark.sql.functions as func
arrcols = ['col2', 'col3', 'col4']
data_sdf. \
selectExpr(*['coalesce({0}, array()) as {0}'.format(c) if c in arrcols else c for c in data_sdf.columns]). \
withColumn('max_size', func.greatest(*[func.size(c) for c in arrcols])). \
selectExpr('col1',
*['flatten(array({0}, array_repeat(element_at({0}, -1), max_size-size({0})))) as {0}'.format(c) for c in arrcols]
). \
withColumn('arrzip', func.arrays_zip(*arrcols)). \
selectExpr('col1', 'inline(arrzip)'). \
orderBy('col1', 'col2'). \
show()
# +----+----+-----+----------+
# |col1|col2| col3| col4|
# +----+----+-----+----------+
# | 1|id_1| tim| apple|
# | 1|id_2|steve| pear|
# | 2|id_3|jenny| avocado|
# | 2|id_4|jenny| avocado|
# | 3|null|megan|strawberry|
# | 3|null|tommy| apple|
# | 4|null| null| banana|
# | 4|null| null|strawberry|
# +----+----+-----+----------+
approach steps
- fill nulls with empty arrays, and take the maximum size within all the array columns
- add elements to arrays that are smaller in size compared to others
- i took the last element of the array and used
array_repeat on it (similar to your approach)
- the number of times to be repeated is calculated by checking the max size against the size of the array being worked on (
max_size-size({0}))
- with the aforementioned steps, you will now have same number of elements in each of the array column which enables you to zip (
arrays_zip) them and explode (using inline() sql function)
the list comprehension in the second selectExpr generates the following
['flatten(array({0}, array_repeat(element_at({0}, -1), max_size-size({0})))) as {0}'.format(c) for c in arrcols]
# ['flatten(array(col2, array_repeat(element_at(col2, -1), max_size-size(col2)))) as col2',
# 'flatten(array(col3, array_repeat(element_at(col3, -1), max_size-size(col3)))) as col3',
# 'flatten(array(col4, array_repeat(element_at(col4, -1), max_size-size(col4)))) as col4']
if it helps, here are the optimized logical plan and physical plan that spark generated
== Optimized Logical Plan ==
Generate inline(arrzip#363), [1], false, [col2#369, col3#370, col4#371]
+- Project [col1#0L, arrays_zip(flatten(array(coalesce(col2#1, []), array_repeat(element_at(coalesce(col2#1, []), -1, false), (greatest(size(coalesce(col2#1, []), true), size(coalesce(col3#2, []), true), size(coalesce(col4#3, []), true)) - size(coalesce(col2#1, []), true))))), flatten(array(coalesce(col3#2, []), array_repeat(element_at(coalesce(col3#2, []), -1, false), (greatest(size(coalesce(col2#1, []), true), size(coalesce(col3#2, []), true), size(coalesce(col4#3, []), true)) - size(coalesce(col3#2, []), true))))), flatten(array(coalesce(col4#3, []), array_repeat(element_at(coalesce(col4#3, []), -1, false), (greatest(size(coalesce(col2#1, []), true), size(coalesce(col3#2, []), true), size(coalesce(col4#3, []), true)) - size(coalesce(col4#3, []), true))))), col2, col3, col4) AS arrzip#363]
+- Filter (size(arrays_zip(flatten(array(coalesce(col2#1, []), array_repeat(element_at(coalesce(col2#1, []), -1, false), (greatest(size(coalesce(col2#1, []), true), size(coalesce(col3#2, []), true), size(coalesce(col4#3, []), true)) - size(coalesce(col2#1, []), true))))), flatten(array(coalesce(col3#2, []), array_repeat(element_at(coalesce(col3#2, []), -1, false), (greatest(size(coalesce(col2#1, []), true), size(coalesce(col3#2, []), true), size(coalesce(col4#3, []), true)) - size(coalesce(col3#2, []), true))))), flatten(array(coalesce(col4#3, []), array_repeat(element_at(coalesce(col4#3, []), -1, false), (greatest(size(coalesce(col2#1, []), true), size(coalesce(col3#2, []), true), size(coalesce(col4#3, []), true)) - size(coalesce(col4#3, []), true))))), col2, col3, col4), true) > 0)
+- LogicalRDD [col1#0L, col2#1, col3#2, col4#3], false
== Physical Plan ==
Generate inline(arrzip#363), [col1#0L], false, [col2#369, col3#370, col4#371]
+- *(1) Project [col1#0L, arrays_zip(flatten(array(coalesce(col2#1, []), array_repeat(element_at(coalesce(col2#1, []), -1, false), (greatest(size(coalesce(col2#1, []), true), size(coalesce(col3#2, []), true), size(coalesce(col4#3, []), true)) - size(coalesce(col2#1, []), true))))), flatten(array(coalesce(col3#2, []), array_repeat(element_at(coalesce(col3#2, []), -1, false), (greatest(size(coalesce(col2#1, []), true), size(coalesce(col3#2, []), true), size(coalesce(col4#3, []), true)) - size(coalesce(col3#2, []), true))))), flatten(array(coalesce(col4#3, []), array_repeat(element_at(coalesce(col4#3, []), -1, false), (greatest(size(coalesce(col2#1, []), true), size(coalesce(col3#2, []), true), size(coalesce(col4#3, []), true)) - size(coalesce(col4#3, []), true))))), col2, col3, col4) AS arrzip#363]
+- *(1) Filter (size(arrays_zip(flatten(array(coalesce(col2#1, []), array_repeat(element_at(coalesce(col2#1, []), -1, false), (greatest(size(coalesce(col2#1, []), true), size(coalesce(col3#2, []), true), size(coalesce(col4#3, []), true)) - size(coalesce(col2#1, []), true))))), flatten(array(coalesce(col3#2, []), array_repeat(element_at(coalesce(col3#2, []), -1, false), (greatest(size(coalesce(col2#1, []), true), size(coalesce(col3#2, []), true), size(coalesce(col4#3, []), true)) - size(coalesce(col3#2, []), true))))), flatten(array(coalesce(col4#3, []), array_repeat(element_at(coalesce(col4#3, []), -1, false), (greatest(size(coalesce(col2#1, []), true), size(coalesce(col3#2, []), true), size(coalesce(col4#3, []), true)) - size(coalesce(col4#3, []), true))))), col2, col3, col4), true) > 0)
+- *(1) Scan ExistingRDD[col1#0L,col2#1,col3#2,col4#3]
col2has 3 elements andcol3has 2 elements?