The point of async/await is to interleave tasks, not functions/generators. For example, when you await asyncio.sleep(1), your current coroutine is delayed along with the sleep. Similarly, an async for delays its coroutine until the next item is ready.
In order to run your separate functionality, you must create each part as a separate task. Use a Queue to exchange items between them - tasks will only be delayed until they have exchanged an item.
from asyncio import Queue, sleep, run, gather
# the original async generator
async def g():
for i in range(3):
await sleep(1)
yield i
async def producer(queue: Queue):
async for i in g():
print('send', i)
await queue.put(i) # resume once item is fetched
await queue.put(None)
async def consumer(queue: Queue):
x = await queue.get() # resume once item is fetched
while x is not None:
print('got', x)
await sleep(2)
x = await queue.get()
async def main():
queue = Queue()
# tasks only share the queue
await gather(
producer(queue),
consumer(queue),
)
run(main())
If you regularly need this functionality, you can also put it into a helper object that wraps an asynchronous iterable. The helper encapsulates the queue and separate task. You can apply the helper directly on an async iterable in an async for statement.
from asyncio import Queue, sleep, run, ensure_future
# helper to consume iterable as concurrent task
async def _enqueue_items(async_iterable, queue: Queue, sentinel):
async for item in async_iterable:
await queue.put(item)
await queue.put(sentinel)
async def concurrent(async_iterable):
"""Concurrently fetch items from ``async_iterable``"""
queue = Queue()
sentinel = object()
consumer = ensure_future( # concurrently fetch items for the iterable
_enqueue_items(async_iterable, queue, sentinel)
)
try:
item = await queue.get()
while item is not sentinel:
yield item
item = await queue.get()
finally:
consumer.cancel()
# the original generator
async def g():
for i in range(3):
await sleep(1)
yield i
# the original main - modified with `concurrent`
async def main():
async for x in concurrent(g()):
print(x)
await sleep(2)
run(main())
sleepingbefore yielding, and thus before the sleep inmain. Theasync/awaitsyntax exists to do multiple tasks concurrently, not to do one task concurrently. You have only one task, so there is nothing that can run concurrently.gto yieldibefore sleeping? Do you expectmainto receivexwhilegsleeps? Do you expectgto prepare the nextiwhilemainsleeps?sleepis some actual work in your real code? Is it computational or I/O?gto prepare the nextiwhilemainsleeps !sleepis some actual work in my real code?