1

I have 200 Mil rows with 1K groups looking like this

Group     X             Y             Z          Q           W
group1  0.054464866 0.002248819 0.299069804 0.763352879 0.395905106
group2  0.9986218   0.023649037 0.50762069  0.212225807 0.619571705
group1  0.839928517 0.290339179 0.050407454 0.75837838  0.495466007
group1  0.021003132 0.663366686 0.687928832 0.239132224 0.020848608
group1  0.393843426 0.006299292 0.141103438 0.858481036 0.715860852
group2  0.045960198 0.014858905 0.672267793 0.59750871  0.893646818

I want to run the same function (say linear regression of X on [X, Z, Q, W]) for each of the groups. I could have done Window.partition etc. but I have my own function. At the moment, I do the following:

df.select("Group").distinct.collect.toList.foreach{group => 
val dfGroup = df.filter(col("Group")===group
dfGroup.withColumn("res", myUdf(col("X"), col("Y"), col("Z"), col("Q"), col("W"))}

Wonder if there is a better way to do?

2
  • an UDF acts on a single row, so why don't you just use your UDF on the original dataframe, there is no point in just selecting one Group. Commented Jun 24, 2017 at 12:07
  • If you want to do linear regression (which involves all records for a group), I would do something like df.repartition($"Group").mapPartitions{rows => rows.toSeq.groupBy(row => row.getAs[String]("Group")).mapValues(...)} Commented Jun 24, 2017 at 12:12

1 Answer 1

1

You have minimum two options depending what you prefer: DataFrame or Dataset.

DataFrame with UDAF

df
  .groupBy("group")
  .agg(myUdaf(col("col1"), col("col2")))

where myUdaf is UDAF

Here you can find example how to implement UDAF: https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html

Dataset

You can use groupByKey and mapGroups transformations from Dataset API:

ds
  .groupByKey(_.group)
  .mapGroups{case (group, values) =>
    (group, aggregator(values))
  }

where aggregator is Scala function responsible for aggregating collection of objects.

If you don't need aggregating you can just map values using map transformation, example:

values.map(v => fun(...))
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.