0

I have some JSON that is in the following format:

{"items": ["234", "454", "434", "534"], "time": "1574290618029", "id": "A1", "user": "Bob"}
{"items": ["432", "123", "765"], "time": "1574200618021", "id": "B1", "user": "Tim"}
{"items": ["437"], "time": "1274600618121", "id": "B1", "user": "Joe"}

Each JSON is brought into a dataframe using

spark.read.json(path)

and looping through to union them all into a single dataframe.

df.show()

shows something like this:

|items| id| time| user|
|["234", "454", "434", "534"] | "1574290618029" | "A1" | "Bob"|
|["432", "123", "765"] | "1574200618021" | "B1" | "Tim"|
|["437"] | "1274600618121" | "B1" | "Joe"|

Doing a df.explode(df.id, df.time, df.user, explode(df.items)).show() results in something very close, but not quite what I'm looking for.

|id| time| user| col|
|A1| 1574290618029| Bob| 234|
|A1| 1574290618029| Bob| 454|
|A1| 1574290618029| Bob| 434|
|A1| 1574290618029| Bob| 534|
|B1| 1574200618021| Tim| 432|
|B1| 1574200618021| Tim| 123|
|B1| 1574200618021| Tim| 765|
|B1| 1274600618121| Joe| 437|

What I am actually needing is the data to be in a format like this:

|id| time| user| item_num| col|
|A1| 1574290618029| Bob| item1| 234|
|A1| 1574290618029| Bob| item2| 454|
|A1| 1574290618029| Bob| item3| 434|
|A1| 1574290618029| Bob| item4| 534|
|B1| 1574200618021| Tim| item1| 432|
|B1| 1574200618021| Tim| item2| 123|
|B1| 1574200618021| Tim| item3| 765|
|B1| 1574200618021| Tim| item4| NA|
|B1| 1274600618121| Joe| item1| 437|
|B1| 1274600618121| Joe| item2| NA|
|B1| 1274600618121| Joe| item3| NA|
|B1| 1274600618121| Joe| item4| NA|

Is there a simple way to accomplish this utilizing explode that I'm not seeing? I'm also very new to spark coding so please excuse me if there is some very obvious answer...

2
  • Will the array always contains a maximum of 4 items? Commented Dec 4, 2021 at 8:56
  • No, the array does not have a specified maximum. Commented Dec 5, 2021 at 17:42

1 Answer 1

1

A naive explode won't work in this case since you need to pad the array before exploding it to get the NA values.

To achieve this,

  1. First we need to identify the maximum size of the array
  2. Compute a sequence from 1 to maximum array size
  3. array_zip sequence with items to get a map with array index as key and value as item. This ensures that for array index where value is not present a null value is added to the map
  4. Explode the map from step 4
  5. Extract item_num and col for each row to get the desired output

Working Example


from pyspark.sql.functions import *

data = [{"items": ["234", "454", "434", "534"], "time": "1574290618029", "id": "A1", "user": "Bob"},
{"items": ["432", "123", "765"], "time": "1574200618021", "id": "B1", "user": "Tim"},
{"items": ["437"], "time": "1274600618121", "id": "B1", "user": "Joe"},
{ "items": [], "time": "1274600618121", "id": "C1", "user": "EmptyUser"},
{"items": None, "time": "1274600618121", "id": "D1", "user": "NullUser"},]

df = spark.createDataFrame(data)

max_number_of_items = df.selectExpr('max(size(items)) as max_items').take(1)[0].max_items

padded_array_df = df.withColumn("items", arrays_zip(sequence(lit(1), lit(max_number_of_items)).alias("index"), 
                                                    coalesce(col("items"), array([])).alias("items")))

padded_array_df.select("id", "time", "user", explode("items").alias("items"))\
               .select("id", "time", "user", concat(lit("item"), col("items").getField("index")).alias("item_num"), col("items").getField("items").alias("col"))\
               .show()

Output

+---+-------------+---------+--------+----+
| id|         time|     user|item_num| col|
+---+-------------+---------+--------+----+
| A1|1574290618029|      Bob|   item1| 234|
| A1|1574290618029|      Bob|   item2| 454|
| A1|1574290618029|      Bob|   item3| 434|
| A1|1574290618029|      Bob|   item4| 534|
| B1|1574200618021|      Tim|   item1| 432|
| B1|1574200618021|      Tim|   item2| 123|
| B1|1574200618021|      Tim|   item3| 765|
| B1|1574200618021|      Tim|   item4|null|
| B1|1274600618121|      Joe|   item1| 437|
| B1|1274600618121|      Joe|   item2|null|
| B1|1274600618121|      Joe|   item3|null|
| B1|1274600618121|      Joe|   item4|null|
| C1|1274600618121|EmptyUser|   item1|null|
| C1|1274600618121|EmptyUser|   item2|null|
| C1|1274600618121|EmptyUser|   item3|null|
| C1|1274600618121|EmptyUser|   item4|null|
| D1|1274600618121| NullUser|   item1|null|
| D1|1274600618121| NullUser|   item2|null|
| D1|1274600618121| NullUser|   item3|null|
| D1|1274600618121| NullUser|   item4|null|
+---+-------------+---------+--------+----+
Sign up to request clarification or add additional context in comments.

4 Comments

Not really sure why, but when I attempt at replicating your example, I keep getting "AnalysisException: 'No such field index in 0, items'"
I suspect you attempt to make the logic I had null-safe or empty array safe and that introduced a column naming mismatch. I have updated the answer for null and empty arrays. If the issue persists, will be needing debugging logs to support further.
Thank you! I think I figured it out. It was probably a combination of empty array handling as well as in this line: .select("id", "time", "user", concat(lit("item"), col("items").getField("index")).alias("item_num"), col("items").getField("items").alias("col")) - The .getField calls should be getItem calls instead right?
@paytur arrays_zip returns a struct and hence getField is used to access the contents of the struct

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.