1

I want to implement a file crawler that stores data to a Mongo. I would like to use multiprocessing as a way to 'hand off' blocking tasks such as unzipping files, file crawling and uploading to Mongo. There are certain tasks that are reliant on other tasks (i.e., a file needs to be unzipped before files can be crawled), so I would like the ability to complete the necessary task and add new ones to the same task queue.

Below is what I currently have:

import multiprocessing


class Worker(multiprocessing.Process):
    def __init__(self, task_queue: multiprocessing.Queue):
        super(Worker, self).__init__()
        self.task_queue = task_queue

    def run(self):
        for (function, *args) in iter(self.task_queue.get, None):
            print(f'Running: {function.__name__}({*args,})')

            # Run the provided function with its parameters in child process
            function(*args)

            self.task_queue.task_done()


def foo(task_queue: multiprocessing.Queue) -> None:
    print('foo')
    # Add new task to queue from this child process
    task_queue.put((bar, 1))


def bar(x: int) -> None:
    print(f'bar: {x}')


def main():
    # Start workers on separate processes
    workers = []
    manager = multiprocessing.Manager()
    task_queue = manager.Queue()
    for i in range(multiprocessing.cpu_count()):
        worker = Worker(task_queue)
        workers.append(worker)
        worker.start()

    # Run foo on child process using the queue as parameter
    task_queue.put((foo, task_queue))

    for _ in workers:
        task_queue.put(None)

    # Block until workers complete and join main process
    for worker in workers:
        worker.join()

    print('Program completed.')


if __name__ == '__main__':
    main()

Expected Behaviour:

Running: foo((<AutoProxy[Queue] object, typeid 'Queue' at 0x1b963548908>,))
foo
Running: bar((1,))
bar: 1
Program completed.

Actual Behaviour:

Running: foo((<AutoProxy[Queue] object, typeid 'Queue' at 0x1b963548908>,))
foo
Program completed.

I am quite new to multiprocessing so any help would be greatly appreciated.

4
  • 1
    You're putting a None into the queue for each of the workers (causing them to end), before any of the workers have had a chance to add their own elements to the queue. In fact, you have an interesting problem. You can't stop any worker until every worker is in a wait state. As long as one worker is still working, there's a chance it'll add a lot more work to the queue. I'm not sure of the solution. Commented Dec 1, 2020 at 4:33
  • 1
    Rather unimportant but the code says print('Program completed.') while your output is Program ended. Commented Dec 1, 2020 at 7:57
  • @lenin Whoops, wanted to introduce better logging. Fixed. Commented Dec 2, 2020 at 23:13
  • @FrankYellin, ah yes, makes total sense. I've looked further into waiting until the queue is empty and believe I have found a solution. Thanks. Commented Dec 2, 2020 at 23:14

1 Answer 1

3

As @FrankYellin noted, this is due to the fact that None is being put into task_queue before bar can be added.

Assuming that the queue will either be non-empty or waiting for a task to complete during the program (which is true in my case), the join method on the queue can be used. According to the docs:

Blocks until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

Below is the updated code:

import multiprocessing


class Worker(multiprocessing.Process):
    def __init__(self, task_queue: multiprocessing.Queue):
        super(Worker, self).__init__()
        self.task_queue = task_queue

    def run(self):
        for (function, *args) in iter(self.task_queue.get, None):
            print(f'Running: {function.__name__}({*args,})')

            # Run the provided function with its parameters in child process
            function(*args)

            self.task_queue.task_done() # <-- Notify queue that task is complete


def foo(task_queue: multiprocessing.Queue) -> None:
    print('foo')
    # Add new task to queue from this child process
    task_queue.put((bar, 1))


def bar(x: int) -> None:
    print(f'bar: {x}')


def main():
    # Start workers on separate processes
    workers = []
    manager = multiprocessing.Manager()
    task_queue = manager.Queue()
    for i in range(multiprocessing.cpu_count()):
        worker = Worker(task_queue)
        workers.append(worker)
        worker.start()

    # Run foo on child process using the queue as parameter
    task_queue.put((foo, task_queue))

    # Block until all items in queue are popped and completed
    task_queue.join() # <---

    for _ in workers:
        task_queue.put(None)

    # Block until workers complete and join main process
    for worker in workers:
        worker.join()

    print('Program completed.')


if __name__ == '__main__':
    main()

This seems to work fine. I will update this if I discover anything new. Thank you all.

Sign up to request clarification or add additional context in comments.

1 Comment

Excellent solution. I forgot about task_queue.task_done, which solves the race condition I couldn't resolve: how can I know when both the queue is empty and no tasks are running.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.