1

Edited my code - NOW it WORKS I'm trying to obtain some date from my Postgres db through asyncpg connection pool asynchronously. Basically my db contain about 100 different tables (per city) and i'm trying to gather all the data in one frame as fast as it possible.

    import pandas as pd
    import asyncpg
    import asyncio
    from time import time


    def make_t():
        lst = []
        # iterator for sql tuple
        for i in ['a',
                  'b',
                  'c']:
            i1 = i
            sql = """
    SELECT
    '%s' as city,
    MAX(starttime) AS max_ts
    FROM
    "table_%s"
    """
            lst.append(sql % (i, i1))
        return tuple(lst)


    async def get_data(pool, sql):
        start = time()
        async with pool.acquire() as conn:
           stmt = await conn.prepare(sql)
           columns = [a.name for a in stmt.get_attributes()]
           data = await stmt.fetch()
           print(f'Exec time: {time() - start}')
           return pd.DataFrame(data, columns=columns)


    async def main():
        dsn = 'postgres://user:[email protected]:5432/my_base'
        cT = ['city', 'max_ts']
        sqls = make_t()
        pool = await asyncpg.create_pool(dsn=dsn, max_size=50)
        start = time()
        tasks = []

        for sql in sqls:
            tasks.append(loop.create_task(get_data(pool, sql)))

        tasks = await asyncio.gather(*tasks)
        df = pd.DataFrame(columns=cT)
        for task in tasks:
            # form df from corutine results
            df = df.append(task.result())

        print(f'total exec time: {time() - start} secs')
        print('exiting main')
        return df


    loop = asyncio.get_event_loop()
    df = loop.run_until_complete(main())
    loop.close()

    print('exiting program')

Python 3.6.5 :: Anaconda, Inc.

Gets me this error:

Traceback (most recent call last): File "", line 319, in File "/Users/fixx/anaconda3/lib/python3.6/asyncio/base_events.py", line 468, in run_until_complete return future.result() File "", line 308, in main File "/Users/fixx/anaconda3/lib/python3.6/asyncio/tasks.py", line 594, in gather for arg in set(coros_or_futures): TypeError: unhashable type: 'list'

I cant figure out, why? My sqls in tuple!

0

1 Answer 1

1

asyncio.gather accepts coroutines as individual arguments, and you are sending it a list of tasks. You have to use the * operator to call gather correctly:

        tasks = await asyncio.gather(*tasks)
Sign up to request clarification or add additional context in comments.

3 Comments

Thanks! Also spotted that. Now it throws another ERROR i can't google: <code> Traceback (most recent call last): File "<string>", line 321, in <module> File "*/anaconda3/lib/python3.6/asyncio/base_events.py", line 468, in run_until_complete return future.result() File "<string>", line 309, in main File "<string>", line 290, in get_data AttributeError: aexit </code>
@Fixx Probably the return value of conn.cursor() is not a context manager usable in the async with statement.
Yep, thats the problem! Thanks!!! I've changed my def a bit, so that it returns df, suitable for context manager!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.