In Spark 3, the following code compares arrays between rows, keeping only rows where the two arrays share at least one element at the same position. df is your input dataframe:
df.join(
df.withColumnRenamed("id", "id2").withColumnRenamed("hashes", "hashes2"),
exists(arrays_zip(col("hashes"), col("hashes2")), x => x("hashes") === x("hashes2"))
)
.groupBy("id")
.agg(first(col("hashes")).as("hashes"), collect_list("id2").as("matched"))
.withColumn("matched", filter(col("matched"), x => x.notEqual(col("id"))))
Detailed description
First, we perform an auto cross join, filtered by your condition of at least one element in same position on the two hashes arrays.
To build the condition, we zip the two hashes arrays, one from first dataframe, one for the second joined dataframe, that is just the first dataframe with columns renamed. By zipping, we get an array of {"hashes":x, "hashes2":y} and next we just need to check that in this array exists an element where x = y. The complete condition is written as follow:
exists(arrays_zip(col("hashes"), col("hashes2")), x => x("hashes") === x("hashes2"))
Then, we will aggregate by column id to collect all id2 of rows that were kept, meaning rows that matching your condition
to keep the "hashes" column, as for two rows with the same "id" the column "hashes" are equals, we get the first occurrence of "hashes" for each "id". And we collect all the "id2" using collect_list:
.agg(first(col("hashes")).as("hashes"), collect_list("id2").as("matches"))
And finally, we filter out from column "matches" the id of the current row
.withColumn("matches", filter(col("matches"), x => x.notEqual(col("id"))))
if you need the "id" to be in order, you can add an orderBy clause:
.orderBy("id")
Run
With a dataframe df containing the following values:
+---+---------------+
|id |hashes |
+---+---------------+
|0 |[1, 2, 3, 4, 5]|
|1 |[1, 5, 3, 7, 9]|
|2 |[9, 3, 6, 8, 0]|
+---+---------------+
You get the following output:
+---+---------------+-------+
|id |hashes |matches|
+---+---------------+-------+
|0 |[1, 2, 3, 4, 5]|[1] |
|1 |[1, 5, 3, 7, 9]|[0] |
|2 |[9, 3, 6, 8, 0]|[] |
+---+---------------+-------+
Limits
The join is a cartesian product, which is very expensive. Although the condition filters results, it can lead to an huge amount of calculation/shuffle on big datasets, and may have very poor performance.
If you use Spark whose version is before 3.0, you have to replace some build-in spark functions by user-defined functions