0

Using Spark Scala I am trying to extract an array of Struct from parquet. The input is a parquet file. The output is a csv file. The field of the csv can have "multi-values" delimited by "#;". The csv is delimited by ",". What is the best way to accomplish this?

Schema

root
 |-- llamaEvent: struct (nullable = true)
 |    |-- activity: struct (nullable = true)
 |    |    |-- Animal: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- time: string (nullable = true)
 |    |    |    |    |-- status: string (nullable = true)
 |    |    |    |    |-- llamaType: string (nullable = true)

Example Input as json (the input will be parquet)

{
   "llamaEvent":{
      "activity":{
         "Animal":[
            {
               "time":"5-1-2020",
               "status":"Running",
               "llamaType":"red llama"
            },
            {
               "time":"6-2-2020",
               "status":"Sitting",
               "llamaType":"blue llama"
            }
         ]
      }
   }
}

Desired CSV Output

time,status,llamaType
5-1-2020#;6-2-2020,running#;sitting,red llama#;blue llama

Update: Based on some trial and error, I believe a solution like this maybe appropriate depending on use case. This does a "short cut" by grabbing the array item, cast it to string, then parse out extraneous characters, which is good for some use cases.

df.select(col("llamaEvent.activity").getItem("Animal").getItem("time")).cast("String"))

Then you can perform whatever parsing you want after such as regexp_replace

df.withColumn("time", regexp_replace(col("time"),",",";#"))

Several appropriate solutions were also proposed using groupby, explode, aggregate as well.

0

2 Answers 2

2

One approach would be to flatten the array of animal attribute structs using SQL function inline and aggregate the attributes via collect_list, followed by concatenating with the specific delimiter.

Given a DataFrame df similar to your provided schema, the following transformations will generate the wanted dataset, dfResult:

val attribCSVs = List("time", "status", "llamaType").map(
    c => concat_ws("#;", collect_list(c)).as(c)
  )

val dfResult = df.
  select($"eventId", expr("inline(llamaEvent.activity.Animal)")).
  groupBy("eventId").agg(attribCSVs.head, attribCSVs.tail: _*)

Note that an event identifying column eventId is added to the sample json data for the necessary groupBy aggregation.

Let's assemble some sample data:

val jsons = Seq(
    """{
        "eventId": 1,
        "llamaEvent":{
            "activity":{
                "Animal":[
                    {
                        "time":"5-1-2020",
                        "status":"Running",
                        "llamaType":"red llama"
                    },
                    {
                        "time":"6-2-2020",
                        "status":"Sitting",
                        "llamaType":"blue llama"
                    }
                ]
            }
        }
    }""",
    """{
        "eventId": 2,
        "llamaEvent":{
            "activity":{
                "Animal":[
                    {
                        "time":"5-2-2020",
                        "status":"Running",
                        "llamaType":"red llama"
                    },
                    {
                        "time":"6-3-2020",
                        "status":"Standing",
                        "llamaType":"blue llama"
                    }
                ]
            }
        }
    }"""
)

val df = spark.read.option("multiLine", true).json(jsons.toDS) 

df.show(false)
+-------+----------------------------------------------------------------------+
|eventId|llamaEvent                                                            |
+-------+----------------------------------------------------------------------+
|1      |{{[{red llama, Running, 5-1-2020}, {blue llama, Sitting, 6-2-2020}]}} |
|2      |{{[{red llama, Running, 5-2-2020}, {blue llama, Standing, 6-3-2020}]}}|
+-------+----------------------------------------------------------------------+

Applying the above transformations, dfResult should look like below:

dfResult.show(false)
+-------+------------------+-----------------+---------------------+
|eventId|time              |status           |llamaType            |
+-------+------------------+-----------------+---------------------+
|1      |5-1-2020#;6-2-2020|Running#;Sitting |red llama#;blue llama|
|2      |5-2-2020#;6-3-2020|Running#;Standing|red llama#;blue llama|
+-------+------------------+-----------------+---------------------+

Writing dfResult to a CSV file:

dfResult.write.option("header", true).csv("/path/to/csv")

/*
eventId,time,status,llamaType
1,5-1-2020#;6-2-2020,Running#;Sitting,red llama#;blue llama
2,5-2-2020#;6-3-2020,Running#;Standing,red llama#;blue llama
*/
Sign up to request clarification or add additional context in comments.

Comments

2

This will be a working solution for you

df = spark.createDataFrame([(str([a_json]))],T.StringType())

df = df.withColumn('col', F.from_json("value", T.ArrayType(T.StringType())))
df = df.withColumn("col", F.explode("col"))


df = df.withColumn("col", F.from_json("col", T.MapType(T.StringType(), T.StringType())))
df = df.withColumn("llamaEvent", df.col.getItem("llamaEvent"))

df = df.withColumn("llamaEvent", F.from_json("llamaEvent", T.MapType(T.StringType(), T.StringType())))
df = df.select("*", F.explode("llamaEvent").alias("x","y"))

df = df.withColumn("Activity", F.from_json("y", T.MapType(T.StringType(), T.StringType())))
df = df.select("*", F.explode("Activity").alias("x","yy"))

df = df.withColumn("final_col", F.from_json("yy", T.ArrayType(T.StringType())))
df = df.withColumn("final_col", F.explode("final_col"))
df = df.withColumn("final_col", F.from_json("final_col", T.MapType(T.StringType(), T.StringType())))
df = df.withColumn("time", df.final_col.getItem("time")).withColumn("status", df.final_col.getItem("status")).withColumn("llamaType", df.final_col.getItem("llamaType")).withColumn("agg_col", F.lit("1"))

df_grp = df.groupby("agg_col").agg(F.concat_ws("#;", F.collect_list(df.time)).alias("time"), F.concat_ws("#;", F.collect_list(df.status)).alias("status"), F.concat_ws("#;", F.collect_list(df.llamaType)).alias("llamaType"))


display(df)
+--------------------+--------------------+--------------------+--------+--------------------+--------------------+------+--------------------+--------------------+--------+-------+----------+-------+
|               value|                 col|          llamaEvent|       x|                   y|            Activity|     x|                  yy|           final_col|    time| status| llamaType|agg_col|
+--------------------+--------------------+--------------------+--------+--------------------+--------------------+------+--------------------+--------------------+--------+-------+----------+-------+
|[{'llamaEvent': {...|[llamaEvent -> {"...|[activity -> {"An...|activity|{"Animal":[{"time...|[Animal -> [{"tim...|Animal|[{"time":"5-1-202...|[time -> 5-1-2020...|5-1-2020|Running| red llama|      1|
|[{'llamaEvent': {...|[llamaEvent -> {"...|[activity -> {"An...|activity|{"Animal":[{"time...|[Animal -> [{"tim...|Animal|[{"time":"5-1-202...|[time -> 6-2-2020...|6-2-2020|Sitting|blue llama|      1|
+--------------------+--------------------+--------------------+--------+--------------------+--------------------+------+--------------------+--------------------+--------+-------+----------+-------+
df_grp.show(truncate=False)
+-------+------------------+----------------+---------------------+
|agg_col|time              |status          |llamaType            |
+-------+------------------+----------------+---------------------+
|1      |5-1-2020#;6-2-2020|Running#;Sitting|red llama#;blue llama|
+-------+------------------+----------------+---------------------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.