1

I am reading Kafka through Spark Structured streaming. The input Kafka message is of the below JSON format:

[
  {
    "customer": "Jim",
    "sex": "male",
    "country": "US"  
  },
  {
    "customer": "Pam",
    "sex": "female",
    "country": "US"
  } 
] 

I have the define the schema like below to parse it:

val schemaAsJson = ArrayType(StructType(Seq(
      StructField("customer",StringType,true),
      StructField("sex",StringType,true),
      StructField("country",StringType,true))),true) 

My code looks like this,

df.select(from_json($"col", schemaAsJson) as "json")
  .select("json.customer","json.sex","json.country")

The current output looks like this,

+--------------+----------------+----------------+
|      customer|             sex|country         |
+--------------+----------------+----------------+
|    [Jim, Pam]|  [male, female]|        [US, US]|
+--------------+----------------+----------------+

Expected output:

+--------------+----------------+----------------+
|      customer|             sex|         country|
+--------------+----------------+----------------+
|           Jim|            male|              US|
|           Pam|          female|              US|
+--------------+----------------+----------------+

How do I split array of structs into individual rows as above? Can someone please help?

1 Answer 1

2

You need explode column before selecting.

df.select(explode_outer(from_json($"value", schemaAsJson)) as "json")
.select("json.customer","json.sex","json.country").show()
Sign up to request clarification or add additional context in comments.

5 Comments

I tried it. I am getting this error: Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'json.customer' given input columns: [col];
Updated the answer, check now
Sorry, that's typo
Thank you very much. It works as expected! My Actual data is little bit more complex in nested structure. Like within the parent struct I will have another array of struct like "previous employment": [ {"emp1Details":""}, {"emp2Details":""} ]. I am yet to try you solution on that. But will the explode_outer still work for them?
For all array columns, you can use explode function.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.