0
#!/usr/bin/env python

import multiprocessing

def tempx((the_time)):
    return int(the_time)*int(the_time)

def tempy((the_time, foobar)):
    return int(the_time)/(float(foobar))+100

def mp_handler():
    p = multiprocessing.Pool(2)
    foo = p.map(tempx, [('2')])
    print foo
    foo = p.map(tempy, [('100', '100000')])
    print foo
if __name__ == '__main__':
    mp_handler()

I have two methods with different input parameters. The first method with only: the_time and the second with the_time and foobar

I need the results in a particular order, and therefore I have used the map function. However, the above code does not use the multiprocess module at-all as i understand because i am using two map functions. am I right?

The end goal is to have two methods running simultaneously.

What am I missing here?

Dano, this is an exmaple of what i am doing

    import multiprocessing

def print_timing(func):
    def wrapper(*arg):
        t1 = time.time()
        res = func(*arg)
        t2 = time.time()
        print '%s took %0.3f ms' % (func.func_name, (t2-t1)*1000.0)
        return res
    return wrapper
@print_timing
def case_one(power_pred, power_core, num_thrs, possible_frequency, clamp_range):
    ds1_cur_freq = list()
    ds1_freq_index = list()
    ds1_cur_clamp = list()
    return ds1_cur_freq, ds1_freq_index, ds1_cur_clamp

@print_timing
def case_two(cpower_pred, power_core, num_thrs, possible_frequency, TT_index, DT_index, clamp_range):
    ds2_cur_freq = list()
    ds2_freq_index = list()
    ds2_cur_clamp = list()
    return ds2_cur_freq, ds2_freq_index, ds2_cur_clamp

def defs_finder():
    cpower_pred = list()
    power_pred = list()
    power_core = list()
    num_thrs = 3
    possible_frequency = list()
    clamp_range= list()
    DT_index =1
    TT_index = 0
    p = multiprocessing.Pool(2)
    #Case 1: DS1
#    ds1_cur_freq, ds1_freq_index, ds1_cur_clamp =
    ds1 = p.apply_async(case_one, args=(power_pred, power_core, num_thrs, possible_frequency))
    #Case 1: DS1
 #   ds1_cur_freq, ds1_freq_index, ds1_cur_clamp = case_one(power_pred, power_core, num_thrs, possible_frequency, clamp_range)
    #Case 2: DS2
#    ds2_cur_freq, ds2_freq_index, ds2_cur_clamp = case_two(cpower_pred, power_core, num_thrs, possible_frequency, TT_index, DT_index, clamp_range)
    ds2 = p.apply_async(case_two, args=(cpower_pred, power_core, num_thrs, possible_frequency, TT_index, DT_index))
    print ds1
    print ds2
    print ds1.get()
    print ds2.get()
#    ds1_cur_freq = ds1.get()[0]
#    ds1_freq_index = ds1.get()[1]
#    ds1_cur_clamp = ds1.get()[2]
#    ds2_cur_freq = ds2.get()[0]
#    ds2_freq_index = ds2.get()[1]
#    ds2_cur_clamp = ds2.get()[2]

defs_finder()

This is how it is implemented right now and the bug is reproduced

0

2 Answers 2

4

Pool.map is useful if you need to run a particular function on all the elements of an iterable in parallel, and block until the whole iterable has been processed. In your case, you're just passing a single item in the iterable, so you're just running a single function in a subprocess, and blocking until its done. This is slower than just running the function in the parent process, since you have the added overhead of IPC.

If your goal is to run tempx and tempy in parallel with just a single set of arguments, Pool.apply_async is a better option:

import multiprocessing

def tempx(the_time):
    return int(the_time)*int(the_time)

def tempy(the_time, foobar):
    return int(the_time)/(float(foobar))+100

def mp_handler():
    p = multiprocessing.Pool(2)
    foox = p.apply_async(tempx, args=('2',))
    fooy = p.apply_async(tempy, args=('100', '100000'))
    print foox.get()
    print fooy.get()

if __name__ == '__main__':
    mp_handler()

apply_async is non-blocking; it returns an AsyncResult object immediately, which you can use later to actually fetch the result of the asynchronous operation, by calling AsyncResult.get. So, we just call apply_async on both functions to start them in the background, and then call get() on each AsyncResult to wait for them to finish.

One other thing to note here: In your example, the work you're doing in the child processes is very light - it doesn't take long for either function to finish. Compartively, the cost of spawning background processes and passing your functions and their arguments to those background processes via IPC, and then sending the results back, is high. You'll probably find that using multiprocessing is slower than just executing these functions sequentially in the parent process. In order for multiprocessing to be worth using, you'd need to be doing more expensive calculations inside tempx and tempy.

Sign up to request clarification or add additional context in comments.

8 Comments

tempx and tempy were examples of how I my 'real' function looks like. the overhead of computing tempx and tempy serially costs upto 0.5w of power. that is way too much and the over head in latency in also higher. Tempx and tempy individually compute 4*450*450 floating point values in reality.
@tandem I thought that might be the case. I wanted to point it out anyway, just to be sure.
Dano, I am unable to unpack the values. In serial execution, there doesn't seem to be any problem. I now get the issue that: cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed I wonder what that implies? The return value has more than one value and i used returnvalue.get()[x] to get it. am I doing something wrong? The returnvalue object is created.
@tandem Are you trying to send a function either to or from the child that isn't defined at the top-level of the module? pickle can only pickle functions that it can find at the top-level; if it's a nested function, pickling will fail with the error you saw.
The function is defined above the p = multiprocessing.Pool(2). Just like the previous example. The object is created. If by a nested function you mean, having a def within a def. I do not have that. However, I call another function, for example, from tempx.
|
-1

No, p.map will distribute your computation just fine: it parallelises the evaluation of function you pass to it with the parameters you pass to it. However in your code the p.map application to tempx and tempy is not parallel, if this is what you want.

However, unlike a normal map or a list comprehension ([function(x) for x in your list]), a p.map is asynchronious and will not return you the results in a particular order. You will need to sort the results on some key afterwards.

5 Comments

Pool.map will always return the results in the correct order. It blocks until all the elements of the iterable have been processed, and returns a list of the return values that coincides with the order of the original iterable. So p.map(lambda x:x+1, [1,2,3,4]) will always return [2,3,4,5] (lambdas can't actually be passed to Pool.map, but you get the idea). The only Pool function that doesn't maintain the order of the input iterable is Pool.imap_unordered.
Hm, interesting. I always assumed that the whole point of the multiprocessing module was to surrender the that the whole point of the pool.map was to abandon thread safety and execution order termination guarantee in exchange for a manual control of what gets distributed. And since a debug with a print would show that the execution ordering was getting offset I just assumed that the p.map was not re-stitching the results properly in the end. Thanks for pointing that out.
Right, the execution can happen in any order, but the Pool will gather the results and put them back in the "correct" order before returning any results. Only imap_unordered will actually return them to the user as soon as they finish, regardless of the original order.
So what would be the iterator object to retrieve the results as they are computed?
To get the results as soon as they're ready, regardless of the order of the original iterable, you'd use for result in p.imap_unordered(func, iterable):.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.