1

I'm currently working on a PySpark project where I need to perform a join between two large dataframes. One dataframe contains around 10 million entries with short strings as keywords(2-5 words), while the other dataframe holds 30 million records with variations(5-10 word strings), merchants, and counts.

The goal is to join the dataframes based on the condition that the keywords in the first one are contained within the variations of the second dataframe. However, the current code is running for over 3 hours on a large EMR cluster and still hasn't finished.

EMR configuration

5 task nodes: m5.16xlarge (32cores/256GB per node) Master node: m5.8xlarge (4cores/64GB)

spark-submit command:

time spark-submit --master yarn --deploy-mode client --conf spark.yarn.maxAppAttempts=1 --packages org.apache.hadoop:hadoop-aws:2.7.0 --num-executors 30 --conf spark.driver.memoryOverhead=6g --conf spark.executor.memoryOverhead=6g --executor-cores 5 --executor-memory 42g --driver-memory g 42 --conf spark.yarn.executor.memoryOverhead=409 join_code.py

Here's a simplified version of the code I'm using:

# Code for join
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DataFrameJoin").getOrCreate()

# Loading dataframes
keywords_df = spark.read.parquet("keywords.parquet")
variations_df= spark.read.parquet("variations.parquet")

# Cross-joining based on keyword containment
result = keywords_df.join(variations_df,F.col(variations).contains(F.col(keyword)),how='left')
result.show()
14
  • 1
    hmm.. this might need a creative solution. does variations have any patterns ( does the variation always start with the keyword or is the keyword always after a specific word?). OR maybe is there a date column you could add to your join or filter the data on? Commented Aug 25, 2023 at 1:50
  • No, there's nothing else I have in the table to do a match on. Commented Aug 25, 2023 at 9:54
  • I hope you are partitioning the dataframes in suitable number of partitions. Another thing you can do is convert the keywords into unique numbers in both the tables and then do contain operation. String comparison is expensive as compared interger comparison. Commented Aug 25, 2023 at 15:46
  • 1
    Thanks for mentioning partition here - I checked the partitions. My large dataframe of 30M rows(500MB) was getting partitioned into 50 only. I repartitioned it into 3000, speed has gone up. Smaller dataframe(10M rows, and is partitioned into 2 by default, which I understood that is not right. I did some trial-errors with partition numbers. Used ganglia and spark UI, takes 40 mins now. Commented Aug 25, 2023 at 19:37
  • 1
    Your datasize seems to be very small. If you optimize is properly. This task should take about <10 minutes. I have optimized datasizes of 1B rows and it took about 5 minutes to finish on similar cluster sizes. Commented Aug 26, 2023 at 15:26

1 Answer 1

1

If Fuzzy matching is taking lot of your time, then following solution could help. It uses rapidfuzz library which is highly optimized so should run faster. There are mutiple ratio options to choose from. Take a look at this github page and test which ratio suits your needs the best.

https://github.com/maxbachmann/RapidFuzz

from pyspark.sql.types import *
from pyspark import SparkContext, SQLContext
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
import rapidfuzz

sc = SparkContext('local')
sqlContext = SQLContext(sc)

keyword_given = [
    ["green pstr",],
    ["greenpstr",],
    ["wlmrt", ],
    ["walmart",],
    ["walmart super",]
    ]

keywordColumns = ["keyword"]
keyword_df = sqlContext.createDataFrame(data=keyword_given, schema = keywordColumns)

print("keyword_df dataframe")
keyword_df.show(truncate=False)


variations = [
            ("type green pstr", "ABC", 100),
            ("type green pstr","PQR",200),
            ("type green pstr", "NZSD", 2999),
            ("wlmrt payment","walmart",200),
            ("wlmrt solutions", "walmart", 200),
            ("nppssdwlmrt", "walmart", 2000)
             ]

variationsColumns = ["variations", "entity", "ID"]
variations_df = sqlContext.createDataFrame(data=variations, schema = variationsColumns)

print("variations_df dataframe")
variations_df.show(truncate=False)

def evalutate_helper_spark(keyw, var):
    return rapidfuzz.fuzz.partial_ratio(keyw, var)


calculate_ratio = udf(lambda keyw, var : evalutate_helper_spark(keyw, var))

middle_df = variations_df.crossJoin(keyword_df)

middle_df = middle_df.withColumn("partial_ratio", calculate_ratio(F.col("keyword"), F.col("variations")))


print("middle df show")
middle_df.show(n=100, truncate=False)

Here's the output :

keyword_df dataframe
+-------------+
|keyword      |
+-------------+
|green pstr   |
|greenpstr    |
|wlmrt        |
|walmart      |
|walmart super|
+-------------+

variations_df dataframe
+---------------+-------+----+
|variations     |entity |ID  |
+---------------+-------+----+
|type green pstr|ABC    |100 |
|type green pstr|PQR    |200 |
|type green pstr|NZSD   |2999|
|wlmrt payment  |walmart|200 |
|wlmrt solutions|walmart|200 |
|nppssdwlmrt    |walmart|2000|
+---------------+-------+----+

middle df show
+---------------+-------+----+-------------+------------------+
|variations     |entity |ID  |keyword      |partial_ratio     |
+---------------+-------+----+-------------+------------------+
|type green pstr|ABC    |100 |green pstr   |100.0             |
|type green pstr|ABC    |100 |greenpstr    |88.88888888888889 |
|type green pstr|ABC    |100 |wlmrt        |33.333333333333336|
|type green pstr|ABC    |100 |walmart      |25.0              |
|type green pstr|ABC    |100 |walmart super|40.0              |
|type green pstr|PQR    |200 |green pstr   |100.0             |
|type green pstr|PQR    |200 |greenpstr    |88.88888888888889 |
|type green pstr|PQR    |200 |wlmrt        |33.333333333333336|
|type green pstr|PQR    |200 |walmart      |25.0              |
|type green pstr|PQR    |200 |walmart super|40.0              |
|type green pstr|NZSD   |2999|green pstr   |100.0             |
|type green pstr|NZSD   |2999|greenpstr    |88.88888888888889 |
|type green pstr|NZSD   |2999|wlmrt        |33.333333333333336|
|type green pstr|NZSD   |2999|walmart      |25.0              |
|type green pstr|NZSD   |2999|walmart super|40.0              |
|wlmrt payment  |walmart|200 |green pstr   |46.15384615384615 |
|wlmrt payment  |walmart|200 |greenpstr    |50.0              |
|wlmrt payment  |walmart|200 |wlmrt        |100.0             |
|wlmrt payment  |walmart|200 |walmart      |83.33333333333334 |
|wlmrt payment  |walmart|200 |walmart super|70.0              |
|wlmrt solutions|walmart|200 |green pstr   |40.0              |
|wlmrt solutions|walmart|200 |greenpstr    |36.36363636363637 |
|wlmrt solutions|walmart|200 |wlmrt        |100.0             |
|wlmrt solutions|walmart|200 |walmart      |83.33333333333334 |
|wlmrt solutions|walmart|200 |walmart super|70.0              |
|nppssdwlmrt    |walmart|2000|green pstr   |42.85714285714286 |
|nppssdwlmrt    |walmart|2000|greenpstr    |46.15384615384615 |
|nppssdwlmrt    |walmart|2000|wlmrt        |100.0             |
|nppssdwlmrt    |walmart|2000|walmart      |83.33333333333334 |
|nppssdwlmrt    |walmart|2000|walmart super|55.55555555555556 |
+---------------+-------+----+-------------+------------------+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.