0

I have a spark data frame as follows

----------------------------------------------------------------------------
| item_id |   popular_tags   | popularity_score
____________________________________________________________________________
| id_1        Samsung         0.4
| id_1        long battery    0.8
| id_2        Apple           0.9
| id_2        UI              0.9
_____________________________________________________________________________

I want to group this data frame by item_id and output as a file with each line being a json object

{id_1: {"Samsung":{"popularity_score":0.4}, "long_battery":{"popularity_score": 0.8}}}
{id_2: {"Apple": {"popularity_score": 0.9},"UI":{"popularity_score":0.9}}}

I tried using to_json and collect_list functions but I get a list not a nested json object. This is a big distributed dataframe, so converting to pandas or collecting it into a single machine is not an option.

1 Answer 1

3

You'll need to create some map types for your JSON:

import pyspark.sql.functions as F

df2 = df.groupBy('item_id').agg(
    F.map_from_entries(
        F.collect_list(
            F.struct('popular_tags', F.struct('popularity_score'))
        )
    ).alias('m')
).select(
    F.to_json(
        F.create_map('item_id', 'm')
    ).alias('col')
)

df2.show(truncate=False)
+-------------------------------------------------------------------------------------+
|col                                                                                  |
+-------------------------------------------------------------------------------------+
|{"id_2":{"Apple":{"popularity_score":0.9},"UI":{"popularity_score":0.9}}}            |
|{"id_1":{"Samsung":{"popularity_score":0.4},"long battery":{"popularity_score":0.8}}}|
+-------------------------------------------------------------------------------------+

Without map_from_entries, you might have to rely on some dirty hacks:

df2 = df.groupBy('item_id').agg(
    F.collect_list(
        F.create_map('popular_tags', F.struct('popularity_score'))
    ).alias('m')
).select(
    F.regexp_replace(
        F.regexp_replace(
            F.to_json(F.create_map('item_id', 'm')),
            '(\\[|\\])', 
            ''
        ),
    '\\},\\{', 
    ','
    ).alias('col')
)

df2.show(truncate=False)
+-------------------------------------------------------------------------------------+
|col                                                                                  |
+-------------------------------------------------------------------------------------+
|{"id_2":{"Apple":{"popularity_score":0.9},"UI":{"popularity_score":0.9}}}            |
|{"id_1":{"Samsung":{"popularity_score":0.4},"long battery":{"popularity_score":0.8}}}|
+-------------------------------------------------------------------------------------+
Sign up to request clarification or add additional context in comments.

3 Comments

I kind of get what you are doing . Thanks but I am using spark 2.2 which does not have this map_from_entries function. Can i achieve that functionality using the spark2.2 functions ?
@NG_21 ... that's a bit difficult. I added a hacky way to do that, not sure if it works for you.
I just figured out a way combining the solution you mentioned with a UDF based approach which works on spark 2.2. Although would prefer spark functions over UDF for performance . Thanks for the help

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.