2

I have a function that writes the content of df into csv file.

def writeToCSV(outDf, defFile, toFile, retainFlag=True, delim='\t', quotechar='"'):
    headers = []
    fid = open(defFile, 'r')
    for line in fid:
        headers.append(line.replace('\r','').split('\n')[0].split('\t')[0])
    df = pd.DataFrame([], columns=headers)
    for header in outDf.columns.values:
        if header in headers:
            df[header] = outDf[header]

    df.to_csv(toFile, sep=delim, quotechar=quotechar, index=False, encoding='utf-8')

How can i parallelize this process? Currently i am using following code

def writeToSchemaParallel(outDf, defFile, toFile, retainFlag=True, delim='\t', quotechar='"'):
    logInfo('Start writingtoSchema in parallel...', 'track')
    headers = []
    fid = open(defFile, 'r')
    for line in fid:
        headers.append(line.replace('\r','').split('\n')[0].split('\t')[0])
    df = pd.DataFrame([], columns=headers)
    for header in outDf.columns.values:
        if header in headers:
            df[header] = outDf[header]
    out_Names = Queue()
    cores = min([int(multiprocessing.cpu_count() / 2), int(len(outDf) / 200000)+1])
    #cores=4
    logInfo(str(cores) + 'cores are used...', 'track')
    # split the data for parallel computation
    outDf = splitDf(df, cores)
    # process the chunks in parallel
    logInfo('splitdf called are df divided...', 'track')
    Filenames=[]
    procs = []
    fname=toFile.split("_Opera_output")[0]
    for i in range(0, cores):
        filename=fname+"_"+str(i)+".tsv"
        proc = Process(target=writeToSchema, args=(outDf[i], defFile,filename, retainFlag,delim, quotechar,i))
        procs.append(proc)
        proc.start()
        print 'processing '+str(i)
        Filenames.append(filename)
# combine all returned chunks
#   outDf = out_Names.get()
#   for i in range(1, cores):
#       outDf = outDf.append(out_q.get(), ignore_index=True)
    for proc in procs:
        proc.join()
    logInfo('Now we merge files...', 'track')
    print Filenames
    with open(toFile,'w') as outfile:
        for fname in Filenames:
            with open(fname) as infile:
                 for line in infile:
                     outfile.write(line)

But it didn't work and gives following error

2017-12-17 16:02:55,078 - track - ERROR: Traceback (most recent call last):
2017-12-17 16:02:55,078 - track - ERROR:   File "C:/Users/sudhir.tiwari/Document
s/AMEX2/Workspace/Backup/Trunk/code/runMapping.py", line 257, in <module>
2017-12-17 16:02:55,089 - track - ERROR: writeToSchemaParallel(outDf, defFile, t
oFile, retainFlag, delim='\t', quotechar='"')
2017-12-17 16:02:55,153 - track - ERROR:   File "C:\Users\sudhir.tiwari\Document
s\AMEX2\Workspace\Backup\Trunk\code\utils.py", line 510, in writeToSchemaParalle
l
2017-12-17 16:02:55,163 - track - ERROR: with open(fname) as infile:
2017-12-17 16:02:55,198 - track - ERROR: IOError
2017-12-17 16:02:55,233 - track - ERROR: :
2017-12-17 16:02:55,233 - track - ERROR: [Errno 2] No such file or directory: 'C
:/Users/sudhir.tiwari/Documents/AMEX2/Workspace/Input/work/Schindler_20171130/Sc
hindler_20171130_0.tsv'

And it didn't write into files as when i search the location no files found. I am using multiprocessing to write dataframe into multiple file and then merging all. Split df divide the dataframe into n parts.

4
  • 1
    You should be using some locking mechanism, I guess. Commented Dec 17, 2017 at 21:20
  • 1
    @NanduKalidindi i am using 8 different file to write so i don't think i need lock mechanism Commented Dec 18, 2017 at 6:17
  • 1
    How about fitting your logic into a MapReduce problem, if it is possible, the framework will do most of the parallel processing for you. Commented Dec 18, 2017 at 8:23
  • 2
    Or perhaps you can write to separate files during the processing and once all processes are finished combine these files into a single final file? Commented May 9, 2019 at 13:36

2 Answers 2

5

Using multiprocessing way will consume more time than the default way(direct save). By using Synchronization between processes, use Processes and Lock to parallel the writing process. Below is the sample POC.

import pandas as pd
import numpy as np
from multiprocessing import Lock, Process
from time import time

def writefile(df,l):
    l.acquire()
    df.to_csv('dataframe-multiprocessing.csv', index=False, mode='a', header=False)
    l.release()


if __name__ == '__main__':
    a = np.random.randint(1,1000,10000000)
    b = np.random.randint(1,1000,10000000)
    c = np.random.randint(1,1000,10000000)

    df = pd.DataFrame(data={'a':a,'b':b,'c':c})

    print('Iterative way:')
    print()
    new = time()
    df.to_csv('dataframe-conventional.csv', index=False, mode='a', header=False)
    print(time() - new, 'seconds')

    print()    
    print('Multiprocessing way:')
    print()
    new = time()
    l = Lock()
    p = Process(target=writefile, args=(df,l))
    p.start()
    p.join()
    print(time() - new, 'seconds')
    print()

    df1 = pd.read_csv('dataframe-conventional.csv')
    df2 = pd.read_csv('dataframe-multiprocessing.csv')
    print('If both file same or not:')
    print(df1.equals(df2))

Result:

C:\Users\Ariff\Documents\GitHub\testing-code>python pandas_multip.py
Iterative way:

18.323541402816772 seconds

Multiprocessing way:

20.14128303527832 seconds

If both file same or not:
True
Sign up to request clarification or add additional context in comments.

1 Comment

You can test a much larger data
1

You can monitor your cpu usage when using to_csv, as long as your cpu is idle, you can accelearte the progress by using multiprocess.
A simple code as follow:

import numpy as np
import pandas as pd
from joblib import Parallel, delayed


def write_csv(df, filename):
    df.to_csv(filename)


df = pd.DataFrame({'c': ['a'*100]*100_000_000, })

N = 8
parts = np.array_split(df, N)

Parallel(n_jobs=N)(delayed(write_csv)(
    part, f'part_{i}') for i, part in enumerate(parts))

This costs 35s in my machine.
However,

df.to_csv

costs 3min.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.