This seems to be trivially parallelizable:
- You've got an outer loop that you run 90 times.
- Each time, you're not mutating any shared arrays except
final_emit
- … and that, only to store into a unique row.
- It looks like most of the work inside the loop is numpy array-wide operations, which will release the GIL.
So (using the futures backport of concurrent.futures, since you seem to be on 2.7):
import numpy as np
import time
import futures
division = 90
freq_division = 50
cd = 3000
boost_factor = np.random.rand(division, division, cd)
freq_bins = np.linspace(1, 60, freq_division)
es = np.random.randint(1,10, size = (cd, freq_division))
final_emit = np.zeros((division, division, freq_division))
def dostuff(i):
fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
# ...
to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
return np.sum(to_bin_emit, axis=1)
with futures.ThreadPoolExecutor(max_workers=8) as x:
for i, row in enumerate(x.map(dostuff, xrange(division))):
final_emit[i] = row
If this works, there are two tweaks to try, either of which might be more efficient. We don't really care which order the results come back in, but map queues them up in order. This can waste a bit of space and time. I don't think it will make much difference (presumably, the vast majority of your time is presumably spent doing the calculations, not writing out the results), but without profiling your code, it's hard to be sure. So, there are two easy ways around this problem.
Using as_completed lets us use the results in whatever order they finish, rather than in the order we queued them. Something like this:
def dostuff(i):
fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
# ...
to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
return i, np.sum(to_bin_emit, axis=1)
with futures.ThreadPoolExecutor(max_workers=8) as x:
fs = [x.submit(dostuff, i) for i in xrange(division))
for i, row in futures.as_completed(fs):
final_emit[i] = row
Alternatively, we can make the function insert the rows directly, instead of returning them. This means we're now mutating a shared object from multiple threads. So I think we need a lock here, although I'm not positive (numpy's rules are a bit complicated, and I haven't read you code that thoroughly…). But that probably won't hurt performance significantly, and it's easy. So:
import numpy as np
import threading
# etc.
final_emit = np.zeros((division, division, freq_division))
final_emit_lock = threading.Lock()
def dostuff(i):
fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
# ...
to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
with final_emit_lock:
final_emit[i] = np.sum(to_bin_emit, axis=1)
with futures.ThreadPoolExecutor(max_workers=8) as x:
x.map(dostuff, xrange(division))
That max_workers=8 in all of my examples should be tuned for your machine. Too many threads is bad, because they start fighting each other instead of parallelizing; too few threads is even worse, because some of your cores just sit there idle.
If you want this to run on a variety of machines, rather than tuning it for each one, the best guess (for 2.7) is usually:
import multiprocessing
# ...
with futures.ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) as x:
But if you want to squeeze the max performance out of a specific machine, you should test different values. In particular, for a typical quad-core laptop with hyperthreading, the ideal value can be anywhere from 4 to 8, depending on the exact work you're doing, and it's easier to just try all the values than to try to predict.