More efficient broadcast of arrays with memmap¶
Data movement is where IPython's naïve model suffers the most.
But knowing about your cluster lets you make smarter decisions about data movement than a simple rc[:].push.
In [ ]:
import socket
import os, sys, re
import numpy as np
from IPython import parallel
In [ ]:
rc = parallel.Client(profile='dirac')
eall = rc[:]
In [ ]:
engine_hosts = eall.apply_async(socket.gethostname).get_dict()
engine_hosts
In [ ]:
host_engines = {}
for eid, host in engine_hosts.items():
if host not in host_engines:
host_engines[host] = []
host_engines[host].append(eid)
host_engines
In [ ]:
sz = 256
data = np.random.random((sz,sz))
data = data.dot(data.T)
In [ ]:
%time _ = rc[:].apply_sync(lambda : None)
In [ ]:
ar = rc[:].push({'data': data}, block=False)
ar.wait_interactive()
In [ ]:
%px import numpy as np
In [ ]:
def array_to_file(A):
"""write an array to a temporary file, return its filename"""
import tempfile
with tempfile.NamedTemporaryFile(suffix='.np', delete=False) as tf:
np.save(tf, data)
data_path = tf.name
return data_path
In [ ]:
@parallel.interactive
def load_memmap(name, path, mode='r+'):
"""load a file on disk into the interactive namespace as a memmapped array"""
globals()[name] = np.memmap(path, mode=mode)
In [ ]:
def bcast_memmap(data, name, client, host_engines):
"""broadcast a numpy array efficiently
- sends data to each remote host only once
- loads with memmap everywhere
"""
# actually push the data, just once to each machine
local_filename = None
filenames_ars = {}
for host, engines in host_engines.items():
h0 = engines[0]
if host == socket.gethostname():
# Don't push at all to local engines
local_filename = array_to_file(data)
else:
filenames_ars[host] = rc[h0].apply_async(array_to_file, data)
# load the data on all engines into a memmapped array
msg_ids = []
for host, engines in host_engines.items():
if host == socket.gethostname():
filename = local_filename
else:
filename = filenames_ars[host].get()
ar = rc[engines].apply_async(load_memmap, name, filename)
msg_ids.extend(ar.msg_ids)
return parallel.AsyncResult(client, msg_ids=msg_ids)
In [ ]:
%%time
ar = bcast_memmap(data, 'data', rc, host_engines)
ar.wait_interactive()
In [ ]:
%px np.linalg.norm(data, 2)
You can also do the same thing [with MPI](MPI Broadcast.ipynb).