2

I am writing a python program that runs an external model in parallel a certain number of times to define data in a parameter space. Because of how the external model is written (for good reasons I promise), I have to make a new copy of the model folder if I want to run it at the same time. I made the copies of my main model folder foo, and called them foo0, foo1, foo2, and foo3. Now I want to be able to go into the specific directory based on the thread, make some changes, run the model, write to a main file, and then move to the next run. Each model run can take between 30s to 200s, hence the benefit of parallel vs serial runs.

import subprocess
from joblib import Parallel

def run_model(base_path):
    #Make some changes using a random number generator in the folder
    ....
    #Run the model using bash on windows. Note the str(threading.get_ident()) is my
    #attempt to get the thread 0,1,2,3
    subprocess.run(['bash','-c', base_path + str(threading.get_ident()) + '/Model.exe'])
    #Write some input for the run to a main file that will store all runs
    with open('Inputs.txt','a') as file:
        with open(base_path + str(threading.get_ident()) + '/inp.txt') as inp_file:
            for i,line in enumerate(inp_file):
                if i == 5:
                    file.write(line)

Parallel(n_jobs=4, backend="threading")(run_model('Models/foo') for i in range(0,10000))

However, I keep getting the FileNotFoundError since the thread id keeps changing and the folder does not exist. The model is large so copying the model with a new thread id (something like a folder named foo+thread_id) is both slow and uses a lot of disk space. Is there any way I can limit only a certain copy of a model to run on a certain thread making sure it is not being used by any other thread?

0

3 Answers 3

2

You could structure your program like so:

  1. First, the main thread searches for folders that need to be processed, and places them into a thread-safe Queue. During this phase, you ensure that the queue contains only unique items. A synchronization primitive is used around the queue to make sure that only one thing is accessing it at a time.
  2. Worker threads are started
  3. Worker threads dequeue work from the syncrhonized thread-safe queue, and process them.
  4. Threads join when there is no work left in the queue
  5. When all threads are joined, work is done.

The picture of this is like:

   Queue of unique dirs is constructed.
                  ||
                  \/            Consumer 0
                              /
                             / /Consumer 1
Queue(DirD->DirC->DirB->DirA)    ...
                             \ \Consumer i
                  ||          \  ...
                  \/            Consumer n

           Dirs are processed.
Sign up to request clarification or add additional context in comments.

Comments

1

Just dedicate a process to each directory:

import subprocess
from multiprocessing import Process, Lock

def run_model(base_path, iterations, output_lock):
    for x in range(iterations):
        #Make some changes using a random number generator in the folder
        ...
        #Run the model using bash on windows.
        subprocess.run(['bash','-c', base_path + '/Model.exe'])
        #Write some input for the run to a main file that will store all runs
        with output_lock:
            with open('Inputs.txt','a') as out_file:
                with open(base_path + '/inp.txt') as inp_file:
                    for i,line in enumerate(inp_file):
                        if i == 5:
                            out_file.write(line)

N = 4
total_runs = 10000
process_list = list()
output_lock = Lock()
for x in range(N):
    arguments = ("Models/foo%s" % x, int(total_runs / N), output_lock)
    p = Process(target=run_model, args=arguments)
    p.daemon = True
    p.start()
    process_list.append(p)
for p in process_list:
    p.join()

I took the liberty of renaming the output file handle so that it doesn't overwrite the built-in file class. Also, I added a lock to protect the output file.

2 Comments

This worked for me, thanks a lot. I changed 'for x in iterations:' to 'for x in range(iterations):' and cast total_runs/N to an int, but thanks a lot!
I'm glad it worked, but after thinking about it, I think you could use joblib.Parallel and use "Models/foo%s" % (i % 4) to pass the directory name.
1

Do not rely on threading.get_ident() in most cases except for one: telling that if two thread instances are actually the same one. However, the original implementation is unluckily not the only legal case and that's why it got into a mess.

Try to refit the call to run_model('Models/foo') to another form, such as:

    run_model('Models/foo', i%4)
    run_model('Models{}/foo'.format(i%4))

depending by favor. Then modify the body in run_model() to utilize this new parameter to create the thread-workers you need. I think that should solve the problem mostly.

But more than that, your code body lacks synchronization. You need some lock() or wait() mechanisms between the invocations of new threads or you'll get into another mess, e.g. 10,000 threads created at once or 2,500 threads accessing the same file. :)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.