0

Is there a way to execute threads in a specific order? I am familiar with the commands ".wait()" and ".notifyAll()", however it does not seem to work when all threads are targeting to a single function. The code below should write the csv file in this order: df1, df2, df3, df4.


import threading
import pandas as pd


df1 = pd.DataFrame(columns=["col1","col2","col3"])
df2 = pd.DataFrame(columns=["col1","col2","col3"])
df3 = pd.DataFrame(columns=["col1","col2","col3"])
df4 = pd.DataFrame(columns=["col1","col2","col3"])


def function(df):
    ###webscraping, compile web data to dataframe
    df.to_csv('output.csv', mode='a')


if __name__ == '__main__':
    t1 = threading.Thread(target=function, args=(df1,))
    t2 = threading.Thread(target=function, args=(df2,))
    t3 = threading.Thread(target=function, args=(df3,))
    t4 = threading.Thread(target=function, args=(df4,))
    t1.start()
    t2.start()
    t3.start()
    t4.start()

I want all dataframes to wait inside "function()" until they can execute in order. With multithreading, threads like to "race each other" and can fall out of order executed. Although multithreading is a good performance enhancing tool, its' downfall comes into play when order matters.

Example of Simplicity: If thread 4 finishes compiling its' dataframe, it needs to wait for the first 3 threads to compile its' corresponding dataframe and upload to the csv file until thread 4 can upload.

As always, thanks in advance!!

1
  • you might want to check out dask.dataframe - something like this could be accomplished with dask pretty easily and if you were writing to parquet files or another format which supported parallel writes you could write in parallel too. Commented Dec 3, 2021 at 0:55

1 Answer 1

2

To solve your problem in a clean way, you probably want to be using concurrent.futures instead of threading, hopefully you're on python3.2+.

To do so, you want to create a list of your arguments to the function in the order you need them to write arglist = [df1, df2, ...], and then do something like

from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=len(arglist)) as ex:
    results = ex.map(function, arglist)
for res in results:
    res.to_csv(..., mode='a')

To be honest, you should really try to use concurrent.futures for everything related to threading or multiprocessing.

It appears I read the question wrong the first time. I'll leave my previous answer here for people who google

You can use a lock (see https://docs.python.org/3/library/threading.html#lock-objects), and then call lock.acquire() before writing to csv and then lock.release() afterwards. This will do exactly what you want.

Although in my opinion this is not ideal, instead I would suggest returning the dataframes from each thread and just writing them all at the end.

Your code would simply look like ``` lock = threading.Lock()

def function(args): # web stuff with lock: df.to_csv(...) ```

Sign up to request clarification or add additional context in comments.

6 Comments

Actually I think I may have read your question wrong, did you actually want to guarantee that the first thread wrote its dataframe first, then second one second etc. Then you definitely just want to return the dataframes and do it after
Yes, I wanted to guarantee that thread one would write, then 2, then 3, then 4. Are you saying I should use .join()? Cause wouldn't the program not be able to tell them apart if I use return and threading is still happening?
Also, I have prerequisites that take place. If I did .join() I would need to redo those prerequisites which would not be an option for this program needs to be as efficient as possible
Your insight about adding lock.aquire() and lock.release() would solve the problem I had of potential data overlap! That deserves an upvote from me. However, if there is a way to write to the output file in order without returning values, that would be excellent and deserving of the correct answer. I want to be able to execute this "chronological order upload" within "function()" if at all possible. Thanks a bunch!!
My solution to this is super unclean, and I'm sure there is a better way, but if you passed the next thread in to each, so that (for example) call for the second one as function(df2, t1) then you would be able to simply type t1.join() above the .to_csv() and then just have to do this for all of them. If you actually only have 4 threads this will be easy to write hard-coded style. But if you have lots it might get a little ugly (although not too bad). Importantly, you'll need to pass None to the first one and actually have a check if thread_input is not None: thread_input.join()
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.