3

I have an application which would read say 50 large size csvs file around 400MB each. Now i am reading these to create a dataframe and ultimately concatenate all these into 1 single dataframe. I want to do that in parallel to speed up the overall process. So my code below looks something like this:

import numpy as np
import pandas as pd
from multiprocessing.pool import ThreadPool
from time import time 

Class dataProvider:
    def __init__(self):
        self.df=pd.DataFrame()
        self.pool = ThreadPool(processes=40)
        self.df_abc=pd.DataFrame()
        self.df_xyz=pd.DataFrame()
        self.start=time()

     def get_csv_data(self,filename):
        return pd.read_csv(filename)

     def get_all_csv_data(self,filename):
         self.start=time()
         df_1 = self.pool.apply_sync(self.get_csv_data,('1.csv',), callback=concatDf)
         df_2 = self.pool.apply_sync(self.get_csv_data,('2.csv',), callback=concatDf)
         total_time=time()-self.start

     def concatDf(self):
         self.df_abc=pd.concat([df_1,df_2])
         self.df_xyz=self.df_abc.iloc[:,1:]
         return self.df_xyz

I see below problem with the code:

  1. If the same callback is invoked by my apply_sync call then how do i know present callback has been invoked by exactly which call in above df_1 line or df_2 ? 2)I want to concatenate the output of the different apply_sync, how can i do it in concatDf callback function?
  2. How do i know that callbacks of all apply_sync call has completed so that i can return back concatenated dataframe all 50 csvs ?
  3. Is there a better and efficient way to do this ?

Thanks

4
  • Are you limited by RAM? Commented Sep 4, 2020 at 6:22
  • @mrzo I have sufficient RAM above 300GB , idea is to read and concatenate these files in parallel as each of them might take say 30secs. so ideally i do not want 30*50 and then for the concatenate process to start. Thanks Commented Sep 4, 2020 at 6:29
  • Concatenating dataframes at once is much more efficient than concatenating them iteratively like you are trying to do it. So why do you want to do it? Have you tried my answer? Commented Sep 4, 2020 at 8:54
  • @mrzo Yes would be going through your answer and will update you. Thanks for your help Commented Sep 4, 2020 at 10:08

1 Answer 1

6

Edit: Use this solution only if you have enough RAM available.

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import pandas as pd
from glob import glob 

files = glob("*.csv")

def read_file(file):
    return pd.read_csv(file)

# I would recommend to try out whether ThreadPoolExecutor or 
# ProcessPoolExecutor is faster on your system:
with ThreadPoolExecutor(4) as pool:
    df = pd.concat(pool.map(read_file, files))
print(df)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.