1

I have the same situation as stated in this question.

df = spark.createDataFrame(
    [(1, "xx", [10, 20], ["a", "b"], ["p", "q"]),
     (2, "yy", [30, 40], ["c", "d"], ["r", "s"]),
     (3, "zz",     None, ["f", "g"], ["e", "k"])],
    ["c1", "c2", "a1", "a2", "a3"])
df.show()
# +---+---+--------+------+------+
# | c1| c2|      a1|    a2|    a3|
# +---+---+--------+------+------+
# |  1| xx|[10, 20]|[a, b]|[p, q]|
# |  2| yy|[30, 40]|[c, d]|[r, s]|
# |  3| zz|    null|[f, g]|[e, k]|
# +---+---+--------+------+------+

I can't figure out a way to explode it correctly in PySpark. How I can achieve this result?

+---+---+----+---+---+
| c1| c2|  a1| a2| a3|
+---+---+----+---+---+
|  1| xx|  10|  a|  p|
|  1| xx|  20|  b|  q|
|  2| yy|  30|  c|  r|
|  2| yy|  40|  d|  s|
|  3| zz|null|  f|  e|
|  3| zz|null|  g|  k|
+---+---+----+---+---+
2
  • The post has a working solution as confirmed by the user who raised the question. Did you try the solution? With little efforts you should be able convert the scala code from that post to pyspark. Commented Aug 8, 2022 at 12:02
  • @AzharKhan - the accepted answer is not simple to translate for someone who is not familiar with Scala. Now I know some Scala, but that answer would still require quite much of searching what that syntax means. Commented Aug 8, 2022 at 12:07

1 Answer 1

1

The following should do it for dynamic number of array columns.

Spark 3:

from pyspark.sql import functions as F

arr_cols = [c[0] for c in df.dtypes if c[1][:5] == "array"]
df = df.withColumn(
    "arr_of_struct",
    F.arrays_zip(*[F.coalesce(c, F.array(F.lit(None))).alias(c) for c in arr_cols])
).select(
    *[c for c in df.columns if c not in arr_cols],
    F.expr("inline(arr_of_struct)")
)

df.show()
# +---+---+----+---+---+
# | c1| c2|  a1| a2| a3|
# +---+---+----+---+---+
# |  1| xx|  10|  a|  p|
# |  1| xx|  20|  b|  q|
# |  2| yy|  30|  c|  r|
# |  2| yy|  40|  d|  s|
# |  3| zz|null|  f|  e|
# |  3| zz|null|  g|  k|
# +---+---+----+---+---+

Spark 2:

from pyspark.sql import functions as F

arr_cols = [c[0] for c in df.dtypes if c[1][:5] == "array"]
df = df.withColumn(
    "my_struct",
    F.explode(F.arrays_zip(*[F.coalesce(c, F.array(F.lit(None))) for c in arr_cols]))
).select(
    *[c for c in df.columns if c not in arr_cols],
    *[F.col(f"my_struct.{i}").alias(c) for i, c in enumerate(arr_cols)]
)
Sign up to request clarification or add additional context in comments.

2 Comments

@Quynh-MaiChu if you have a list of columns to explode, you can first create a new column with the arrays_zip and then use it in inline. withColumn('arr_of_structs', func.arrays_zip(*column_list)).selectExpr('c1', 'c2', 'inline(arr_of_structs)')
Thank you, very helpful for someone who has little knowledge in pyspark :)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.