3

I have a function "function" that I want to call 10 times using 2 times 5 cpus with multiprocessing.

Therefore I need a way to synchronize the processes as described in the code below.

Is this possible without using a multiprocessing pool? I get strange errors if I do so (for example "UnboundLocalError: local variable 'fd' referenced before assignment" (I don't have such a variable)). Also the processes seem to terminate randomly.

If possible I would like to do this without a pool. Thanks!

number_of_cpus = 5
number_of_iterations = 2

# An array for the processes.
processing_jobs = []

# Start 5 processes 2 times.
for iteration in range(0, number_of_iterations):

    # TODO SYNCHRONIZE HERE

    # Start 5 processes at a time.
    for cpu_number in range(0, number_of_cpus):

        # Calculate an offset for the current function call.
        file_offset = iteration * cpu_number * number_of_files_per_process

        p = multiprocessing.Process(target=function, args=(file_offset,))
        processing_jobs.append(p)
        p.start()

    # TODO SYNCHRONIZE HERE

This is an (anonymized) traceback of the errors I get when I run the code in a pool:

Process Process-5:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "python_code_3.py", line 88, in function_x
    xyz = python_code_1.function_y(args)
  File "/python_code_1.py", line 254, in __init__
    self.WK =  file.WK(filename)
  File "/python_code_2.py", line 1754, in __init__
    self.__parse__(name, data, fast_load)
  File "/python_code_2.py", line 1810, in __parse__
    fd.close()
UnboundLocalError: local variable 'fd' referenced before assignment

Most of the processes crash like that but not all of them. More of them seem to crash when I increase the number of processes. I also thought this might be due to memory limitations...

12
  • 1
    Why don't you want to use a Pool? Commented Aug 18, 2014 at 17:45
  • 1
    This task is really best suited to a Pool, if you could get it working without errors, would you be willing to use one? Commented Aug 18, 2014 at 17:48
  • 1
    It'd be helpful if you provided the complete traceback. The code runs fine for me on Linux. What platform are you using? Commented Aug 18, 2014 at 17:50
  • 1
    Also, what do you mean by "synchronize"? Do you just mean wait for the first set of 5 processes to complete prior to starting the second set? Do you need to return anything from function back to the parent? Commented Aug 18, 2014 at 17:58
  • 1
    Edit it into your question. Commented Aug 18, 2014 at 18:01

2 Answers 2

1

Here's how you can do the synchronization you're looking for without using a pool:

import multiprocessing

def function(arg):
    print ("got arg %s" % arg)

if __name__ == "__main__":
    number_of_cpus = 5
    number_of_iterations = 2

    # An array for the processes.
    processing_jobs = []

    # Start 5 processes 2 times.
    for iteration in range(1, number_of_iterations+1):  # Start the range from 1 so we don't multiply by zero.

        # Start 5 processes at a time.
        for cpu_number in range(1, number_of_cpus+1):

            # Calculate an offset for the current function call.
            file_offset = iteration * cpu_number * number_of_files_per_process

            p = multiprocessing.Process(target=function, args=(file_offset,))
            processing_jobs.append(p)
            p.start()

        # Wait for all processes to finish.
        for proc in processing_jobs:
            proc.join()

        # Empty active job list.
        del processing_jobs[:]

        # Write file here
        print("Writing")

Here it is with a Pool:

import multiprocessing

def function(arg):
    print ("got arg %s" % arg)

if __name__ == "__main__":
    number_of_cpus = 5
    number_of_iterations = 2

    pool = multiprocessing.Pool(number_of_cpus)
    for i in range(1, number_of_iterations+1): # Start the range from 1 so we don't multiply by zero
        file_offsets = [number_of_files_per_process * i * cpu_num for cpu_num in range(1, number_of_cpus+1)] 
        pool.map(function, file_offsets)
        print("Writing")
        # Write file here

As you can see, the Pool solution is nicer.

This doesn't solve your traceback problem, though. It's hard for me to say how to fix that without understanding what's actually causing that. You may need to use a multiprocessing.Lock to synchronize access to the resource.

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks. I will look both at the traceback and your solution. One question. Don't I have to reinstantiate processing_jobs after deleting it in the outer for-loop?
@user2177047 We're just deleting all the contents of the processing_jobs list, not the object itself. So you can just start appending to it again on the next iteration.
1

A Pool can be very easy to use. Here's a full example:

source

import multiprocessing

def calc(num):
    return num*2

if __name__=='__main__':  # required for Windows
    pool = multiprocessing.Pool()   # one Process per CPU
    for output in pool.map(calc, [1,2,3]):
        print 'output:',output

output

output: 2
output: 4
output: 6

2 Comments

You should probably use an if __name__ == "__main__": guard so that this will work on Windows.
@shavenwarthog is on the right track. You, @user2177047, need to come up with a better way to share the mutual resource that is being opened in function. The best method would be to open the resource in your parent thread, farm work out to your spawned processes and report back to the parent (who then writes to the file). Alternatively, this SO post may help give you some ideas: stackoverflow.com/questions/659865/…

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.