0
class Job(object):

    def __init__(self, name):
        self.name = name
        self.depends = []
        self.waitcount = 0

    def work(self):
         #does some work

    def add_dependent(self, another_job)
        self.depends.append(another_job)
        self.waitcount += 1

so, waitcount is based on the number of jobs you have in depends

job_board = {}
# create a dependency tree
for i in range(1000):
    # create random jobs
    j = Job(<new name goes here>)

    # add jobs to depends if dependent

    # record it in job_board
    job_board[j.name] = j

# example
# jobC is in self.depends of jobA and jobB
# jobC would have a waitcount of 2

rdyQ = Queue.Queue()

def worker():
    try:
        job = rdyQ.get()
        success = job.work()

        # if this job was successful create dependent jobs
        if success:
            for dependent_job in job.depends:
                dependent_job.waitcount -= 1
                if dependent_job.waitcount == 0:
                    rdyQ.put(dependent_job)

and then i would create threads

for i in range(10):
    t = threading.Thread( target=worker )
    t.daemon=True
    t.start()

for job_name, job_obj in job_board.iteritems():
    if job_obj.waitcount == 0:
        rdyQ.put(job_obj)

while True:
    # until all jobs finished wait

Now here is an example:

# example
# jobC is in self.depends of jobA and jobB
# jobC would have a waitcount of 2

now in this scenario, if both jobA and jobB are running and they both tried to decrement waitcount of jobC, weird things were happening

so i put a lock

waitcount_lock = threading.Lock()

and changed this code to:

# if this job was successful create dependent jobs
if success:
    for dependent_job in job.depends:
        with waitcount_lock:
            dependent_job.waitcount -= 1
            if dependent_job.waitcount == 0:
                rdyQ.put(dependent_job)

and strange things still happen

i.e. same job was being processed by multiple threads, as if the job was put into the queue twice

is it not a best practice to have/modify nested objects when complex objects are being pass amongst threads?

9
  • 1
    You have a problem at for job_name, job_obj in job_board.iteritems():. If some jobs complete before the for loop gets to them, dependent jobs could be queued by the worker and this loop so they'd run twice. You could create a run list such as run_this = [job for job in job_board.iteritems() if job.waitcount == 0] and then queue them. That way the decision to queue is made before anything can run. Commented Sep 23, 2016 at 21:42
  • or just run that section before starting the threads so the queue is already populated? Commented Sep 23, 2016 at 21:52
  • Yes, that would work also. Commented Sep 23, 2016 at 21:53
  • If this works, let me know so I can make it the answer. Its an interesting race condition. Commented Sep 23, 2016 at 21:55
  • 2
    It's a race condition. Suppose Y depends on X. X is queued and the worker decrements Y.workcount, sees that it is zero and puts it on the queue. Now the main loop looks at Y and sees the same waitcount that the worker decremented so queues it a second time. Commented Sep 24, 2016 at 5:40

1 Answer 1

1

Here's a complete, executable program that appears to work fine. I expect you're mostly seeing "weird" behavior because, as I suggested in a comment, you're counting job successors instead of job predecessors. So I renamed things with "succ" and "pred" in their names to make that much clearer. daemon threads are also usually a Bad Idea, so this code arranges to shut down all the threads cleanly when the work is over. Note too the use of assertions to verify that implicit beliefs are actually true ;-)

import threading
import Queue
import random

NTHREADS = 10
NJOBS = 10000

class Job(object):
    def __init__(self, name):
        self.name = name
        self.done = False
        self.succs = []
        self.npreds = 0

    def work(self):
        assert not self.done
        self.done = True
        return True

    def add_dependent(self, another_job):
        self.succs.append(another_job)
        another_job.npreds += 1

def worker(q, lock):
    while True:
        job = q.get()
        if job is None:
            break
        success = job.work()
        if success:
            for succ in job.succs:
                with lock:
                    assert succ.npreds > 0
                    succ.npreds -= 1
                    if succ.npreds == 0:
                        q.put(succ)
        q.task_done()

jobs = [Job(i) for i in range(NJOBS)]
for i, job in enumerate(jobs):
    # pick some random successors
    possible = xrange(i+1, NJOBS)
    succs = random.sample(possible,
                          min(len(possible),
                              random.randrange(10)))
    for succ in succs:
        job.add_dependent(jobs[succ])

q = Queue.Queue()
for job in jobs:
    if job.npreds == 0:
        q.put(job)
print q.qsize(), "ready jobs initially"

lock = threading.Lock()
threads = [threading.Thread(target=worker,
                            args=(q, lock))
           for _ in range(NTHREADS)]

for t in threads:
    t.start()
q.join()
# add sentinels so threads end cleanly
for t in threads:
    q.put(None)
for t in threads:
    t.join()
for job in jobs:
    assert job.done
    assert job.npreds == 0

CLARIFYING THE LOCK

In a sense, the lock in this code protects "too much". The potential problem it's addressing is that multiple threads may try to decrement the .npreds member of the same Job object simultaneously. Without mutual exclusion, the stored value at the end of that may be anywhere from 1 smaller than its initial value, to the correct result (the initial value minus the number of threads trying to decrement it).

But there's no need to also mutate the queue under lock protection. Queues do their own thread-safe locking. So, e.g., the code could be written like so instead:

        for succ in job.succs:
            with lock:
                npreds = succ.npreds = succ.npreds - 1
            assert npreds >= 0
            if npreds == 0:
                q.put(succ)

It's generally best practice to hold a lock for as little time as possible. However, I find this rewrite harder to follow. Pick your poison ;-)

Sign up to request clarification or add additional context in comments.

4 Comments

thank you so much for this code. one question, is there any performance impact on trying to get a lock each iteration of the code vs. just acquire one lock for the whole duration of the loop?
second question, if jobC is in self.depends of jobA and jobB. whel jobA is going to the decrementing of jobC, that new value would be reflected on jobC which is in jobB's self.depends right? just a little paranoid here. i know multithread shares memory so i would think so
To the second comment, yes. jobC is a unique physically shared object across all the lists it may be in. To the first comment, there's no answer short of you timing it both ways in your actual code. Of course it takes more time to acquire and release a lock, say, 100 times instead of 1 time. But the lock also potentially blocks other threads from running for the duration, and perhaps (depending on all the details of your real application) holding the lock longer than semantically necessary will needlessly limit potential concurrency.
yes yes very enlightening point. thank you so much! ill play with your code! thank you!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.