0

My requirement is to save mysql tables to csv files. For running it in parallel, I want to use a thread pool to execute multiple to_csv this function at the same time in order to dump many tables in parallel. Here I reproduced the issue and implemented the code like below:

import concurrent.futures
from sqlalchemy import create_engine

executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)

engine = create_engine(f'mysql+pymysql://user:passwd@host:3306/')
conn = engine.connect()

db = 'db_enterprise_0'
table1 = 't_enterprise_0'
table2 = 't_enterprise_1'
table3 = 't_enterprise_2'
filename1 = '/data/test_threading1.csv'
filename2 = '/data/test_threading2.csv'
filename3 = '/data/test_threading3.csv'


def to_csv(db, table, filename, limit=None, delimiter=','):
    sql = f'select * from {db}.{table}'
    proxy = conn.execution_options(stream_results=True) \
      .execution_options(net_write_timeout=3600) \
      .execution_options(max_allowed_packet=67108864).execute(sql)
    outcsv.writerow(proxy.keys())
    while 'batch not empty':
        batch = proxy.fetchmany(10000)  # 100,000 rows at a time
        if not batch:
            break

        for row in batch:
            outcsv.writerow(row)

executor.submit(to_csv, db, table1, filename1)
executor.submit(to_csv, db, table2, filename2)
executor.submit(to_csv, db, table3, filename3)

The problem:

What the odd thing was it was not able to write data into csv successully, it wrote an empty content instead! Although there were csv files created, the content was empty or only the **header inside.

-rw-rw-r-- 1 user user 20 Sep 18 10:05 test_threading1.csv
-rw-rw-r-- 1 user user  0 Sep 18 10:05 test_threading2.csv
-rw-rw-r-- 1 user user  0 Sep 18 10:05 test_threading3.csv

What's the problem with my code? I guess there might be some problem at conn.execute(sql). So what's the exact reason for that? Or is there any other ways for dumping MySQL tables to csv with sqlAlchemy in parallel?

Thanks in advance. Appreciate if anyone can give me some advice. Thanks.

6
  • Connections are not thread safe. Commented Sep 18, 2019 at 3:44
  • @IljaEverilä Hi IIja, I just read tables, not any other actions, it should be thread-safe? Commented Sep 18, 2019 at 3:56
  • Some driver implementations might have thread safe connections that support concurrent execution, but the norm is that they are not. Also the SQLAlchemy Connection object is not thread safe as is, but requires that you guard access to it: docs.sqlalchemy.org/en/13/core/… Commented Sep 18, 2019 at 4:09
  • after reading your answer in stackoverflow.com/questions/51769299/…, I think if I can use connection pool with ThreadPoolExecutor to achieve it? Commented Sep 18, 2019 at 5:49
  • If using threads, you could and should just checkout a new connection in each thread – in your case in each call to to_csv(). The engine and its connection pool is thread-safe and will handle the situation correctly, i.e. serve each thread its own connection. Commented Sep 18, 2019 at 10:10

1 Answer 1

1

Based on the guide from @Ilja Everilä , I solved it with multiprocessing, but not the multithreading.

Here is the code:

def run_in_process(db, table, filename, delimiter=','):

    file_ = open(filename, 'w')
    outcsv = csv.writer(file_, delimiter=delimiter, quotechar='"', quoting=csv.QUOTE_MINIMAL)
    engine.dispose()

    with engine.connect() as conn:
        sql = f'select * from {db}.{table}'
        proxy = conn.execute(sql)

        outcsv.writerow(proxy.keys())
        while 'batch not empty':
            print('batch')
            batch = proxy.fetchmany(10000)  # 100,000 rows at a time
            if not batch:
                break

            for row in batch:
                outcsv.writerow(row)
    file_.close()


p1 = Process(target=run_in_process, args=(db, table1, filename1))
p2 = Process(target=run_in_process, args=(db, table2, filename2))
p1.start()
p2.start()
p.join()

Reference:

https://docs.sqlalchemy.org/en/13/core/pooling.html#using-connection-pools-with-multiprocessing

Is connection pool in sqlalchemy thread-safe?

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.