1

I have a below requirement to aggregate the data on Spark dataframe in scala.

I have a spark dataframe with two columns.

mo_id   sales
201601  11.01
201602  12.01
201603  13.01
201604  14.01
201605  15.01
201606  16.01
201607  17.01
201608  18.01
201609  19.01
201610  20.01
201611  21.01
201612  22.01

As shown above the dataframe has two columns 'mo_id' and 'sales'. I want to add a new column (agg_sales)to the dataframe which should have the sum of sales upto the current month like as shown below.

mo_id   sales   agg_sales
201601  10  10
201602  20  30
201603  30  60
201604  40  100
201605  50  150
201606  60  210
201607  70  280
201608  80  360
201609  90  450
201610  100 550
201611  110 660
201612  120 780

Description:

For the month 201603 agg_sales will be sum of sales from 201601 to 201603. For the month 201604 agg_sales will be sum of sales from 201601 to 201604. and so on.

Can anyone please help to do this.

Versions using : Spark 1.6.2 and Scala 2.10

5
  • did you mean for the sales to be formatted like the first dataset or the second ? Commented Dec 28, 2016 at 2:57
  • i have a first dataframe with two columns. Commented Dec 28, 2016 at 3:12
  • So in next dataframe I want to add a new column (agg_sales). Commented Dec 28, 2016 at 3:13
  • So in new dataset I will have total 3 columns. (month_id, sales, agg_sales) Commented Dec 28, 2016 at 3:14
  • check my answer below Commented Dec 28, 2016 at 3:23

1 Answer 1

1

You are looking for a cumulative sum which can be accomplished with a window function:

scala> val df = sc.parallelize(Seq((201601, 10), (201602, 20), (201603, 30), (201604, 40), (201605, 50), (201606, 60), (201607, 70), (201608, 80), (201609, 90), (201610, 100), (201611, 110), (201612, 120))).toDF("id","sales")
df: org.apache.spark.sql.DataFrame = [id: int, sales: int]

scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> val ordering = Window.orderBy("id")
ordering: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@75d454a4

scala> df.withColumn("agg_sales", sum($"sales").over(ordering)).show 
16/12/27 21:11:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+------+-----+-------------+
|    id|sales|  agg_sales  |
+------+-----+-------------+
|201601|   10|           10|
|201602|   20|           30|
|201603|   30|           60|
|201604|   40|          100|
|201605|   50|          150|
|201606|   60|          210|
|201607|   70|          280|
|201608|   80|          360|
|201609|   90|          450|
|201610|  100|          550|
|201611|  110|          660|
|201612|  120|          780|
+------+-----+-------------+

Note that I defined the ordering on the ids, you would probably want some sort of time stamp to order the summation.

Sign up to request clarification or add additional context in comments.

4 Comments

Thanks. Let me try.
No Partition Defined for Window operation! Moving all data to a single partition
Just a warning, no need to worry about it(if your data were big enough to worry about partitioning you would have already defined one)
Yes in my actual dataset which am working on, has a column which I can choose as a partition key.Thanks

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.