I want to implement a file crawler that stores data to a Mongo. I would like to use multiprocessing as a way to 'hand off' blocking tasks such as unzipping files, file crawling and uploading to Mongo. There are certain tasks that are reliant on other tasks (i.e., a file needs to be unzipped before files can be crawled), so I would like the ability to complete the necessary task and add new ones to the same task queue.
Below is what I currently have:
import multiprocessing
class Worker(multiprocessing.Process):
def __init__(self, task_queue: multiprocessing.Queue):
super(Worker, self).__init__()
self.task_queue = task_queue
def run(self):
for (function, *args) in iter(self.task_queue.get, None):
print(f'Running: {function.__name__}({*args,})')
# Run the provided function with its parameters in child process
function(*args)
self.task_queue.task_done()
def foo(task_queue: multiprocessing.Queue) -> None:
print('foo')
# Add new task to queue from this child process
task_queue.put((bar, 1))
def bar(x: int) -> None:
print(f'bar: {x}')
def main():
# Start workers on separate processes
workers = []
manager = multiprocessing.Manager()
task_queue = manager.Queue()
for i in range(multiprocessing.cpu_count()):
worker = Worker(task_queue)
workers.append(worker)
worker.start()
# Run foo on child process using the queue as parameter
task_queue.put((foo, task_queue))
for _ in workers:
task_queue.put(None)
# Block until workers complete and join main process
for worker in workers:
worker.join()
print('Program completed.')
if __name__ == '__main__':
main()
Expected Behaviour:
Running: foo((<AutoProxy[Queue] object, typeid 'Queue' at 0x1b963548908>,))
foo
Running: bar((1,))
bar: 1
Program completed.
Actual Behaviour:
Running: foo((<AutoProxy[Queue] object, typeid 'Queue' at 0x1b963548908>,))
foo
Program completed.
I am quite new to multiprocessing so any help would be greatly appreciated.
Noneinto the queue for each of the workers (causing them to end), before any of the workers have had a chance to add their own elements to the queue. In fact, you have an interesting problem. You can't stop any worker until every worker is in a wait state. As long as one worker is still working, there's a chance it'll add a lot more work to the queue. I'm not sure of the solution.print('Program completed.')while your output isProgram ended.