In Python, if the database is very large, a simple select query will take a lot of time. I have a table with 4,700,000 records, and if I use SELECT * FROM MY_TABLE to get all the data in the table, it will take 18 minutes. By setting chunk_size and implement parallel query, it will save time.
So, my code is:
import os
import time
import multiprocessing
import pandas as pd
import MySQLdb as mysql
if __name__ == '__main__':
conn = mysql.connect(host='192.168.0.114',
user='root',
passwd='fit123456',
db='A_stock_day',
charset='utf8'
)
limit = 100000
offset = 0
dfs = []
print 'start.....'
_s = time.time()
while True:
_query = 'SELECT * FROM A_stock_basic LIMIT %d OFFSET %d' %\
(limit, offset)
dfs.append(pd.read_sql(_query, conn))
offset += limit
if len(dfs[-1]) < limit:
break
_e = time.time()
print 'Time: ', _e - _s
full_df = pd.concat(dfs)
But, it still takes about 10 minutes. How to parallelize it, let many threads to run at the same time and make execution time down to the execution time of one thread? I have the multiprocessing code here:
def select(info):
""""""
limit, offset, conn = info[0], info[1], info[2]
_query = 'SELECT * FROM A_stock_basic LIMIT %d OFFSET %d' %\
(limit, offset)
s = time.time()
info[3].append(pd.read_sql(_query, conn))
e = time.time()
print 'time: ', e - s, ' pid: ', os.getpid()
if __name__ == '__main__':
conn = mysql.connect(host='192.168.0.114',
user='root',
passwd='fit123456',
db='A_stock_day',
charset='utf8'
)
dfs, p, pool= [], [], multiprocessing.Pool(7)
info = [(1000000, 0, conn, dfs),
(1000000, 1000000, conn, dfs),
(1000000, 2000000, conn, dfs),
(1000000, 3000000, conn, dfs),
(1000000, 4000000, conn, dfs),
(1000000, 5000000, conn, dfs),
(1000000, 6000000, conn, dfs),
]
for _i, _v in enumerate(info):
print 'start....', _i
_p = multiprocessing.Process(target=select, args=(_v, ))
_p.start()
_p.join()
print 'The End'
As you can see, although it launched multiprocessing, only one process reads the database at one time. So, that is just multiprocessing, not parallel process.
How to implement parallel multiprocessing to save time? Thanks.
_p.join()makes current process to wait for a termination of child. So you application works in N processes launched one-by-one. Try to launch all processes then join to all. Like: processes = [] for _i, _v in enumerate(info): print 'start....', _i _p = multiprocessing.Process(target=select, args=(_v, )) _p.start() processes.append(_p) # _p.join() # after the cycle for _p in processes: _p.join()