2

For example, I have one full set of nested JSON, I need to update this JSON with the latest values from another nested JSON.

Can anyone help me with this?

I want to implement this in Pyspark.

Full Set Json look like this:

{
    "email": "[email protected]", 
    "firstName": "name01", 
    "id": 6304,
    "surname": "Optional",
    "layer01": {
        "key1": "value1", 
        "key2": "value2", 
        "key3": "value3", 
        "key4": "value4", 
        "layer02": {
            "key1": "value1", 
            "key2": "value2"
        }, 
        "layer03": [
            {
                "inner_key01": "inner value01"
            }, 
            {
                "inner_key02": "inner_value02"
            }
        ]
    }, 
    "surname": "Required only$uid"
}

LatestJson look like this:

{
    "email": "[email protected]", 
    "firstName": "name01", 
    "surname": "Optional",
    "id": 6304,
    "layer01": {
        "key1": "value1", 
        "key2": "value2", 
        "key3": "value3", 
        "key4": "value4", 
        "layer02": {
            "key1": "value1_changedData", 
            "key2": "value2"
        }, 
        "layer03": [
            {
                "inner_key01": "inner value01"
            }, 
            {
                "inner_key02": "inner_value02"
            }
        ]
    }, 
    "surname": "Required only$uid"
}

In above for id=6304 we have received updates for the layer01.layer02.key1 and emailaddress fileds.

So I need to update these values to full JSON, Kindly help me with this.

2
  • Do you always want to get for each id the values from the latest JSON version regardless of null updates for example? Commented Jan 27, 2021 at 11:09
  • @blackbishop , ya based on id we need to update the json, Can help me with sample code in python Commented Jan 29, 2021 at 10:10

1 Answer 1

1

You can load the 2 JSON files into Spark data frames and do a left_join to get updates from the latest JSON data :

from pyspark.sql import functions as F

full_json_df = spark.read.json(full_json_path, multiLine=True)
latest_json_df = spark.read.json(latest_json_path, multiLine=True)

updated_df = full_json_df.alias("full").join(
    latest_json_df.alias("latest"),
    F.col("full.id") == F.col("latest.id"),
    "left"
).select(
    F.col("full.id"),
    *[
        F.when(F.col("latest.id").isNotNull(), F.col(f"latest.{c}")).otherwise(F.col(f"full.{c}")).alias(c)
        for c in full_json_df.columns if c != 'id'
    ]
)

updated_df.show(truncate=False)

#+----+------------+---------+-----------------------------------------------------------------------------------------------------+--------+
#|id  |email       |firstName|layer01                                                                                              |surname |
#+----+------------+---------+-----------------------------------------------------------------------------------------------------+--------+
#|6304|[email protected]|name01   |[value1, value2, value3, value4, [value1_changedData, value2], [[inner value01,], [, inner_value02]]]|Optional|
#+----+------------+---------+-----------------------------------------------------------------------------------------------------+--------+

Update:

If the schema changes between full and latest JSONs, you can load the 2 files into the same data frame (this way the schemas are being merged) and then deduplicate per id:

from pyspark.sql import Window
from pyspark.sql import functions as F

merged_json_df = spark.read.json("/path/to/{full_json.json,latest_json.json}", multiLine=True)

# order priority: latest file then full
w = Window.partitionBy(F.col("id")).orderBy(F.when(F.input_file_name().like('%latest%'), 0).otherwise(1))

updated_df = merged_json_df.withColumn("rn", F.row_number().over(w))\
    .filter("rn = 1")\
    .drop("rn")

updated_df.show(truncate=False)
Sign up to request clarification or add additional context in comments.

10 Comments

Thank you for the response, One more issue I am facing here, in the latest Nested json I will not get the complete json, I will get only partial json, If you apply above code I am getting error like (org.apache.spark.sql.AnalysisException: cannot resolve 'latest.Document' given input columns:), Can you help me on this
I am geeting latest Json like this { "email": "[email protected]", "firstName": "name01", "surname": "Optional", "id": 6304, "layer01": { "key1": "value1", "key2": "value2", "key3": "value3", "key4": "value4", "layer02": { "key1": "value1_changedData", "key2": "value2" },}
If I am not getting some column in latest json, is there any possibility to pull that column from full json, By the above code full and latest have to be same no of columns(I mean same schema)
The above code is failing, If the schema is miss match But for me I will not getting same schema for the latest json, Can you please help me on this
@Pradeep Ok. Do you have any date in the data that identifies latest from full ? Or maybe using the file names ?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.