4

Let's say I have the following DataFrame:

[Row(user='bob', values=[0.5, 0.3, 0.2]),
Row(user='bob', values=[0.1, 0.3, 0.6]),
Row(user='bob', values=[0.8, 0.1, 0.1])]

I would like to groupBy user and do something like avg(values) where the average is taken over each index of the array values like this:

[Row(user='bob', avgerages=[0.466667, 0.233333, 0.3])]

How can I do this in PySpark?

0

1 Answer 1

14

You can expand array and compute average for each index.

Python

from pyspark.sql.functions import array, avg, col

n = len(df.select("values").first()[0])

df.groupBy("user").agg(
    array(*[avg(col("values")[i]) for i in range(n)]).alias("averages")
)

Scala

import spark.implicits._
import org.apache.spark.functions.{avg, size}

val df = Seq(
  ("bob", Seq(0.5, 0.3, 0.2)),
  ("bob", Seq(0.1, 0.3, 0.6))
).toDF("user", "values")

val n = df.select(size($"values")).as[Int].first
val values = (0 to n).map(i => $"values"(i))

df.select($"user" +: values: _*).groupBy($"user").avg()
Sign up to request clarification or add additional context in comments.

10 Comments

What does the * do in this case? Also, is there a way like in Pandas where I could pass each group to a user-defined function and do the operation in there? Thanks.
* is a standard Python argument unpacking. No, Python doesn't support UDAFs. You can use RDDs directly or define JVM one.
Thanks! I think RDD makes sense here.
If you want to give RDD a try you can use a subset (compute_stats without collect) of this answer.
@Gevorg Here you are. You may also find interesting stackoverflow.com/q/41731865/1560062
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.