I have written a small multiprocessing program in Python which reads an array of values and runs multiple processes asynchronously to operate on parts of the data array. Each separate process should its own 1-D section of the 2-D array, with no overlap between processes. Once all the processes have completed the shared memory array is to be written out to file, but it's at this point in my code where the expected/computed values are not present in the shared memory array, but the original values are still present. It seems that the assignment of new values within the processes did not stick to the shared memory object. Perhaps there's something going on that I'm not understanding (for example pass by reference vs. pass by value) which is at play causing my trouble?
I have a Processor class which creates a number of worker processes and instantiates a JoinableQueue. The function that is called by each process operates on an indexed slice of the 2-D shared memory array and updates those array values in place, so the input (shared memory) array should have all values replaced by the results of the computation so there should be no need to have a second array for the results. The main function passes the shared memory array and an index value as arguments for the compute function, these are added into a queue from which the process objects will consume items of work. The code is below:
class Processor:
queue = None
def __init__(self,
number_of_workers=1):
# create a joinable queue
self.queue = JoinableQueue()
# create the processes
self.processes = [Process(target=self.compute) for _ in range(number_of_workers)]
for p in self.processes:
p.start()
def add_work_item(self, item):
# add the parameters list to the parameters queue
self.queue.put(item)
def compute(self):
while True:
# get a list of arguments from the queue
arguments = self.queue.get()
# if we didn't get one we keep looping
if arguments is None:
break
# process the arguments here
data = arguments[0]
index = arguments[1]
# only process non-empty grid cells, i.e. data array contains at least some non-NaN values
if (isinstance(data[:, index], np.ma.MaskedArray) and data[:, index].mask.all()) or np.isnan(data[:, index]).all():
pass
else: # we have some valid values to work with
logger.info('Processing latitude: {}'.format(index))
# perform a fitting to gamma
results = do_something(data[:, index])
# update the shared array
data[:, index] = results
# indicate that the task has completed
self.queue.task_done()
def terminate(self):
# terminate all processes
for p in self.processes:
p.terminate()
def wait_on_all(self):
#wait until queue is empty
self.queue.join()
#-----------------------------------------------------------------------------------------------------------------------
if __name__ == '__main__':
try:
# log some timing info, used later for elapsed time
start_datetime = datetime.now()
logger.info("Start time: {}".format(start_datetime, '%x'))
# get the command line arguments
input_file = sys.argv[1]
input_var_name = sys.argv[2]
output_file_base = sys.argv[3]
month_scale = int(sys.argv[4])
# create the variable name from the indicator, distribution, and month scale
variable_name = 'spi_gamma_{}'.format(str(month_scale).zfill(2))
# open the NetCDF files
with netCDF4.Dataset(input_file) as input_dataset, \
netCDF4.Dataset(output_file_base + '_' + variable_name + '.nc', 'w') as output_dataset:
# read info from the input dataset and initialize the output for writing
# create a processor with a number of worker processes
number_of_workers = 1
processor = Processor(number_of_workers)
# for each longitude slice
for lon_index in range(lon_size):
logger.info('\n\nProcessing longitude: {}\n'.format(lon_index))
# read the longitude slice into a data array
longitude_slice = input_dataset.variables[input_var_name][:, lon_index, :]
# reshape into a 1-D array
original_shape = longitude_slice.shape
flat_longitude_slice = longitude_slice.flatten()
# convert the array onto a shared memory array which can be accessed from within another process
shared_array_base = Array(ctypes.c_double, flat_longitude_slice)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
shared_array = shared_array.reshape(original_shape)
# loop over each latitude point in the longitude slice
for lat_index in range(lat_size):
# have the processor process the shared array at this index
arguments = [shared_array, lat_index]
processor.add_work_item(arguments)
# join to the processor and don't continue until all processes have completed
processor.wait_on_all()
# write the fitted longitude slice values into the output NetCDF
output_dataset.variables[variable_name][:, lon_index, :] = np.reshape(shared_array, (time_size, 1, lat_size))
# all processes have completed
processor.terminate()
except Exception, e:
logger.error('Failed to complete', exc_info=True)
raise
Where am I going wrong, i.e. why are the values of the shared memory array not being updated as I'm expecting?
Update
I have this working now for a single process but when I try to spawn multiple processes I get a pickle error:
pickle.PicklingError: Can't pickle '_subprocess_handle' object: <_subprocess_handle object at 0x00000000021CF9F0>
This occurs when the second process is started, within the Processor.init() function. If I run the below code using a single process (number_of_workers = 1) then I do not encounter this error and my code runs as expected, although not utilizing multiple processors, which is a goal.
class Processor:
queue = None
def __init__(self,
shared_array,
data_shape,
number_of_workers=1):
# create a joinable queue
self.queue = JoinableQueue()
# keep reference to shared memory array
self.shared_array = shared_array
self.data_shape = data_shape
# create the processes
self.processes = [Process(target=self.compute_indicator) for _ in range(number_of_workers)]
for p in self.processes:
p.start()
def add_work_item(self, item):
# add the parameters list to the parameters queue
self.queue.put(item)
def compute_indicator(self):
while True:
# get a list of arguments from the queue
arguments = self.queue.get()
# if we didn't get one we keep looping
if arguments is None:
break
# process the arguments here
index = arguments[0]
# turn the shared array into a numpy array
data = np.ctypeslib.as_array(self.shared_array)
data = data.reshape(self.data_shape)
# only process non-empty grid cells, i.e. data array contains at least some non-NaN values
if (isinstance(data[:, index], np.ma.MaskedArray) and data[:, index].mask.all()) \
or np.isnan(data[:, index]).all() or (data[:, index] < 0).all():
pass
else: # we have some valid values to work with
logger.info('Processing latitude: {}'.format(index))
# perform computation
fitted_values = do_something(data[:, index])
# update the shared array
data[:, index] = fitted_values
# indicate that the task has completed
self.queue.task_done()
def terminate(self):
# terminate all processes
for p in self.processes:
p.terminate()
def wait_on_all(self):
#wait until queue is empty
self.queue.join()
#-----------------------------------------------------------------------------------------------------------------------
if __name__ == '__main__':
# create a shared memory array which can be accessed from within another process
shared_array_base = Array(ctypes.c_double, time_size * lat_size, lock=False)
# create a processor with a number of worker processes
number_of_workers = 4
data_shape = (time_size, lat_size)
processor = Processor(shared_array_base, data_shape, number_of_workers)
# for each longitude slice
for lon_index in range(lon_size):
logger.info('\n\nProcessing longitude: {}\n'.format(lon_index))
# get the shared memory array and convert into a numpy array with proper dimensions
longitude_array = np.ctypeslib.as_array(shared_array_base)
longitude_array = np.reshape(longitude_array, data_shape)
# read the longitude slice into the shared memory array
longitude_array[:] = input_dataset.variables[input_var_name][:, lon_index, :]
# a list of arguments we'll map to the processes of the pool
arguments_iterable = []
# loop over each latitude point in the longitude slice
for lat_index in range(lat_size):
# have the processor process the shared array at this index
processor.add_work_item([lat_index])
# join to the processor and don't continue until all processes have completed
processor.wait_on_all()
# get the longitude slice of fitted values from the shared memory array and convert
# into a numpy array with proper dimensions which we can then use to write to NetCDF
fitted_array = np.ctypeslib.as_array(shared_array_base)
fitted_array = np.reshape(fitted_array, (time_size, 1, lat_size))
# write the longitude slice of computed values into the output NetCDF
output_dataset.variables[variable_name][:, lon_index, :] = fitted_array
# all processes have completed
processor.terminate()
map()method frommultiprocessing.Poolinstead of writing your own?Process-es before you create the shared array. Are you sure that theProcess-es actually have access to the shared array? It could be that they just get a copy...shared_array_baseshould be passed as an argument to the targetcomputemethod. Actually, for POSIX systems it just needs to be inherited viafork, but for Windows support it needs to be an argument to allow the name and state of themmapshared memory to be pickled and piped to the child process. You can wrap the shared array as a NumPy array in each worker process; don't pickle and pipe a copy to each worker like you're currently doing.