3

I have a Spark dataframe, originating from Google Analytics, that looks like the following:

id     customDimensions (Array<Struct>)
100    [ {"index": 1, "value": "Earth"}, {"index": 2, "value": "Europe"}]
101    [ {"index": 1, "value": "Mars" }]

I also have a "custom dimensions metadata" dataframe that looks like this:

index   name
1       planet
2       continent

I'd to use the indexes in the metadata df in order to expand my custom dimensions into columns. The result should look like the following:

id     planet     continent
100    Earth      Europe
101    Mars       null

I have tried the following approach, and it works fine, however it is extremely non-performant. I'd like to know if there's a better approach.

# Select the two relevant columns
cd = df.select('id', 'customDimensions')

# Explode customDimensions so that each row now has a {index, value}
cd = cd.withColumn('customDimensions', F.explode(cd.customDimensions))

# Put the index and value into their own columns
cd = cd.select('id', 'customDimensions.index', 'customDimensions.value')

# Join with metadata to obtain the name from the index
metadata = metadata.select('index', 'name')
cd = (cd
         .join(metadata, cd.index == metadata.index, 'left')
         .drop(metadata.index))

# Pivot cd so that each row has the id, and we have columns for each custom dimension
piv = cd.groupBy('id').pivot('name').agg(F.first(F.col('value')))

# Join back to restore the other columns
return df.join(piv, df.id == piv.id).drop(piv.id)

Assumptions:

  • There are up to 250 custom dimension indexes, and the names are only known through the metadata dataframe
  • The original dataframe has several other columns that I would like to maintain (hence the join at the end of my solution)
1
  • 2
    I presume the metadata DataFrame is much much smaller than the original dataframe. You may have better performance by collecting the distinct values of the name column and passing it in as a parameter to pivot. Something like: piv = cd.groupBy("id").pivot("name", values=[row["name"] for row in metadata.select("name").distinct().collect()]).agg(F.first("value")). Yes there's an upfront cost to calling collect but it may outweigh the cost of pivot without the values argument. Commented Dec 7, 2018 at 18:31

1 Answer 1

4

Joins are very costly operation because it results in data shuffling. If you can, you should avoid it or look to optimize it.

There are two joins in your code. The last join get the columns back can be avoided altogether. The other join with metadata dataframe can be optimized. Since metadata df has only 250 rows and is very, you can use broadcast() hint in the join. This would avoid shuffling of the larger dataframe.

I have made some suggested code changes but its not tested since I don't have your data.

# df columns list
df_columns = df.columns

# Explode customDimensions so that each row now has a {index, value}
cd = df.withColumn('customDimensions', F.explode(cd.customDimensions))

# Put the index and value into their own columns
cd = cd.select(*df_columns, 'customDimensions.index', 'customDimensions.value')

# Join with metadata to obtain the name from the index
metadata = metadata.select('index', 'name')
cd = cd.join(broadcast(metadata), "index", 'left')

# Pivot cd so that each row has the id, and we have columns for each custom dimension
piv = cd.groupBy(df_columns).pivot('name').agg(F.first(F.col('value')))


return piv
Sign up to request clarification or add additional context in comments.

1 Comment

I suspect that the pivot here is more costly than the join.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.