1

i have a Dataframe of 200k lines, i want to split into parts and call my function S_Function for each partition.

def S_Function(df):
    #mycode here 
    return new_df

Main program

N_Threads = 10
Threads = []
Out = []

size = df.shape[0] // N_Threads

for i in range(N_Threads + 1):

    begin = i * size
    end = min(df.shape[0], (i+1)*size)
    Threads.append(Thread(target = S_Function, args = (df[begin:end])) )

I run the threads & make the join :

for i in range(N_Threads + 1):
    Threads[i].start()

for i in range(N_Threads + 1):
    Out.append(Threads[i].join())

output = pd.concat(Out)

The code is working perfectly but the problem is that using threading.Thread did not decrease the execution time.
Sequential Code : 16 minutes
Parallel Code : 15 minutes

Can someone explain what to improve, why this is not working well?

4
  • 1
    Do you have to create a whole new DataFrame or have you consider the use of apply(). This pandas function has multiple ways to be parallelized. One of them is pandarallel. You can have more information on this thread Commented Mar 9, 2022 at 9:42
  • Also, multi-threading is not supposed to speed up the execution, what you need to look into is called multi-processing. Commented Mar 9, 2022 at 9:44
  • 1
    Do you use Thread from threading module? Commented Mar 9, 2022 at 10:05
  • @Corralien Yes, i mentionned it at the end (threading.Thread) Commented Mar 9, 2022 at 10:11

1 Answer 1

1

Don't use threading when you have to process CPU-bound operations. To achieve your goal, I think you should use multiprocessing module

Try:

import pandas as pd
import numpy as np
import multiprocessing
import time
import functools

# Modify here
CHUNKSIZE = 20000

def S_Function(df, dictionnary):
    # do stuff here
    new_df = df
    return new_df


if __name__ == '__main__':
    # Load your dataframe
    df = pd.DataFrame({'A': np.random.randint(1, 30000000, 200000).tolist()})

    # Create chunks to process
    chunks = (df[i:i+CHUNKSIZE] for i in range(0, len(df), CHUNKSIZE))
    dictionnary = {'k1': 'v1', 'k2': 'v2'}
    s_func = functools.partial(S_Function, dictionnary=dictionnary)

    start = time.time()
    with multiprocessing.Pool(multiprocessing.cpu_count()) as pool:
        data = pool.map(s_func, chunks)
        out = pd.concat(data)
    end = time.time()

    print(f"Elapsed time: {end - start:.2f} seconds")
Sign up to request clarification or add additional context in comments.

8 Comments

I can't wait to see if it improves the calculation time :)
if i have many parameters in S_Function, how can i do that.
Use starmap instead of map. Can you provide the function signature of S_Function please?
The easiest way is to use partial from functools.
I finally did it as a global var, the new execution time is 45 s :)
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.