0

I am working on processing a CDC data recieved via kafka tables, and load them into databricks delta tables. I am able to get it working all, except for a nested JSON string which is not getting loaded when using from_json, spark.read.json.

When I try to fetch schema of the json from level 1, using "spark.read.json(df.rdd.map(lambda row: row.value)).schema", the column INPUT_DATA is considered as string loaded as a string object. Am giving sample json string, the code that I tried, and the expected results.

I have many topics to process and each topic will have different schema, so I would like to process dynamically, and do not prefer to store the schemas, since the schema may change over time, and i would like to have my code handle the changes automatically.

Appreciate any help as I have spent whole day to figure out, and still trying. Thanks in advance.

Sample Json with nested tree:

after = {
    "id_transaction": "121",
    "product_id": 25,
    "transaction_dt": 1662076800000000,
    "creation_date": 1662112153959000,
    "product_account": "40012",
    "input_data": "{\"amount\":[{\"type\":\"CASH\",\"amount\":1000.00}],\"currency\":\"USD\",\"coreData\":{\"CustId\":11021,\"Cust_Currency\":\"USD\",\"custCategory\":\"Premium\"},\"context\":{\"authRequired\":false,\"waitForConfirmation\":false,\"productAccount\":\"CA12001\"},\"brandId\":\"TOYO-2201\",\"dealerId\":\"1\",\"operationInfo\":{\"trans_Id\":\"3ED23-89DKS-001AA-2321\",\"transactionDate\":1613420860087},\"ip_address\":null,\"last_executed_step\":\"PURCHASE_ORDER_CREATED\",\"last_result\":\"OK\",\"output_dataholder\":\"{\"DISCOUNT_AMOUNT\":\"0\",\"BONUS_AMOUNT_APPLIED\":\"10000\"}",
    "dealer_id": 1,
    "dealer_currency": "USD",
    "Cust_id": 11021,
    "process_status": "IN_PROGRESS",
    "tot_amount": 10000,
    "validation_result_code": "OK_SAVE_AND_PROCESS",
    "operation": "Create",
    "timestamp_ms": 1675673484042
}

I have created following script to get all the columns of the json structure:

import json
# table_column_schema = {}
json_keys = {}
child_members = []
table_column_schema = {}
column_schema = []
dbname = "mydb"
tbl_name = "tbl_name"

def get_table_keys(dbname):
    table_values_extracted = "select value from {mydb}.{tbl_name} limit 1"
    cmd_key_pair_data = spark.sql(table_values_extracted)
    jsonkeys=cmd_key_pair_data.collect()[0][0]
    json_keys = json.loads(jsonkeys)
    column_names_as_keys = json_keys["after"].keys()
    value_column_data = json_keys["after"].values()
    column_schema = list(column_names_as_keys)
    for i in value_column_data:
        if ("{" in str(i) and "}" in str(i)):
            a = json.loads(i)
            for i2 in a.values():
                if (str(i2).startswith("{") and str(i2).endswith('}')):
                    column_schema = column_schema + list(i2.keys())
    table_column_schema['temp_table1'] = column_schema
    return 0
get_table_keys("dbname")

The following code is used to process the json and create a dataframe with all nested jsons as the columns:

from pyspark.sql.functions import from_json, to_json, col
from pyspark.sql.types import StructType, StructField, StringType, LongType, MapType
import time

dbname = "mydb"
tbl_name = "tbl_name"
start = time.time()

df = spark.sql(f'select value from {mydb}.{tbl_name} limit 2')
tbl_columns = table_column_schema[tbl_name]

data = []
for i in tbl_columns:
    if i == 'input_data':
#         print('FOUND !!!!')
        data.append(StructField(f'{i}', MapType(StringType(),StringType()), True))
    else:
        data.append(StructField(f'{i}', StringType(), True))

schema2 = spark.read.json(df.rdd.map(lambda row: row.value)).schema
print(type(schema2))

df2 = df.withColumn("value", from_json("value", schema2)).select(col('value.after.*'), col('value.op'))

Note: The VALUE is a column in my delta table (bronze layer)

Current dataframe output: enter image description here

Expected dataframe output: enter image description here

7
  • Have you tried removing all the \ characters from the string before trying to parse it with from_json ? That may be what's blocking it since it isn't valid JSON. Commented Feb 10, 2023 at 8:42
  • Yes I have tried. When I print datatype for intput_data, its giving as String, thanks Commented Feb 10, 2023 at 9:04
  • 1
    I had faced similar situation but you can try with .withColumn('temp',explode(split('input_data',','))) Commented Feb 10, 2023 at 10:32
  • \"brandId\":TOYO-2201 is your json string for the input_data valid? Commented Feb 11, 2023 at 10:26
  • Does this answer your question? Pyspark: Parse a column of json strings Commented Feb 11, 2023 at 11:36

1 Answer 1

1

You can use rdd to get the schema and from_json to read the value as json.

schema = spark.read.json(df.rdd.map(lambda r: r.input_data)).schema
df = df.withColumn('input_data', f.from_json('input_data', schema))

new_cols = df.columns + df.select('input_data.*').columns
df = df.select('*', 'input_data.*').toDF(*new_cols).drop('input_data')

df.show(truncate=False)

+-------+----------------+---------------+---------+--------------+---------+--------------+---------------+----------+-------------+----------+----------------+----------------------+----------------+---------+-----------------------+---------------------+--------+--------+----------+----------------------+-----------+---------------------------------------+-----------------+
|Cust_id|creation_date   |dealer_currency|dealer_id|id_transaction|operation|process_status|product_account|product_id|timestamp_ms |tot_amount|transaction_dt  |validation_result_code|amount          |brandId  |context                |coreData             |currency|dealerId|ip_address|last_executed_step    |last_result|operationInfo                          |output_dataholder|
+-------+----------------+---------------+---------+--------------+---------+--------------+---------------+----------+-------------+----------+----------------+----------------------+----------------+---------+-----------------------+---------------------+--------+--------+----------+----------------------+-----------+---------------------------------------+-----------------+
|11021  |1662112153959000|USD            |1        |121           |Create   |IN_PROGRESS   |40012          |25        |1675673484042|10000     |1662076800000000|OK_SAVE_AND_PROCESS   |[{1000.0, CASH}]|TOYO-2201|{false, CA12001, false}|{11021, USD, Premium}|USD     |1       |null      |PURCHASE_ORDER_CREATED|OK         |{3ED23-89DKS-001AA-2321, 1613420860087}|{10000, 0}       |
+-------+----------------+---------------+---------+--------------+---------+--------------+---------------+----------+-------------+----------+----------------+----------------------+----------------+---------+-----------------------+---------------------+--------+--------+----------+----------------------+-----------+---------------------------------------+-----------------+
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you Sai saran, Lamanus, will try both the suggestions today and keep you posted.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.