1

I have three tables which each takes around 1 minute to query from (i.e total of 3 min) like this

from my_utils import get_engine
import pandas as pd

def main():
   con1 = get_engine("table1")
   con2 = get_engine("table2")
   con3 = get_engine("table3")

   df1 = pd.read_sql(query1,con=con1)
   df2 = pd.read_sql(query2,con=con2)
   df3 = pd.read_sql(query3,con=con3)

main()

which yields to the sky to be "asynchronized".

I have thus tried the following (I'm rather new to using asyncio)

.
.
import asyncio

async def get_df1(query1):
   df1 = pd.read_sql(query1,con=con1)
   return df1

async def get_df2(query2):
   df2 = pd.read_sql(query2,con=con2)
   return df2

async def get_df3(query3):
   df3 = pd.read_sql(query3,con=con3)
   return df3

async def main():

 df1,df2,df3 = await asyncio.gather(get_df1(),get_df2(),get_df3())

asyncio.run(main())

It runs, but it takes the exact same time, as the sync-run.

Am I missing something?

1
  • You may want to look at this answer as well if the goal is just to execute queries in parallel and you are not wedded to asyncio coroutines: stackoverflow.com/questions/51426533/… Commented Jul 6, 2021 at 5:16

2 Answers 2

6

Switching between coroutines only occurs at an await statement, and since there are no awaits in your get_df functions your three queries will only ever be executed sequentially. Since pd.read_sql is not natively async, you'll have to wrap it with an executor to make an async version:

async def read_sql_async(stmt, con):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, pd.read_sql, stmt, con)

You'll then be able to run read_sql as an awaitable:

df1 = await read_sql_async(query1, con=con1)
Sign up to request clarification or add additional context in comments.

7 Comments

That seems to do the trick! ALthough one of my queries is stalling (I don't if theres many requests to the DB atm) - but is there some issue in createing a loop in each function, i.e if you copy your example 10 times thus creating the loop in each function?
You can use a loop to set up each future before executing them all with gather (if that's what you mean?) e.g. if you have lists of queries and connections you could do results = await asyncio.gather(*(read_sql_async(query, con) for query, con in zip(queries, connections)). If that's a really long list though you probably would be better off using a threadpool to limit the number of queries running at once to a sensible number.
However if you had a loop within get_df1, the steps in the loop are still executed sequentially, they're just allowed to interleave with the other running coroutines.
Right now my calls to a postgreSQL DB is increasing (by a lot). I have 6 calls (to three different databases in total) - the Mysql and MSSQL works fine, but two of them are to PostgreSQL and they are insanely slow using async
How to pass chunksize argument to read_sql function in run_in_executor(None, pd.read_sql, stmt, con) @CutePoison
|
3

You can use below code directly without using the wrapper:

await asyncio.to_thread(pd.read_sql, query, con)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.