Load-balancing with IPython.parallel¶
import os,sys,time
import numpy as np
from IPython.core.display import display
from IPython import parallel
rc = parallel.Client()
dview = rc[:]
Create a LoadBalancedView
lview = rc.load_balanced_view()
lview
<LoadBalancedView None>
LoadBalancedViews behave very much like a DirectView on a single engine:
Each call to apply() results in a single remote computation,
and the result (or AsyncResult) of that call is returned directly,
rather than in a list, as in the multi-engine DirectView.
e0 = rc[0]
from numpy.linalg import norm
A = np.random.random(1024)
e0.apply_sync(norm, A, 2)
18.635657447025618
lview.apply_sync(norm, A, 2)
18.635657447025618
However, unlike the DirectView of a single engine, you are letting the IPython Scheduler decide which engine should do the work:
e0.apply_sync(os.getpid)
48459
for i in range(2*len(rc.ids)):
print lview.apply_sync(os.getpid)
48459 48461 48462 48460 48459 48461 48462
48460
Map¶
The LoadBalancedView also has a load-balanced version of the builtin map()
lview.block = True
serial_result = map(lambda x:x**10, range(32))
parallel_result = lview.map(lambda x:x**10, range(32))
serial_result==parallel_result
True
Just like apply(), you can use non-blocking map with block=False or map_async
amr = lview.map_async(lambda x:x**10, range(32))
Map results are iterable!¶
AsyncResults with multiple results are actually iterable before their results arrive.
This means that you can perform map/reduce operations on elements as they come in:
# scatter 'id', so id=0,1,2 on engines 0,1,2
dv = rc[:]
dv.scatter('id', rc.ids, flatten=True)
print dv['id']
# create a Reference to `id`. This will be a different value on each engine
ref = parallel.Reference('id')
tic = time.time()
ar = dv.apply(time.sleep, ref)
for i,r in enumerate(ar):
print "%i: %.3f"%(i, time.time()-tic)
[0, 1, 2, 3] 0: 0.010 1: 1.008
2: 2.010
3: 3.013
Now we submit a bunch of tasks of increasing magnitude, and watch where they happen, iterating through the results as they come.
def sleep_here(t):
"""sleep here for a time, return where it happened"""
import time
time.sleep(t)
return id
amr = lview.map(sleep_here, [.01*t for t in range(100)])
tic = time.time()
for i,r in enumerate(amr):
print "task %i on engine %i: %.3f" % (i, r, time.time()-tic)
task 0 on engine 2: 0.001 task 1 on engine 1: 0.001 task 2 on engine 0: 0.001 task 3 on engine 3: 0.001 task 4 on engine 2: 0.001 task 5 on engine 1: 0.002 task 6 on engine 0: 0.004 task 7 on engine 3: 0.005 task 8 on engine 2: 0.005 task 9 on engine 1: 0.005 task 10 on engine 0: 0.026 task 11 on engine 3: 0.032 task 12 on engine 2: 0.077
task 13 on engine 1: 0.104 task 14 on engine 0: 0.149
task 15 on engine 3: 0.183 task 16 on engine 2: 0.231
task 17 on engine 1: 0.276 task 18 on engine 0: 0.332
task 19 on engine 3: 0.375 task 20 on engine 2: 0.433
task 21 on engine 1: 0.489
task 22 on engine 0: 0.555
task 23 on engine 3: 0.608
task 24 on engine 2: 0.677
task 25 on engine 1: 0.754
task 26 on engine 0: 0.833
task 27 on engine 3: 0.883 task 28 on engine 2: 0.959
task 29 on engine 1: 1.040
task 30 on engine 0: 1.126
task 31 on engine 3: 1.202
task 32 on engine 2: 1.293
task 33 on engine 1: 1.374
task 34 on engine 0: 1.492
task 35 on engine 3: 1.569
task 36 on engine 2: 1.646
task 37 on engine 1: 1.742
task 38 on engine 0: 1.861
task 39 on engine 3: 1.953
task 40 on engine 2: 2.049
task 41 on engine 1: 2.154
task 42 on engine 0: 2.287
task 43 on engine 3: 2.386
task 44 on engine 2: 2.493
task 45 on engine 1: 2.608
task 46 on engine 0: 2.760
task 47 on engine 3: 2.863
task 48 on engine 2: 2.989
task 49 on engine 1: 3.113
task 50 on engine 0: 3.266
task 51 on engine 3: 3.379
task 52 on engine 2: 3.503
task 53 on engine 1: 3.640
task 54 on engine 0: 3.806
task 55 on engine 3: 3.938
task 56 on engine 2: 4.070
task 57 on engine 1: 4.213
task 58 on engine 0: 4.384
task 59 on engine 3: 4.523
task 60 on engine 2: 4.670
task 61 on engine 1: 4.829
task 62 on engine 0: 5.021
task 63 on engine 3: 5.160
task 64 on engine 2: 5.315
task 65 on engine 1: 5.483
task 66 on engine 0: 5.670
task 67 on engine 3: 5.838
task 68 on engine 2: 5.997
task 69 on engine 1: 6.182
task 70 on engine 0: 6.380
task 71 on engine 3: 6.546
task 72 on engine 2: 6.718
task 73 on engine 1: 6.912
task 74 on engine 0: 7.117
task 75 on engine 3: 7.295
task 76 on engine 2: 7.481
task 77 on engine 1: 7.680
task 78 on engine 0: 7.900
task 79 on engine 3: 8.087
task 80 on engine 2: 8.284
task 81 on engine 1: 8.493
task 82 on engine 0: 8.724
task 83 on engine 3: 8.935
task 84 on engine 2: 9.155
task 85 on engine 1: 9.344
task 86 on engine 0: 9.586
task 87 on engine 3: 9.796
task 88 on engine 2: 10.021
task 89 on engine 1: 10.238
task 90 on engine 0: 10.489
task 91 on engine 3: 10.709
task 92 on engine 2: 10.943
task 93 on engine 1: 11.181
task 94 on engine 0: 11.438
task 95 on engine 3: 11.665
task 96 on engine 2: 11.906
task 97 on engine 1: 12.158
task 98 on engine 0: 12.425
task 99 on engine 3: 12.659
Unlike DirectView.map(), which always results in one task per engine,
LoadBalance map defaults to one task per item in the sequence. This
can be changed by specifying the chunksize keyword arg.
amr = lview.map(sleep_here, [.01*t for t in range(100)], chunksize=4)
tic = time.time()
for i,r in enumerate(amr):
print "task %i on engine %i: %.3f"%(i, r, time.time()-tic)
task 0 on engine 2: 0.084 task 1 on engine 2: 0.084 task 2 on engine 2: 0.084 task 3 on engine 2: 0.084 task 4 on engine 1: 0.245
task 5 on engine 1: 0.246 task 6 on engine 1: 0.246 task 7 on engine 1: 0.246 task 8 on engine 0: 0.411
task 9 on engine 0: 0.411 task 10 on engine 0: 0.412 task 11 on engine 0: 0.412 task 12 on engine 3: 0.576
task 13 on engine 3: 0.577 task 14 on engine 3: 0.577 task 15 on engine 3: 0.577 task 16 on engine 2: 0.790
task 17 on engine 2: 0.790 task 18 on engine 2: 0.790 task 19 on engine 2: 0.791 task 20 on engine 1: 1.114
task 21 on engine 1: 1.114 task 22 on engine 1: 1.114 task 23 on engine 1: 1.115 task 24 on engine 0: 1.436
task 25 on engine 0: 1.436 task 26 on engine 0: 1.436 task 27 on engine 0: 1.436 task 28 on engine 3: 1.762
task 29 on engine 3: 1.763 task 30 on engine 3: 1.763 task 31 on engine 3: 1.763 task 32 on engine 2: 2.135
task 33 on engine 2: 2.136 task 34 on engine 2: 2.136 task 35 on engine 2: 2.136 task 36 on engine 1: 2.616
task 37 on engine 1: 2.616 task 38 on engine 1: 2.617 task 39 on engine 1: 2.617 task 40 on engine 0: 3.101
task 41 on engine 0: 3.101 task 42 on engine 0: 3.101 task 43 on engine 0: 3.102 task 44 on engine 3: 3.588
task 45 on engine 3: 3.589 task 46 on engine 3: 3.589 task 47 on engine 3: 3.589 task 48 on engine 2: 4.121
task 49 on engine 2: 4.122 task 50 on engine 2: 4.122 task 51 on engine 2: 4.122 task 52 on engine 1: 4.763
task 53 on engine 1: 4.763 task 54 on engine 1: 4.763 task 55 on engine 1: 4.764 task 56 on engine 0: 5.404
task 57 on engine 0: 5.405 task 58 on engine 0: 5.405 task 59 on engine 0: 5.405 task 60 on engine 3: 6.053
task 61 on engine 3: 6.054 task 62 on engine 3: 6.054 task 63 on engine 3: 6.054 task 64 on engine 2: 6.746
task 65 on engine 2: 6.746 task 66 on engine 2: 6.746 task 67 on engine 2: 6.747 task 68 on engine 1: 7.545
task 69 on engine 1: 7.546 task 70 on engine 1: 7.546 task 71 on engine 1: 7.546 task 72 on engine 0: 8.350
task 73 on engine 0: 8.351 task 74 on engine 0: 8.351 task 75 on engine 0: 8.351 task 76 on engine 3: 9.174
task 77 on engine 3: 9.175 task 78 on engine 3: 9.175 task 79 on engine 3: 9.175 task 80 on engine 2: 10.014
task 81 on engine 2: 10.014 task 82 on engine 2: 10.014 task 83 on engine 2: 10.015 task 84 on engine 1: 10.983
task 85 on engine 1: 10.984 task 86 on engine 1: 10.984 task 87 on engine 1: 10.985 task 88 on engine 0: 11.938
task 89 on engine 0: 11.938 task 90 on engine 0: 11.938 task 91 on engine 0: 11.938 task 92 on engine 3: 12.908
task 93 on engine 3: 12.908 task 94 on engine 3: 12.908 task 95 on engine 3: 12.909 task 96 on engine 2: 13.926
task 97 on engine 2: 13.927 task 98 on engine 2: 13.927 task 99 on engine 2: 13.927
def area(w,h):
return w*h
widths = range(1,4)
heights = range(6,10)
areas = []
for w in widths:
for h in heights:
areas.append(area(w,h))
areas
[6, 7, 8, 9, 12, 14, 16, 18, 18, 21, 24, 27]
%loadpy soln/nestedloop.py
Validate the result:
areas == ar.get()
True