I am trying to read in several files, and append certain elements from them to a list. reading the files seems to be slow, so I thought multiprocessing might be able to help me with that. I produced the following code to do what I wanted, essentially in parallel open the numbered file_%i, and pull out the relevant data read_append and append it to a global array res = manager.list() that is shared between processes. sample code given below. However, this fails to work. attempting to print a.shape gives the error message included below the sample code. I am not quite sure how to fix this errant code, and am quite new to multiprocessing. I suspect, this hacky script that i put together using SO answers and the man pages for multiprocessing is far from ideal.
import multiprocessing as mp
import numpy as np
from timeit import default_timer as timer
start = timer()
def read_append(input_list):
val, res_arr = input_list
data_file = np.load('file_%i.npz' %val, mmap_mode = 'r', allow_pickle=True)['data']
for i in range(len(data_file)):
res_arr.append(data_file[i][1])
return None
if __name__ == '__main__':
N= mp.cpu_count()
print(N)
with mp.Manager() as manager:
res = manager.list()
input_list = [(val, res) for val in range(2)]
with mp.Pool(processes = N) as p:
results = p.map(read_append,input_list)
end = timer()
print(end-start)
a = list(res)
print(a.shape)
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
~/anaconda3/lib/python3.7/multiprocessing/managers.py in _callmethod(self, methodname, args, kwds)
810 try:
--> 811 conn = self._tls.connection
812 except AttributeError:
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above exception, another exception occurred:
FileNotFoundError Traceback (most recent call last)
<ipython-input-13-35028af51086> in <module>
21 end = timer()
22 print(end-start)
---> 23 a = list(res)
24 print(a.shape)
<string> in __len__(self, *args, **kwds)
~/anaconda3/lib/python3.7/multiprocessing/managers.py in _callmethod(self, methodname, args, kwds)
813 util.debug('thread %r does not own a connection',
814 threading.current_thread().name)
--> 815 self._connect()
816 conn = self._tls.connection
817
~/anaconda3/lib/python3.7/multiprocessing/managers.py in _connect(self)
800 if threading.current_thread().name != 'MainThread':
801 name += '|' + threading.current_thread().name
--> 802 conn = self._Client(self._token.address, authkey=self._authkey)
803 dispatch(conn, None, 'accept_connection', (name,))
804 self._tls.connection = conn
~/anaconda3/lib/python3.7/multiprocessing/connection.py in Client(address, family, authkey)
490 c = PipeClient(address)
491 else:
--> 492 c = SocketClient(address)
493
494 if authkey is not None and not isinstance(authkey, bytes):
~/anaconda3/lib/python3.7/multiprocessing/connection.py in SocketClient(address)
617 with socket.socket( getattr(socket, family) ) as s:
618 s.setblocking(True)
--> 619 s.connect(address)
620 return Connection(s.detach())
621
FileNotFoundError: [Errno 2] No such file or directory