0

Given the following program structure I am looking for a way to:

  • Detect child failures in the parent process

  • Terminate all processes (parent and children) and exit with status code 1

  • Achieve the above without relaxing the max queue size

Here's the code:

import multiprocessing


def worker1(queue1, queue2):
    while True:
        item = queue1.get()
        if item == 10:
            raise
        if item == 'stop':
            return
        # do something with item that generates multiple queue entries
        for x in xrange(1000):
            queue2.put(x)


def worker2(queue2, queue3):
    while True:
        item = queue2.get()
        if item == 'stop':
            return
        # do something with item, only one result to pass to the next queue
        queue3.put(1)


def worker3(queue3):
    while True:
        item = queue3.get()
        if item == 'stop':
            return
        # do something with item


def main():
    queue1 = multiprocessing.Queue()
    queue2 = multiprocessing.Queue(100)
    queue3 = multiprocessing.Queue(100)

    pool1 = multiprocessing.Pool(2, worker1, (queue1,queue2,))
    pool2 = multiprocessing.Pool(8, worker2, (queue2,queue3,))
    pool3 = multiprocessing.Pool(4, worker3, (queue3,))

    for i in xrange(100):
        queue1.put(i)
    for _ in range(pool1.__dict__['_processes']):
        queue1.put('stop')
    pool1.close()
    pool1.join()

    for _ in range(pool2.__dict__['_processes']):
        queue2.put('stop')
    pool2.close()
    pool2.join()

    for _ in range(pool3.__dict__['_processes']):
        queue3.put('stop')
    pool3.close()
    pool3.join()

    print 'finished'

if __name__ == '__main__':
    main()

Thanks!

8
  • 1
    Do the code works? If yes, go to CodeReview. If not, explain the problem to us. Commented Apr 2, 2017 at 17:57
  • The code works, but the parent process is unaware of uncaught exceptions in the subprocesses. I am looking for some help to detect them and shutdown all processes (parent and children) once an uncaught exception is raised. Commented Apr 2, 2017 at 18:00
  • Well, the usual way to catch exceptions is to use try: ... except: raise ~exception type here~ Commented Apr 2, 2017 at 18:03
  • sure, but those don't propagate to the parent process - which is where my problem lies Commented Apr 2, 2017 at 18:04
  • Wouldn't kill the process and restart it after a exception solve your problem? I mean, write a daemon to watch for process kill messages. So you will restart the process,and the parent will be notified. Commented Apr 2, 2017 at 18:07

1 Answer 1

1

You are kind of misusing the Pool objects here.

Instead of Pool objects, use Process objects directly.

Program the worker functions to exit with an error code when an exception is raised.

Then you can use the is_alive() method to check if the process is still running, and check the exitcode property of dead processes.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.