2

I have two Spark dataframes that look as following:

> cities_df

+----------+---------------------------+
|   city_id|                     cities|           
+----------+---------------------------+
|       22 |[Milan, Turin, Rome]       |
+----------+---------------------------+
|       15 |[Naples, Florence, Genoa]  |
+----------+---------------------------+
|       43 |[Houston, San Jose, Boston]|
+----------+---------------------------+
|       56 |[New York, Dallas, Chicago]|
+----------+---------------------------+


> countries_df

+----------+----------------------------------+
|country_id|                         countries|           
+----------+----------------------------------+
|      680 |{'country': [56, 43], 'add': []}  |
+----------+----------------------------------+
|       11 |{'country': [22, 15], 'add': [32]}|
+----------+----------------------------------+

Country values from the countries_df are the city ids from the cities_df dataframe.

I need to merge these dataframes to replace the city id for country with their values from the cities_df dataframe.

Expected output:

country_id countries grouped_cities
680 {'country': [56, 43], 'add': []} [New York, Dallas, Chicago, Houston, San Jose, Boston]
11 {'country': [22, 15], 'add': [32]} [Milan, Turin, Rome, Naples, Florence, Genoa]

Obtained grouped_cities value doesn't have to be an array type, it can be just a string.

How can I get this result using PySpark?

0

2 Answers 2

2

Inputs:

from pyspark.sql import functions as F
cities_df = spark.createDataFrame(
    [(22, ['Milan', 'Turin', 'Rome']),
     (15, ['Naples', 'Florence', 'Genoa']),
     (43, ['Houston', 'San Jose', 'Boston']),
     (56, ['New York', 'Dallas', 'Chicago'])],
    ['city_id', 'cities']
)
countries_df = spark.createDataFrame(
    [(680, {'country': [56, 43], 'add': []}),
     (11, {'country': [22, 15], 'add': [32]})],
    ['country_id', 'countries']
)

Script:

df_expl = countries_df.withColumn('city_id', F.explode('countries.country'))
df_joined = df_expl.join(cities_df, 'city_id', 'left')
df = df_joined.groupBy('country_id').agg(
    F.first('countries').alias('countries'),
    F.flatten(F.collect_list('cities')).alias('grouped_cities')
)
df.show(truncate=0)
# +----------+----------------------------------+------------------------------------------------------+
# |country_id|countries                         |grouped_cities                                        |
# +----------+----------------------------------+------------------------------------------------------+
# |11        |{add -> [32], country -> [22, 15]}|[Naples, Florence, Genoa, Milan, Turin, Rome]         |
# |680       |{add -> [], country -> [56, 43]}  |[Houston, San Jose, Boston, New York, Dallas, Chicago]|
# +----------+----------------------------------+------------------------------------------------------+
Sign up to request clarification or add additional context in comments.

2 Comments

Thank you so much, but have an error: AttributeError: Can't extract value from countries#1062: need struct type but got string. How can I debug and resolve it?
It means that your countries column is not a dictionary, but a string. The answer will work if you have a dictionary (map), but not when string. Converting your column from string to map deserves another question, as it's not very simple.
2

Anaother way of doing it. Create a new column on countries_df using select. Groupby using country_id, and countries column cast as a string. Code below.

new =cities_df.join(countries_df.select('*',explode('countries.country').alias('city_id')), how='left', on='city_id').groupby('country_id',col('countries').cast('string').alias('countries')).agg(flatten(collect_set('cities')).alias('cities')).show(truncate=False)


+----------+----------------------------------+------------------------------------------------------+
|country_id|countries                         |cities                                                |
+----------+----------------------------------+------------------------------------------------------+
|11        |{add -> [32], country -> [22, 15]}|[Milan, Turin, Rome, Naples, Florence, Genoa]         |
|680       |{add -> [], country -> [56, 43]}  |[New York, Dallas, Chicago, Houston, San Jose, Boston]|
+----------+----------------------------------+------------------------------------------------------+
             

4 Comments

Thanks a lot for the great solution, but I got the following error: AttributeError: 'NoneType' object has no attribute 'join'
One of the dataframes you are trying to join has a .show() besides it
Yep, I removed .show(), but this didn't help
Using the same df as the other answer in this question and it works alright. Hard to say before I see what df is giving you issues. It should work.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.