1

What I'm trying to achieve is fairly simple: I want to check all IDs (uuid's) if they experience a certain "status" (behavioral status). If they do, then return to me all the records associated with that ID. For example, if one of the ID's below has a status of "three" I want to keep all those records associated with that ID. So far I can achieve this with the following two ways:

// first method
val dfList = df.filter($"status" === "three").select($"id").distinct.map(_.getString(0)).collect.toList
val dfTransformedOne = df.filter($"id".isin(dfList:_*))

// second method
val dfIds = df.filter($"status" === "three").select($"id").distinct
val dfTransformedTwo = df.join(broadcast(dfIds), Seq("id"))

The above two methods work fine with the sample data I'm working with, however I'm running into some performance issues when I start increasing the amount of data to process because I could have millions to hundreds of millions of IDs that I need to filter for. Is there a more efficient way to do the above or should it just be a case of bumping up the hardware I'm using?

Below is the example data and expected output.

val df = Seq(
  ("1234", "one"), 
  ("1234", "two"), 
  ("1234", "three"), 
  ("234", "one"), 
  ("234", "one"), 
  ("234", "two")
  ).toDF("id", "status")

df.show
+----+------+
|  id|status|
+----+------+
|1234|   one|
|1234|   two|
|1234| three|
| 234|   one|
| 234|   one|
| 234|   two|
+----+------+

dfTransformed.show()
+----+------+
|  id|status|
+----+------+
|1234|   one|
|1234|   two|
|1234| three|
+----+------+

1 Answer 1

1

Grouping and aggregating before filtering will introduce a shuffle, while eliminating the need to collect the large list to the driver. Whether it's faster is dependent on your data distribution, cluster size, and network connectivity. It is probably worth a test though:

val df = Seq(
  ("1234", "one"), 
  ("1234", "two"), 
  ("1234", "three"), 
  ("234", "one"), 
  ("234", "one"), 
  ("234", "two")
  ).toDF("id", "status")

df.groupBy("id")
  .agg(collect_list("status").as("statuses"))
  .filter(array_contains($"statuses", "three"))
  .withColumn("status", explode($"statuses"))
  .select("id", "status")
  .show(false)

Gives the intended result:

+----+------+
|id  |status|
+----+------+
|1234|one   |
|1234|two   |
|1234|three |
+----+------+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.