7

Say I have two async generators:

async def get_rules():
    while True:
        yield 'rule=1'
        asyncio.sleep(2)


async def get_snapshots():
    while True:
        yield 'snapshot=1'
        asyncio.sleep(5)

I want to merge them into a single async generator that returns 2-tuples, with the latest value from both. Sort of combineLatest.

What is the best way to do this?

2
  • Can you clarify when you want the combined generator to yield? Does it only yield when both of the sub-generators do, or when either one does? Commented Jan 5, 2017 at 0:11
  • @Blckknght When either one does. Though the more I learn about asyncio, the less sure I become that this should work. I'm starting to think asyncio wants me to use a task, and somehow communicate the results of these functions with a queue or a channel of sorts. Commented Jan 5, 2017 at 0:35

2 Answers 2

7

You might want to have a look at aiostream, especially stream.merge and stream.accumulate:

import asyncio
from itertools import count
from aiostream import stream


async def get_rules():
    for x in count():
        await asyncio.sleep(2)
        yield 'rule', x


async def get_snapshots():
    for x in count():
        await asyncio.sleep(5)
        yield 'snapshot', x


async def main():
    xs = stream.merge(get_rules(), get_snapshots())
    ys = stream.map(xs, lambda x: {x[0]: x[1]})
    zs = stream.accumulate(ys, lambda x, e: {**x, **e}, {})

    async with zs.stream() as streamer:
        async for z in streamer:
            print(z)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

Output:

{}
{'rule': 0}
{'rule': 1}
{'rule': 1, 'snapshot': 0}
{'rule': 2, 'snapshot': 0}
[...]

See the project page and the documentation for further information.

Disclaimer: I am the project maintainer.

Sign up to request clarification or add additional context in comments.

2 Comments

Neat lib as long as your project can live with GPL license.
@Rotareti Thanks! Personally, I'm fine with copyleft licenses :)
1

I came up with this:

async def combine(**generators):
    """Given a bunch of async generators, merges the events from
    all of them. Each should have a name, i.e. `foo=gen, bar=gen`.
    """
    combined = Channel()
    async def listen_and_forward(name, generator):
        async for value in generator:
            await combined.put({name: value})
    for name, generator in generators.items():
        asyncio.Task(listen_and_forward(name, generator))

    async for item in combined:
        yield item


async def combine_latest(**generators):
    """Like "combine", but always includes the latest value from
    every generator.
    """
    current = {}
    async for value in combine(**generators):
        current.update(value)
        yield current

Call it like so:

async for item in combine_latest(rules=rulesgen, snap=snapgen):
    print(item)

Output looks like this:

{'rules': 'rule-1'}
{'rules': 'rule-1', 'snap': 'snapshot-1'}
{'rules': 'rule-1', 'snap': 'snapshot-1'}
....

I am using aiochannel, but a normal asyncio.Queue should be fine, too.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.