3

Trying to resolve some transformation within dataframes, any help is much appreciated.

Within scala (version 2.3.1) : I have a dataframe which has array of string and long.

+------+---------+----------+---------+---------+
|userId|     varA|      varB|     varC|     varD|
+------+---------+----------+---------+---------+
|     1|[A, B, C]| [0, 2, 5]|[1, 2, 9]|[0, 0, 0]|
|     2|[X, Y, Z]|[1, 20, 5]|[9, 0, 6]|[1, 1, 1]|
+------+---------+----------+---------+---------+

I would want my output to be like below dataframe.

+------+---+---+---+---+
|userId|  A|  B|  C|  D|
+------+---+---+---+---+
|     1|  A|  0|  1|  0|
|     1|  B|  2|  2|  0|
|     1|  C|  5|  9|  0|
|     2|  X|  1|  9|  1|
|     2|  Y| 20|  0|  1|
|     2|  Z|  5|  6|  1|
+------+---+---+---+---+

I tried doing this using explode, getting Cartesian product. Is there a way to keep the record count to 6 rows, instead of 18 rows.

scala> val data = sc.parallelize(Seq("""{"userId": 1,"varA": ["A", "B", "C"], "varB": [0, 2, 5], "varC": [1, 2, 9], "varD": [0, 0, 0]}""","""{"userId": 2,"varA": ["X", "Y", "Z"], "varB": [1, 20, 5], "varC": [9, 0, 6], "varD": [1, 1, 1]}"""))
scala> val df = spark.read.json(data)
scala> df.show()
+------+---------+----------+---------+---------+
|userId|     varA|      varB|     varC|     varD|
+------+---------+----------+---------+---------+
|     1|[A, B, C]| [0, 2, 5]|[1, 2, 9]|[0, 0, 0]|
|     2|[X, Y, Z]|[1, 20, 5]|[9, 0, 6]|[1, 1, 1]|
+------+---------+----------+---------+---------+
scala>
scala> df.printSchema
root
 |-- userId: long (nullable = true)
 |-- varA: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- varB: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- varC: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- varD: array (nullable = true)
 |    |-- element: long (containsNull = true)
scala>
scala> val zip_str = udf((x: Seq[String], y: Seq[Long]) => x.zip(y))
zip_str: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,ArrayType(StructType(StructField(_1,StringType,true), StructField(_2,LongType,false)),true),Some(List(ArrayType(StringType,true), ArrayType(LongType,false))))

scala> val zip_long = udf((x: Seq[Long], y: Seq[Long]) => x.zip(y))
zip_long: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,ArrayType(StructType(StructField(_1,LongType,false), StructField(_2,LongType,false)),true),Some(List(ArrayType(LongType,false), ArrayType(LongType,false))))

scala> df.withColumn("zip_1", explode(zip_str($"varA", $"varB"))).withColumn("zip_2", explode(zip_long($"varC", $"varD"))).select($"userId", $"zip_1._1".alias("A"),$"zip_1._2".alias("B"),$"zip_2._1".alias("C"),$"zip_2._2".alias("D")).show()
+------+---+---+---+---+
|userId|  A|  B|  C|  D|
+------+---+---+---+---+
|     1|  A|  0|  1|  0|
|     1|  A|  0|  2|  0|
|     1|  A|  0|  9|  0|
|     1|  B|  2|  1|  0|
|     1|  B|  2|  2|  0|
|     1|  B|  2|  9|  0|
|     1|  C|  5|  1|  0|
|     1|  C|  5|  2|  0|
|     1|  C|  5|  9|  0|
|     2|  X|  1|  9|  1|
|     2|  X|  1|  0|  1|
|     2|  X|  1|  6|  1|
|     2|  Y| 20|  9|  1|
|     2|  Y| 20|  0|  1|
|     2|  Y| 20|  6|  1|
|     2|  Z|  5|  9|  1|
|     2|  Z|  5|  0|  1|
|     2|  Z|  5|  6|  1|
+------+---+---+---+---+
scala>

Some reference used here

https://intellipaat.com/community/17050/explode-transpose-multiple-columns-in-spark-sql-table

1
  • try my answer, should work in versions lower to 2.4, since it doesnt need arrays_zip expression Commented Jul 8, 2020 at 22:42

2 Answers 2

3

Something down the line of combining posexplode and expr could work.

if we do the following:

df.select(
  col("userId"),
  posexplode("varA"),
  col("varB"),
  col("varC")
).withColumn(
  "varB", 
  expr("varB[pos]")
).withColumn(
  "varC", 
  expr("varC[pos]")
)

I am writing this from memory so I am not 100% sure. I will run a test later and update with Edit if I verify.

EDIT

Above expression works except one minor correct is needed. Updated expression -

df.select(col("userId"),posexplode(col("varA")),col("varB"),col("varC"), col("varD")).withColumn("varB",expr("varB[pos]")).withColumn("varC",expr("varC[pos]")).withColumn("varD",expr("varD[pos]")).show()

Ouput -

+------+---+---+----+----+----+
|userId|pos|col|varB|varC|varD|
+------+---+---+----+----+----+
|     1|  0|  A|   0|   1|   0|
|     1|  1|  B|   2|   2|   0|
|     1|  2|  C|   5|   9|   0|
|     2|  0|  X|   1|   9|   1|
|     2|  1|  Y|  20|   0|   1|
|     2|  2|  Z|   5|   6|   1|
+------+---+---+----+----+----+
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks Shantanu & Milos, posexplode instead of explode works perfectly.
2

You don't need udfs, it could be achieved using spark sql arrays_zip and then explode:

 df.select('userId,explode(arrays_zip('varA,'varB,'varC,'varD)))
   .select("userId","col.varA","col.varB","col.varC","col.varD")
   .show

output:

+------+----+----+----+----+
|userId|varA|varB|varC|varD|
+------+----+----+----+----+
|     1|   A|   0|   1|   0|
|     1|   B|   2|   2|   0|
|     1|   C|   5|   9|   0|
|     1|   X|   1|   9|   1|
|     1|   Y|  20|   0|   1|
|     1|   Z|   5|   6|   1|
+------+----+----+----+----+

2 Comments

Thanks Chlebek, I think arrays_zip is in Spark >= 2.4, I am using version 2.3.1 :(
I've found arrays_zip could be replaced with udf stackoverflow.com/questions/61503929/…

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.