1

I am working with a relatively large dataframe (close to 1 billion rows) in PySpark. This dataframe is in "long" format, and I would like to have a unique index for each group defined by a groupBy over multiple columns. An example dataframe:

+--------------+-------+---------+------+------+
|classification|   id_1|     id_2|     t|     y|
+--------------+-------+---------+------+------+
|             1| person|    Alice|   0.1| 0.247|
|             1| person|    Alice|   0.2| 0.249|
|             1| person|    Alice|   0.3| 0.255|
|             0| animal|   Jaguar|   0.1| 0.298|
|             0| animal|   Jaguar|   0.2| 0.305|
|             0| animal|   Jaguar|   0.3| 0.310|
|             1| person|    Chris|   0.1| 0.267|
+--------------+-------+---------+------+------+

Here I would like to perform an operation such that I can index each group of ["classification", "id_1", "id_2"]. Example output is:

+--------------+-------+---------+------+------+----+
|classification|   id_1|     id_2|     t|     y| idx|
+--------------+-------+---------+------+------+----+
|             1| person|    Alice|   0.1| 0.247|   1|
|             1| person|    Alice|   0.2| 0.249|   1|
|             1| person|    Alice|   0.3| 0.255|   1|
|             0| animal|   Jaguar|   0.1| 0.298|   2|
|             0| animal|   Jaguar|   0.2| 0.305|   2|
|             0| animal|   Jaguar|   0.3| 0.310|   2|
|             1| person|    Chris|   0.1| 0.267|   3|
+--------------+-------+---------+------+------+----+

I cannot use monotonically_increasing_id() since I don't want a unique ID per row. What I've done, hopefully as a stop-gap, is to create another dataframe, create unique indices for each group, then join that dataframe back into the original.

from pyspark.sql import functions as F

df_groups = (
    df
    .select("classification", "id_1", "id_2")
    .dropDuplicates()
    .withColumn(
        "idx", F.monotonically_increasing_id()
    )
)

df = df.join(other=df_groups, on=["classification", "id_1", "id_2"])

This can be a pretty hefty operation, so I'm wondering if there is any native Spark operation that effectively does the same thing.

0

2 Answers 2

1

What you are asking is something that can actually be accomplished in many different ways! Here I will just describe two that should be enough to consider what works for your specific use case :)

1. Hashing
Hashing things is a very common way to create a unique value for something/collections of somethings. You can do this using a native spark function: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.hash.html or just in the python standard library: https://docs.python.org/3/library/hashlib.html ; in either case of pyspark or native python, you can just hash all of the values of the classification, id_1, and id_2 columns together to get a unique index.

2: Concatenating the values together
The downside of hashing is that when you look at a hash, it will never tell you anything about what is in it. This is not a problem if you are just looking to create a unique index and that is it, but if you want to look at that unique index AND know exactly what it is then hashing is not that helpful. So instead, you can create your own human readable unique index by gluing the variables together into a single index. The way you can do this is by concatenating these columns together. Putting a delimiter between each of the column values would make it more robust. Either way, you can do this using the native pyspark concat function: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.concat.html

This answer is a little short and directly to the point, but I am happy to add/edit any context to it if you have any questions about it :)

Sign up to request clarification or add additional context in comments.

1 Comment

For me, the problem with (2) is that my columns can actually have pretty long strings, so I think this might make the dataset significantly larger. But (1) is excellent and I think I'll be implementing that. Thank you!
1

Sadly, there is no built in function to do this (considering only the expected output is accepted).

As @welp also mentioned, if you are dealing with about 1 billion unique groups, a hash is probably the most efficient way:

from pyspark.sql import functions as F

group_cols = ["classification", "id_1", "id_2"]

df = df.withColumn(
    "idx",
    F.hash(*group_cols)
)
df.show()

Now, these hash values are not what I would call readable, but it should work if you just want to do operations based on the "idx":

+--------------+------+------+---+-----+-----------+
|classification|  id_1|  id_2|  t|    y|        idx|
+--------------+------+------+---+-----+-----------+
|             1|person| Alice|0.1|0.247| 2070863403|
|             1|person| Alice|0.2|0.249| 2070863403|
|             1|person| Alice|0.3|0.255| 2070863403|
|             0|animal|Jaguar|0.1|0.298|-1362073686|
|             0|animal|Jaguar|0.2|0.305|-1362073686|
|             0|animal|Jaguar|0.3| 0.31|-1362073686|
|             1|person| Chris|0.1|0.267|  -77853758|
+--------------+------+------+---+-----+-----------+

If strings are fine, then you can use sha2 instead, it will be a tiny bit slower but more human readable:

df = df.withColumn(
    "idx",
    substring(sha2(F.concat_ws("||", *group_cols), 256), 1, 8)
)

Output:

+--------------+------+------+---+-----+--------+
|classification|  id_1|  id_2|  t|    y|     idx|
+--------------+------+------+---+-----+--------+
|             1|person| Alice|0.1|0.247|48e2b794|
|             1|person| Alice|0.2|0.249|48e2b794|
|             1|person| Alice|0.3|0.255|48e2b794|
|             0|animal|Jaguar|0.1|0.298|64c141cc|
|             0|animal|Jaguar|0.2|0.305|64c141cc|
|             0|animal|Jaguar|0.3| 0.31|64c141cc|
|             1|person| Chris|0.1|0.267|f5bc105a|
+--------------+------+------+---+-----+--------+

According to your expected output, your current approach is probably the fastest one. One way you can improve the speed by a little (only if df_groups < 10 million rows) is to use broadcast, this speeds the process up.

df = df.join(other=broadcast(df_groups), on=["classification", "id_1", "id_2"])

Summary:

If you are dealing with massive datasets with billions of unique groups and you don't want strings, then use hash() . If strings are fine and you want it a bit more human readable (still works for billions of unique groups), then use sha2 .

If you want the indexes as 1, 2, 3 and you have less than 10 million unique groups, use your current code + broadcast

Otherwise, use your code, which can work for about 10-500 million of unique groups (if you have proper memory).

1 Comment

Thanks, these are great ideas. I actually (despite what my question implied) don't care about the index being sequential and starting with 1, 2, 3, ... etc. So the hash solution is a good one. Basically what I want is a way of shortening my groupBy groups. Instead of grouping by 3, 4, or 5 columns, I can create an index on those columns and then group by that instead.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.