30

I'm doing a pipeline code refactoring using python.

Assuming we have a series of generator functions and we want to chain those to form a data processing pipeline.

Example:

#!/usr/bin/python
import itertools

def foo1(g):
    for i in g:
        yield i + 1

def foo2(g):
    for i in g:
        yield 10 + i

def foo3(g):
    for i in g:
        yield 'foo3:' + str(i)

res = foo3(foo2(foo1(range(0, 5))))

for i in res:
    print i

Output:

foo3:11
foo3:12
foo3:13
foo3:14
foo3:15

I do not think foo3(foo2(foo1(range(0, 5)))) is a pythonic way to achieve my pipeline goal. Especially when the number of stages in the pipeline is large.

I wish I could rewrite it like chain in jquery. Something similar to :

range(0, 5).foo1().foo2().foo3()

Or maybe

l = [range(0, 5), foo1, foo2, foo3]
res = runner.run(l)

But I'm new to generator topic and couldn't find a way to achieve this.

Any help will be welcome.

3
  • Maybe with itertools.accumulate or functools.reduce? (both are part of the standard library) Commented Aug 4, 2016 at 0:38
  • This sounds like some XY-problem. If you are doing number crunching on arrays/lists consider using numpy/pandas. Commented Aug 4, 2016 at 0:39
  • 1
    maxymoo's answer is possible the best, but you can also abuse operator overloading in devious ways (this is frowned upon in Python), see this for inspiration: stackoverflow.com/questions/33658355/… Commented Aug 4, 2016 at 1:07

7 Answers 7

35

I sometimes like to use a left fold (called reduce in Python) for this type of situation:

from functools import reduce
def pipeline(*steps):
    return reduce(lambda x, y: y(x), list(steps))

res = pipeline(range(0, 5), foo1, foo2, foo3)

Or even better:

def compose(*funcs):
    return lambda x: reduce(lambda f, g: g(f), list(funcs), x)

p = compose(foo1, foo2, foo3)
res = p(range(0, 5))
Sign up to request clarification or add additional context in comments.

9 Comments

Apart from being less efficient than the chained foo(foo1(...) this isn't exactly what I would call more readable even if there are lots of functions.
well it's a matter of taste, you could define an alias for this like @john1024 did, but for me, i just recognise the pattern as a pipeline, and focus on the list of functions as the meaningful part, the main thing is that they're pipelined left-to-right instead of right-to-left as in the function composition in the question
@MSeifert - how is this less efficient? It strikes me as identically efficient.
@MSeifert: now imagine when you have 10-20 different generators with names that are actually descriptive.
Yes, the efficiency doesn't suffer if one operates on largish data-sets. I don't think the answer is wrong or not recommendable. I just wanted to point out that in my opinion something like foo1(foo2(...)) is more readable in the long run.
|
4

I do not think foo3(foo2(foo1(range(0, 5)))) is a pythonic way to achieve my pipeline goal. Especially when the number of stages in the pipeline is large.

There is a fairly trivial, and in my opinion clear, way of chaining generators: assigning the result of each to a variable, where each can have a descriptive name.

range_iter = range(0, 5)
foo1_iter = foo1(range_iter)
foo2_iter = foo2(foo1_iter)
foo3_iter = foo3(foo2_iter)

for i in foo3_iter:
  print(i)

I prefer this to a something that uses a higher order function, e.g. a reduce or similar:

  • In my real cases, often each foo* generator function needs its own other parameters, which is tricky if using a reduce.

  • In my real cases, the steps in the pipeline are not dynamic at runtime: it seems a bit odd/unexpected (to me) to have a pattern that seems more appropriate for a dynamic case.

  • It's a bit inconsistent with how regular functions are typically written where each is called explicitly, and the result of each is passed to the call of the next. Yes, I guess a bit of duplication, but I'm happy with "calling a function" being duplicated since (to me) it's really clear.

  • No need for an import: it uses core language features.

Comments

3

Although this implementation requires some overhead, I prefer to use >> operator for chaining pipeline steps; similar to how tasks are arranged into a dag in Airflow.

def foo1(g):
    for i in g:
        yield i + 1


def foo2(g):
    for i in g:
        yield 10 + i


def foo3(g):
    for i in g:
        yield "foo3:" + str(i)


def print_loop(g):
    for i in g:
        print(i)


class PipelineOperator:
    def __init__(self, task):
        self.task = task

    def __rrshift__(self, x):
        return self.task(x)


foo1_t = PipelineOperator(foo1)
foo2_t = PipelineOperator(foo2)
foo3_t = PipelineOperator(foo3)
print_loop_t = PipelineOperator(print_loop)

(range(0, 5) >> foo1_t >> foo2_t >> foo3_t >> print_loop_t)

Comments

2

Following up on your runner.run approach, let's define this utility function:

def recur(ops):
    return ops[0](recur(ops[1:])) if len(ops)>1 else ops[0]

As an example:

>>> ops = foo3, foo2, foo1, range(0, 5)
>>> list( recur(ops) )
['foo3:11', 'foo3:12', 'foo3:13', 'foo3:14', 'foo3:15']

Alternative: backward ordering

def backw(ops):
    return ops[-1](backw(ops[:-1])) if len(ops)>1 else ops[0]

For example:

>>> list( backw([range(0, 5), foo1, foo2, foo3]) )
['foo3:11', 'foo3:12', 'foo3:13', 'foo3:14', 'foo3:15']

3 Comments

it won't work if you have more than 1000 functions though!
I think having the functions in the reverse order is confusion. What about return ops[-1](recur(ops[:-1])) if len(ops)>1 else ops[0]
@rbierman Very good. Yes, that works also. Answer updated with code for reverse ordering.
2

Here is another answer in case the function in your example are one-time(or one-use) function. Some nice variable naming and use of generator expression can be helpful for small operations.

>>> g = range(0, 5)
>>> foo1 = (x+1 for x in g)
>>> foo2 = (x+10 for x in foo1)
>>> foo3 = ('foo3:' + str(x) for x in foo2)
>>> for x in foo3:
...     print x
...
foo3:11
foo3:12
foo3:13
foo3:14
foo3:15

Comments

2

You can compose curried generator functions using PyMonad:

def main():
    odds = list * \
         non_divisibles(2) * \
         lengths * \
         Just(["1", "22", "333", "4444", "55555"])
    print(odds.getValue())    #prints [1, 3, 5]


@curry
def lengths(words: Iterable[Sized]) -> Iterable[int]:
    return map(len, words)


@curry
def non_divisibles(div: int, numbers: Iterable[int]) -> Iterable[int]:
    return (n for n in numbers if n % div)

Another alternative is to start with a Monad and compose the generators using fmap calls - this syntax is familiar to Java 8 Stream users:

def main():
    odds = Just(["1", "22", "333", "4444", "55555"]) \
        .fmap(lengths) \
        .fmap(non_divisibles(2)) \
        .fmap(list) \
        .getValue()
    print(odds)   #prints [1, 3, 5]


def lengths(words: Iterable[Sized]) -> Iterable[int]:
    return map(len, words)


@curry
def non_divisibles(div: int, numbers: Iterable[int]) -> Iterable[int]:
    return (n for n in numbers if n % div)

Note that the functions don't need to be decorated with @curry in this case. The entire chain of transformations is not evaluated until the terminal getValue() call.

Comments

2

For future readers: another solution that is very pythonic (IMHO):

steps = [
    foo1, 
    foo2, 
    foo3
    ]

res = range(0, 5)
for step in steps:
    res = step(res)

for i in res:
    print(i)

foo3:11
foo3:12
foo3:13
foo3:14
foo3:15

This is essentially doing the same as functools.reduce like in maxymoo's answer. The laziness of generators allows this simple formulation without functools.

1 Comment

This works great and reads well for non-lazy functions with side-effects in a pipeline too. Including from functools import partial and passing in additional custom parameters as necessary worked for me.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.