What I'm trying to achieve is fairly simple: I want to check all IDs (uuid's) if they experience a certain "status" (behavioral status). If they do, then return to me all the records associated with that ID. For example, if one of the ID's below has a status of "three" I want to keep all those records associated with that ID. So far I can achieve this with the following two ways:
// first method
val dfList = df.filter($"status" === "three").select($"id").distinct.map(_.getString(0)).collect.toList
val dfTransformedOne = df.filter($"id".isin(dfList:_*))
// second method
val dfIds = df.filter($"status" === "three").select($"id").distinct
val dfTransformedTwo = df.join(broadcast(dfIds), Seq("id"))
The above two methods work fine with the sample data I'm working with, however I'm running into some performance issues when I start increasing the amount of data to process because I could have millions to hundreds of millions of IDs that I need to filter for. Is there a more efficient way to do the above or should it just be a case of bumping up the hardware I'm using?
Below is the example data and expected output.
val df = Seq(
("1234", "one"),
("1234", "two"),
("1234", "three"),
("234", "one"),
("234", "one"),
("234", "two")
).toDF("id", "status")
df.show
+----+------+
| id|status|
+----+------+
|1234| one|
|1234| two|
|1234| three|
| 234| one|
| 234| one|
| 234| two|
+----+------+
dfTransformed.show()
+----+------+
| id|status|
+----+------+
|1234| one|
|1234| two|
|1234| three|
+----+------+