14

UPDATED QUESTION FOR CLARITY:

suppose I have 2 processing generator functions:

def gen1(): # just for examples,
  yield 1   # yields actually carry 
  yield 2   # different computation weight 
  yield 3   # in my case

def gen2():
  yield 4
  yield 5
  yield 6

I can chain them with itertools

from itertools import chain

mix = chain(gen1(), gen2())

and then I can create another generator function object with it,

def mix_yield():
   for item in mix:
      yield item

or simply if I just want to next(mix), it's there.

My question is, how can I do the equivalent in asynchronous code?

Because I need it to:

  • return in yield (one by one), or with next iterator
  • the fastest resolved yield first (async)

PREV. UPDATE:

After experimenting and researching, I found aiostream library which states as async version of itertools, so what I did:

import asyncio
from aiostream import stream

async def gen1(): 
     await asyncio.sleep(0) 
     yield 1 
     await asyncio.sleep(0) 
     yield 2 
     await asyncio.sleep(0) 
     yield 3 

async def gen2(): 
     await asyncio.sleep(0) 
     yield 4 
     await asyncio.sleep(0) 
     yield 5 
     await asyncio.sleep(0) 
     yield 6 

a_mix = stream.combine.merge(gen1(),gen2())

async def a_mix_yield():
   for item in a_mix:
      yield item

but I still can't do next(a_mix)

TypeError: 'merge' object is not an iterator

or next(await a_mix)

raise StreamEmpty()

Although I still can make it into a list:

print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]

so one goal is completed, one more to go:

  • return in yield (one by one), or with next iterator

    - the fastest resolved yield first (async)

3
  • Your code above is just creating a couple generators and iterating through them. Hence why you are seeing them printed in order. You could iterate through gen2 first and it would print 4,5,6,1,2,3. Perhaps you should find a different example to show what you're trying to do. Commented Nov 22, 2018 at 14:37
  • In my case gen1() and gen2() yields not at the same time, I will update my question and I think already found the answer with aiostream (I hope). Commented Nov 22, 2018 at 15:00
  • Sorry people for the confusion, I updated the question for clarity. Commented Nov 22, 2018 at 17:49

3 Answers 3

14

Python's next built-in function is just a convenient way of invoking the underlying __next__ method on the object. The async equivalent of __next__ is the __anext__ method on the async iterator. Prior to Python 3.10 there was no anext global function in the standard library (the aiostream library provided one), but one could easily write it:

async def anext(aiterator):
    return await aiterator.__anext__()

The async iterator is in turn obtained from an async iterable by calling the __aiter__ (in analogy to __iter__ provided by regular iterables). Async iteration driven manually looks like this:

a_iterator = obj.__aiter__()          # regular method
elem1 = await a_iterator.__anext__()  # async method
elem2 = await a_iterator.__anext__()  # async method
...

__anext__ will raise StopAsyncIteration when no more elements are available. To loop over async iterators one should use async for.

Here is a runnable example, based on your code, using both __anext__ and async for to exhaust the stream set up with aiostream.stream.combine.merge:

async def main():
    a_mix = stream.combine.merge(gen1(), gen2())
    async with a_mix.stream() as streamer:
        mix_iter = streamer.__aiter__()    
        print(await mix_iter.__anext__())
        print(await mix_iter.__anext__())
        print('remaining:')
        async for x in mix_iter:
            print(x)

asyncio.get_event_loop().run_until_complete(main())
Sign up to request clarification or add additional context in comments.

9 Comments

Thank you so much, I have to re-read it multiple times and connect the dots before completely understand.
@Ardhi Another good resource is the PEP that introduced them.
Just mentioning that entering a streaming context in aiostream is meant to be done using async with zs.stream() as streamer:, as seen in this demonstration.
@Vincent Thanks, I've now amended the answer to use the advertised pattern.
+1. One potential improvement to this answer would be to highlight how easily you can make an async equivalent of next given this knowledge - something like async def anext(async_iterator): return await async_iterator.__anext__()
|
4

I came across this answer and I looked at the aiostream library. Here is the code I came up with to merge multiple async generators. It does not use any library.

async def merge_generators(gens:Set[AsyncGenerator[Any, None]]) -> AsyncGenerator[Any, None]:
    pending = gens.copy()
    pending_tasks = { asyncio.ensure_future(g.__anext__()): g for g in pending }
    while len(pending_tasks) > 0:
        done, _ = await asyncio.wait(pending_tasks.keys(), return_when="FIRST_COMPLETED")
        for d in done:
            try:
                result = d.result()
                yield result
                dg = pending_tasks[d]
                pending_tasks[asyncio.ensure_future(dg.__anext__())] = dg
            except StopAsyncIteration as sai:
                print("Exception in getting result", sai)
            finally:
                del pending_tasks[d]

Hope this helps you and let me know if there are any bugs in this.

Comments

0

I've managed to merge and chain several async generators with these simple helpers.

Different error strategies can be applied so I choose not to show any error For clarity I don't show error handling as different

import asyncio


async def merge(*streams):
    n = len(streams)
    queue = asyncio.Queue()
    signal = object()
    async def enqueue(stream):
        async for event in stream:
            await queue.put(event)
        await queue.put(signal)
    tasks = [asyncio.create_task(enqueue(stream)) for stream in streams]
    while n > 0:
        event = await queue.get()
        if event is signal:
            n -= 1
        else:
            yield event
    await asyncio.wait(tasks)


async def chain(*streams):
    for stream in streams:
        async for item in stream:
            yield item

Example usage:

async def gen(name, n, nap):
    for i in range(n):
        await asyncio.sleep(nap)
        yield f"Event #{i} for {name}"


async def main():
    print("Merging 2 async generators")
    g1 = gen("task A", 3, 0.5)
    g2 = gen("task B", 6, 0.3)
    async for item in merge(g1, g2):
        print(f"  {item}")

    print("Chaining 2 async generators")
    g1 = gen("task A", 3, 0.5)
    g2 = gen("task B", 6, 0.3)
    async for item in chain(g1, g2):
        print(f"  {item}")

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.