The code down below is a contrived example that simulates an actual problem I have that uses multiprocessing to speed up the code. The code is run on Windows 10 64-bit OS, python 3.7.5, and ipython 7.9.0
the transformation functions(these functions will be used to transform arrays in main())
from itertools import product
from functools import partial
from numba import njit, prange
import multiprocessing as mp
import numpy as np
@njit(parallel= True)
def transform_array_c(data, n):
ar_len= len(data)
sec_max1= np.empty(ar_len, dtype = data.dtype)
sec_max2= np.empty(ar_len, dtype = data.dtype)
for i in prange(n-1):
sec_max1[i]= np.nan
for sec in prange(ar_len//n):
s2_max= data[n*sec+ n-1]
s1_max= data[n*sec+ n]
for i in range(n-1,-1,-1):
if data[n*sec+i] > s2_max:
s2_max= data[n*sec+i]
sec_max2[n*sec+i]= s2_max
sec_max1[n*sec+ n-1]= sec_max2[n*sec]
for i in range(n-1):
if n*sec+n+i < ar_len:
if data[n*sec+n+i] > s1_max:
s1_max= data[n*sec+n+i]
sec_max1[n*sec+n+i]= max(s1_max, sec_max2[n*sec+i+1])
else:
break
return sec_max1
@njit(error_model= 'numpy', cache= True)
def rt_mean_sq_dev(array1, array2, n):
msd_temp = np.empty(array1.shape[0])
K = array2[n-1]
rs_x= array1[0] - K
rs_xsq = rs_x *rs_x
msd_temp[0] = np.nan
for i in range(1,n):
rs_x += array1[i] - K
rs_xsq += np.square(array1[i] - K)
msd_temp[i] = np.nan
y_i = array2[n-1] - K
msd_temp[n-1] = np.sqrt(max(y_i*y_i + (rs_xsq - 2*y_i*rs_x)/n, 0))
for i in range(n, array1.shape[0]):
rs_x = array1[i] - array1[i-n]+ rs_x
rs_xsq = np.square(array1[i] - K) - np.square(array1[i-n] - K) + rs_xsq
y_i = array2[i] - K
msd_temp[i] = np.sqrt(max(y_i*y_i + (rs_xsq - 2*y_i*rs_x)/n, 0))
return msd_temp
@njit(cache= True)
def transform_array_a(data, n):
result = np.empty(data.shape[0], dtype= data.dtype)
alpharev = 1. - 2 / (n + 1)
alpharev_exp = alpharev
e = data[0]
w = 1.
if n == 2: result[0] = e
else:result[0] = np.nan
for i in range(1, data.shape[0]):
w += alpharev_exp
e = e*alpharev + data[i]
if i > n -3:result[i] = e / w
else:result[i] = np.nan
if alpharev_exp > 3e-307:alpharev_exp*= alpharev
else:alpharev_exp=0.
return result
The multiprocessing part
def func(tup, data): #<-------------the function to be run among all
a_temp= a[tup[2][0]]
idx1 = a_temp > a[tup[2][1]]
idx2= a_temp < b[(tup[2][1], tup[1][1])]
c_final = c[tup[0][1]][idx1 | idx2]
data_final= data[idx1 | idx2]
return (tup[0][0], tup[1][0], *tup[2]), c_final[-1] - data_final[-1]
def setup(a_dict, b_dict, c_dict): #initialize the shared dictionaries
global a,b,c
a,b,c = a_dict, b_dict, c_dict
def main(a_arr, b_arr, c_arr, common_len):
np.random.seed(0)
data_array= np.random.normal(loc= 24004, scale=500, size= common_len)
a_size = a_arr[-1] + 1
b_size = len(b_arr)
c_size = len(c_arr)
loop_combo = product(enumerate(c_arr),
enumerate(b_arr),
(n_tup for n_tup in product(np.arange(1,a_arr[-1]), a_arr) if n_tup[1] > n_tup[0])
)
result = np.zeros((c_size, b_size, a_size -1 ,a_size), dtype = np.float32)
###################################################
#This part simulates the heavy-computation in the actual problem
a= {}
b= {}
c= {}
for i in range(1, a_arr[-1]+1):
a[i]= transform_array_a(data_array, i)
if i in a_arr:
for j in b_arr:
b[(i,j)]= rt_mean_sq_dev(data_array, a[i], i)/data_array *j
for i in c_arr:
c[i]= transform_array_c(data_array, i)
###################################################
with mp.Pool(processes= mp.cpu_count() - 1,
initializer= setup,
initargs= [a,b,c]
) as pool:
mp_res= pool.imap_unordered(partial(func, data= data_array),
loop_combo
)
for item in mp_res:
result[item[0]] =item[1]
return result
if __name__ == '__main__':
mp.freeze_support()
a_arr= np.arange(2,44,2)
b_arr= np.arange(0.4,0.8, 0.20)
c_arr= np.arange(2,42,10)
common_len= 440000
final_res= main(a_arr, b_arr, c_arr, common_len)
For performance reasons, multiple shared "read only" dictionaries are used among all processes to reduce the redundant computations(in the actual problem, the total computation time is reduced by 40% after using shared dictionaries among all the processes). However, The ram usage becomes absurdly higher after using shared dictionaries in my actual problem; memory usage in my 6C/12T Windows computer goes from (8.2GB peak, 5.0GB idle) to (23.9GB peak, 5.0GB idle), a little too high of a cost to pay in order to gain 40% speed up.
Is the high ram usage unavoidable when using multiple shared data among processes is a must? What can be done to my code in order to make it as fast as possible while using as low memory as possible?
Thank you in advance
Note: I tried using imap_unordered() instead of map because I heard it is supposed to reduce the memory usage when the input iterable is large, but I honestly can't see an improvement in the ram usage. Maybe I have done something wrong here?
EDIT: Due to the feedback in the answers, I have already changed the heavy computation part of the code such that it looks less dummy and resembles the computation in the actual problem.
numpy.memmaparrays.numpy.memmaparrays?