2

I want to use the multiprocessing module to speed up traversing a directory structure. First I did some research and found this Stack Overflow thread:

How do I run os.walk in parallel in Python?

However, when I tried to adapt the code in the thread, I kept on running into a problem. Here is a little script I wrote to just test out Pool and figure out how it works.:

import os

from multiprocessing.pool import Pool
from multiprocessing import Process
from multiprocessing import JoinableQueue as Queue

def scan():
    print "Hi!"
    while True:
        print "Inside loop"
        directory = unsearched.get()
        print "Got directory"
        unsearched.task_done()
        print "{0}".format(directory)

if __name__ == '__main__':

    # Put those directories on the queue
    unsearched = Queue()
    top_dirs = ['a', 'b', 'c']
    for d in top_dirs:
        unsearched.put(d)
    print unsearched

    # Scan the directories
    processes = 1
    pool = Pool(processes)
    for i in range(processes):
        print "Process {0}".format(i)
        pool.apply_async(scan)

    # Block until all the tasks are done
    unsearched.join()
    print 'Done'

What is happening is that the script goes inside of the loop inside of the scan function and just sits there:

PS C:\Test> python .\multiprocessing_test.py
<multiprocessing.queues.JoinableQueue object at 0x000000000272F630>
Process 0
Hi!
Inside loop

I'm sure I'm missing something simple here.

1 Answer 1

2

This actually runs fine for me on Linux, but does hang on Windows. This is because on Windows, everything inside the if __name__ ... guard doesn't get executed in the child process, which of course includes defining unsearched. That means that scan is throwing an exception when it tries to used unsearched, but that exception is never consume in the parent, so you don't see the Traceback show up in the CLI. Instead, it just hangs.

To make this work on Windows and Linux, you can use the initializer/initargs keyword arguments when you create the Pool to make unsearched in scope in the child:

def initializer(q):
    global unsearched
    unsearched = q

...

Then replace your old Pool call with this:

pool = Pool(processes, initializer=initializer, initargs=(unsearched,))
Sign up to request clarification or add additional context in comments.

2 Comments

Great! It works! Is there a way to see a Traceback from a child process?
@ChristopherSpears Normally you would get the traceback when you tried to fetch the result of the apply_async call. If you store the result of each of those in a list and call result.get() on each prior to calling unsearched.join() , I think you'll be able to receive the exception and not hang forever.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.