3

I'm sure this is something very simple but I didn't find anything related to this.

My code is simple:

... 
stream = stream.map(mapper) 
stream = stream.reduceByKey(reducer) 
... 

Nothing extraordinary.The output looks like this:

... 
key1  value1 
key2  [value2, value3] 
key3  [[value4, value5], value6] 
... 

And so on. So, sometimes I got a flat value (if it's single). Sometimes - nested lists that might be really, really deep (on my simple test data it was 3 levels deep).

I tried searching through sources for something like 'flat' - but found only flatMap method which is (as I understand it) not what I need.

I don't know why those lists are nested. My guess is that they were handled by different processes (workers?) and then joined together without flattening.

Of course, I can write a code in Python which will unfold that list and flatten it. But I believe this is not a normal situation - I think almost everybody needs a flat output.

itertools.chain stops unfolding on fist found non-iterable value. In other words, it still needs some coding (previous paragraph).

So - how to flatten the list using PySpark's native methods?

Thanks

2
  • 1
    What's your reduce function (reducer)? Commented Jan 12, 2014 at 18:45
  • @JoshRosen just "return [key, value]" Commented Jan 13, 2014 at 5:20

2 Answers 2

5

The problem here is your reduce function. For each key, reduceByKey calls your reduce function with pairs of values and expects it to produce combined values of the same type.

For example, say that I wanted to perform a word count operation. First, I can map each word to a (word, 1) pair, then I can reduceByKey(lambda x, y: x + y) to sum up the counts for each word. At the end, I'm left with an RDD of (word, count) pairs.

Here's an example from the PySpark API Documentation:

>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)]

To understand why your example didn't work, you can imagine the reduce function being applied something like this:

reduce(reduce(reduce(firstValue, secondValue), thirdValue), fourthValue) ...

Based on your reduce function, it sounds like you might be trying to implement the built-in groupByKey operation, which groups each key with a list of its values.

Also, take a look at combineByKey, a generalization of reduceByKey() that allows the reduce function's input and output types to differ (reduceByKey is implemented in terms of combineByKey)

Sign up to request clarification or add additional context in comments.

1 Comment

ouch... I must say, Spark's approach differs from many MR frameworks. It takes some time to port some working MRJob or Disco code there.
1

Alternatively, stream.groupByKey().mapValues(lambda x: list(x)).collect() gives

key1 [value1]
key2 [value2, value3]
key3 [value4, value5, value6]

2 Comments

or just .groupByKey().mapValues(list)
or .reduceByKey(lambda a,b: (a if type(a) == list else [a]) + (b if type(b) == list else [b])).collect()

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.