0

I have one producer and 3 consumers. Each consumer waits to acquire a global lock before it can proceed. The program runs and doesnt finish and come out of the while loop. Could you tell me where it is going wrong?

import asyncio
import random

async def produce(queue, n):
    for x in range(1, n + 1):
        # produce an item
        print('producing {}/{}'.format(x, n))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())
        item = str(x)
        # put the item in the queue
        await queue.put(item)

    # indicate the producer is done
    await queue.put(None)


async def consume(queue, lock):
    while True:
        item = await queue.get()
        if item is None:
            # the producer emits None to indicate that it is done
            break
        # wait for an item from the producer
        async with lock:
            # process the item
            print('consuming item {}...'.format(item))
            # simulate i/o operation using sleep
            await asyncio.sleep(0.3)


loop = asyncio.get_event_loop()
lock = asyncio.Lock()
queue = asyncio.Queue(loop=loop)
producer_coro = produce(queue, 10)
consumers = []
for _ in range(3):
    consumers.append(consume(queue, lock))
all_coroutines = []
all_coroutines.append(producer_coro)
all_coroutines.extend(consumers)
loop.run_until_complete(asyncio.wait(all_coroutines))
loop.close()
0

2 Answers 2

2

The problem is in the consumer:

        if item is None:
            # the producer emits None to indicate that it is done
            break

The None sentinel is only picked up by a single consumer, and the rest are left waiting for the next value to arrive through the queue. A simple fix is to return the sentinel value back to the queue:

        if item is None:
            # The producer emits None to indicate that it is done.
            # Propagate it to other consumers and quit.
            await queue.put(None)
            break

Alternatively, produce could enqueue as many None sentinels as there are consumers - but that would require the producer to know how many consumers there are, which is not always desirable.

Sign up to request clarification or add additional context in comments.

Comments

0

Adding to the answer @user4815162342 provided, try:

if item is None and queue.qsize() == 0:
    await queue.put(None)
    break

I had an issue where the consumer also had to queue.put() to the same queue to rerun the function but it hung at the end without the both conditions.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.