0

I am trying to read in several files, and append certain elements from them to a list. reading the files seems to be slow, so I thought multiprocessing might be able to help me with that. I produced the following code to do what I wanted, essentially in parallel open the numbered file_%i, and pull out the relevant data read_append and append it to a global array res = manager.list() that is shared between processes. sample code given below. However, this fails to work. attempting to print a.shape gives the error message included below the sample code. I am not quite sure how to fix this errant code, and am quite new to multiprocessing. I suspect, this hacky script that i put together using SO answers and the man pages for multiprocessing is far from ideal.

import multiprocessing as mp
import numpy as np
from timeit import default_timer as timer
start = timer()
def read_append(input_list):
    val, res_arr = input_list
    data_file = np.load('file_%i.npz' %val, mmap_mode = 'r', allow_pickle=True)['data']
    for i in range(len(data_file)):
        res_arr.append(data_file[i][1])
    return None


if __name__ == '__main__':
    N= mp.cpu_count()
    print(N)
    with mp.Manager() as manager:
        res = manager.list()
        input_list = [(val, res) for val in range(2)]
        with mp.Pool(processes = N) as p:
            results = p.map(read_append,input_list)
end = timer()
print(end-start)
a = list(res)
print(a.shape)


---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
~/anaconda3/lib/python3.7/multiprocessing/managers.py in _callmethod(self, methodname, args, kwds)
    810         try:
--> 811             conn = self._tls.connection
    812         except AttributeError:

AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

FileNotFoundError                         Traceback (most recent call last)
<ipython-input-13-35028af51086> in <module>
     21 end = timer()
     22 print(end-start)
---> 23 a = list(res)
     24 print(a.shape)

<string> in __len__(self, *args, **kwds)

~/anaconda3/lib/python3.7/multiprocessing/managers.py in _callmethod(self, methodname, args, kwds)
    813             util.debug('thread %r does not own a connection',
    814                        threading.current_thread().name)
--> 815             self._connect()
    816             conn = self._tls.connection
    817 

~/anaconda3/lib/python3.7/multiprocessing/managers.py in _connect(self)
    800         if threading.current_thread().name != 'MainThread':
    801             name += '|' + threading.current_thread().name
--> 802         conn = self._Client(self._token.address, authkey=self._authkey)
    803         dispatch(conn, None, 'accept_connection', (name,))
    804         self._tls.connection = conn

~/anaconda3/lib/python3.7/multiprocessing/connection.py in Client(address, family, authkey)
    490         c = PipeClient(address)
    491     else:
--> 492         c = SocketClient(address)
    493 
    494     if authkey is not None and not isinstance(authkey, bytes):

~/anaconda3/lib/python3.7/multiprocessing/connection.py in SocketClient(address)
    617     with socket.socket( getattr(socket, family) ) as s:
    618         s.setblocking(True)
--> 619         s.connect(address)
    620         return Connection(s.detach())
    621 

FileNotFoundError: [Errno 2] No such file or directory

1 Answer 1

1
  1. I don't think res is a global variable, why do you assume it so?
  2. A list does not have an attribute shape, numpy arrays do.
  3. You are trying to access the managed list res after closing the manager process in which the list resides. So you need to shift the code where you use res inside the with mp.Manager() as manager block:
  4. Your timer isn't actually measuring anything useful except in the main process. In the child processes it is actually measuring the time needed to import libraries and define functions. You should consider shifting it inside main. If you want to time how much time each function takes, then start the timer inside the function and return end-start:

Example fixed code:

import multiprocessing as mp
import numpy as np
from timeit import default_timer as timer


def read_append(input_list):
    val, res_arr = input_list
    data_file = np.load('file_%i.npz' %val, mmap_mode = 'r', allow_pickle=True)['data']
    for i in range(len(data_file)):
        res_arr.append(data_file[i][1])
    return None


if __name__ == '__main__':
    start = timer()
    N= mp.cpu_count()
    print(N)
    with mp.Manager() as manager:
        res = manager.list()
        input_list = [(val, res) for val in range(2)]
        with mp.Pool(processes = N) as p:
            results = p.map(read_append,input_list)
        a = np.array(res)
        print(a.shape)
    end = timer()
    print(end - start)
Sign up to request clarification or add additional context in comments.

2 Comments

thanks. I guess I see my mistake now. the variable res is not global but local to the context of the with statement. Could I have initialized between def read_append() and if __name__ == '__main__':. would that be considered a global variable and be accessible to the processes working in pool?
@jcp That would be a global variable and accessible within the function only if use global res inside. Moreover, if you are thinking that res will contain the managed list you create inside the if __name__... block then that is incorrect. That block only runs in the main process and not the child processes. Child and parent processes do not share memory.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.