1

Say I have two dataframes with 4 columns each. The first 3 columns are string types, and the 4th column is an array type. I would like to concatenate these two dataframes so that the resulting dataframe will fulfill the following:

In rows where the first 3 columns' values are identical between the two dataframes, the row in the result dataframe will contain the identical values, and the array column will contain a union of all the values in each of the original dataframes' 4th column arrays.

Rows that don't have an 'identical' (just the first 3 columns) partner in the second dataframe, will appear as they are originally in the result dataframe.

Example:

DF1 = [
Row(str1="StringA", str2="StringB", str3="StringC", arr=["array_member_a"]),
Row(str1="String1", str2="String2", str3="String3", arr=["array_member_1"])]

DF2 = [ 
Row(str1="StringA", str2="StringB", str3="StringC", arr=["array_member_d"]),
Row(str1="String1", str2="String8", str3="String9", arr=["array_member_x"])]

reulst_DF = [
Row(str1="StringA", str2="StringB", str3="StringC", arr=["array_member_a", "array_member_d"]),
Row(str1="String1", str2="String2", str3="String3", arr=["array_member_1"]),
Row(str1="String1", str2="String8", str3="String9", arr=["array_member_x"])]

1 Answer 1

2

Question: What's the difference between a join and a group-by? Answer: Only the axis of the aggregation.

It's much simpler to aggregate distinct rows than it is to aggregate distinct columns, so let's reinterpret your problem.

First we "join" the dataframes on the row axis with a union. As opposed to a join, where the rows we want to aggregate will be in different columns on one row, this will produce multiple rows with a single column to aggregate:

on = ['_1', '_2', '_3']

(df1
 .union(df2))

[Row(_1='StringA', _2='StringB', _3='StringC', _4=['array_member_a']),
 Row(_1='String1', _2='String2', _3='String3', _4=['array_member_1']),
 Row(_1='StringA', _2='StringB', _3='StringC', _4=['array_member_d']),
 Row(_1='String1', _2='String8', _3='String9', _4=['array_member_x'])]

Now we collect the values of the rows. These are arrays, so they must be flattened before being made distinct:

from pyspark.sql.functions import array_distinct, collect_set, flatten

(df1
 .union(df2)
 .groupby(on).agg(array_distinct(flatten(collect_set('_4'))).alias('_4')))

[Row(_1='String1', _2='String2', _3='String3', _4=['array_member_1']),
 Row(_1='StringA', _2='StringB', _3='StringC', _4=['array_member_a', 'array_member_d'])]
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.