I wrote a script to launch a number of processes (simple unit tests) to run in parallel. It will do N jobs with num_workers parallel processes at a time.
My first implementation ran the processes in batches of num_workers and seemed to be working fine (I used the false command here to test the behavior)
import subprocess
errors = 0
num_workers = 10
N = 100
i = 0
while i < N:
processes = []
for j in range(i, min(i+num_workers, N)):
p = subprocess.Popen(['false'])
processes.append(p)
[p.wait() for p in processes]
exit_codes = [p.returncode for p in processes]
errors += sum(int(e != 0) for e in exit_codes)
i += num_workers
print(f"There were {errors}/{N} errors")
However, the tests do not take equal amounts of time so I was sometimes waiting for a slow test to finish. Thus I rewrote it to keep assigning tasks as they finished
import subprocess
import os
errors = 0
num_workers = 40
N = 100
assigned = 0
completed = 0
processes = set()
while completed < N:
if assigned < N:
p = subprocess.Popen(['false'])
processes.add((assigned, p))
assigned += 1
if len(processes) >= num_workers or assigned == N:
os.wait()
for i, p in frozenset(processes):
if p.poll() is not None:
completed += 1
processes.remove((i, p))
err = p.returncode
print(i, err)
if err != 0:
errors += 1
print(f"There were {errors}/{N} errors")
However this produces wrong results for the last few processes. For instance, in the example above it produces 98/100 errors instead of 100. I checked and this has nothing to do with concurrency; the 2 latest jobs were returning with exit code 0 for some reason.
Why does this happen?
errorsvariable.os.wait()clobbers the return code forpoll().