DirectView as multiplexer¶
import os,sys,time
import numpy as np
from IPython.core.display import display
from IPython import parallel
rc = parallel.Client()
The DirectView can be readily understood as an Engine Multiplexer - it does the same thing on all of its engines.
The only difference between running code on a single remote engine and running code in parallel is how many engines the DirectView is instructed to use.
You can create DirectViews by index-access to the Client. This creates
a DirectView using the engines after passing the same index (or slice)
to the ids list.
e0 = rc[0]
eall = rc[:]
even = rc[::2]
odd = rc[1::2]
# this is the one we are going to use:
dview = eall
dview.block = True
Now, the only difference from single-engine remote execution is that the code we run happens on all of the engines of a given view:
for view in (e0, eall, even, odd):
print view, view.apply_sync(os.getpid)
<DirectView 0> 48550 <DirectView [0, 1, 2, 3]> [48550, 48549, 48546, 48551] <DirectView [0, 2]> [48550, 48546] <DirectView [1, 3]>
[48549, 48551]
The results of multiplexed execution is always a list of the length of the number of engines.
dview['a'] = 5
dview['a']
[5, 5, 5, 5]
Scatter and Gather¶
Lots of parallel computations involve partitioning data onto processes.
DirectViews have scatter() and gather() methods, to help with this.
Pass any container or numpy array, and IPython will partition the object onto the engines wih scatter,
or reconstruct the full object in the Client with gather().
dview.scatter('a',range(16))
dview['a']
[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
dview.gather('a')
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
dview.execute("asum = sum(a)")
dview.gather('asum')
[6, 22, 38, 54]
We can pass a 'flatten' keyword, to instruct engines that will only get one item of the list to get the actual item, rather than a one-element sublist:
dview.scatter('id',rc.ids)
dview['id']
[[0], [1], [2], [3]]
dview.scatter('id',rc.ids, flatten=True)
dview['id']
[0, 1, 2, 3]
Scatter and gather also work with numpy arrays
A = np.random.randint(1,10,(16,4))
B = np.random.randint(1,10,(4,16))
display(A)
display(B)
array([[9, 4, 7, 3],
[7, 7, 4, 8],
[1, 7, 1, 9],
[7, 1, 2, 6],
[7, 9, 1, 4],
[4, 7, 4, 2],
[9, 2, 3, 2],
[3, 8, 8, 7],
[8, 6, 5, 9],
[3, 2, 1, 3],
[1, 1, 6, 3],
[7, 3, 8, 5],
[1, 1, 2, 3],
[3, 3, 5, 6],
[8, 1, 9, 2],
[1, 5, 5, 7]])
array([[7, 8, 5, 9, 3, 8, 1, 2, 6, 1, 6, 9, 9, 8, 5, 7],
[1, 3, 1, 3, 6, 8, 1, 5, 8, 4, 6, 9, 9, 4, 2, 9],
[8, 2, 2, 5, 9, 5, 2, 6, 1, 1, 6, 7, 7, 9, 3, 2],
[8, 9, 4, 6, 1, 8, 4, 3, 9, 1, 4, 9, 1, 4, 6, 4]])
dview.scatter('A', A)
dview.scatter('B', B)
display(e0['A'])
display(e0['B'])
array([[9, 4, 7, 3],
[7, 7, 4, 8],
[1, 7, 1, 9],
[7, 1, 2, 6]])
array([[7, 8, 5, 9, 3, 8, 1, 2, 6, 1, 6, 9, 9, 8, 5, 7]])
Excercise: Parallel Matrix Multiply¶
Can you compute the Matrix product C=A.dot(B) in parallel? (not looking for brilliant, just correct).
%loadpy soln/matmul.py
Let's run this, and validate the result against a local computation.
C_ref = A.dot(B)
C1 = pdot(dview, A, B)
# validation:
(C1==C_ref).all()
True
Map¶
DirectViews have a map method, which behaves just like the builtin map, but computed in parallel.
dview.block = True
serial_result = map(lambda x:x**10, range(32))
parallel_result = dview.map(lambda x:x**10, range(32))
serial_result==parallel_result
True
DirectView.map partitions the sequences onto each engine,
and then calls map remotely. The result is always a single
IPython task per engine.
amr = dview.map_async(lambda x:x**10, range(32))
amr.msg_ids
['62561534-aea1-4a2a-95bf-dcb111a664a6', 'ebba3e35-c785-4b89-b4a7-f38fb5889c2d', '94390aef-2785-4408-aa45-d01a191a0f83', '2d3f6f9d-35a2-44e8-908b-48915eff9d8f']
amr = dview.map_async(lambda x:x**10, range(3200))
amr.msg_ids
['0220ba6d-2435-4cda-9063-5c0a23755246', '259a6313-bb90-4bbc-8872-d8f968cd06ad', 'b83a3174-0f95-432b-86d8-56ce698fe494', 'baae174f-07b5-4a2e-8f5e-ba5e54a7024d']