1

I need some help because I tried since two days, and I don't know how I can do this. I have function compute_desc that takes multiples arguments (5 to be exact) and I would like to run this in parallel. I have this for now:

def compute_desc(coord, radius, coords, feat, verbose):
    # Compute here my descriptors
    return my_desc # numpy array (1x10 dimensions)

def main():
    points = np.rand.random((1000000, 4))
    coords = points[:, 0:3]
    feat = points[:, 3]
    all_features = np.empty((1000000, 10))
    all_features[:] = np.NAN
    scales = [0.5, 1, 2]
    for radius in scales:
        for index, coord in enumerate(coords):
            all_features[index, :] = compute_desc(coord,
                                                  radius,
                                                  coords,
                                                  feat,
                                                  False)

I would like to parallelize this. I saw several solutions with a Pool, but I don't understand how it works.

I tried with a pool.map(), but I can only send only one argument to the function.

Here is my solution (it doesn't work):

all_features = [pool.map(compute_desc, zip(point, repeat([radius, 
                                                          coords,
                                                          feat, 
                                                          False]
                                                         ) 
                                           ) 
                         )]

but I doubt it can work with a numpy array.

EDIT

This is my minimum code with a pool (it works now):

import numpy as np
from multiprocessing import Pool
from itertools import repeat

def compute_desc(coord, radius, coords, feat, verbose):
    # Compute here my descriptors
    my_desc = np.rand.random((1, 10))
    return my_desc

def compute_desc_pool(args):
    coord, radius, coords, feat, verbose = args
    compute_desc(coord, radius, coords, feat, verbose)

def main():
    points = np.random.rand(1000000, 4)
    coords = points[:, 0:3]
    feat = points[:, 3]
    scales = [0.5, 1, 2]
    for radius in scales:
        with Pool() as pool:
            args = zip(points, repeat(radius),
                       repeat(coords),
                       repeat(feat),
                       repeat(kdtree),
                       repeat(False))
            feat_one_scale = pool.map(compute_desc_pool, args)

        feat_one_scale = np.array(feat_one_scale)
        if radius == scales[0]:
            all_features = feat_one_scale
        else: 
            all_features = np.hstack([all_features, feat_one_scale])

    # Others stuffs

2 Answers 2

2

The generic solution is to pass to Pool.map a sequence of tuples, each tuple holding one set of arguments for your worker function, and then to unpack the tuple in the worker function.

So, just change your function to accept only one argument, a tuple of your arguments, which you already prepared with zip and passed to Pool.map. Then simply unpack args to variables:

def compute_desc(args):
    coord, radius, coords, feat, verbose = args
    # Compute here my descriptors

Also, Pool.map should work with numpy types too, since after all, they are valid Python types.

Just be sure to properly zip 5 sequences, so your function receives a 5-tuple. You don't need to iterate over point in coords, zip will do that for you:

args = zip(coords, repeat(radius), repeat(coords), repeat(feat), repeat(False))
# args is a list of [(coords[0], radius, coords, feat, False), (coords[1], ... )]

(if you do, and give point as a first sequence to zip, the zip will iterate over that point, which is in this case a 3-element array).

Your Pool.map line should look like:

for radius in scales:
    args = zip(coords, repeat(radius), repeat(coords), repeat(feat), repeat(False))
    feat_one_scale = [pool.map(compute_desc_pool, args)]
    # other stuff

A solution specific to your case, where all arguments except one are fixed could be to use functools.partial (as the other answer suggests). Furthermore, you don't even need to unpack coords in the first argument, just pass the index [0..n] in coords, since each invocation of your worker function already receives the complete coords array.

Sign up to request clarification or add additional context in comments.

10 Comments

Nope, it doesn't work, I tried your solution with an intermediate function (I would like to keep my original function), but I have the following error: ValueError: too many values to unpack (expected 6).
Please update your question with the exact code, so the error can be reproduced. And please write it so that anybody can copy/paste it and try it (without missing imports, etc). See stackoverflow.com/help/mcve.
Excellent! So the problem was in your zip expression. Check my update.
it works now! But I'm a little bit surprised, I run this script on a server with 56 processes, I didn't expect my code run 56 time faster, but at least more than 3 times
and adding Pool(), I increase time of computing when I don't have much point (around 1000 by example)
|
1

I assume from your example that four of those five arguments would be constant to all calls to compute_desc_pool. If so, then you can use partial to do this.

from functools import partial
....

def compute_desc_pool(coord, radius, coords, feat, verbose):    
    compute_desc(coord, radius, coords, feat, verbose)

def main():
    points = np.random.rand(1000000, 4)
    coords = points[:, 0:3]
    feat = points[:, 3]
    feat_one_scale = np.empty((1000000, 10))
    feat_one_scale[:] = np.NAN
    scales = [0.5, 1, 2]
    pool = Pool()
    for radius in scales:
        feat_one_scale = [pool.map(partial(compute_desc_pool, radius, coords, 
                                           feat, False), coords)]

2 Comments

In your example why define compute_desc_pool, when it takes the same arguments as compute_desc?
Because this was the structure of the original question. Of course it can be simplified. The question was clearly about passing static parameters alongside the mapped one when using Pool.map() and not about trivial simplifications to code provided only as an example.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.