I am working with a relatively large dataframe (close to 1 billion rows) in PySpark. This dataframe is in "long" format, and I would like to have a unique index for each group defined by a groupBy over multiple columns. An example dataframe:
+--------------+-------+---------+------+------+
|classification| id_1| id_2| t| y|
+--------------+-------+---------+------+------+
| 1| person| Alice| 0.1| 0.247|
| 1| person| Alice| 0.2| 0.249|
| 1| person| Alice| 0.3| 0.255|
| 0| animal| Jaguar| 0.1| 0.298|
| 0| animal| Jaguar| 0.2| 0.305|
| 0| animal| Jaguar| 0.3| 0.310|
| 1| person| Chris| 0.1| 0.267|
+--------------+-------+---------+------+------+
Here I would like to perform an operation such that I can index each group of ["classification", "id_1", "id_2"]. Example output is:
+--------------+-------+---------+------+------+----+
|classification| id_1| id_2| t| y| idx|
+--------------+-------+---------+------+------+----+
| 1| person| Alice| 0.1| 0.247| 1|
| 1| person| Alice| 0.2| 0.249| 1|
| 1| person| Alice| 0.3| 0.255| 1|
| 0| animal| Jaguar| 0.1| 0.298| 2|
| 0| animal| Jaguar| 0.2| 0.305| 2|
| 0| animal| Jaguar| 0.3| 0.310| 2|
| 1| person| Chris| 0.1| 0.267| 3|
+--------------+-------+---------+------+------+----+
I cannot use monotonically_increasing_id() since I don't want a unique ID per row. What I've done, hopefully as a stop-gap, is to create another dataframe, create unique indices for each group, then join that dataframe back into the original.
from pyspark.sql import functions as F
df_groups = (
df
.select("classification", "id_1", "id_2")
.dropDuplicates()
.withColumn(
"idx", F.monotonically_increasing_id()
)
)
df = df.join(other=df_groups, on=["classification", "id_1", "id_2"])
This can be a pretty hefty operation, so I'm wondering if there is any native Spark operation that effectively does the same thing.