0

I have a Dataset with Schema as below

 root
 |-- collectorId: string (nullable = true)
 |-- generatedAt: long (nullable = true)
 |-- managedNeId: string (nullable = true)
 |-- neAlert: struct (nullable = true)
 |    |-- advisory: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- equipmentType: string (nullable = true)
 |    |    |    |-- headlineName: string (nullable = true)
 |    |-- fieldNotice: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- caveat: string (nullable = true)
 |    |    |    |-- distributionCode: string (nullable = true)
 |    |-- hwEoX: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- bulletinName: string (nullable = true)
 |    |    |    |-- equipmentType: string (nullable = true)
 |    |-- swEoX: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- bulletinHeadline: string (nullable = true)
 |    |    |    |-- equipmentType: string (nullable = true)
 |-- partyId: string (nullable = true)
 |-- recordType: string (nullable = true)
 |-- sourceNeId: string (nullable = true)
 |-- sourcePartyId: string (nullable = true)
 |-- sourceSubPartyId: string (nullable = true)
 |-- wfid: string (nullable = true)

I want to get the fields inside the "element". In order to do this I have done an explode on the array to flatten this.

Dataset<Row> alert = spark.read().option("multiLine", true).option("mode", "PERMISSIVE").json("C:\\Users\\LearningAndDevelopment\\\\merge\\data1\\sample.json");

Seq<String> droppedColumns = scala.collection.JavaConversions.asScalaBuffer(Arrays.asList("neAlert"));

Dataset<Row> alertjson = alert.withColumn("exploded_advisory", explode(col("neAlert.advisory"))).withColumn("exploded_fn", explode(col("neAlert.fieldNotice"))).withColumn("exploded_swEoX", explode(col("neAlert.swEoX"))).withColumn("exploded_hwEox", explode(col("neAlert.hwEoX"))).drop(droppedColumns);

alertjson.printSchema();

I got the final JSON as below

root
 |-- collectorId: string (nullable = true)
 |-- generatedAt: long (nullable = true)
 |-- managedNeId: string (nullable = true)
 |-- partyId: string (nullable = true)
 |-- recordType: string (nullable = true)
 |-- sourceNeId: string (nullable = true)
 |-- sourcePartyId: string (nullable = true)
 |-- sourceSubPartyId: string (nullable = true)
 |-- wfid: string (nullable = true)
 |-- exploded_advisory: struct (nullable = true)
 |    |-- equipmentType: string (nullable = true)
 |    |-- headlineName: string (nullable = true)
 |-- exploded_fn: struct (nullable = true)
 |    |-- caveat: string (nullable = true)
 |    |-- distributionCode: string (nullable = true)
 |-- exploded_swEoX: struct (nullable = true)
 |    |-- bulletinHeadline: string (nullable = true)
 |    |-- equipmentType: string (nullable = true)
 |-- exploded_hwEox: struct (nullable = true)
 |    |-- bulletinName: string (nullable = true)
 |    |-- equipmentType: string (nullable = true)

But, the above method created all duplicate records flattened with data in the first element of each JSON array. Each array can have so many elements. How can i flatten the JSON arrays without loosing the data integrity.

4
  • do all arrays have the same length? Commented Apr 14, 2018 at 9:05
  • not necessarily. Commented Apr 14, 2018 at 9:35
  • what should happen in that case? should be short array be filled with nulls to match the longest size? Commented Apr 14, 2018 at 9:38
  • yep that helps. Commented Apr 14, 2018 at 9:46

1 Answer 1

1

You can select the nested json with . dot operator first and use explode for each nested field.

Dataset<Row> alertjson = alert
    .withColumn("exploded_advisory", explode(col("neAlert.advisory")))
    .withColumn("exploded_fn", explode(col("neAlert.fieldNotice")))
    .withColumn("exploded_swEoX", explode(col("neAlert.swEoX")))
    .withColumn("exploded_hwEox", explode(col("neAlert.hwEoX")));

If you want each field explode as individual then you have to explode separately which created multiple dataframes

// for advisory
Dataset<Row> alertjson = alert
    .withColumn("exploded_advisory", explode(col("neAlert.advisory")))

DataSet<Row> fieldNorice = alert
    .withColumn("exploded_fn", explode(col("neAlert.fieldNotice")))

Drop the unrequired columns and should work.

Sign up to request clarification or add additional context in comments.

1 Comment

Worked like a charm!!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.