0

I know there are a lot of topics around similar problems (like How do I make processes able to write in an array of the main program?, Multiprocessing - Shared Array or Multiprocessing a loop of a function that writes to an array in python), but I just don't get it... so sorry for asking again.

I need to do some stuff with a huge array and want to speed up things by splitting it into blocks and running my function on those blocks, with each block being run in its own process. Problem is: the blocks are "cut" from one array and the result shall then be written into a new, common array. This is what I did so far (minimum working example; don't mind the array-shaping, this is necessary for my real-world case):

import numpy as np
import multiprocessing as mp

def calcArray(array, blocksize, n_cores=1):
    in_shape = (array.shape[0] * array.shape[1], array.shape[2])
    input_array = array[:, :, :array.shape[2]].reshape(in_shape)
    result_array = np.zeros(array.shape)
    # blockwise loop
    pix_count = array.size
    for position in range(0, pix_count, blocksize):
        if position + blocksize < array.shape[0] * array.shape[1]:
            num = blocksize
        else:
            num = pix_count - position
        result_part = input_array[position:position + num, :] * 2
        result_array[position:position + num] = result_part
    # finalize result
    final_result = result_array.reshape(array.shape)
    return final_result

if __name__ == '__main__':
    start = time.time()
    img = np.ones((4000, 4000, 4))
    result = calcArray(img, blocksize=100, n_cores=4)
    print 'Input:\n', img
    print '\nOutput:\n', result

How can I now implement multiprocessing in way that I set a number of cores and then calcArray assigns processes to each block until n_cores is reached?


With the much appreciated help of @Blownhither Ma, the code now looks like this:

import time, datetime
import numpy as np
from multiprocessing import Pool

def calculate(array):
    return array * 2

if __name__ == '__main__':
    start = time.time()
    CORES = 4
    BLOCKSIZE = 100
    ARRAY = np.ones((4000, 4000, 4))
    pool = Pool(processes=CORES)
    in_shape = (ARRAY.shape[0] * ARRAY.shape[1], ARRAY.shape[2])
    input_array = ARRAY[:, :, :ARRAY.shape[2]].reshape(in_shape)
    result_array = np.zeros(input_array.shape)
    # do it
    pix_count = ARRAY.size
    handles = []
    for position in range(0, pix_count, BLOCKSIZE):
        if position + BLOCKSIZE < ARRAY.shape[0] * ARRAY.shape[1]:
            num = BLOCKSIZE
        else:
            num = pix_count - position
        ### OLD APPROACH WITH NO PARALLELIZATION ###
        # part = calculate(input_array[position:position + num, :])
        # result_array[position:position + num] = part
        ### NEW APPROACH WITH PARALLELIZATION ###
        handle = pool.apply_async(func=calculate, args=(input_array[position:position + num, :],))
        handles.append(handle)
    # finalize result
    ### OLD APPROACH WITH NO PARALLELIZATION ###
    # final_result = result_array.reshape(ARRAY.shape)
    ### NEW APPROACH WITH PARALLELIZATION ###
    final_result = [h.get() for h in handles]
    final_result = np.concatenate(final_result, axis=0)
    print 'Done!\nDuration (hh:mm:ss): {duration}'.format(duration=datetime.timedelta(seconds=time.time() - start))

The code runs and really starts the number processes I assigned, but takes much much longer than the old approach with just using the loop "as-is" (3 sec compared to 1 min). There must be something missing here.

1 Answer 1

1

The core function is pool.apply_async and handler.get.

I have been recently working on the same functions and find it useful to make a standard utility function. balanced_parallel applies function fn on matrix a in a parallel manner silently. assigned_parallel explicitly apply function on each element.
i. The way I split array is np.array_split. You may use block scheme instead.
ii. I use concat rather than assign to a empty matrix when collecting result. There is no shared memory.

from multiprocessing import cpu_count, Pool

def balanced_parallel(fn, a, processes=None, timeout=None):
    """ apply fn on slice of a, return concatenated result """
    if processes is None:
        processes = cpu_count()
    print('Parallel:\tstarting {} processes on input with shape {}'.format(processes, a.shape))
    results = assigned_parallel(fn, np.array_split(a, processes), timeout=timeout, verbose=False)
    return np.concatenate(results, 0)


def assigned_parallel(fn, l, processes=None, timeout=None, verbose=True):
    """ apply fn on each element of l, return list of results """
    if processes is None:
        processes = min(cpu_count(), len(l))
    pool = Pool(processes=processes)
    if verbose:
        print('Parallel:\tstarting {} processes on {} elements'.format(processes, len(l)))

    # add jobs to the pool
    handler = [pool.apply_async(fn, args=x if isinstance(x, tuple) else (x, )) for x in l]

    # pool running, join all results
    results = [handler[i].get(timeout=timeout) for i in range(len(handler))]

    pool.close()
    return results

In your case, fn would be

def _fn(matrix_part): return matrix_part * 2
result = balanced_parallel(_fn, img)

Follow-up: Your loop should look like this to make parallelization happen.

handles = []
for position in range(0, pix_count, BLOCKSIZE):
    if position + BLOCKSIZE < ARRAY.shape[0] * ARRAY.shape[1]:
        num = BLOCKSIZE
    else:
        num = pix_count - position
    handle = pool.apply_async(func=calculate, args=(input_array[position:position + num, :], ))
    handles.append(handle)

# multiple handlers exist at this moment!! Don't `.get()` yet
results = [h.get() for h in handles]
results = np.concatenate(results, axis=0)
Sign up to request clarification or add additional context in comments.

10 Comments

Well, at least it seems to work :) Now I have to understand it, give me some time to get my head around it, then I'll accept it. Thank you!
@s6hebern Sorry if it's lengthy :) I have encountered this multiple times so I decided to make a general solution. You may take only the essential code
Given my edits, which are based on your answer, do you see anything I missed? To me, it looks like the code has no effect at all
@s6hebern you get result of a job before the next job is applied, thus forcing jobs to be done one by one. Simply apply all the jobs, then get all the results
@s6hebern aka use one loop for apply, another for get
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.