2

I'm trying to start a variable number of threads to compute the results of functions for one of my automated trading modules. I have about 14 functions all of which are computationally expensive. I've been calculating each function sequentially, but it takes around 3 minutes to complete, and my platform is high frequency, I have the need to cut that computation time down to 1 minute or less.

I've read up on multiprocessing and multithreading, but I can't find a solution that fits my need.

What I'm trying to do is define "n" number of threads to use, then divide my list of functions into "n" groups, then compute each group of functions in a separate thread. Essentially:

functionList = [func1,func2,func3,func4]
outputList = [func1out,func2out,func3out,func4out]
argsList = [func1args,func2args,func3args,func4args]

# number of threads
n = 3

functionSplit = np.array_split(np.array(functionList),n)
outputSplit = np.array_split(np.array(outputList),n)
argSplit = np.array_split(np.array(argsList),n)

Now I'd like to start "n" seperate threads, each processing the functions according to the split lists. Then I'd like to name the output of each function according to the outputList and create a master dict of the outputs from each function. I then will loop through the output dict and create a dataframe with column ID numbers according to the information in each column (already have this part worked out, just need the multithreading).

Is there any way to do something like this? I've been looking into creating a subclass of the threading.Thread class and passing the functions, output names, and arguments into the run() method, but I don't know how to name and output the results of the functions from each thread! Nor do I know how to call functions in a list according to their corresponding arguments!

The reason that I'm doing this is to discover the optimum thread number balance between computational efficiency and time. Like I said, this will be integrated into a high frequency trading platform I'm developing where time is my major constraint!

Any ideas?

3
  • 1
    If your functions are CPU-bound then forget multithreading and look at multiprocessing (at least for CPython). Commented Feb 26, 2017 at 5:53
  • I'm realizing now that multiprocessing is the way to go. Any pointers on how to do this with the process module from threading? Commented Feb 26, 2017 at 5:55
  • 1
    threading is the wrong module. Use multiprocessing. Pool.map is what you want. Commented Feb 26, 2017 at 5:57

2 Answers 2

2

You can use multiprocessing library like below

import multiprocessing

def callfns(fnList, argList, outList, d):
    for i in range(len(fnList)):
        d[somekey] = fnList[i](argList, outList)

...

manager = multiprocessing.Manager()
d = manager.dict()
processes = []
for i in range(len(functionSplit)):
    process = multiprocessing.Process(target=callfns, args=(functionSplit[i], argSplit[i], outputSplit[i], d))
    processes.append(process)

for j in processes:
    j.start()

for j in processes:
    j.join()

# use d here

You can use a server process to share the dictionary between these processes. To interact with the server process you need Manager. Then you can create a dictionary in server process manager.dict(). Once all process join back to the main process, you can use the dictionary d.

I hope this help you solve your problem.

Sign up to request clarification or add additional context in comments.

3 Comments

For more here
Thank you! Each one of my functions will output a dataframe, is there any way that I can put the output of each function into a dict? Would I do this in the callfuns function and return the dict for each segment? Just not sure how this will work with multiple processes, if they finish at different times. Could you edit your answer to include this?
@denbjornen505 I have updated my answer as you said.
1
  • You should use multiprocessing instead of threading for cpu bound tasks.

  • Manually creating and managing processes can be difficult and require more efforts. Do checkout the concurrent.futures and try the ProcessPool for maintaining a pool of processes. You can submit tasks to them and retrieve results.

  • The Pool.map method from multiprocessing module can take a function and iterable and then process them in chunks in parallel to compute faster. The iterable is broken into separate chunks. These chunks are passed to the function in separate processes. Then the results are then put back together.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.