0

I have created dataframe as follows :

+----+-------+-------+
| age| number|name   |
+----+-------+-------+
|  16|     12|A      |
|  16|     13|B      |
|  17|     16|E      |
|  17|     17|F      |
+----+-------+-------+

How to convert it into following json:

{ 
'age' : 16,  
'values' : [{‘number’: ‘12’ , ‘name’ : 'A'},{‘number’: ‘12’ , ‘name’ : 'A'} ] 
},{ 
'age' : 17,  
'values' : [{‘number’: ‘16’ , ‘name’ : 'E'},{‘number’: ‘17’ , ‘name’ : 'F'} ] 
}

2 Answers 2

8

assuming df is your dataframe,

from pyspark.sql import functions as F

new_df = df.select(
    "age",
    F.struct(
        F.col("number"),
        F.col("name"),
    ).alias("values")
).groupBy(
    "age"
).agg(
    F.collect_list("values").alias("values")
)

new_df.toJSON()
# or
new_df.write.json(...)
Sign up to request clarification or add additional context in comments.

2 Comments

If you have 'Null' Value in our data then how to handle with using F.struct because if we use above script then it removing null values cell
@Amol it removes only null grouping keys ... which means age=Null. Do you think OP has this case? or are you just pointing out some missing limiting case which may simply not occures in OP's case ?
2

You can convert the DF to RDD and apply your transformations:

NewSchema = StructType([StructField("age", IntegerType())
                           , StructField("values", StringType())
                        ])


res_df = df.rdd.map(lambda row: (row[0], ([{'number':row[1], 'name':row[2]}])))\
    .reduceByKey(lambda x, y: x + y)\
    .map(lambda row: (row[0], json.dumps(row[1])))\
    .toDF(NewSchema)

res_df.show(20, False)

Show res_df:

+---+------------------------------------------------------------+
|age|values                                                      |
+---+------------------------------------------------------------+
|16 |[{"number": 12, "name": "A"}, [{"number": 13, "name": "B"}] |
|17 |[{"number": 17, "name": "F"}, [{"number": 16, "name": "E"}] |
+---+------------------------------------------------------------+

Saving the DF as JSON File:

res_df.coalesce(1).write.format('json').save('output.json')

3 Comments

convert DF to RDD and back to DF can really impact performances
@Steven Maybe, but I doubt it really impacts the performance because in the background of each DF there are RDDs!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.