0

I have a dataframe like this:

col1    | col2        |
-----------------------
test:1  | {"test1:subtest1":[{"Id":"17","cName":"c1"}], "test1:subtest2":[{"Id":"01","cName":"c2"}]}
test:2  | {"test1:subtest2":[{"Id":"18","cName":"c13","pScore":0.00203}]}

I want an output like this:

col1   | col2           | Id | cName | pScore  |
------------------------------------------------
test:1 | test1:subtest1 | 17 | c1    | null    | 
test:1 | test1:subtest2 | 01 | c2    | null    | 
test:2 | test1:subtest2 | 18 | c13   | 0.00203 | 

This is a follow up to this question - Casting a column to JSON/dict and flattening JSON values in a column in pyspark

I am new to pyspark and would appreciate any help on this. I tried the solution given in that post. It kept giving me errors

TypeError: type object argument after ** must be a mapping, not list

I also tried the following:

test = sqlContext.read.json(df.rdd.map(lambda r: r.col2))

But this gave me an output like the following:

 test1:subtest1      | test1:subtest2        |
----------------------------------------------
[{"Id":"17","cName":"c1"}] | [{"Id":"01","cName":"c2"}]
null                       | [{"Id":"18","cName":"c13","pScore":0.00203}]

I am stuck at how I can use the ^ above to join to col1 and get the desired output.

Any help is much much appreciated, thanks in advance!!

1 Answer 1

1

You can use from_json() function, the key is to define the json_schema which you can manually create it or if you are using the pyspark 2.4+, you can use the function schema_of_json() (below code tested under pyspark 2.4.0):

from pyspark.sql import functions as F

# define all keys with a list:
my_keys = ['test1:subtest1', 'test1:subtest2']

# find a sample json_code for a single key with all sub-fields and then construct its json_schema
key_schema = df.select(F.schema_of_json('{"test1:subtest1":[{"Id":"17","cName":"c1","pScore":0.00203}]}').alias('schema')).first().schema

>>> key_schema
u'struct<test1:subtest1:array<struct<Id:string,cName:string,pScore:double>>>'

# use the above sample key_schema to create the json_schema for all keys
schema = u'struct<' + ','.join([r'`{}`:array<struct<Id:string,cName:string,pScore:double>>'.format(k) for k in my_keys]) + r'>'

>>> schema 
u'struct<`test1:subtest1`:array<struct<Id:string,cName:string,pScore:double>>,`test1:subtest2`:array<struct<Id:string,cName:string,pScore:double>>>'

Note: backticks are required to surround the fieldname when it contains special characters like :.

After we have the schema, the json data can be retrieved from the col2:

df1 = df.withColumn('data', F.from_json('col2', schema)).select('col1', 'data.*')

>>> df1.printSchema()
root
 |-- col1: string (nullable = true)
 |-- test1:subtest1: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Id: string (nullable = true)
 |    |    |-- cName: string (nullable = true)
 |    |    |-- pScore: double (nullable = true)
 |-- test1:subtest2: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Id: string (nullable = true)
 |    |    |-- cName: string (nullable = true)
 |    |    |-- pScore: double (nullable = true)

>>> df1.show(2,0)
+------+--------------+--------------------+
|col1  |test1:subtest1|test1:subtest2      |
+------+--------------+--------------------+
|test:1|[[17, c1,]]   |[[01, c2,]]         |
|test:2|null          |[[18, c13, 0.00203]]|
+------+--------------+--------------------+

Then you can use select and union to normalize the dataframe:

df_new = df1.select('col1', F.lit('test1:subtest1').alias('col2'), F.explode(F.col('test1:subtest1')).alias('arr')) \
            .union(
                df1.select('col1', F.lit('test1:subtest2'), F.explode(F.col('test1:subtest2')))
           ).select('col1', 'col2', 'arr.*')  

>>> df_new.show()
+------+--------------+---+-----+-------+
|  col1|          col2| Id|cName| pScore|
+------+--------------+---+-----+-------+
|test:1|test1:subtest1| 17|   c1|   null|
|test:1|test1:subtest2| 01|   c2|   null|
|test:2|test1:subtest2| 18|  c13|0.00203|
+------+--------------+---+-----+-------+

use reduce()

When there are many unique keys in the json strings, use reduce function to create df_new:

from functools import reduce     

df_new = reduce(lambda x,y: x.union(y)
          , [ df1.select('col1', F.lit(k).alias('col2'), F.explode(F.col(k)).alias('arr')) for k in my_keys ]
         ).select('col1', 'col2', 'arr.*')
Sign up to request clarification or add additional context in comments.

1 Comment

Beatiful! Thanks a lot for this!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.