0

I have a kafka producer sending large amounts of data in the format of

{
  '1000': 
    {
       '3': 
        {
           'seq': '1', 
           'state': '2', 
           'CMD': 'XOR' 
        }
    },
 '1001': 
    {
       '5': 
        {
           'seq': '2', 
           'state': '2', 
           'CMD': 'OR' 
        }
    },
 '1003': 
    {
       '5': 
        {
           'seq': '3', 
           'state': '4', 
           'CMD': 'XOR' 
        }
    }
}

.... the data I want is in the final loop: {'seq': '1', 'state': '2', 'CMD': 'XOR'} and the keys in the loops above('1000' and '3') are variable. Please note that the above values are only for example. the original dataset is huge with lots of variable keys. only the keys in the final loop{'seq', 'state', 'CMD'} are constant.

I have tried using the generic formats to read the data but am getting incorrect data since the loops above have variable keys and I am not sure how to define the schema to parse this format of data.

The output I am trying to achieve is a dataframe of the format

seq    state     CMD
----------------------
 1       2       XOR
 2       2        OR
 3       4       XOR
3
  • what is the final outcome ? are you willing to get it in as a dataframe ? more clarification on the expected output will help here Commented Nov 2, 2020 at 6:10
  • @dsk... I've added the expected output. thanks for the comment. Commented Nov 2, 2020 at 6:16
  • Can you please check now if the solution below is something you are looking for - Would appreciate if you could accept and upvote .. :) Commented Nov 2, 2020 at 6:32

1 Answer 1

2

This can be a working soluting for you - use explode() and getItem() as below-

Load the json into a Dataframe Here

a_json={
  '1000': 
    {
       '3': 
        {
           'seq': '1', 
           'state': '2', 
           'CMD': 'XOR' 
        }
    }
}
df = spark.createDataFrame([(a_json)])
df.show(truncate=False)

+-----------------------------------------+
|1000                                     |
+-----------------------------------------+
|[3 -> [CMD -> XOR, state -> 2, seq -> 1]]|
+-----------------------------------------+

Logic Here

df = df.select("*", F.explode("1000").alias("x", "y"))
df = df.withColumn("seq", df.y.getItem("seq")).withColumn("state", df.y.getItem("state")).withColumn("CMD", df.y.getItem("CMD"))
df.show(truncate=False)


 +-----------------------------------------+---+----------------------------------+---+-----+---+
|1000                                     |x  |y                                 |seq|state|CMD|
+-----------------------------------------+---+----------------------------------+---+-----+---+
|[3 -> [CMD -> XOR, state -> 2, seq -> 1]]|3  |[CMD -> XOR, state -> 2, seq -> 1]|1  |2    |XOR|
+-----------------------------------------+---+----------------------------------+---+-----+---+

Updating the Code based on Further Inputs

#Assuming that all the json columns are in a single column, hence making it an array column first.
df = df.withColumn("array_col", F.array("1000", "1001", "1003"))
#Then explode and getItem
df = df.withColumn("explod_col", F.explode("array_col"))
df = df.select("*", F.explode("explod_col").alias("x", "y"))
df_final = df.withColumn("seq", df.y.getItem("seq")).withColumn("state", df.y.getItem("state")).withColumn("CMD", df.y.getItem("CMD"))
df_final.select("seq","state","CMD").show()
|seq|state|CMD|
+---+-----+---+
|  1|    2|XOR|
|  2|    2| OR|
|  3|    4|XOR|
+---+-----+---+
Sign up to request clarification or add additional context in comments.

6 Comments

Hi, the keys are variable for the first two loops, so using F.explode("1000") doesn't solve the problem. I have updated the example in the question.
Also, since I am getting data from a Kafka stream, I am storing the values in data frame column "value". df = df.select("*", F.explode("value").alias("x", "y")) is also giving me an error: cannot resolve 'explode(value)' due to data type mismatch: input to function explode should be array or map type, not string;;
I have solved that , can you please check the updated answer, the idea here is to create an explode column first from the value column - df = df.withColumn("explod_col", F.explode("value")) and again separate the key and value again in two different column like this - df = df.select("*", F.explode("explod_col").alias("x", "y"))
the data set is huge and 1000, 1001, 1003 aren't the only values in it. There are thousands of such keys. the example in the question is only to give an idea of the format of the dataset.
Buddy use df.columns in order to array all the column eventually you can modify or pass a list as your own .. df = df.withColumn("array_col", F.array(df.columns))
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.