1

I have a queue which stored on Redis lists. I'm trying to create async consumer for this queue. But couldn't call async function inside loop. Its working like sync function when I call.

import asyncio

async def worker():
    print("starting sleep")
    await asyncio.sleep(2)
    print("slept")

async def main():
    while True:
        await worker()

asyncio.run(main())

Here is a short and simple example of mine implemantation. I'm expecting to see 'starting sleep' messages until first 'slept' message, it means for 2 seconds.

5
  • You await worker, so nothing inside main will progress until worker is done… What else did you expect? Commented Jun 5, 2020 at 15:02
  • I'm just excepting to see "starting sleep" messages until first "slept" message, which appears after 2 secs. So "starting sleep" messages for first 2secs after run. @deceze thanks for attention Commented Jun 5, 2020 at 15:04
  • No, for this you'd need to keep launching many workers without awaiting them, but you'd also need to put some await inside main, otherwise nothing else could ever run. An await puts a "break" into the code which allows other code to run. There's only one thread and tasks are cooperatively multitasking. Meaning, whenever a task awaits the completion of something else, it yields control back to the event loop, which will schedule other tasks to run in the meantime. Commented Jun 5, 2020 at 15:12
  • 1
    Use asyncio.wait to run workers in parallel and block until all are complete. Commented Jun 5, 2020 at 15:18
  • Yes, I want to spawn them in parallel. But couldn't find anything how to do in practically. When I call them via asyncio.wait or directly, raising you never awaited error as expected. Commented Jun 5, 2020 at 15:22

2 Answers 2

1

main is literally awaiting the completion of worker. Until worker is done, main won't progress. async tasks don't run in the background like in multithreading.

What you want is to keep launching new workers without awaiting each one of them. However, if you just keep doing this in a loop like this:

while True:
    worker()

then you will never see any output of those workers, since this is an endless loop which never gives anything else the chance to run. You'd need to "break" this loop in some way to allow workers to progress. Here's an example of that:

import asyncio

async def worker():
    print("starting sleep")
    await asyncio.sleep(2)
    print("slept")

async def main():
    while True:
        asyncio.ensure_future(worker())
        await asyncio.sleep(0.5)

asyncio.run(main())

This will produce the expected outcome:

starting sleep
starting sleep
starting sleep
starting sleep
slept
starting sleep
slept
...

The await inside main transfers control back to the event loop, which now has the chance to run the piled up worker tasks, When those worker tasks await, they in turn transfer control back to the event loop, which will transfer it back to either main or a worker as their awaited sleep completes.

Note that this is only for illustration purposes; if and when you interrupt this program, you'll see notices about unawaited tasks which haven't completed. You should keep track of your tasks and await them all to completion at the end somewhere.

Sign up to request clarification or add additional context in comments.

3 Comments

Now I can see my failure clearly. I'm already using await in main because of aioredis. Just I should spawn workers in background. Just changed it and everything working great. Thanks for your suggestions.
Hi @deceze, your solution has helped me to understand and solve my requirement partially. Thanks for that. As you mentioned in the task how to track the tasks that are still pending. My script is running fine for sometime and stopping without completing all the awaited tasks. Thanks.
is there a difference between ensure_future() and create_task()... ie. in both cases, just doing the same thing => creating future tasks ??
1

Here is an example using asyncio.wait:

import asyncio

async def worker():
    print("starting sleep")
    await asyncio.sleep(2)
    print("slept")

async def main():
    tasks = [worker() for each in range(10)]
    await asyncio.wait(tasks)

asyncio.run(main())

It spawns all the workers together.

1 Comment

I need to create workers dynamically because I need them when recieved a task from redis queue. deceze's answer covering my issue complately. Thanks a lot for your time too.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.