0

I have the following function:

def function(arguments, process_number):
    calclulations that build vector_1 and vector_2 of size 100000 each
    return vector_1, vector_2

I need to run this function in parallel, for 200 different values of process_number. I am not interested in having an output ordered by process_number, so I am using apply_async(), in the following fashion:

def parallelizing():        
    number_of_process
    pool = mp.Pool(mp.cpu_count())

    size = 100000
    vector_1 = np.zeros(size)
    vector_2 = np.zeros(size)      

    returned_object = [pool.apply_async(function,args=(args, procnum)) for procnum in range(number_of_process)]
    pool.close()
    pool.join()
    for r in returned_object:
        vector_1 += r.get()[0]
        vector_2 += r.get()[1]

So at the end of the day, I just have to add together all the vectors that I get as results from the function I am parallelizing.

The problem is that a big portion of the time used by the process is actually storing memory to build the [returned_object] list, which I think it is really not necessary as I simply need the return of the function to be added and then forgotten.

So I am trying to avoid building a huge list of objects, each containing at least two vecotrs of floats of size 100000, and get directly into the "addition" step.

Is there a way? Should I define a global variable and write on it inside the function? I fear it might lead to concurrency and screw things up. As I said, I really don't care about getting an ordered result, given that I simply need to add things up.



Edit from Cireo answer down below:

Ok just to confirm if I understood. Instead of using the apply_async method, I would do something like the following:

def function(args, queue_1, queue_2):
     #do calculation
     queue.put(vector_1)
     queue.put(vector_2)

and inside the function that calls the parallelization I simply do

def parallelizing():
         queue1 = Queue()
         queue2=Queue()
         processid = np.arange(200)
         p = Process(target=f, args=(args,queue1,queue2,prcoessid))
         p.start()
         p.join()

What I don't really understand is how, instead of putting things in a Queue which to me seems as computationally intensive as creating a list, I can instead add the returns of my functions. if I do Queue.put() don't I end up with the same list as before?

1
  • You can try pool = mp.Pool(60) and check. Commented Apr 22, 2020 at 8:33

2 Answers 2

1

It sounds like you've self-diagnosed this problem as the same as

Share the list of lists in multiprocessing

Don't create the list at all, just use a Queue or similar to send results from all your subprocesses back into the main process. You said you don't care about order, so that should be pretty straightforward.

You could also investigate using shared memory, or just sharing the object across processes (though the second option might not be fast enough)

https://docs.python.org/3/library/multiprocessing.shared_memory.html

Sign up to request clarification or add additional context in comments.

2 Comments

I edited my question, could you check if it makes sense of if I misunderstood please? thanks
@FrancescoDiLauro I think I understand your question better now. The answer to this will depend on how your calculations are performed - are the 100k elements independent, or do they have to be calculated all at once? If they are independent then you could perhaps shove each one into the queue as it went, e.g.: queue.put((vec_num, vec_ind, value)). However, there is some overhead for each of these operations, that will only really make sense for certain speeds of operations. Are you sure your original calculation is being done in an efficient way?
0

I'm not well acquainted with the multiprocessing module, but I am with threading, I'm not sure which would perform better, but this is how I would thread it:

import numpy as np
from threading import Thread 

def function(arguments, process_number, results, index): 
    print(arguments, process_number)

    #calclulations that build vector_1 and vector_2 of size 100000 each
    # example arrays/vectors
    vector_1 = np.zeros(10000)
    vector_2 = np.zeros(10000)
    results[index] = (vector_1, vector_2)

count = 200

threads = [None] * count
results = [None] * count

args = ("example", "arguments")
process_numbers = range(1, count + 1)

for process_number, i in zip(process_numbers, range(count)):
    threads[i] = Thread(target=function, args=(args,process_number, results, i))
    threads[i].start()

for i in range(count):
    threads[i].join()

size = 10000
vector_1 = np.zeros(size)
vector_2 = np.zeros(size)

for result in results:
    vector_1 += result[0]
    vector_2 += result[1]  

4 Comments

and how about adding the results to the vectors?
@FrancescoDiLauro does the updated answer work for you?
I tried but for some reason the multiprocessing does not start, looking at htop for instance I see cores not working. But still this creates a list and populates it, is it really necessary?
I looked into it and apparently 'threading' only executes on a single core, so it works asynchronously, whereas 'multiprocessing' actually uses the specified number of cores. I've worked with it before, I'll try convert my current impl.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.