0

I have a Dataframe with single column like shown below.

Type
'BAT'
'BAT'
'BALL'
'BAT'
'BALL'
'BALL'

To the above dataframe I have added a new column called 'const'.

df = df.withColumn('const',F.lit(1))

How do I perform a cumsum using window.partionBy() on 'const' column and create new row_id column?

Expected Output

Type  row_id
'BAT'   1
'BAT'   2
'BALL'  3
'BAT'   4
'BALL'  5
'BALL'  6

I also dont want to use RDD, everything should be in Dataframe due to performance reasons.

EDIT

  • I want the row id to increment by +1
  • Dont want to use monotonically_increasing() function due to above reason
2

2 Answers 2

2

if you just want a row index without taking into account the values, then use :

df = df.withColumn('row_id',F.monotonically_increasing_id())

this will create a unic index for each line.

If you want to take into account your values, and have the same index for a duplicate value, then use rank:

from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy("type")
df = df.withColumn('row_id',F.rank().over(w))

You can of course achieve the same with sum or row_number, but the 2 methods above are better i think.

import sys
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy().rowsBetween(-sys.maxsize,0)
df = df.withColumn('row_id',F.sum("const").over(w))

or

from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy("const")
df = df.withColumn('row_id',F.row_number().over(w))
Sign up to request clarification or add additional context in comments.

7 Comments

Thanks F.row_number() is working for me. But F.sum("const") is not working, how do i fix it?
F.sum("const") code is giving value 10 in all the rows
One qq what if my data has more than sys.maxsize rows?
How to use UNBOUNDED PRECEDING as mentioned in the below blog databricks.com/blog/2015/07/15/…
Window.unboundedPreceding appear in 2.1. otherwise you have to use the sys.maxsize. The frame is unbounded if this is Window.unboundedPreceding, or any value less than or equal to max(-sys.maxsize, -9223372036854775808)
|
0

Using directly the row_number() function may change the original row order when you have defined your window to be ordered by a column with the same value in all rows. To avoid that, I would use first the monotically_increasing_id() to create a new column "row_order" which will keep the original row order (since it will give you a monotically increasing number). Then use the "row_number()" function and set the window to be ordered by the generated column "row_order", here is an example:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

df = df.withColumn('row_order',F.monotonically_increasing_id())
w = Window().partitionBy().orderBy("row_order")
df = df.withColumn('row_id',F.row_number().over(w))
df = df.drop("row_order")

This will ensure you keep the original row order in your table after applying the window.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.