class Job(object):
def __init__(self, name):
self.name = name
self.depends = []
self.waitcount = 0
def work(self):
#does some work
def add_dependent(self, another_job)
self.depends.append(another_job)
self.waitcount += 1
so, waitcount is based on the number of jobs you have in depends
job_board = {}
# create a dependency tree
for i in range(1000):
# create random jobs
j = Job(<new name goes here>)
# add jobs to depends if dependent
# record it in job_board
job_board[j.name] = j
# example
# jobC is in self.depends of jobA and jobB
# jobC would have a waitcount of 2
rdyQ = Queue.Queue()
def worker():
try:
job = rdyQ.get()
success = job.work()
# if this job was successful create dependent jobs
if success:
for dependent_job in job.depends:
dependent_job.waitcount -= 1
if dependent_job.waitcount == 0:
rdyQ.put(dependent_job)
and then i would create threads
for i in range(10):
t = threading.Thread( target=worker )
t.daemon=True
t.start()
for job_name, job_obj in job_board.iteritems():
if job_obj.waitcount == 0:
rdyQ.put(job_obj)
while True:
# until all jobs finished wait
Now here is an example:
# example
# jobC is in self.depends of jobA and jobB
# jobC would have a waitcount of 2
now in this scenario, if both jobA and jobB are running and they both tried to decrement waitcount of jobC, weird things were happening
so i put a lock
waitcount_lock = threading.Lock()
and changed this code to:
# if this job was successful create dependent jobs
if success:
for dependent_job in job.depends:
with waitcount_lock:
dependent_job.waitcount -= 1
if dependent_job.waitcount == 0:
rdyQ.put(dependent_job)
and strange things still happen
i.e. same job was being processed by multiple threads, as if the job was put into the queue twice
is it not a best practice to have/modify nested objects when complex objects are being pass amongst threads?
for job_name, job_obj in job_board.iteritems():. If some jobs complete before the for loop gets to them, dependent jobs could be queued by the worker and this loop so they'd run twice. You could create a run list such asrun_this = [job for job in job_board.iteritems() if job.waitcount == 0]and then queue them. That way the decision to queue is made before anything can run.