2

In Python, if the database is very large, a simple select query will take a lot of time. I have a table with 4,700,000 records, and if I use SELECT * FROM MY_TABLE to get all the data in the table, it will take 18 minutes. By setting chunk_size and implement parallel query, it will save time.

So, my code is:

import os
import time
import multiprocessing
import pandas as pd
import MySQLdb as mysql

if __name__ == '__main__':
    conn = mysql.connect(host='192.168.0.114',
                         user='root',
                         passwd='fit123456',
                         db='A_stock_day',
                         charset='utf8'
                        )
    limit = 100000
    offset = 0
    dfs = []
    print 'start.....'
    _s = time.time()
    while True:
        _query = 'SELECT * FROM A_stock_basic LIMIT %d OFFSET %d' %\
                (limit, offset)
        dfs.append(pd.read_sql(_query, conn))
        offset += limit
        if len(dfs[-1]) < limit:
            break
    _e = time.time()
    print 'Time: ', _e - _s
    full_df = pd.concat(dfs)

But, it still takes about 10 minutes. How to parallelize it, let many threads to run at the same time and make execution time down to the execution time of one thread? I have the multiprocessing code here:

def select(info):
    """"""
    limit, offset, conn = info[0], info[1], info[2]
    _query = 'SELECT * FROM A_stock_basic LIMIT %d OFFSET %d' %\
            (limit, offset)
    s = time.time()
    info[3].append(pd.read_sql(_query, conn))
    e = time.time()
    print 'time: ', e - s, ' pid: ', os.getpid()

if __name__ == '__main__':
    conn = mysql.connect(host='192.168.0.114',
                         user='root',
                         passwd='fit123456',
                         db='A_stock_day',
                         charset='utf8'
                        )
    dfs, p, pool= [], [], multiprocessing.Pool(7)
    info = [(1000000, 0, conn, dfs),
            (1000000, 1000000, conn, dfs),
            (1000000, 2000000, conn, dfs),
            (1000000, 3000000, conn, dfs),
            (1000000, 4000000, conn, dfs),
            (1000000, 5000000, conn, dfs),
            (1000000, 6000000, conn, dfs),
           ]
    for _i, _v in enumerate(info):
        print 'start....', _i
        _p = multiprocessing.Process(target=select, args=(_v, ))
        _p.start()
        _p.join()
    print 'The End'

As you can see, although it launched multiprocessing, only one process reads the database at one time. So, that is just multiprocessing, not parallel process.

How to implement parallel multiprocessing to save time? Thanks.

1
  • _p.join() makes current process to wait for a termination of child. So you application works in N processes launched one-by-one. Try to launch all processes then join to all. Like: processes = [] for _i, _v in enumerate(info): print 'start....', _i _p = multiprocessing.Process(target=select, args=(_v, )) _p.start() processes.append(_p) # _p.join() # after the cycle for _p in processes: _p.join() Commented Mar 16, 2016 at 7:21

1 Answer 1

2

In your loop

for _i, _v in enumerate(info):
    print 'start....', _i
    _p = multiprocessing.Process(target=select, args=(_v, ))
    _p.start()
    _p.join()

you're starting processes and then joining on them immediately. This means that your main process will never launch more than one additional subprocess (since as soon as it launches one, it will wait for that one to complete before continuing).

The most direct way to fix this would be something like:

processes = []
for _i, _v in enumerate(info):
    print 'start....', _i
    _p = multiprocessing.Process(target=select, args=(_v, ))
    _p.start()
    processes.append(_p)
for _p in processes:
    _p.join()

However, a better way would be to use the pool object that you already created. For this, the code should look something like

pool.apply(select, info)

However, I think you'd be happier making select return the data it gets (instead of appending it to an array) and calling pool.map instead of pool.apply. This should help avoid some race conditions and shared memory problems that I think you'd otherwise run into.

You can read more about these functions at https://docs.python.org/2/library/multiprocessing.html, although I expect you've already been there.

Sign up to request clarification or add additional context in comments.

1 Comment

thank you. But, the connection of mysql will lost connection in the func. It is the share memory problem, I think.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.