0

the code is like

def function():
    for i in range(100):
        image1 = image_list[0,i]
        image2 = image_list[1,i]
        
         pool = Pool(3)
         for j in range(3):
            pool.apply_async(some_function, (image1, image2))
         pool.close()
         pool.join()
         do other stuff...

The function works fine when i=0, but when i=1 (the second round of main for loop), the pool seems not work properly (stop call some_function).

So how to process such multiprocessing task

1
  • Do you mean to create a pool for every i and to close it for every j ? Commented Aug 30, 2022 at 10:04

1 Answer 1

1

See How to create a Minimal, Reproducible Example.

You should be creating the pool once, before the for i in range(100: rather than over and over again. And as you currently have your for j in range(3): block, after you call apply_async when j is 0 (i.e. the first time), you are immediately calling pool.close() and pool.join(). This will (1) block until that first submitted task completes and (2) prevent you from submitting any more tasks to the pool (i.e. for j equal to 2 and 3). What you want is closer to the following assuming you want to submit all 300 tasks and then wait for their completion:

def function():
    pool = Pool()
    for i in range(100):
        image1 = image_list[0,i]
        image2 = image_list[1,i]
        for j in range(3):
            pool.apply_async(some_function, (image1, image2))
    # Wait for the 300 tasks to complete:
    pool.close()
    pool.join()

Note that we are no longer using a pool size of 3 but rather a pool size equal to the number of CPU cores we have since we are submitting all 300 tasks at once. I am also assuming that some_function is heavily CPU bound and so there is nothing to be gained by using a larger pool.

If, however, you want to submit tasks 3 at a time and wait for them to complete before submitting the next 3, then:

def function():
    pool = Pool(3)
    for i in range(100):
        image1 = image_list[0,i]
        image2 = image_list[1,i]
        async_results = [pool.apply_async(some_function, (image1, image2)) for j in range(3)]
        # wait for all 3 tasks to complete without using pool.close()
        # and pool.join():
        for async_result in async_results:
            # Wait for this task to complete and get return value,
            # which we are not saving:
            async_result.get()
    # Only when we have finished submitting our 300 tasks do we cleanup
    # the pool:
    pool.close()
    pool.join

Note that I provided two versions of the code since you were not clear on what exactly you wanted to do. See How do I ask a good question?.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.