0

I am trying to read a JSON file and parse 'jsonString' and the underlying fields which includes array into a pyspark dataframe.

Here is the contents of json file.

[{"jsonString": "{\"uid\":\"value1\",\"adUsername\":\"value3\",\"courseCertifications\":[{\"uid\":\"value2\",\"courseType\":\"TRAINING\"},{\"uid\":\"TEST\",\"courseType\":\"TRAINING\"}],\"modifiedBy\":\"value4\"}","transactionId": "value5", "tableName": "X"},
 {"jsonString": "{\"uid\":\"value11\",\"adUsername\":\"value13\",\"modifiedBy\":\"value14\"}","transactionId": "value15", "tableName": "X1"},
 {"jsonString": "{\"uid\":\"value21\",\"adUsername\":\"value23\",\"modifiedBy\":\"value24\"}","transactionId": "value25", "tableName": "X2"}]

I am able to parse contents of string 'jsonString' and select required columns using the below logic

df = spark.read.json('path.json',multiLine=True)
df = df.withColumn('courseCertifications', explode(array(get_json_object(df['jsonString'],'$.courseCertifications'))))

Now my end goal is to parse field "courseType" from "courseCertifications" and create one row per instance.

I am using below logic to get "courseType"

df = df.withColumn('new',get_json_object(df.courseCertifications, '$[*].courseType'))

I am able to get the contents of "courseType" but as a string as shown below

[Row(new=u'["TRAINING","TRAINING"]')]

My end goal is to create a dataframe with columns transactionId, jsonString.uid, jsonString.adUsername, jsonString.courseCertifications.uid, jsonString.courseCertifications.courseType

  • I need to retain all the rows and create multiple rows one per array instances of courseCertifications.uid/courseCertifications.courseType.

1 Answer 1

2

An elegant manner to resolve your question is creating the schema of the json string and then parse it using from_json function

import pyspark.sql.functions as f
from pyspark.shell import spark
from pyspark.sql.types import ArrayType, StringType, StructType, StructField

df = spark.read.json('your_path', multiLine=True)
schema = StructType([
    StructField('uid', StringType()),
    StructField('adUsername', StringType()),
    StructField('modifiedBy', StringType()),
    StructField('courseCertifications', ArrayType(
        StructType([
            StructField('uid', StringType()),
            StructField('courseType', StringType())
        ])
    ))
])

df = df \
    .withColumn('tmp', f.from_json(df.jsonString, schema)) \
    .withColumn('adUsername', f.col('tmp').adUsername) \
    .withColumn('uid', f.col('tmp').uid) \
    .withColumn('modifiedBy', f.col('tmp').modifiedBy) \
    .withColumn('tmp', f.explode(f.col('tmp').courseCertifications)) \
    .withColumn('course_uid', f.col('tmp').uid) \
    .withColumn('course_type', f.col('tmp').courseType) \
    .drop('jsonString', 'tmp')
df.show()

Output:

+-------------+------+----------+----------+----------+-----------+
|transactionId|uid   |adUsername|modifiedBy|course_uid|course_type|
+-------------+------+----------+----------+----------+-----------+
|value5       |value1|value3    |value4    |value2    |TRAINING   |
|value5       |value1|value3    |value4    |TEST      |TRAINING   |
+-------------+------+----------+----------+----------+-----------+
Sign up to request clarification or add additional context in comments.

5 Comments

Thanks @Kafels! This works, instead of customized schema I am doing the below to get structure of 'jsonString' as it may contain different columns depending on JSON file. json_schema = spark.read.json(df.rdd.map(lambda row: row.jsonString)).schema df = df.withColumn('jsonString', from_json(df['jsonString'], json_schema)) Please share your thoughts.
Your suggestion code it's very good because isn't necessary map the schema always has a new or removed column from JSON file
Thanks @Kafels, I notice that only records having field "courseCertifications" is retained and all the other records are dropped. My requirement is to fetch all records and populate NULL if "courseCertifications" field is not present for any row.
Edit the JSON in your question and put more values to debug
updated question with more values and also added more info on o/p Dataframe.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.