3

I am experiencing a behaviour that I don't understand while trying to implement a select()-like functionality with asyncio (on Python 3.6):

  • I have 2 queues (A & B) where messages are Queue.put by async producers
  • I have an async consumer (the selector) that polls the queues and takes the first message available (ie. a select()-like functionality using asyncio.wait_for.

The polling works like this:

poll = list(_.get() for _ in queues)
while True:
    done, pending = await asyncio.wait(tuple(poll), return_when=asyncio.FIRST_COMPLETED)

Then, iterating on the done futures, I replace the corresponding entries in poll with a fresh Queue.get:

    for f in done:
        try:
            i = poll.index(f._coro)
            ...
            poll[i] = queues[i].get()

Now, the odd behaviour that I experience is that the selection works, but some of the done futures .result() return None, and the messages sent through the queues are lost:

PUT B ('B', 0)/0
GET B ('B', 0)/0
PUT A ('A', 0)/0
GET A None/0         << ('A', 0) is never received
PUT A ('A', 1)/0
GET A ('A', 1)/0
PUT B ('B', 1)/0
GET B None/0
PUT A ('A', 2)/0
GET A None/0
PUT B ('B', 2)/0
GET B None/0

Here's the code

#!/usr/bin/env python3
import asyncio, random

async def queue_generator( name, queue, speed=1 ):
    counter = 0
    while True:
        t = (random.random() + 0.5) * speed
        await asyncio.sleep(t)
        m = (name, counter)
        print ("PUT {0} {1}/{2:d}".format(name, m, queue.qsize()))
        queue.put_nowait(m)
        counter += 1

async def select( *queues ):
    poll = list(_.get() for _ in queues)
    while True:
        # That's the select()-like functionalit
        done, pending = await asyncio.wait(tuple(poll), return_when=asyncio.FIRST_COMPLETED)
        for f in done:
            i = poll.index(f._coro)
            # That's where sometimes v is None
            v = f.result()
            print ("GET {0} {1}/{2}".format("AB"[i], v, queues[i].qsize()))
            poll[i] = queues[i].get()
        await asyncio.sleep(0.5)

if __name__ == "__main__":
    loop     = asyncio.get_event_loop()
    queue_a  = asyncio.Queue(10)
    queue_b  = asyncio.Queue(10)
    tasks = (
        loop.create_task(queue_generator("A", queue_a, 3)),
        loop.create_task(queue_generator("B", queue_b, 3)),
        loop.create_task(select(queue_a, queue_b))
    )
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

As this happens whenever there is a change in the queue that currently has a message, I assume the problem lies in doing something with the pending futures. In fact, adding

    for f in pending:
        f.cancel()

solves the issue, but I'd like to re-use these futures as much as I can. I suppose the problem comes from the fact that asyncio.wait_for silently transforms the list of generators into tasks.

3
  • 1
    get is a coroutine. You need to use either await queue.get() or queue.get_nowait(). Commented Sep 8, 2017 at 15:45
  • 1
    Related: awaitchannel Commented Sep 8, 2017 at 15:47
  • @Vincent isn't the await asyncio.wait(tuple(poll)...) doing exactly that? By doing await queue.get() sequentially I would block the whole process if let's say the first queue has no message but the second one gets filled. Commented Sep 8, 2017 at 16:34

1 Answer 1

1

I didn't dig deeply into what happens, but moving poll creating inside loop seems to fix things:

async def select( *queues ):
    while True:    
        poll = list(_.get() for _ in queues)  # HERE    
        # That's the select()-like functionalit
        done, pending = await asyncio.wait(tuple(poll), return_when=asyncio.FIRST_COMPLETED)

poll is a list of coroutines. Each unique coroutine usually should be awaited once. Doing otherwise may lead to strange things.

Sign up to request clarification or add additional context in comments.

1 Comment

It does fix the issue, and the explanation is that because Queue.get() is a coroutine, it must not be awaited twice, which would happen for pending coroutines after the first loop. In any case, select() is typically done within the loop, not before. Thanks!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.