0

I'm trying to use multiprocessing for doing multiple background jobs and using the main process as user interface which accepts commands through input(). Each process has to do certain jobs and will write its current status into a dictionary, which was created with manager.dict() and then passed to the Process.

After the creation of the processes, there is a loop with an input() for accessing the user commands. The commands are reduced to a minimum for simplicity here.

from multiprocessing import Manager
from multiprocessing import Process
with Manager() as manager:
    producers = []
    settings = [{'name':'test'}]

    for setting in settings:
        status = manager.dict()
        logger.info("Start Producer {0}".format(setting['name']))
        producer = Process(target=start_producer, args=(setting, status))
        producer.start()
        producers.append([producer, status])

    logger.info("initialized {0} producers".format(len(producers)))

    while True:
        text_command = input('Enter your command:')
        if text_command == 'exit':
            logger.info("waiting for producers")
            for p in producers:
                p[0].join()
            logger.info("Exit application.")
            break
        elif text_command == 'status':
            for p in producers:
                if 'name' in p[1] and 'status' in p[1]:
                    print('{0}:{1}'.format(p[1]['name'], p[1]['status']))
        else:
            print("Unknown command.")

The method which runs in other processes is pretty simple:

def start_producer(producer_setting: dict, status_dict: dict):
    importer = MyProducer(producer_setting)
    importer.set_status_dict(status_dict)
    importer.run()

I create a MyProducer instance and set the status-dictionary through a setter of the object and call the blocking run() method, which will only return when the producer is finished. On calling set_status_dict(status_dict), the dictionary is filled with a name and status element.

When I run the code, the producer seems to get created, I receive the "Start Producer test" and "initialized 1 producers" output and after that the "Enter your command" request from the input(), but it seems that the actual process doesn't run.

When I press enter to skip the first loop iteration, I get the expected "unknown command" log and the producer-process begins the actual work. After that my "status" command also works as expected.

When I enter 'status' in the first loop-iteration I get an key-Error, because 'name' and 'status' are not set in the dictionary. Those keys should get set in set_status_dict() which itself is called in Process(target=...).

Why is that? Shouldn't producer.start() run the complete block of start_producer inside a new process and therefor never hang on the input() of the main-process?

How can I start the processes first without any user input and only then wait for input()?

Edit: A complete mvce programm with this problem can be found here: https://pastebin.com/k8xvhLhn

Edit: A solution with sleep(1) after initializing the processes has been found. But why does that behavior happen in the first place? Shouldn't run all code in start_producer() run in a new process?

6
  • you cannot share input() between processes. Commented Apr 4, 2018 at 14:37
  • I don't want share input(). the other processes should run on their own without interacting with input.. Commented Apr 4, 2018 at 14:38
  • I (completely) replaced start_producer suite with status_dict['name'] = 'foo'; status_dict['status'] = 'thinking' and everything else seems to be working the way you want. Commented Apr 4, 2018 at 15:19
  • 1
    How do you know (why do you think) it is hanging on input()? Commented Apr 4, 2018 at 15:22
  • when I press enter Unknown command. is printed in console and after that the other process runs as excepted. Commented Apr 4, 2018 at 15:23

1 Answer 1

1

I have limited experience with the multiprocessing module but I was able to get it to behave the way (i think) you want. First I added some print statements at the top of the while loop to see what might be going on and found that if the process was run or joined it worked. I figured you didn't want it to block so I added the call to run further up the process - but it appears that run() also blocks. Turns out that the process just wasn't finished when the first while loop iteration came around - adding time.sleep(30) at the top of the loop gave the process enough time to get scheduled (by the OS) and run. (On my machine it actually only needs between 200 and 300 milliseconds of nap time)

I replaced start_producer with :

def start_producer(producer_setting: dict, status_dict: dict):
##    importer = MyProducer(producer_setting)
##    importer.set_status_dict(status_dict)
##    importer.run()
    #time.sleep(30)
    status_dict['name'] = 'foo'
    status_dict['status'] = 'thinking'

Your code modified:

if __name__ == '__main__':
    with Manager() as manager:
        producers = []
        settings = [{'name':'test'}]

        for setting in settings:
            status = manager.dict()
            logger.info("Start Producer {0}".format(setting['name']))
            producer = Process(target=start_producer, args=(setting, status))
            producer.start()
            # add a call to run() but it blocks
            #producer.run()
            producers.append([producer, status])

        logger.info("initialized {0} producers".format(len(producers)))

        while True:
            time.sleep(30)
            for p, s in producers:
                #p.join()
                #p.run()
                print(f'name:{p.name}|alive:{p.is_alive()}|{s}')
                if 'name' in s and 'status' in s:
                    print('{0}:{1}'.format(s['name'], s['status']))
            text_command = input('Enter your command:')
            if text_command == 'exit':
                logger.info("waiting for producers")
                for p in producers:
                    p[0].join()
                logger.info("Exit application.")
                break
            elif text_command == 'status':
                for p in producers:
                    if 'name' in p[1] and 'status' in p[1]:
                        print('{0}:{1}'.format(p[1]['name'], p[1]['status']))
            else:
                print("Unknown command.")
Sign up to request clarification or add additional context in comments.

5 Comments

those producer processes will run for a long time. that was the reason I put them in a process in the first place. and yes, run() blocks, that method should not be called by the main process but the newly created one. the solution with sleep(1) after initializing the processes seems to work fine, thank you. But I'm curious why this happens. I really thought that the whole block start_producer() runs already in another process an should not get blocked. I don't understand why this is happening.
My take on that whole excercise was that it isn't blocked, it just hasn't had time to run or finish. I really don't know how the manager.dict gets updated, if it is dynamic and happens while the other process is running or if it only gets updated when the process finishes even if it is the first statement(s): I really think it happens in process while the code is executing.
@JackO'neill There are a lot of multiprocessing questions here on SO that have answers/comments alluding to a a lot of overhead to start another process, I imagine that overhead includes the OS scheduling the task which you don't have control over.
ok, that makes sense. I'm fine with waiting a bit before continue the main thread, was just curious why. thanks for your help :)
@JackO'neill If you are really curious you could sprinkle some time.time() statements in the main and sub-process (maybe another item in status) and compare them afterwards.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.