2

I'm trying to implement multithreading to a very time consuming program, and I've come across this SO answer: https://stackoverflow.com/a/28463266/3451339, which basically offers this solution for multiple arrays:

from multiprocessing.dummy import Pool as ThreadPool

pool = ThreadPool(4)
results = pool.map(my_function, my_array)

# Close the pool and wait for the work to finish
pool.close()
pool.join()

and, passing multiple arrays:

results = pool.starmap(function, zip(list_a, list_b))

The following is the code I have so far which must be refactored with threading. It iterates over 4 arrays, and needs to pass arguments to the function at each iteration and append all results to a final container:

    strategies = ['strategy_1', 'strategy_2']
    budgets = [90,100,110,120,130,140,150,160]
    formations=['343','352','433','442','451','532','541']
    models = ['model_1', 'model_2', 'model_3']

    all_teams = pd.DataFrame()

    for strategy in strategies:
        for budget in budgets:
            for formation in formations:
                for model in models:

                    team = function(strategy=strategy, 
                                    budget=budget, 
                                    curr_formation=formation,
                                    model=model)
                       
                    all_teams = all_teams.append(team, ignore_index=True, sort=False)\
                                         .reset_index(drop=True)\
                                         .copy()

Note: Each function call makes api web requests.

What is the way to go with multithreading in this scenario?

3
  • First, multithreading in python does not make processor-intensive programs faster (they actually make them slightly slower). Python multithreading excels when you have a large amount of idle time, for example, a program that downloads 40 images, or a program that waits for 3 seconds 500 times in parallel (cant figure out the use of this one but it seems to be popular on forums). It is not fast when generating 40,000,000,000 random numbers. (i learned this the hard way) Commented May 5, 2021 at 0:29
  • But can the code be run in parallel somehow? Commented May 5, 2021 at 0:42
  • not a real parallel, it's just splitting up the tasks into micro tasks and quickly switching between them. Python was built in a way that means it can never use multiple cores or threads. You can do artificial parallel but it's actually slower (unless you're doing web requests). How you do this, I cannot remember nor do I want to. I upvoted because this is a good question. Commented May 5, 2021 at 0:49

2 Answers 2

1

Python has the multiprocessing module which can run multiple tasks in parallel and inside each process you can have multiple threads or async io code

Here is a working example which uses 3 Processes and Multithreading

import pandas as pd
import multiprocessing
from multiprocessing import Queue
from threading import Thread

strategies = ['strategy_1', 'strategy_2']
budgets = [90,100,110,120,130,140,150,160]
formations=['343','352','433','442','451','532','541']
models = ['model_1', 'model_2', 'model_3']

 #shared Queue if you want to reduce write locking use 3 Queues
Q = Queue()

# Retrive async if you want to speed up the process
def function(q,strategy,budget,curr_formation,model):
    q.put("Team")

def runTask(model,q):
    for strategy in strategies:
        for budget in budgets:
            for formation in formations:
                Thread(target=function,args=(q,strategy,budget,formation,model)).start()

def main():
    p1 = multiprocessing.Process(target=runTask, args=('model_1',Q))
    p2 = multiprocessing.Process(target=runTask, args=('model_2',Q))
    p3 = multiprocessing.Process(target=runTask, args=('model_3',Q))

    p1.start()
    p2.start()
    p3.start()

    p1.join()
    p2.join()
    p3.join()

    all = []
    for i in range(0,Q.qsize()):
        all.append(Q.get())
    print(all)
    print(len(all))

if __name__ == "__main__": 
    main()

A usefull article Multiprocessing in Python | Set 2

Sign up to request clarification or add additional context in comments.

3 Comments

Thank you for your answer, but your solution is generic and does not acknowledge my code above, does it?
Split the work at 3 parts for each model which will run in 3 processes this way you reduce the cyclomatic complexity by 1. I will update the example.
@8-BitBorges i think the code is complete now, have a look and let me know
1

This can be one approach.

Note: Thread vs multiProcess. In this SO, I have provided execution through map, that will not work here as map has limitation on number.

  1. Run your nested for loops and build a list of parameters ==> financial_options
    for strategy in strategies:
        for budget in budgets:
            for formation in formations:
                for model in models:
                    financial_options.append([strategy,budget,formation,model])
    financial_options_len=len(financial_options)
  1. Create a new function that will handle API calls
def access_url(url,parameter_list):
    #response=requests.get(url) # request goes here
    print(parameter_list)
    time.sleep(2)
    print("sleep done!")
    return "Hello"#,parameter_list # return type

now run the threading with these permutation parameters. so complete program will look like this:

import concurrent.futures
import requests # just in case needed
from bs4 import BeautifulSoup # just in case needed
import time
import pandas as pd

def access_url(url,parameter_list):
    #response=requests.get(url) # request goes here
    print(parameter_list)
    time.sleep(2)
    print("sleep done!")
    return "Hello"#,parameter_list # return type

def multi_threading():
    test_url="http://bla bla.com/"
    base_url=test_url
    THREAD_MULTI_PROCESSING= True
    
    
    strategies = ['strategy_1', 'strategy_2']
    budgets = [90,100,110,120,130,140,150,160]
    formations=['343','352','433','442','451','532','541']
    models = ['model_1', 'model_2', 'model_3']

    all_teams = pd.DataFrame()
    start = time.perf_counter() # start time for performance
    financial_options=[]
    decision_results=[]
    for strategy in strategies:
        for budget in budgets:
            for formation in formations:
                for model in models:
                    financial_options.append([strategy,budget,formation,model])
    financial_options_len=len(financial_options)
    print(f"Total options:{financial_options_len}")
    future_list = []
    THREAD_MULTI_PROCESSING_LOOP=True
    if THREAD_MULTI_PROCESSING_LOOP:
        with concurrent.futures.ThreadPoolExecutor() as executor: # Through executor
            for each in range(financial_options_len):
                future = executor.submit(access_url,test_url,financial_options[each]) # submit each option
                future_list.append(future)    
        for f1 in concurrent.futures.as_completed(future_list):
            r1=f1.result()
            decision_results.append(r1)
        
    end = time.perf_counter() # finish time for performance
    print(f'Threads: Finished in {round(end - start,2)} second(s)') 
    df=pd.DataFrame(decision_results)
    df.to_csv("multithread_for.csv")
    return df,decision_results
df,results=multi_threading()

2 Comments

thanks for you detalied answer. just one thing needs to be cleared up to me, please. where did function() in my working code go? access_url() is not really needed here, because api calls are implicit from within function(). are all function() parameters implicit as well?, must they all be passed in the same order as in the nested for loop?
That's right your function()' can replace access_url()` or you can wrap your function like def access_url(url,parameter_list): function() . parameters have been passed in 2nd and 3rd position (access_url,test_url,financial_options[each]). as you were using nested loop, so list was build in the same sequence. even though Threads have been created in sequence but their completion is not guaranteed in the same sequence because some will take more time as you are calling api in them. fyip..need to define boundary or task for thread, that is reason function was defined.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.