4

I have the following code which read data from database (read_db) and write the data to parquet file (data.to_parquet). Both I/O operations take a while to run.

def main():
    while id < 1000:
       logging.info(f'reading - id: {id}')
       data = read_db(id) # returns a dataframe

       logging.info(f'saving - id: {id}')
       data.to_parquet(f'{id}.parquet')
       logging.info(f'saved - id: {id}')

       id += 1
       

It's slow so I want read_db(n+1) and to_parquet(n) running concurrently. I need to keep each step of id finishing sequentially though (read_db(n+1) need to run after read_db(n) and data.to_parquet(n+1) run after data.to_parquet(n).). Here is the asynchronous version

def async_wrap(f):
    @wraps(f)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        p = partial(f, *args, **kwargs)
        return await loop.run_in_executor(executor, p)
    return run

async def main():
    read_db_async = async_wrap(read_db)
    while id < 1000:
       logging.info(f'reading - id: {id}')
       data = await read_db_async(id) # returns a dataframe

       logging.info(f'saving - id: {id}')
       to_parquet_async = async_wrap(data.to_parquet)
       await data.to_parquet(f'{id}.parquet')
       logging.info(f'saved - id: {id}')

       id += 1

asyncio.get_event_loop().run_until_complete(main())

I excepted to see the some out of order of logs:

reading - id: 1
saving - id: 1      (saving 1 and reading 2 run in parallel)
reading - id: 2
saved - id: 1
saving - id: 2
reading - id: 3
saved - id: 2
.....

But, the actually logs are the same of synchronous code?

reading - id: 1
saving - id: 1
saved - id: 1
reading - id: 2
saving - id: 2
saved - id: 2
reading - id: 3
.....
5
  • Does this answer your question? How to run tasks concurrently in asyncio? Commented Jan 27, 2021 at 0:19
  • No, the answer runs all task in parallel, which is what I need to avoid. I just need some steps run in parallel. Commented Jan 27, 2021 at 0:55
  • You can run as many tasks in parallel as you like. It doesn't have to be all of them. Commented Jan 27, 2021 at 1:41
  • There's only one coroutine running in your solution, the one that started with main(). The await data.to_parquet(f'{id}.parquet') means the current coroutine will sleep until to_parquet finishes, so it won't start the next iteration before that. Check this question for a basic example Commented Jan 27, 2021 at 2:00
  • EDIT: I said "There's only one coroutine running" but that's not exactly accurate, since async calls create new coroutines. The thing is you are waiting for those new coroutines to finish to resume the original one. Commented Jan 27, 2021 at 2:13

1 Answer 1

2

You could make read_db(n+1) and to_parquet(n) run concurrently by using gather or equivalent:

async def main():
    read_db_async = async_wrap(read_db)
    prev_to_parquet = asyncio.sleep(0)  # no-op

    for id in range(1, 1000):
        data, _ = await asyncio.gather(read_db_async(id), prev_to_parquet)
        to_parquet_async = async_wrap(data.to_parquet)
        prev_to_parquet = to_parquet_async(f'{id}.parquet')

    await prev_to_parquet
Sign up to request clarification or add additional context in comments.

4 Comments

I also need read_db(n+1) runs after read_db(n) and data.to_parquet(n+1) runs after data.to_parquet(n).
Very nice. Actually, multiple read_db(..) can be run in parallel. It takes most of the time. However, I cannot have more than 3 read_db() because each one takes a lot of memories and there will not be enough memory if read too many data in dataframes.
@ca9163d9 Sure, you can use the same principle to parallelize any number of operations. You can even use the previous version of the answer and add a Semaphore(3) to prevent too many parallel reads. I'lll leave this version because it faithfully answers the question as asked (after the edit), and is simple enough to serve as basis for more sophisticated use cases.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.