0

I'm getting data into a list of pandas.Dataframe. Then I need to send this data to a DB. It tool ~2min per iteration.

So I wanted to know of parallel processing is a good idea ? (no lock issue or something ?)

So I wanted to implement it from :

for df in df_list:
    # Send a DF's batch to the DB
    print('Sending DF\'s data to DB')
    df.to_sql('ga_canaux', engine, if_exists='append', index=False)
    db.check_data()

To something I find about multiprocessing:

with multiprocessing.Pool(processes=4) as pool:
    results = pool.map_async(df.to_sql(???), df_list)
    results.wait()

How can I pass the params I need in df.to_sql with map_async ?

EDIT :

I try something to pass N arguments like :

pool = multiprocessing.Pool()
    args = ((df, engine, db) for df in df_list)
    results = pool.map(multiproc, args)
    results.wait()

but I get the error TypeError: can't pickle _thread._local objects

EDIT2:

I changed a bit the way I'm doing mp and it's kinda work (179s vs 732s with the same sample dataset). But I'm facing an error when I try to read from the DB inside the pool.

# Connect to the remote DB
global DB
DB = Database()
global ENGINE
ENGINE = DB.connect()

pool = mp.Pool(mp.cpu_count() - 1)
pool.map(multiproc, df_list)
pool.close()
pool.join()

def multiproc(df):
    print('Sending DF\'s data to DB')
    df.to_sql('ga_canaux', ENGINE, if_exists='append', index=False)
    DB.check_data() // HERE

Error :

(psycopg2.OperationalError) SSL SYSCALL error: EOF detected
 [SQL: 'SELECT COUNT(*) FROM ga_canaux'] (Background on this error at: http://sqlalche.me/e/e3q8)

EDIT 3 When I try a on a bigger sample I got a DB timeout : psycopg2.DatabaseError: SSL SYSCALL error: Operation timed out

7
  • The params to df.to_sql(... are not passed with .map_async(..., you define it inside of def my_function_to_sql. Change to .map_async(my_function_to_sql, df_list). Note: All your dataframe are going to be pickeld, that needs time and memory consumption. Commented Mar 1, 2019 at 15:55
  • Yep I made a function already but I can pass only one argument to it, the iterable values. Check my edit. Also what do your mean by pickeld ? Commented Mar 1, 2019 at 16:00
  • pickled means serialisation of data passed to or from a process. Your error indicats that you want to use not pickable objects. At this point you are off. "can pass only one argument to it": Your args looks ok, but useless, as you passing for all df the same engine and db. Commented Mar 1, 2019 at 16:10
  • Same engine and db yes. Ok for pickled, it's been few weeks I code in python and I didn't know this word before. Commented Mar 1, 2019 at 18:32
  • Read using-multiprocessing-on-a-pandas-dataframe to get some ideas. Commented Mar 1, 2019 at 19:09

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.