I'm getting data into a list of pandas.Dataframe.
Then I need to send this data to a DB. It tool ~2min per iteration.
So I wanted to know of parallel processing is a good idea ? (no lock issue or something ?)
So I wanted to implement it from :
for df in df_list:
# Send a DF's batch to the DB
print('Sending DF\'s data to DB')
df.to_sql('ga_canaux', engine, if_exists='append', index=False)
db.check_data()
To something I find about multiprocessing:
with multiprocessing.Pool(processes=4) as pool:
results = pool.map_async(df.to_sql(???), df_list)
results.wait()
How can I pass the params I need in df.to_sql with map_async ?
EDIT :
I try something to pass N arguments like :
pool = multiprocessing.Pool()
args = ((df, engine, db) for df in df_list)
results = pool.map(multiproc, args)
results.wait()
but I get the error TypeError: can't pickle _thread._local objects
EDIT2:
I changed a bit the way I'm doing mp and it's kinda work (179s vs 732s with the same sample dataset). But I'm facing an error when I try to read from the DB inside the pool.
# Connect to the remote DB
global DB
DB = Database()
global ENGINE
ENGINE = DB.connect()
pool = mp.Pool(mp.cpu_count() - 1)
pool.map(multiproc, df_list)
pool.close()
pool.join()
def multiproc(df):
print('Sending DF\'s data to DB')
df.to_sql('ga_canaux', ENGINE, if_exists='append', index=False)
DB.check_data() // HERE
Error :
(psycopg2.OperationalError) SSL SYSCALL error: EOF detected
[SQL: 'SELECT COUNT(*) FROM ga_canaux'] (Background on this error at: http://sqlalche.me/e/e3q8)
EDIT 3
When I try a on a bigger sample I got a DB timeout : psycopg2.DatabaseError: SSL SYSCALL error: Operation timed out
df.to_sql(...are not passed with.map_async(..., you define it inside ofdef my_function_to_sql. Change to.map_async(my_function_to_sql, df_list). Note: All yourdataframeare going to bepickeld, that needs time and memory consumption.pickeld?pickledmeansserialisationof data passed to or from a process. Your error indicats that you want to use not pickable objects. At this point you are off. "can pass only one argument to it": Yourargslooks ok, but useless, as you passing for alldfthe sameengineanddb.engineanddbyes. Ok forpickled, it's been few weeks I code in python and I didn't know this word before.