3

I wrote a script on a linux platform using the multiprocessing module of python. When I tried running the program on Windows this was not working directly which I found out is related to the fact how child-processes are generated on Windows. It seems to be crucial that the objects which are used can be pickled.

My main problem is, that I am using large numpy arrays. It seems that with a certain size they are not pickable any more. To break it down to a simple script, I want to do something like that:

### Import modules

import numpy as np
import multiprocessing as mp

number_of_processes = 4

if __name__ == '__main__':

    def reverse_np_array(arr):
        arr = arr + 1
        return arr

    a = np.ndarray((200,1024,1280),dtype=np.uint16)

    def put_into_queue(_Queue,arr):
        _Queue.put(reverse_np_array(arr))


    Queue_list = []
    Process_list = []
    list_of_arrays = []

    for i in range(number_of_processes):
        Queue_list.append(mp.Queue())


    for i in range(number_of_processes):
        Process_list.append(mp.Process(target=put_into_queue, args=(Queue_list[i],a)))

    for i in range(number_of_processes):
        Process_list[i].start()

    for i in range(number_of_processes):
        list_of_arrays.append(Queue_list[i].get())

    for i in range(number_of_processes):
        Process_list[i].join()

I get the following error message:

Traceback (most recent call last):
  File "Windows_multi.py", line 34, in <module>
    Process_list[i].start()
  File "C:\Program Files\Anaconda32\lib\multiprocessing\process.py", line 130, i
n start
    self._popen = Popen(self)
  File "C:\Program Files\Anaconda32\lib\multiprocessing\forking.py", line 277, i
n __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
  File "C:\Program Files\Anaconda32\lib\multiprocessing\forking.py", line 199, i
n dump
    ForkingPickler(file, protocol).dump(obj)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 224, in dump
    self.save(obj)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 419, in save_reduce
    save(state)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())

So I am basically creating a large array which I need in all processes to do calculations with this array and return it.

One important thing seems to be to write the definitions of the functions before the statement if __name__ = '__main__':

The whole thing is working if I reduce the array to (50,1024,1280). However even if 4 processes are started and 4 cores are working, it is slower than writing the code without multiprocessing for one core only (on windows). So I think I have another problem here.

The function in my real program later on is in a cython module.

I am using the anaconda package with python 32-bit since I could not get my cython package compiled with the 64-bit version (I'll ask about that in a different thread).

Any help is welcome!!

Thanks! Philipp

UPDATE:

First mistake I did was haveing the a "put_into_queue" function definition in the __main__.

Then I introduced shared arrays as suggested, however, uses a lot of memory and the used memory scales with the processes I use (which should of course not be the case). Any ideas what I am doing wrong here? It seems not to be important where I place the definition of the shared array (in or outside __main__), though, I think it should be in the __main__. Got this from this post: Is shared readonly data copied to different processes for Python multiprocessing?

import numpy as np
import multiprocessing as mp
import ctypes


shared_array_base = mp.Array(ctypes.c_uint, 1280*1024*20)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
#print shared_array
shared_array = shared_array.reshape(20,1024,1280)

number_of_processes = 4

def put_into_queue(_Queue,arr):
    _Queue.put(reverse_np_array(arr))
def reverse_np_array(arr):
    arr = arr + 1 + np.random.rand()
    return arr
if __name__ == '__main__':


    #print shared_arra

    #a = np.ndarray((50,1024,1280),dtype=np.uint16)


    Queue_list = []
    Process_list = []
    list_of_arrays = []

    for i in range(number_of_processes):
        Queue_list.append(mp.Queue())


    for i in range(number_of_processes):
        Process_list.append(mp.Process(target=put_into_queue, args=(Queue_list[i],shared_array)))

    for i in range(number_of_processes):
        Process_list[i].start()

    for i in range(number_of_processes):
       list_of_arrays.append(Queue_list[i].get())

    for i in range(number_of_processes):
        Process_list[i].join()
4
  • Does the answer to this question help you? stackoverflow.com/a/14593135/513688 The idea is to create shared arrays that both parent and child can write to, instead of using pickling. Commented Jan 23, 2014 at 20:46
  • Hi, thanks for the answers, I tried to use shared arrays, but it is not working, see above. Does anyone know why? Cheers Commented Apr 24, 2014 at 22:22
  • You're putting the shared array into the queue. The linked examples don't do this. Start with a working example, verify it works, and make small changes until it stops behaving how you want/expect. Commented Apr 26, 2014 at 0:58
  • Thanks for the hint! Just to verify if I understood the multiprocessing and queue correctly: If I want to have parallel processes from which I need the output, I have to use queues, right? Otherwise I can not get the data? Is the threading and/or Queue (not mp.queue) module more suitable for my application? Since I just want to do independent operations on parts (where "parts" equals amount of cores) of an array. Just thought it might be worth for me to take a step back and check if I use the correct modules. Thanks again! Commented Apr 27, 2014 at 16:25

1 Answer 1

0

You didn't include the full traceback; the end is most important. On my 32-bit Python I get the same traceback that finally ends in

  File "C:\Python27\lib\pickle.py", line 486, in save_string
    self.write(BINSTRING + pack("<i", n) + obj)
MemoryError

MemoryError is the exception and it says you ran out of memory.

64-bit Python would get around this, but sending large amounts of data between processes can easily become a serious bottleneck in multiprocessing.

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for the reply! Yeah you are right. But how do I now solve the problem? There has to be an elegant way to deal with this, I think people deal with much larger arrays. It will not be a bottle neck on my case since I am sending the arrays only once (forward and backward).
@Fips A possible solution is to Use numpy array in shared memory

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.