2

I have a Spark Dataframe with columns id and hashes, where the column hashes contains a Seq of integer values of length n. Example:

+----+--------------------+
+  id|              hashes|
+----+--------------------+
|0   |     [1, 2, 3, 4, 5]|
|1   |     [1, 5, 3, 7, 9]|
|2   |     [9, 3, 6, 8, 0]|
+-------------------------+

I want to get a dataframe with all the rows for which the arrays in hashes match in at least one position. More formally, I want a dataframe with an additional column matches that for each row r contains a Seq of ids of rows where hashes[r][i] == hashes[k][i] with k being any other row for at leas one value of i.

For my example data, the result would be:

+---+---------------+-------+
|id |hashes         |matches|
+---+---------------+-------+
|0  |[1, 2, 3, 4, 5]|[1]    |
|1  |[1, 5, 3, 7, 9]|[0]    |
|2  |[9, 3, 6, 8, 0]|[]     |
+---+---------------+-------+

1 Answer 1

2

In Spark 3, the following code compares arrays between rows, keeping only rows where the two arrays share at least one element at the same position. df is your input dataframe:

    df.join(
      df.withColumnRenamed("id", "id2").withColumnRenamed("hashes", "hashes2"),
      exists(arrays_zip(col("hashes"), col("hashes2")), x => x("hashes") === x("hashes2"))
    )
      .groupBy("id")
      .agg(first(col("hashes")).as("hashes"), collect_list("id2").as("matched"))
      .withColumn("matched", filter(col("matched"), x => x.notEqual(col("id"))))

Detailed description

First, we perform an auto cross join, filtered by your condition of at least one element in same position on the two hashes arrays.

To build the condition, we zip the two hashes arrays, one from first dataframe, one for the second joined dataframe, that is just the first dataframe with columns renamed. By zipping, we get an array of {"hashes":x, "hashes2":y} and next we just need to check that in this array exists an element where x = y. The complete condition is written as follow:

exists(arrays_zip(col("hashes"), col("hashes2")), x => x("hashes") === x("hashes2"))

Then, we will aggregate by column id to collect all id2 of rows that were kept, meaning rows that matching your condition

to keep the "hashes" column, as for two rows with the same "id" the column "hashes" are equals, we get the first occurrence of "hashes" for each "id". And we collect all the "id2" using collect_list:

.agg(first(col("hashes")).as("hashes"), collect_list("id2").as("matches"))

And finally, we filter out from column "matches" the id of the current row

.withColumn("matches", filter(col("matches"), x => x.notEqual(col("id"))))

if you need the "id" to be in order, you can add an orderBy clause:

.orderBy("id")

Run

With a dataframe df containing the following values:

+---+---------------+
|id |hashes         |
+---+---------------+
|0  |[1, 2, 3, 4, 5]|
|1  |[1, 5, 3, 7, 9]|
|2  |[9, 3, 6, 8, 0]|
+---+---------------+

You get the following output:

+---+---------------+-------+
|id |hashes         |matches|
+---+---------------+-------+
|0  |[1, 2, 3, 4, 5]|[1]    |
|1  |[1, 5, 3, 7, 9]|[0]    |
|2  |[9, 3, 6, 8, 0]|[]     |
+---+---------------+-------+

Limits

The join is a cartesian product, which is very expensive. Although the condition filters results, it can lead to an huge amount of calculation/shuffle on big datasets, and may have very poor performance.

If you use Spark whose version is before 3.0, you have to replace some build-in spark functions by user-defined functions

Sign up to request clarification or add additional context in comments.

1 Comment

Hi i'm new to spark and i'm using <3.0 version, how would you go about replacing the exist function with a UDF

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.