3

I've been reading threads like this one but any of them seems to work for my case. I'm trying to parallelize the following toy example to fill a Numpy array inside a for loop using Multiprocessing in Python:

import numpy as np
from multiprocessing import Pool
import time


def func1(x, y=1):
    return x**2 + y

def func2(n, parallel=False):
    my_array = np.zeros((n))
    
    # Parallelized version:
    if parallel:
        pool = Pool(processes=6)
        for idx, val in enumerate(range(1, n+1)):
            result = pool.apply_async(func1, [val])
            my_array[idx] = result.get()
        pool.close()

    # Not parallelized version:
    else:
        for i in range(1, n+1):
            my_array[i-1] = func1(i)

    return my_array

def main():
    start = time.time()
    my_array = func2(60000)
    end = time.time()
    
    print(my_array)
    print("Normal time: {}\n".format(end-start))

    start_parallel = time.time()
    my_array_parallelized = func2(60000, parallel=True)
    end_parallel = time.time()

    print(my_array_parallelized)
    print("Time based on multiprocessing: {}".format(end_parallel-start_parallel))


if __name__ == '__main__':
    main()

The lines in the code based on Multiprocessing seem to work and give you the right results. However, it takes far longer than the non parallelized version. Here is the output of both versions:

[2.00000e+00 5.00000e+00 1.00000e+01 ... 3.59976e+09 3.59988e+09
 3.60000e+09]
Normal time: 0.01605963706970215

[2.00000e+00 5.00000e+00 1.00000e+01 ... 3.59976e+09 3.59988e+09
 3.60000e+09]
Time based on multiprocessing: 2.8775112628936768

My intuition tells me that it should be a better way of capturing results from pool.apply_async(). What am I doing wrong? What is the most efficient way to accomplish this? Thx.

3
  • Why do you expect this to be improved with multiprocessing? Is this a toy example? Because you should be using a vectorized operation for this, not a interpreter-level loop (let alone multiprocessing) Commented Apr 22, 2021 at 18:28
  • "My intuition tells me that it should be a better way of capturing results from pool.apply_async()." can you elaborate on what you mean by this? Commented Apr 22, 2021 at 18:29
  • In my real program (not in this toy example), I use a function like "func2" that calls an external program to generate some images inside a for loop. I want that calls to run in parallel. So, I can't use a vectorized implementation. What I'm trying to achieve with this toy example is something similar to this tutorial but using numpy arrays as an example (instead of lists). Commented Apr 22, 2021 at 18:42

2 Answers 2

5

Creating processes is expensive. On my machine it take at leas several hundred of microsecond per process created. Moreover, the multiprocessing module copy the data to be computed between process and then gather the results from the process pool. This inter-process communication is very slow too. The problem is that your computation is trivial and can be done very quickly, likely much faster than all the introduced overhead. The multiprocessing module is only useful when you are dealing with quite small datasets and perform intensive computation (compared to the amount of computed data).

Hopefully, when it comes to numericals computations using Numpy, there is a simple and fast way to parallelize your application: the Numba JIT. Numba can parallelize a code if you explicitly use parallel structures (parallel=True and prange). It uses threads and not heavy processes that are working in shared memory. Numba can overcome the GIL if your code does not deal with native types and Numpy arrays instead of pure Python dynamic object (lists, big integers, classes, etc.). Here is an example:

import numpy as np
import numba as nb
import time

@nb.njit
def func1(x, y=1):
    return x**2 + y

@nb.njit('float64[:](int64)', parallel=True)
def func2(n):
    my_array = np.zeros(n)
    for i in nb.prange(1, n+1):
        my_array[i-1] = func1(i)
    return my_array

def main():
    start = time.time()
    my_array = func2(60000)
    end = time.time()
    
    print(my_array)
    print("Numba time: {}\n".format(end-start))

if __name__ == '__main__':
    main()

Because Numba compiles the code at runtime, it is able to fully optimize the loop to a no-op resulting in a time close to 0 second in this case.

Sign up to request clarification or add additional context in comments.

5 Comments

This is probably a much better way to do it, but to be fair: I guess the original code would work if they wouldn't call result.get() within the for loop, right?
That's really cool. Thank you! BTW, I couldn't use numba/njit if func2 had an object type as one of its arguments, right?
@thisisalsomypassword, if you didn't call result.get() inside the loop, how would you fill the array with each result?
By collecting the AsyncResult objects in a list within the loop and then calling AsyncResult.get() after all processes have started on each result object.
@AntonioSerrano Well you can use Numba with the GIL in this case, but it will not be very fast and parallelization only be possible on parts of the code that does not work on pure Python types. If you are dealing with a lot of Python types, then you can use Cython. While Cython provide a prange parallel built-in, parallelism is not possible on CPython types because of the GIL and sadly no package can actually overcome this (this is a fundamental issue of CPython). You could try to play with PyPy otherwise but AFAIK the parallel version is quite inefficient so far.
1

Here is the solution proposed by @thisisalsomypassword that improves my initial proposal. That is, "collecting the AsyncResult objects in a list within the loop and then calling AsyncResult.get() after all processes have started on each result object":

import numpy as np
from multiprocessing import Pool
import time


def func1(x, y=1):
    time.sleep(0.1)
    return x**2 + y

def func2(n, parallel=False):
    my_array = np.zeros((n))
    
    # Parallelized version:
    if parallel:
        pool = Pool(processes=6)
        ####### HERE COMES THE CHANGE #######
        results = [pool.apply_async(func1, [val]) for val in range(1, n+1)]
        for idx, val in enumerate(results):
            my_array[idx] = val.get()
        #######
        pool.close()

    # Not parallelized version:
    else:
        for i in range(1, n+1):
            my_array[i-1] = func1(i)

    return my_array

def main():
    start = time.time()
    my_array = func2(600)
    end = time.time()
    
    print(my_array)
    print("Normal time: {}\n".format(end-start))

    start_parallel = time.time()
    my_array_parallelized = func2(600, parallel=True)
    end_parallel = time.time()

    print(my_array_parallelized)
    print("Time based on multiprocessing: {}".format(end_parallel-start_parallel))


if __name__ == '__main__':
    main()

Now it works. Time is reduced considerably with Multiprocessing:

Normal time: 60.107836008071
Time based on multiprocessing: 10.049324989318848    

time.sleep(0.1) was added in func1 to cancel out the effect of being a super trivial task.

2 Comments

You're neglecting that your function func1 only does trivial work. Add time.sleep(0.1) to it, run it with 600 "tasks" and you will see the speedup compared to your non-parallel test.
OK, I edited my proposal above accordingly. Thx!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.