8

My code is as follows. I want the two sleep can share the same time frame and take 1+2*3=7 seconds to run the script. But it seems that something wrong happened so that it still takes 3*(1+2) second.

Is there any idea how to modify the code?

import asyncio

async def g():
    for i in range(3):
        await asyncio.sleep(1)
        yield i

async def main():
    async for x in g():
        print(x)
        await asyncio.sleep(2)

loop = asyncio.get_event_loop()
res = loop.run_until_complete(main())
loop.close()

4
  • You are still running the sleep in g before yielding, and thus before the sleep in main. The async/await syntax exists to do multiple tasks concurrently, not to do one task concurrently. You have only one task, so there is nothing that can run concurrently. Commented Jul 5, 2019 at 11:29
  • 1
    What is your expected outcome? Do you expect g to yield i before sleeping? Do you expect main to receive x while g sleeps? Do you expect g to prepare the next i while main sleeps? Commented Jul 5, 2019 at 11:36
  • I assume your sleep is some actual work in your real code? Is it computational or I/O? Commented Jul 5, 2019 at 11:40
  • 1
    Yes, I expect g to prepare the next i while main sleeps ! sleep is some actual work in my real code? Commented Jul 5, 2019 at 11:42

2 Answers 2

9

The point of async/await is to interleave tasks, not functions/generators. For example, when you await asyncio.sleep(1), your current coroutine is delayed along with the sleep. Similarly, an async for delays its coroutine until the next item is ready.

In order to run your separate functionality, you must create each part as a separate task. Use a Queue to exchange items between them - tasks will only be delayed until they have exchanged an item.

from asyncio import Queue, sleep, run, gather


# the original async generator
async def g():
    for i in range(3):
        await sleep(1)
        yield i


async def producer(queue: Queue):
    async for i in g():
        print('send', i)
        await queue.put(i)  # resume once item is fetched
    await queue.put(None)


async def consumer(queue: Queue):
    x = await queue.get()  # resume once item is fetched
    while x is not None:
        print('got', x)
        await sleep(2)
        x = await queue.get()


async def main():
    queue = Queue()
    # tasks only share the queue
    await gather(
        producer(queue),
        consumer(queue),
    )


run(main())

If you regularly need this functionality, you can also put it into a helper object that wraps an asynchronous iterable. The helper encapsulates the queue and separate task. You can apply the helper directly on an async iterable in an async for statement.

from asyncio import Queue, sleep, run, ensure_future


# helper to consume iterable as concurrent task
async def _enqueue_items(async_iterable, queue: Queue, sentinel):
    async for item in async_iterable:
        await queue.put(item)
    await queue.put(sentinel)


async def concurrent(async_iterable):
    """Concurrently fetch items from ``async_iterable``"""
    queue = Queue()
    sentinel = object()
    consumer = ensure_future(  # concurrently fetch items for the iterable
        _enqueue_items(async_iterable, queue, sentinel)
    )
    try:
        item = await queue.get()
        while item is not sentinel:
            yield item
            item = await queue.get()
    finally:
        consumer.cancel()


# the original generator
async def g():
    for i in range(3):
        await sleep(1)
        yield i


# the original main - modified with `concurrent`
async def main():
    async for x in concurrent(g()):
        print(x)
        await sleep(2)


run(main())
Sign up to request clarification or add additional context in comments.

1 Comment

@cjs840312 Since this is a pattern that occurs regularly, I've now added an example helper that can be wrapped around existing async generators.
4

As an alternative to doing this with a Queue, instead this solution chain Futures together, so that a Future's result is the current item and another Future to retrieve the next item (sort of like a linked list, so to speak):

from asyncio import sleep, get_event_loop, run, create_task

async def aiter(fut, async_generator):
    try:
        async for item in async_generator:
            fut, prev_fut = get_event_loop().create_future(), fut
            prev_fut.set_result((item, fut))
        else:
            fut.set_exception(StopAsyncIteration())
    except Exception as e:
        fut.set_exception(e)


async def concurrent(async_generator):
    fut = get_event_loop().create_future()
    create_task(aiter(fut, async_generator))

    try:
        while True:
            item, fut = await fut
            yield item
    except StopAsyncIteration as e:
        return

As an added bonus this solution will correctly handle exception that happens in g() by reraising the exception in the main() method with a traceback that will be useful for debugging.

1 Comment

Very nice because it guarantees a constant memory usage!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.