0

Summary

Using Python I want to start multiple processes which run the same executable with different parameters in parallel. When all are finished I want to check there were not errors and then do some more processing.

What I've tried

I have this already:

def main(path_of_script):
    path_of_exe = make_path_to_exe(path_of_script)
    #
    lst_standin_params = [["1", "5"], ["2", "1"]]
    #
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
        #
        future_standin_exe = { 
            executor.submit(
                subprocess.Popen(
                    [path_of_exe, standin_arg_lst[TASK_ID_IDX], standin_arg_lst[TASK_DELAY_IDX]]
                )
            ): standin_arg_lst for standin_arg_lst in lst_standin_params 
        }
        #
        for future in concurrent.futures.as_completed(future_standin_exe):
            tmp_rv_holder = future_standin_exe[future]
            #
            try:
                data = future.result()
            except Exception as exc:
                print('An exception occurred: %s' % (exc))

Question

The processes run fine but I'm clearly doing something wrong with respect to checking that each process started by subprocess.Popen has completed successfully. I think I need a way to capture the return value from the call to subprocess.Popen but I'm not sure how to .

The code as is stands throws an exception when the line data = future.result() is executed with an exception can't pickle _thread.lock objects. I'm pretty sure that attempting to use the Future object is the wrong idea but I can't work out how to access the results of the execution.

2
  • maybe create normal function with subprocess.Popen and with functions which catch output - stdout=PIPE, p.stdout.read() - and return this output. And then use this function in ProcessPoolExecutor Commented May 9, 2021 at 13:20
  • 1
    it seems asyncio has method to run external processes with await - docs.python.org/3/library/asyncio-subprocess.html - but it still need to use stdout=PIPE, p.stdout.read() to get output. Commented May 9, 2021 at 13:26

1 Answer 1

4

You should create function which uses stdout=PIPE and p.stdout.read() to catch output

def func(path_of_exe, task_id, task_delay):
    
    p = subprocess.Popen(
        [path_of_exe, task_id, task_delay],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,        
    )
    
    return [p.stdout.read(), p.stderr.read()]

and later use it in executor

    future_standin_exe = { 
        executor.submit(
            func, path_of_exe, standin_arg_lst[TASK_ID_IDX], standin_arg_lst[TASK_DELAY_IDX]
        ): standin_arg_lst for standin_arg_lst in lst_standin_params 
    }

And it has to be func, arg1, arg2, arg3, not func(arg1, arg2, arg3)

And later you can display both outputs.

            data = future.result()
            for item in data:
                print(item)

or

            stdout, stderr = future.result()
            print('stdout:', stdout)
            print('stderr:', stderr)

Minimal code which I used for test.

I didn't have any program to run so I used command ls which gives some output but it is useless.

import concurrent.futures
import subprocess

TASK_ID_IDX = 0
TASK_DELAY_IDX = 0

def func(path_of_exe, standin_arg_lst):
    p = subprocess.Popen(
        [path_of_exe, standin_arg_lst[TASK_ID_IDX], standin_arg_lst[TASK_DELAY_IDX]],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,        
    )
    return [p.stdout.read(), p.stderr.read()]

def make_path_to_exe(path):
    return path

def main(path_of_script):
    path_of_exe = make_path_to_exe(path_of_script)
    #
    lst_standin_params = [["1", "5"], ["2", "1"]]
    #
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
        #
        future_standin_exe = { 
            executor.submit(
                func, path_of_exe, standin_arg_lst
            ): standin_arg_lst for standin_arg_lst in lst_standin_params 
        }
        #
        
        for future in concurrent.futures.as_completed(future_standin_exe):
            tmp_rv_holder = future_standin_exe[future]
            #
            try:
                data = future.result()
                for item in data:
                    print(item)
            except Exception as exc:
                print('An exception occurred: %s' % (exc))
                
main('dir')                
Sign up to request clarification or add additional context in comments.

1 Comment

This is a great answer, thank you very much. You pin-pointed the bit I had got wrong, if I could tick it twice I would !

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.