3

I am learning python and wrote some simple scripts to give myself practical examples of various topics. One such is this script to demonstrate how queue.Queue() can be used with threading.Thread() to create back ground workers. It acts quite strangely though. I ran some time trials. With just one thread it does as you would expect... it takes roughly 2 seconds per task taking (actually just under??) 40 seconds to complete the 20 tasks. With four threads it does again as you would expect. It does the tasks 4 at a time and so takes around 10 secs. So how an earth when I run 20 threads does it take 0.01 seconds (1 s.f.) --- surely it must take 2 secs???

Here is the code:

import threading
from queue import Queue
import time

q = Queue()
tLock = threading.Lock()

def run() :

    while True :
        task = q.get()  
        print('Just finished task number',task)
        q.task_done()   
        time.sleep(2)

def main() :
    # worker threads are activated  
    for x in range(20) :
        t = threading.Thread(target=run)
        t.daemon = True
        t.start()
    #20 jobs are put in the queue   
    for x in range(1,21) :
        q.put(x)
    #waits until queue is empty and then continues
    q.join()

if __name__ == '__main__' :
    startTime = time.time()
    main()
    print('Time taken was', time.time() - startTime)

1 Answer 1

4

You're not actually blocking the progress of the main thread:

The "proper"(*) way is to make sure all threads are done, by joining all threads:

def main() :
    # keep reference to threads
    threads = [threading.Thread(target=run) for _ in range(20)]
    # start all threads
    for t in threads:
        t.start()

    #20 jobs are put in the queue   
    for x in range(1,21) :
        q.put(x)
    #waits until queue is empty and then continues
    q.join()
    # join all threads
    for t in threads:
        t.join()

*But, this won't work as your threads are in an infinite loop, even tasks are done.

So another way is to make sure you wait before you report back the task:

def run() :
    while True :
        task = q.get()  
        # simulate processing time *before* actual reporting
        time.sleep(2)
        print('Just finished task number',task)  
        q.task_done()

Still, threads are remained blocked. What you chould have is a message to threads telling them to quit. Something like:

def run() :
    while True :
        task = q.get()
        if task == 'stop':
            break  
        # simulate processing time *before* actual reporting
        time.sleep(2)
        print('Just finished task number',task)  
        q.task_done()

and now simply tell the main thread to put enough stop messages for all threads to finally quit their infinite loop:

def main() :
    # keep reference to threads
    threads = [threading.Thread(target=run) for _ in range(20)]
    # start all threads
    for t in threads:
        t.start()

    #20 jobs are put in the queue   
    for x in range(1,21):
        q.put(x)

    for x in range(20):
        # stop all threads after tasks are done
        q.put('stop')

    # waits until queue is empty and then continues
    q.join()

    # join all threads
    for t in threads:
        t.join()

Tip: You shouldn't use "magic numbers" such as 20. Have a global variable in the module level named THREADS_COUNT so you only have to change one place when you want to test different configrations.

Sign up to request clarification or add additional context in comments.

2 Comments

Such a great answer, I keep coming back to this as my main reference on the subject so have edited the title to make it a bit more visible for others.
Thanks, if you have any suggestions on making it better it's welcomed.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.