13

I need to add an index column to a dataframe with three very simple constraints:

  • start from 0

  • be sequential

  • be deterministic

I'm sure I'm missing something obvious because the examples I'm finding look very convoluted for such a simple task, or use non-sequential, non deterministic increasingly monotonic id's. I don't want to zip with index and then have to separate the previously separated columns that are now in a single column because my dataframes are in the terabytes and it just seems unnecessary. I don't need to partition by anything, nor order by anything, and the examples I'm finding do this (using window functions and row_number). All I need is a simple 0 to df.count sequence of integers. What am I missing here?

1, 2, 3, 4, 5

3
  • 1
    DataFrames are inherently unordered. This is one of the core reasons they work for parallel processing- any executor can pick up any part of the data and do its work. You can introduce an order (as you've shown), but how can it be deterministic if you don't order by anything? Commented Sep 13, 2018 at 17:01
  • 1
    Btw, I believe monotonically_increasing_id will be deterministic as long as you don't change the number of partitions. Commented Sep 13, 2018 at 17:03
  • 1
    Fair enough, maybe I'm using the word index out of context here. What I mean is: how can I add a column with an ordered, monotonically increasing by 1 sequence 0:df.count? Commented Sep 13, 2018 at 17:05

2 Answers 2

22

What I mean is: how can I add a column with an ordered, monotonically increasing by 1 sequence 0:df.count? (from comments)

You can use row_number() here, but for that you'd need to specify an orderBy(). Since you don't have an ordering column, just use monotonically_increasing_id().

from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window

df = df.withColumn(
    "index",
    row_number().over(Window.orderBy(monotonically_increasing_id()))-1
)

Also, row_number() starts at 1, so you'd have to subtract 1 to have it start from 0. The last value will be df.count - 1.


I don't want to zip with index and then have to separate the previously separated columns that are now in a single column

You can use zipWithIndex if you follow it with a call to map, to avoid having all of the separated columns turn into a single column:

cols = df.columns
df = df.rdd.zipWithIndex().map(lambda row: (row[1],) + tuple(row[0])).toDF(["index"] + cols
Sign up to request clarification or add additional context in comments.

Comments

0

Not sure about the performance but here is a trick.

Note - toPandas will collect all the data to driver

from pyspark.sql import SparkSession

# speed up toPandas using arrow
spark = SparkSession.builder.appName('seq-no') \
        .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
        .config("spark.sql.execution.arrow.enabled", "true") \
        .getOrCreate()

df = spark.createDataFrame([
    ('id1', "a"),
    ('id2', "b"),
    ('id2', "c"),
], ["ID", "Text"])

df1 = spark.createDataFrame(df.toPandas().reset_index()).withColumnRenamed("index","seq_no")

df1.show()

+------+---+----+
|seq_no| ID|Text|
+------+---+----+
|     0|id1|   a|
|     1|id2|   b|
|     2|id2|   c|
+------+---+----+

2 Comments

df.toPandas() are you kidding, if to_pandas was a possibility, spark was never used in the first place!
I have already given note and in some cases when we don't have a choice thats why df.toPandas methods are given

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.