1

I have a streaming JSON data, whose structure can be described with the case class below

case class Hello(A: String, B: Array[Map[String, String]])

Sample data for the same is as below

|  A    | B                                        |
|-------|------------------------------------------|
|  ABC  |  [{C:1, D:1}, {C:2, D:4}]                | 
|  XYZ  |  [{C:3, D :6}, {C:9, D:11}, {C:5, D:12}] |

I want to transform it to

|   A   |  C  |  D   |
|-------|-----|------|
|  ABC  |  1  |  1   |
|  ABC  |  2  |  4   |
|  XYZ  |  3  |  6   |
|  XYZ  |  9  |  11  |
|  XYZ  |  5  |  12  | 

Any help will be appreciated.

0

2 Answers 2

1

As the question went through an evolution I leave the original answer there and this addresses the final question.

Important point, the input mentioned as follows is now catered for:

val df0 = Seq (
            ("ABC", List(Map("C" -> "1", "D" -> "2"), Map("C" -> "3", "D" -> "4"))),
            ("XYZ", List(Map("C" -> "44", "D" -> "55"), Map("C" -> "188", "D" -> "199"), Map("C" -> "88", "D" -> "99")))
              )
             .toDF("A", "B")

Can also be done like this, but then the script needs to be modified for this, although trivial:

val df0 = Seq (
           ("ABC", List(Map("C" -> "1",  "D" -> "2"))), 
           ("ABC", List(Map("C" -> "44", "D" -> "55"))),
           ("XYZ", List(Map("C" -> "11", "D" -> "22")))
              )
            .toDF("A", "B")

Following on from requested format then:

val df1 = df0.select($"A", explode($"B")).toDF("A", "Bn")

val df2 = df1.withColumn("SeqNum", monotonically_increasing_id()).toDF("A", "Bn", "SeqNum") 

val df3 = df2.select($"A", explode($"Bn"), $"SeqNum").toDF("A", "B", "C", "SeqNum")

val df4 = df3.withColumn("dummy", concat( $"SeqNum", lit("||"), $"A"))

val df5 = df4.select($"dummy", $"B", $"C").groupBy("dummy").pivot("B").agg(first($"C")) 

val df6 = df5.withColumn("A", substring_index(col("dummy"), "||", -1)).drop("dummy")

df6.show(false)

returns:

+---+---+---+
|C  |D  |A  |
+---+---+---+
|3  |4  |ABC|
|1  |2  |ABC|
|88 |99 |XYZ|
|188|199|XYZ|
|44 |55 |XYZ|
+---+---+---+

You may re-sequence columns.

Sign up to request clarification or add additional context in comments.

1 Comment

@thebluephanton Thanks that worked perfectly. Thanks again for your patience.
1

Not sure if the best approach, but in a 2 step process it can be done. Leaving your case class aside, the following:

import org.apache.spark.sql.functions._
//case class ComponentPlacement(A: String, B: Array[Map[String, String]])
val df = Seq (
              ("ABC", List(Map("C" -> "1",  "D" -> "2"))),
              ("XYZ", List(Map("C" -> "11", "D" -> "22")))
             ).toDF("A", "B")

val df2 = df.select($"A", explode($"B")).toDF("A", "Bn")

val df3 = df2.select($"A", explode($"Bn")).toDF("A", "B", "C")

val df4 = df3.select($"A", $"B", $"C").groupBy("A").pivot("B").agg(first($"C"))

returns:

+---+---+---+
|  A|  C|  D|
+---+---+---+
|XYZ| 11| 22|
|ABC|  1|  2|
+---+---+---+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.