I am experiencing a behaviour that I don't understand while trying to implement a select()-like functionality with asyncio (on Python 3.6):
- I have 2 queues (A & B) where messages are
Queue.putby async producers - I have an async consumer (the selector) that polls the queues and takes the first message available (ie. a
select()-like functionality usingasyncio.wait_for.
The polling works like this:
poll = list(_.get() for _ in queues)
while True:
done, pending = await asyncio.wait(tuple(poll), return_when=asyncio.FIRST_COMPLETED)
Then, iterating on the done futures, I replace the corresponding entries in poll with a fresh Queue.get:
for f in done:
try:
i = poll.index(f._coro)
...
poll[i] = queues[i].get()
Now, the odd behaviour that I experience is that the selection works, but some of the done futures .result() return None, and the messages sent through the queues are lost:
PUT B ('B', 0)/0
GET B ('B', 0)/0
PUT A ('A', 0)/0
GET A None/0 << ('A', 0) is never received
PUT A ('A', 1)/0
GET A ('A', 1)/0
PUT B ('B', 1)/0
GET B None/0
PUT A ('A', 2)/0
GET A None/0
PUT B ('B', 2)/0
GET B None/0
Here's the code
#!/usr/bin/env python3
import asyncio, random
async def queue_generator( name, queue, speed=1 ):
counter = 0
while True:
t = (random.random() + 0.5) * speed
await asyncio.sleep(t)
m = (name, counter)
print ("PUT {0} {1}/{2:d}".format(name, m, queue.qsize()))
queue.put_nowait(m)
counter += 1
async def select( *queues ):
poll = list(_.get() for _ in queues)
while True:
# That's the select()-like functionalit
done, pending = await asyncio.wait(tuple(poll), return_when=asyncio.FIRST_COMPLETED)
for f in done:
i = poll.index(f._coro)
# That's where sometimes v is None
v = f.result()
print ("GET {0} {1}/{2}".format("AB"[i], v, queues[i].qsize()))
poll[i] = queues[i].get()
await asyncio.sleep(0.5)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
queue_a = asyncio.Queue(10)
queue_b = asyncio.Queue(10)
tasks = (
loop.create_task(queue_generator("A", queue_a, 3)),
loop.create_task(queue_generator("B", queue_b, 3)),
loop.create_task(select(queue_a, queue_b))
)
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
As this happens whenever there is a change in the queue that currently has a message, I assume the problem lies in doing something with the pending futures. In fact, adding
for f in pending:
f.cancel()
solves the issue, but I'd like to re-use these futures as much as I can. I suppose the problem comes from the fact that asyncio.wait_for silently transforms the list of generators into tasks.
getis a coroutine. You need to use eitherawait queue.get()orqueue.get_nowait().await asyncio.wait(tuple(poll)...)doing exactly that? By doingawait queue.get()sequentially I would block the whole process if let's say the first queue has no message but the second one gets filled.