4

I have an example json data file which has the following structure:

{
    "Header": {
        "Code1": "abc",
        "Code2": "def",
        "Code3": "ghi",
        "Code4": "jkl",
    },
    "TimeSeries": {
        "2020-11-25T03:00:00+00:00": {
            "UnitPrice": 1000,
            "Amount": 10000,

        },
        "2020-11-26T03:00:00+00:00": {
            "UnitPrice": 1000,
            "Amount": 10000,

        }
    }
}

When I parse this into databricks with command:

df = spark.read.json("/FileStore/test.txt") 

I get as output 2 objects: Header and TimeSeries. With the TimeSeries I want to be able to flatten the structure so it has the following schema:

Date
UnitPrice
Amount 

As the date field is a key, I am currently only able to access it via iterating through the column names and then using this in the dot-notation dynamically:

def flatten_json(data):


  columnlist = data.select("TimeSeries.*")
  count = 0 
  for name in data.select("TimeSeries.*"):
    df1 = data.select("Header.*").withColumn(("Timeseries"), lit(columnlist.columns[count])).withColumn("join", lit("a"))
    df2 = data.select("TimeSeries." + columnlist.columns[count] + ".*").withColumn("join", lit("a"))
    if count == 0: 
      df3 = df1.join(df2, on=['join'], how="inner")
    else: 
      df3 = df3.union(df1.join(df2, on=['join'], how="inner"))
    count = count + 1
  return(df3)

This is far from ideal. Does anyone know a better method to create the described dataframe?

1 Answer 1

3

The idea:

  • Step 1: Extract Header and TimeSeries separately.

  • Step 2: For each field in the TimeSeries object, extract the Amount and UnitPrice, together with the name of the field, stuff them into a struct.

  • Step 3: Merge all these structs into an array column, and explode it.

  • Step 4: Extract Timeseries, Amount and UnitPrice from the exploded column.

  • Step 5: Cross join with the Header row.

import pyspark.sql.functions as F

header_df = df.select("Header.*")
timeseries_df = df.select("TimeSeries.*")
fieldNames = enumerate(timeseries_df.schema.fieldNames())
cols = [F.struct(F.lit(name).alias("Timeseries"), col(name).getItem("Amount").alias("Amount"), col(name).getItem("UnitPrice").alias("UnitPrice")).alias("ts_" + str(idx)) for idx, name in fieldNames]
combined = explode(array(cols)).alias("comb")
timeseries = timeseries_df.select(combined).select('comb.Timeseries', 'comb.Amount', 'comb.UnitPrice')
result = header_df.crossJoin(timeseries)
result.show(truncate = False)

Output:

+-----+-----+-----+-----+-------------------------+------+---------+
|Code1|Code2|Code3|Code4|Timeseries               |Amount|UnitPrice|
+-----+-----+-----+-----+-------------------------+------+---------+
|abc  |def  |ghi  |jkl  |2020-11-25T03:00:00+00:00|10000 |1000     |
|abc  |def  |ghi  |jkl  |2020-11-26T03:00:00+00:00|10000 |1000     |
+-----+-----+-----+-----+-------------------------+------+---------+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.