1

I have written a small multiprocessing program in Python which reads an array of values and runs multiple processes asynchronously to operate on parts of the data array. Each separate process should its own 1-D section of the 2-D array, with no overlap between processes. Once all the processes have completed the shared memory array is to be written out to file, but it's at this point in my code where the expected/computed values are not present in the shared memory array, but the original values are still present. It seems that the assignment of new values within the processes did not stick to the shared memory object. Perhaps there's something going on that I'm not understanding (for example pass by reference vs. pass by value) which is at play causing my trouble?

I have a Processor class which creates a number of worker processes and instantiates a JoinableQueue. The function that is called by each process operates on an indexed slice of the 2-D shared memory array and updates those array values in place, so the input (shared memory) array should have all values replaced by the results of the computation so there should be no need to have a second array for the results. The main function passes the shared memory array and an index value as arguments for the compute function, these are added into a queue from which the process objects will consume items of work. The code is below:

class Processor:
     
    queue = None
                 
    def __init__(self, 
                 number_of_workers=1):
              
        # create a joinable queue
        self.queue = JoinableQueue()
        
        # create the processes
        self.processes = [Process(target=self.compute) for _ in range(number_of_workers)]
        for p in self.processes:
            p.start()
                 
    def add_work_item(self, item):
         
        # add the parameters list to the parameters queue
        self.queue.put(item)
 
    def compute(self):
         
        while True:
              
            # get a list of arguments from the queue
            arguments = self.queue.get()
              
            # if we didn't get one we keep looping
            if arguments is None:
                break
  
            # process the arguments here
            data = arguments[0]
            index = arguments[1] 
                 
            # only process non-empty grid cells, i.e. data array contains at least some non-NaN values
            if (isinstance(data[:, index], np.ma.MaskedArray) and data[:, index].mask.all()) or np.isnan(data[:, index]).all():
             
                pass         
                  
            else:  # we have some valid values to work with
             
                logger.info('Processing latitude: {}'.format(index))
             
                # perform a fitting to gamma     
                results = do_something(data[:, index])
 
                # update the shared array
                data[:, index] = results

            # indicate that the task has completed
            self.queue.task_done()
 
    def terminate(self):
 
        # terminate all processes
        for p in self.processes:
            p.terminate()

    def wait_on_all(self):
 
        #wait until queue is empty
        self.queue.join()

#-----------------------------------------------------------------------------------------------------------------------
if __name__ == '__main__':

    try:
        
        # log some timing info, used later for elapsed time 
        start_datetime = datetime.now()
        logger.info("Start time:    {}".format(start_datetime, '%x'))
        
        # get the command line arguments
        input_file = sys.argv[1]
        input_var_name = sys.argv[2]
        output_file_base = sys.argv[3]
        month_scale = int(sys.argv[4])

        # create the variable name from the indicator, distribution, and month scale
        variable_name = 'spi_gamma_{}'.format(str(month_scale).zfill(2))

        # open the NetCDF files
        with netCDF4.Dataset(input_file) as input_dataset, \
            netCDF4.Dataset(output_file_base + '_' + variable_name + '.nc', 'w') as output_dataset:
             
            # read info from the input dataset and initialize the output for writing

            # create a processor with a number of worker processes
            number_of_workers = 1
            processor = Processor(number_of_workers)
            
            # for each longitude slice
            for lon_index in range(lon_size):
    
                logger.info('\n\nProcessing longitude: {}\n'.format(lon_index))

                # read the longitude slice into a data array     
                longitude_slice = input_dataset.variables[input_var_name][:, lon_index, :]
                
                # reshape into a 1-D array
                original_shape = longitude_slice.shape
                flat_longitude_slice = longitude_slice.flatten()
                
                # convert the array onto a shared memory array which can be accessed from within another process
                shared_array_base = Array(ctypes.c_double, flat_longitude_slice)
                shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
                shared_array = shared_array.reshape(original_shape)
                
                # loop over each latitude point in the longitude slice
                for lat_index in range(lat_size):
                    
                    # have the processor process the shared array at this index
                    arguments = [shared_array, lat_index]
                    processor.add_work_item(arguments)
                    
                # join to the processor and don't continue until all processes have completed
                processor.wait_on_all()
    
                                                 
                # write the fitted longitude slice values into the output NetCDF
                output_dataset.variables[variable_name][:, lon_index, :] = np.reshape(shared_array, (time_size, 1, lat_size))
    
            # all processes have completed
            processor.terminate()
        
    except Exception, e:
        logger.error('Failed to complete', exc_info=True)
        raise

Where am I going wrong, i.e. why are the values of the shared memory array not being updated as I'm expecting?

Update

I have this working now for a single process but when I try to spawn multiple processes I get a pickle error:

pickle.PicklingError: Can't pickle '_subprocess_handle' object: <_subprocess_handle object at 0x00000000021CF9F0>

This occurs when the second process is started, within the Processor.init() function. If I run the below code using a single process (number_of_workers = 1) then I do not encounter this error and my code runs as expected, although not utilizing multiple processors, which is a goal.

class Processor:
     
    queue = None
                 
    def __init__(self, 
                 shared_array,
                 data_shape,
                 number_of_workers=1):
              
        # create a joinable queue
        self.queue = JoinableQueue()
        
        # keep reference to shared memory array
        self.shared_array = shared_array
        self.data_shape = data_shape
        
        # create the processes
        self.processes = [Process(target=self.compute_indicator) for _ in range(number_of_workers)]
        for p in self.processes:
            p.start()
                 
    def add_work_item(self, item):
         
        # add the parameters list to the parameters queue
        self.queue.put(item)
 
    def compute_indicator(self):
         
        while True:
              
            # get a list of arguments from the queue
            arguments = self.queue.get()
              
            # if we didn't get one we keep looping
            if arguments is None:
                break
  
            # process the arguments here
            index = arguments[0] 
                 
            # turn the shared array into a numpy array     
            data = np.ctypeslib.as_array(self.shared_array)
            data = data.reshape(self.data_shape)
                
            # only process non-empty grid cells, i.e. data array contains at least some non-NaN values
            if (isinstance(data[:, index], np.ma.MaskedArray) and data[:, index].mask.all()) \
                or np.isnan(data[:, index]).all() or (data[:, index] < 0).all():
             
                pass         
                  
            else:  # we have some valid values to work with
             
                logger.info('Processing latitude: {}'.format(index))
             
                # perform computation     
                fitted_values = do_something(data[:, index])
 
                # update the shared array
                data[:, index] = fitted_values
                
            # indicate that the task has completed
            self.queue.task_done()
 
    def terminate(self):
 
        # terminate all processes
        for p in self.processes:
            p.terminate()

    def wait_on_all(self):
 
        #wait until queue is empty
        self.queue.join()

#-----------------------------------------------------------------------------------------------------------------------
if __name__ == '__main__':

            # create a shared memory array which can be accessed from within another process
            shared_array_base = Array(ctypes.c_double, time_size * lat_size, lock=False)
                
            # create a processor with a number of worker processes
            number_of_workers = 4
            data_shape = (time_size, lat_size)
            processor = Processor(shared_array_base, data_shape, number_of_workers)

            # for each longitude slice
            for lon_index in range(lon_size):
    
                logger.info('\n\nProcessing longitude: {}\n'.format(lon_index))

                # get the shared memory array and convert into a numpy array with proper dimensions
                longitude_array = np.ctypeslib.as_array(shared_array_base)
                longitude_array = np.reshape(longitude_array, data_shape)

                # read the longitude slice into the shared memory array     
                longitude_array[:] = input_dataset.variables[input_var_name][:, lon_index, :]
                
                # a list of arguments we'll map to the processes of the pool
                arguments_iterable = []
                
                # loop over each latitude point in the longitude slice
                for lat_index in range(lat_size):
                    
                    # have the processor process the shared array at this index
                    processor.add_work_item([lat_index])
                        
                # join to the processor and don't continue until all processes have completed
                processor.wait_on_all()
    
                # get the longitude slice of fitted values from the shared memory array and convert  
                # into a numpy array with proper dimensions which we can then use to write to NetCDF
                fitted_array = np.ctypeslib.as_array(shared_array_base)
                fitted_array = np.reshape(fitted_array, (time_size, 1, lat_size))
                                                 
                # write the longitude slice of computed values into the output NetCDF
                output_dataset.variables[variable_name][:, lon_index, :] = fitted_array
    
            # all processes have completed
            processor.terminate()
5
  • 1
    Why not use the map() method from multiprocessing.Pool instead of writing your own? Commented Apr 25, 2016 at 22:25
  • 1
    Just a thought, but you are creating the Process-es before you create the shared array. Are you sure that the Process-es actually have access to the shared array? It could be that they just get a copy... Commented Apr 25, 2016 at 22:29
  • 1
    shared_array_base should be passed as an argument to the target compute method. Actually, for POSIX systems it just needs to be inherited via fork, but for Windows support it needs to be an argument to allow the name and state of the mmap shared memory to be pickled and piped to the child process. You can wrap the shared array as a NumPy array in each worker process; don't pickle and pipe a copy to each worker like you're currently doing. Commented Apr 25, 2016 at 23:13
  • I want to have a loop where I read only a longitude slice of data into the shared memory array and all processes should operate on the shared array at the index provided as the argument to the compute() function. Once all processes have completed their compute() functions then the shared memory array is reread at the next longitude slice and more work items for processing the new slice (one for each latitude point of the slice) are added to queue, with the understanding that the processes will consume those new work items and compute on those points of the slice and so on. Commented Apr 26, 2016 at 12:42
  • When I use a pool I get a pickle error: cPickle.PicklingError: Can't pickle <class 'multiprocessing.sharedctypes.c_double_Array_71400'>: attribute lookup multiprocessing.sharedctypes.c_double_Array_71400 failed. It seems that the shared memory array can't be pickled when passed as an argument to the function. Commented Apr 26, 2016 at 14:05

1 Answer 1

1

I have now successfully implemented a solution although it is still exhibiting unexpected behaviors: 1) it runs on all CPUs on a Windows environment but the total elapsed time for the process is no faster than running a single processor job (i.e. the same code without any of the multiprocessing.* usages), and 2) when I run the code on a Linux environment (virtual container) I only see one out of four CPUs being utilized. In any event I have now a working code that utilizes a shared memory array, which is what the original question was about. If anyone can see where I'm going wrong which is leading to the two issues mentioned above then please follow up in the comments.

def compute(args):

    # extract the arguments
    lon_index = args[0]
    lat_index = args[1]

    # NOT SHOWN
    # get the data_shape value

    # turn the shared array into a numpy array
    data = np.ctypeslib.as_array(shared_array)
    data = data.reshape(data_shape)

    # perform the computation, update the indexed array slice
    data[:, lon_index, lat_index] = perform_computation(data[:, lon_index, lat_index])  

def init_process(array):

    # put the arguments to the global namespace  
    global shared_array
    shared_array = array


if __name__ == '__main__':

    # NOT SHOWN
    # get the lat_size, lon_size, time_size, lon_stride, and data_shape values

    # create a shared memory array which can be accessed from within another process
    shared_array = Array(ctypes.c_double, time_size * lon_stride * lat_size, lock=False)
    data_shape = (time_size, lon_stride, lat_size)

    # create a processor with a number of worker processes
    number_of_workers = cpu_count()

    # create a Pool, essentially forking with copies of the shared array going to each pooled/forked process
    pool = Pool(processes=number_of_workers, 
                initializer=init_process, 
                initargs=(shared_array))

    # for each slice
    for lon_index in range(0, lon_size, lon_stride):

        # convert the shared memory array into a numpy array with proper dimensions
        slice_array = np.ctypeslib.as_array(shared_array)
        slice_array = np.reshape(slice_array, data_shape)

        # read the longitude slice into the shared memory array     
        slice_array[:] = read_data(data_shape)

        # a list of arguments we'll map to the processes of the pool
        arguments_iterable = []

        # loop over each latitude point in the longitude slice
        for lat_index in range(lat_size):

            for i in range(lon_stride):

                # have the processor process the shared array at this index
                arguments = [i, lat_index]
                arguments_iterable.append(arguments)

                # map the arguments iterable to the compute function
                pool.map(compute, arguments_iterable)

                # get the longitude slice of fitted values from the shared memory array and convert  
                # into a numpy array with proper dimensions which we can then use to write to NetCDF
                fitted_array = np.ctypeslib.as_array(shared_array)
                fitted_array = np.reshape(fitted_array, (time_size, lon_stride, lat_size))

                # NOT SHOWN
                # write the slice of computed values to file

            # all processes have completed, close the pool
            pool.close()
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.