3

I'm trying to write in the same shared array in a parallel processing python script.

When I do it outside a class, in a normal script, everything works right. But when I try to do it through a class (using the same code), I get the
Runtime Error: SynchronizedArray objects should only be shared between processes through inheritance.

My script is the following (without a class):

import numpy
import ctypes

from multiprocessing import Pool, Array, cpu_count

n = 2

total_costs_matrix_base = Array(ctypes.c_double, n*n)
total_costs_matrix = numpy.ctypeslib.as_array(
                     total_costs_matrix_base.get_obj())
total_costs_matrix = total_costs_matrix.reshape(n,n)


def set_total_costs_matrix( i, j, def_param = total_costs_matrix_base):
    total_costs_matrix[i,j] = i * j

if __name__ == "__main__":

    pool = Pool(processes=cpu_count())
    iterable = []

    for i in range(n):
        for j in range(i+1,n):
            iterable.append((i,j))
    pool.starmap(set_total_costs_matrix, iterable)
    total_costs_matrix.dump('some/path/to/file')

That script works well. The one that doesn't is the following (which uses a class):

import numpy
import ctypes

from multiprocessing import Pool, Array, cpu_count

class CostComputation(object):
    """Computes the cost matrix."""

    def __init__(self):
        self.n = 2

        self.total_costs_matrix_base = Array(ctypes.c_double, self.n*self.n)
        self.total_costs_matrix = numpy.ctypeslib.as_array(
                             self.total_costs_matrix_base.get_obj())
        self.total_costs_matrix = self.total_costs_matrix.reshape(self.n,self.n)


    def set_total_costs_matrix(self, i, j, def_param = None):
        def_param = self.total_costs_matrix_base
        self.total_costs_matrix[i,j] = i * j


    def write_cost_matrix(self):
        pool = Pool(processes=cpu_count())
        iterable = []

        for i in range(self.n):
            for j in range(i+1,self.n):
                iterable.append((i,j))
        pool.starmap(self.set_total_costs_matrix, iterable)
        self.total_costs_matrix.dump('some/path/to/file')

After this, I would call write_cost_matrix from another file, after creating an instance of CostComputation.

I read this answer but still couldn't solve my problem.

I'm using Python 3.4.2 in a Mac OSX Yosemite 10.10.4.

EDIT
When using the class CostComputation, the script I'm using is:

from cost_computation import CostComputation

cc = CostComputation()
cc.write_costs_matrix()

The whole error is:

Traceback (most recent call last):
  File "app.py", line 65, in <module>
    cc.write_cost_matrix()
  File "/path/to/cost_computation.py", line 75, in write_cost_matrix
    pool.starmap(self.set_total_costs_matrix, iterable)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 268, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 599, in get
    raise self._value
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 383, in _handle_tasks
    put(task)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 206, in send
    self._send_bytes(ForkingPickler.dumps(obj))
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/reduction.py", line 50, in dumps
    cls(buf, protocol).dump(obj)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/sharedctypes.py", line 192, in __reduce__
    assert_spawning(self)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 347, in assert_spawning
    ' through inheritance' % type(obj).__name__
RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance
4
  • Where, exactly, does the RuntimeException get raised? My guess is that you are accessing CostComputation.total_costs_matrix from a different class, which is triggering this exception. Commented Jul 29, 2015 at 15:47
  • @justhecuke See the edit. I'm not accessing total_costs_matrix from a different class, but I'm actually accessing it many times (because of the multiprocessing) from the same instance of the class. I guess that the solution is the answer given by @ATOzTOA right below. Trying it asap. Commented Jul 29, 2015 at 17:36
  • That´s probably a serialization problem. Check that your class can be correctly pickled. Commented Jul 29, 2015 at 17:42
  • have you looked at this thousandfold.net/cz/2014/05/01/… Commented Jul 29, 2015 at 19:16

2 Answers 2

0

Try creating a second class which contains the shared data only. Then use that class object in your main class.

Sign up to request clarification or add additional context in comments.

Comments

0

As an aside, Array(ctypes.c_double, n*n) creates a synchronized array that has an underlying multiprocessing.Lock instance that can be obtained by calling get_lock() on the array instance. This should be used if you need to implement operations on the array as a critical section. In your case, this is unnecessary and you could instead initialize self.total_costs_matrix_base with the more efficient Array(ctypes.c_double, n*n, lock=False).

Unfortunately, if you are using a multiprocessing pool and a multiprocessing.Array as a class attribute, you will get the exception you experienced. A solution is to use a multiprocessing.shared_memory.SharedMemory instance:

import numpy as np

from multiprocessing import Pool, cpu_count, shared_memory, current_process

class SharedNumpyArray:
    def __init__(self, arr):
        self._shape = arr.shape
        self._dtype = arr.dtype
        self._shm = None
        self._creator = current_process().pid

        # Initialize shared memory
        self._acquired_shm = shared_memory.SharedMemory(create=True, size=arr.nbytes)
        self._name = self._acquired_shm.name
        _arr = np.ndarray(shape=self._shape, dtype=self._dtype, buffer=self._acquired_shm.buf)
        _arr[:] = arr[:]

    def __getstate__(self):
        # If pickle is being used to serialize this instance to another process,
        # then we do not need to include attribute _acquired_shm.

        if '_acquired_shm' in self.__dict__:
            state = self.__dict__.copy()
            del state['_acquired_shm']
        else:
            state = self.__dict__
        return state

    @property
    def arr(self):
        self._shm = shared_memory.SharedMemory(name=self._name)
        return np.ndarray(shape=self._shape, dtype=self._dtype, buffer=self._shm.buf)

    def __del__(self):
        if self._shm:
            self._shm.close()

    def close_and_unlink(self):
        """Called only by the process that created this instance."""

        if current_process().pid != self._creator:
            raise RuntimeError('Only the creating process may call close_and_unlink')

        if self._shm:
            self._shm.close()
            # Prevent __del__ from trying to close _shm again
            self._shm = None

        if self._acquired_shm:
            self._acquired_shm.close()
            self._acquired_shm.unlink()
            # Make additional call to this method a no-op:
            self._acquired_shm = None

class CostComputation(object):
    """Computes the cost matrix."""

    def __init__(self):
        self.n = 4
        shape = (self.n, self.n)
        dtype = np.float64
        arr = np.zeros(shape, dtype=dtype)
        self._shared_array = SharedNumpyArray(arr=arr)

    def set_total_costs_matrix(self, i, j):
        # Create the shared numpy array from shared memory:
        total_costs_matrix = self._shared_array.arr

        total_costs_matrix[i, j] = i * j

    def write_cost_matrix(self):
        pool = Pool(processes=cpu_count())
        iterable = []

        for i in range(self.n):
            for j in range(self.n):
                iterable.append((i,j))
        pool.starmap(self.set_total_costs_matrix, iterable)

        # Create the shared numpy array from shared memory:
        arr = self._shared_array.arr
        print(arr)
        # We are through with shared memory:
        self._shared_array.close_and_unlink()

if __name__ == '__main__':
    cc = CostComputation()
    cc.write_cost_matrix()

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.