8

I have a dataframe in PySpark with 3 columns - json, date and object_id:

-----------------------------------------------------------------------------------------
|json                                                              |date      |object_id|
-----------------------------------------------------------------------------------------
|{'a':{'b':0,'c':{'50':0.005,'60':0,'100':0},'d':0.01,'e':0,'f':2}}|2020-08-01|xyz123   |
|{'a':{'m':0,'n':{'50':0.005,'60':0,'100':0},'d':0.01,'e':0,'f':2}}|2020-08-02|xyz123   |
|{'g':{'h':0,'j':{'50':0.005,'80':0,'100':0},'d':0.02}}            |2020-08-03|xyz123   |
-----------------------------------------------------------------------------------------

Now I have a list of variables: [a.c.60, a.n.60, a.d, g.h]. I need to extract only these variables from the json column of above mentioned dataframe and to add those variables as columns in the dataframe with their respective values.

So in the end, the dataframe should look like:

-------------------------------------------------------------------------------------------------------
|json                                                    |date      |object_id|a.c.60|a.n.60|a.d |g.h|
-------------------------------------------------------------------------------------------------------
|{'a':{'b':0,'c':{'50':0.005,'60':0,'100':0},'d':0.01,...|2020-08-01|xyz123   |0     |null  |0.01|null|
|{'a':{'m':0,'n':{'50':0.005,'60':0,'100':0},'d':0.01,...|2020-08-02|xyz123   |null  |0     |0.01|null|
|{'g':{'h':0,'j':{'k':0.005,'':0,'100':0},'d':0.01}}     |2020-08-03|xyz123   |null  |null  |0.02|0   |
-------------------------------------------------------------------------------------------------------

Please help to get this result dataframe. The main problem I am facing is due to no fixed structure for the incoming json data. The json data can be anything in nested form but I need to extract only the given four variables. I have achieved this in Pandas by flattening out the json string and then to extract the 4 variables but in Spark it is getting difficult.

1 Answer 1

15

There are 2 ways to do it:

  1. use the get_json_object function, like this:
import pyspark.sql.functions as F

df = spark.createDataFrame(['{"a":{"b":0,"c":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
                            '{"a":{"m":0,"n":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
                            '{"g":{"h":0,"j":{"50":0.005,"80":0,"100":0},"d":0.02}}'],
                           StringType())

df3 = df.select(F.get_json_object(F.col("value"), "$.a.c.60").alias("a_c_60"),
                F.get_json_object(F.col("value"), "$.a.n.60").alias("a_n_60"),
                F.get_json_object(F.col("value"), "$.a.d").alias("a_d"),
                F.get_json_object(F.col("value"), "$.g.h").alias("g_h"))

will give:

>>> df3.show()
+------+------+----+----+
|a_c_60|a_n_60| a_d| g_h|
+------+------+----+----+
|     0|  null|0.01|null|
|  null|     0|0.01|null|
|  null|  null|null|   0|
+------+------+----+----+
  1. Declare schema explicitly (only necessary fields), convert JSON into structus using the from_json function with the schema, and then extract individual values from structures - this could be more performant than JSON Path:
from pyspark.sql.types import *
import pyspark.sql.functions as F

aSchema = StructType([
    StructField("c", StructType([
        StructField("60", DoubleType(), True)
    ]), True),
    StructField("n", StructType([
        StructField("60", DoubleType(), True)
    ]), True),
    StructField("d", DoubleType(), True),
])
gSchema = StructType([
    StructField("h", DoubleType(), True)
])

schema = StructType([
    StructField("a", aSchema, True),
    StructField("g", gSchema, True)
])

df = spark.createDataFrame(['{"a":{"b":0,"c":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
                            '{"a":{"m":0,"n":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
                            '{"g":{"h":0,"j":{"50":0.005,"80":0,"100":0},"d":0.02}}'],
                           StringType())

df2 = df.select(F.from_json("value", schema=schema).alias('data')).select('data.*')
df2.select(df2.a.c['60'], df2.a.n['60'], df2.a.d, df2.g.h).show()

will give

+------+------+----+----+
|a.c.60|a.n.60| a.d| g.h|
+------+------+----+----+
|   0.0|  null|0.01|null|
|  null|   0.0|0.01|null|
|  null|  null|null| 0.0|
+------+------+----+----+
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks Alex Ott, the first method worked for me. I did not try the second one though. Thanks a lot for your help :)
Hi @Alex Ott What will be code if the dataframe is like (List of dictionaries): df = spark.createDataFrame(['[{"a":{"b":0,"c":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}]', '[{"a":{"m":0,"n":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}]', '[{"g":{"h":0,"j":{"50":0.005,"80":0,"100":0},"d":0.02}}]'], StringType())

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.