46

I was wondering if there is some way to specify a custom aggregation function for spark dataframes over multiple columns.

I have a table like this of the type (name, item, price):

john | tomato | 1.99
john | carrot | 0.45
bill | apple  | 0.99
john | banana | 1.29
bill | taco   | 2.59

to:

I would like to aggregate the item and it's cost for each person into a list like this:

john | (tomato, 1.99), (carrot, 0.45), (banana, 1.29)
bill | (apple, 0.99), (taco, 2.59)

Is this possible in dataframes? I recently learned about collect_list but it appears to only work for one column.

5 Answers 5

114

Consider using the struct function to group the columns together before collecting as a list:

import org.apache.spark.sql.functions.{collect_list, struct}
import sqlContext.implicits._

val df = Seq(
  ("john", "tomato", 1.99),
  ("john", "carrot", 0.45),
  ("bill", "apple", 0.99),
  ("john", "banana", 1.29),
  ("bill", "taco", 2.59)
).toDF("name", "food", "price")

df.groupBy($"name")
  .agg(collect_list(struct($"food", $"price")).as("foods"))
  .show(false)

Outputs:

+----+---------------------------------------------+
|name|foods                                        |
+----+---------------------------------------------+
|john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
|bill|[[apple,0.99], [taco,2.59]]                  |
+----+---------------------------------------------+
Sign up to request clarification or add additional context in comments.

3 Comments

I want to mention that this approach looks cleaner than the accepted answer, but unfortunately doesn't work with spark 1.6, because collect_list() doesn't accept a struct.
Works in Spark 2.1
This works better than the accepted answer as the order is preserved.
38

The easiest way to do this as a DataFrame is to first collect two lists, and then use a UDF to zip the two lists together. Something like:

import org.apache.spark.sql.functions.{collect_list, udf}
import sqlContext.implicits._

val zipper = udf[Seq[(String, Double)], Seq[String], Seq[Double]](_.zip(_))

val df = Seq(
  ("john", "tomato", 1.99),
  ("john", "carrot", 0.45),
  ("bill", "apple", 0.99),
  ("john", "banana", 1.29),
  ("bill", "taco", 2.59)
).toDF("name", "food", "price")

val df2 = df.groupBy("name").agg(
  collect_list(col("food")) as "food",
  collect_list(col("price")) as "price" 
).withColumn("food", zipper(col("food"), col("price"))).drop("price")

df2.show(false)
# +----+---------------------------------------------+
# |name|food                                         |
# +----+---------------------------------------------+
# |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
# |bill|[[apple,0.99], [taco,2.59]]                  |
# +----+---------------------------------------------+

8 Comments

I used col(...) instead of $"..." for a reason -- I find col(...) works with less work inside of things like class definitions.
Is there any function to realign columns like for example in the zip function tell it to first add an element from the tail of the column and remove one from the head and then zip them? In this case you can have for example next price for the items if you read prices daily and there is a time column.
The answer assumes (maybe correctly) that collect_list() will preserve the order of elements on the two columns food & price. Meaning that food and price from the same row will end up at the same index in the two collected lists. Is this order preserving behavior guaranteed? (it would make sense, but I'm not sure by looking at the scala code for collect_list, not a scala programmer).
Afaik, there is no guarantee that the order of elements will be the same. cf : stackoverflow.com/questions/40407514/…
I used a variation of this solution to zip five lists together. This gave me the opportunity to write the best line of code of my career so far: _ zip _ zip _ zip _ zip _
|
10

Maybe a better way than the zip function (since UDF and UDAF are very bad to performance) is to wrap the two columns into Struct.

This would probably work as well:

df.select('name, struct('food, 'price).as("tuple"))
  .groupBy('name)
  .agg(collect_list('tuple).as("tuples"))

Comments

6

To your point collect_list appears to only work for one column : For collect_list to work on multiple columns you will have to wrap the columns you want as aggregate in a struct. For e.g :

     val aggregatedData = df.groupBy("name").agg(collect_list(struct("item", "price")) as("food"))

     aggregatedData.show
+----+------------------------------------------------+
|name|foods                                           |
+----+------------------------------------------------+
|john|[[tomato, 1.99], [carrot, 0.45], [banana, 1.29]]|
|bill|[[apple, 0.99], [taco, 2.59]]                   |
+----+------------------------------------------------+

Comments

2

Here is an option by converting the data frame to a RDD of Map and then call a groupByKey on it. The result would be a list of key-value pairs where value is a list of tuples.

df.show
+----+------+----+
|  _1|    _2|  _3|
+----+------+----+
|john|tomato|1.99|
|john|carrot|0.45|
|bill| apple|0.99|
|john|banana|1.29|
|bill|  taco|2.59|
+----+------+----+


val tuples = df.map(row => row(0) -> (row(1), row(2)))
tuples: org.apache.spark.rdd.RDD[(Any, (Any, Any))] = MapPartitionsRDD[102] at map at <console>:43

tuples.groupByKey().map{ case(x, y) => (x, y.toList) }.collect
res76: Array[(Any, List[(Any, Any)])] = Array((bill,List((apple,0.99), (taco,2.59))), (john,List((tomato,1.99), (carrot,0.45), (banana,1.29))))

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.