0

I'd like to implement a function in Scala/Spark, which can take multiple reducers/aggregators and execute them in a single pass. So basically I give the reduce functions and initial values and it should create a compound reduce operation in a single pass.

Here is what the logic would look like in Python

from functools import reduce

def reduce_at_once(data, reducer_funcs_inits):
    reducer_funcs, inits = zip(*reducer_funcs_inits)

    complete_reducer_func = lambda acc, y: tuple(rf(a_x, y) for a_x, rf in zip(acc, reducer_funcs))

    return list(reduce(complete_reducer_func, data, inits))

data = list(range(1, 20))
reducer_funcs_inits = [(lambda acc, y: acc + y, 0), # sum
                       (lambda acc, y: acc * y, 1)  # product
                       ]
print(list(reduce_at_once(data, reducer_funcs_inits)))
# [190, 121645100408832000]

How can I do something like this in Scala (Spark)? The issue seems that I have a list whose length I only know when calling, but also the elements of the list may have different types (reduce initial accumulator) depending on which reducer I want to include (not necessarily only numbers like here).

1
  • You should add the python label also Commented Dec 5, 2015 at 15:31

1 Answer 1

2

You can always use

def reduce_at_once(data: Any, reducer_funcs_inits: Any*)

but this is very rarely what you want. In particular, here you actually need

case class ReducerInit[A, B](f: (B, A) => B, init: B)

def reduce_at_once[A](data: Seq[A], rfis: ReducerInit[A, _]*): Seq[_]

Unfortunately, implementing reduce_at_once is going to be pretty ugly:

def reduce_at_once[A](data: Seq[A], rfis: ReducerInit[A, _]*): Seq[_] = {
  val rfs = rfis.map(_.f.asInstanceOf[(Any, A) => Any])
  val inits = rfis.map(_.init.asInstanceOf[Any])

  val crf = (acc: Seq[Any], y: A) => acc.zip(rfs).map { case (a_x, rf) => rf(a_x, y) }

  data.foldLeft(inits)(crf)
}

To check:

val data = 1 to 20

val rf1 = ReducerInit[Int, Int](_ + _, 0)
val rf2 = ReducerInit[Int, Int](_ * _, 1)

println(reduce_at_once(data, rf1, rf2))

gives ArrayBuffer(210, -2102132736) (note overflow).

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.