11

Lets say I have a big list of music of varying length that needs to be converted or images of varying sizes that need to be resized or something like that. The order doesn't matter so it is perfect for splitting across multiple processors.

If I use multiprocessing.Pool's map function it seems like all the work is divided up ahead of time and doesn't take into account the fact that some files may take longer to do that others.

What happens is that if I have 12 processors... near the end of processing, 1 or 2 processors will have 2 or 3 files left to process while other processors that could be utilized sit idle.

Is there some sort of queue implementation that can keep all processors loaded until there is no more work left to do?

5 Answers 5

6

There is a Queue class within the multiprocessing module specifically for this purpose.

Edit: If you are looking for a complete framework for parallel computing which features a map() function using a task queue, have a look at the parallel computing facilities of IPython. In particlar, you can use the TaskClient.map() function to get a load-balanced mapping to the available processors.

Sign up to request clarification or add additional context in comments.

2 Comments

I have tried finding a working example of multiprocessing.Queue but have yet to find one. I came across this a little bit ago and just got around to testing it. It only uses 1 of my 12 CPUs even when I changed num_processes=2 and num_jobs = 200000 (so that it wouldn't process so fast) jeetworks.org/node/81 I think I'll ask another StackOverflow question about where to find a working example of multiprocessing.Queue and then I'll mark your's as the answer.
Queue worked fine. That example in the comment above was just a bad example because it took longer to put together the work todo in a single process than it did to processes it using multiple processors. I put something together with a nearly identical Worker(multiprocessing.Process) class that works fine.
2

This is trivial to do with jug:

def process_image(img):
     ....
images = glob('*.jpg')
for im in images:
      Task(process_image, im)

Now, just run jug execute a few times to spawn worker processes.

Comments

1

About queue implementations. There are some.

Look at the Celery project. http://celeryproject.org/

So, in your case, you can run 12 conversions (one on each CPU) as Celery tasks, add a callback function (to the conversion or to the task) and in that callback function add a new conversion task running when one of the previous conversions is finished.

Comments

1

The Python threading library that has brought me most joy is Parallel Python (PP). It is trivial with PP to use a thread pool approach with a single queue to achieve what you need.

Comments

0

This is not the case if you use Pool.imap_unordered.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.