12

I need to know when a Queue is closed and wont have more items so I can end the iteration.

I did it by putting a sentinel in the queue:

from Queue import Queue

class IterableQueue(Queue): 

    _sentinel = object()

    def __iter__(self):
        return self

    def close(self):
        self.put(self._sentinel)

    def next(self):
        item = self.get()
        if item is self._sentinel:
            raise StopIteration
        else:
            return item

Given that this is a very common use for a queue, isn't there any builtin implementation?

1
  • I either use the sentinel, or a flag within the thread to stop the iteration over the queue. For the later, I usually wait with a timeout. Commented Jul 2, 2012 at 5:30

4 Answers 4

16

A sentinel is a reasonable way for a producer to send a message that no more queue tasks are forthcoming.

FWIW, your code can be simplified quite a bit with the two argument form of iter():

from Queue import Queue

class IterableQueue(Queue): 

    _sentinel = object()

    def __iter__(self):
        return iter(self.get, self._sentinel)

    def close(self):
        self.put(self._sentinel)
Sign up to request clarification or add additional context in comments.

Comments

4

The multiprocessing module has its own version of Queue that does include a close method. I am not sure how it works in threading, but its worth a try. I don't see why it shouldn't work the same:

from multiprocessing import Queue

q = Queue()
q.put(1)
q.get_nowait()
# 1
q.close()
q.get_nowait()
# ...
# IOError: handle out of range in select()

You could just catch the IOError as the close signal.

TEST

from multiprocessing import Queue
from threading import Thread

def worker(q):
    while True:
        try:
            item = q.get(timeout=.5)
        except IOError:
            print "Queue closed. Exiting thread."
            return
        except:
            continue
        print "Got item:", item

q = Queue()
for i in xrange(3):
    q.put(i)
t = Thread(target=worker, args=(q,))
t.start()
# Got item: 0
# Got item: 1
# Got item: 2
q.close()
# Queue closed. Exiting thread.

Though to be honest, its not too much different than setting a flag on the Queue.Queue. The multiprocessing.Queue is just using a closed file descriptor as a flag:

from Queue import Queue

def worker2(q):
    while True:
        if q.closed:
            print "Queue closed. Exiting thread."
            return
        try:
            item = q.get(timeout=.5)
        except:
            continue
        print "Got item:", item

q = Queue()
q.closed = False
for i in xrange(3):
    q.put(i)
t = Thread(target=worker2, args=(q,))
t.start()
# Got item: 0
# Got item: 1
# Got item: 2
q.closed = True
# Queue closed. Exiting thread.

Comments

0

An old question, and variations of self._sentinel = Object() will work. Revisiting this in 2021, I would instead suggest using concurrent.futures combined with using None as your sentinel:

# Note: this is Python 3.8+ code                                                                                                                                                   

import queue
import time
import functools
import random
from concurrent.futures import ThreadPoolExecutor

def worker(tup):
    (q,i) = tup
    print(f"Starting thread {i}")
    partial_sum = 0
    numbers_added = 0
    while True:
        try:
            item = q.get()
            if item is None:
                # 'propagate' this 'sentinel' to anybody else                                                                                                                      
                q.put(None)
                break
            numbers_added += 1
            partial_sum += item
            # need to pretend that we're doing something asynchronous                                                                                                              
            time.sleep(random.random()/100)

    except Exception as e:
            print(f"(warning) Thread {i} got an exception {e}, that shouldn't happen.")
            break

    print(f"Thread {i} is done, saw a total of {numbers_added} numbers to add up")
    return partial_sum

MAX_RANGE = 1024
MAX_THREADS = 12

with ThreadPoolExecutor() as executor:

    # create a queue with numbers to add up                                                                                                                                        
    (q := queue.Queue()).queue = queue.deque(range(MAX_RANGE))

    # kick off the threads                                                                                                                                                         
    future_partials = executor.map(worker, [(q,i) for i in range(MAX_THREADS)])

    # they'll be done more or less instantly, but we'll make them wait                                                                                                             
    print("Threads launched with first batch ... sleeping 2 seconds")
    time.sleep(2)

    # threads are still available for more work!                                                                                                                                   
    for i in range(MAX_RANGE):
        q.put(i)

    print("Finished giving them another batch, this time we're not sleeping")

    # now we tell them all to wrap it up                                                                                                                                           
    q.put(None)
    # this will nicely catch the outputs                                                                                                                                           
    sum = functools.reduce(lambda x, y: x+y, future_partials)
    print(f"Got total sum {sum} (correct answer is {(MAX_RANGE-1)*MAX_RANGE}")

# Starting thread 0                                                                                                                                                                
# Starting thread 1                                                                                                                                                                
# Starting thread 2                                                                                                                                                                
# Starting thread 3                                                                                                                                                                
# Starting thread 4                                                                                                                                                                
# Starting thread 5                                                                                                                                                                
# Starting thread 6                                                                                                                                                                
# Starting thread 7                                                                                                                                                                
# Starting thread 8                                                                                                                                                                
# Starting thread 9                                                                                                                                                                
# Starting thread 10                                                                                                                                                               
# Starting thread 11                                                                                                                                                               
# Threads launched with first batch ... sleeping 2 seconds                                                                                                                         
# Finished giving them another batch, this time we're not sleeping                                                                                                                 
# Thread 0 is done, saw a total of 175 numbers to add up                                                                                                                           
# Thread 3 is done, saw a total of 178 numbers to add up                                                                                                                           
# Thread 11 is done, saw a total of 173 numbers to add up                                                                                                                          
# Thread 4 is done, saw a total of 177 numbers to add up                                                                                                                           
# Thread 9 is done, saw a total of 169 numbers to add up                                                                                                                           
# Thread 1 is done, saw a total of 172 numbers to add up                                                                                                                           
# Thread 7 is done, saw a total of 162 numbers to add up                                                                                                                           
# Thread 10 is done, saw a total of 161 numbers to add up                                                                                                                          
# Thread 5 is done, saw a total of 169 numbers to add up                                                                                                                           
# Thread 2 is done, saw a total of 157 numbers to add up                                                                                                                           
# Thread 6 is done, saw a total of 169 numbers to add up                                                                                                                           
# Thread 8 is done, saw a total of 186 numbers to add up                                                                                                                           
# Got total sum 1047552 (correct answer is 1047552      

                                                                                                                       

Note how the de facto 'master thread' just need to push None into the queue, similar to a conditional variable 'signal', which the threads all pick up (and propagate).

Also, this does not use the multiprocessor Queue which is heavier-weight than the standard (thread-safe) queue. The above code also has the benefit of easily being modified to using ProcessPoolExecutor or hybrids of both (in either case yes you would need to use multiprocessing.Queue).

(Side note: generally speaking, if classes are needed to solve a "fundamental" issue in any given generation of Python, there are often new options in more modern versions.)

(Second side note: The only reason the code is Python 3.8+ is because I'm a fan of assignment expressions, which, in line with the above side note, resolves the historical issue of how to initialize a queue from a list without having to resort to non-functional solutions.)

Comments

0

I have implemented and asyncio.Queue compatible queue that can be iterated using a async for loop. See queutils.IterableQueue.

Features

  • asyncio.Queue interface
  • AsyncIterable support: async for item in queue:
  • Automatic termination of the consumers with QueueDone exception when the queue has been emptied
  • Producers must be registered with add_producer() and they must notify the queue with finish() once they have finished adding items
  • Countable interface to count number of items task_done() through count property
  • Countable property can be disabled with count_items=False. This is useful when you want to sum the count of multiple IterableQueues.

Install

pip install queutils

Usage

Producers fill a queue

A Producer is "process" that adds items to the queue. A producer needs to be registered to the queue with add_producer() coroutine. Once a producer has added all the items it intends to, it notifies the queue with finish()

from queutils import IterableQueue

async def producer(
    Q: IterableQueue[int], N: int
) -> None:

    # Add a producer to add items to the queue
    await Q.add_producer()
    
    for i in range(N):
        await Q.put(i)
    
    # notify the queue that this producer does not add more
    await Q.finish()
    
    return None

Consumers take items from the queue

Consumer is a "process" that takes items from a queue with get() coroutine. Since IterableQueue is AsyncIterable, it can be iterated over async for.

from queutils.iterablequeue import IterableQueue

async def consumer(Q: IterableQueue[int]):
    """
    Consume the queue
    """
    async for i in Q:
        print(f"consumer: got {i} from the queue")        
    print(f"consumer: queue is done")

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.