2

Using SQLAlchemy async ORM 1.4, Postgres backend, Python 3.7

I am using an augmented Declarative Base with the SA ORM. The tables are not held in models.py but are committed directly to the database by parsing a JSON script that contains all the table schemas. Because of this, I can't import the models at the top of the script like from models import ThisTable.

So to work with CRUD operations on the tables, I first retrieve them by reflecting the metadata.

In the 'usual' way, when importing all the tables at the top of the script, a query like this works:

result = await s.execute(select(func.sum(TableName.column)))
curr = result.all()

When I try to reflect the table and column objects from the MetaData in order to query them, this doesn't work. There are lots of AttributeError: 'Table' object has no attribute 'func' or TypeError: 'Table' object is not callableerrors.


def retrieve_table_obj(table):
    meta = MetaData()
    meta.reflect(bind=sync_engine)
    return meta.tables[table]

def retrieve_table_cols(self, table):
    table = retrieve_table_obj('users')
    return table.columns.keys()

async def reading(collection, modifications):

    table = db.retrieve_table_obj(collection)
    columns = db.retrieve_table_cols(collection)
    for c in columns:
        for f in mods['fields']:
            if c in f:
                q = select(func.sum(table.c))

result = await s.execute(q)
curr = result.all()

asyncio.run(reading("users", {'fields': ["usage", "allowance"]}))

How can I query tables and columns in the database when they first have to be explicitly retrieved?

2
  • 1
    You could use the automap extension to build models via reflection. Commented Nov 30, 2021 at 18:10
  • Thanks for the automap suggestion! It works great with a sync engine, but I am working with an async engine and am having trouble getting automap to work with it, even when obtaining an engine connection and calling the function with conn.run_sync. Have you had success using automap with async engine instances? Commented Dec 1, 2021 at 0:16

1 Answer 1

4

The automap extension can be used to automatically reflect database tables to SQLAlchemy models. However automap uses inspect on the engine, and this isn't supported on async engines; we can work around this by doing the automapping using a synchronous engine. Once the models have been mapped they can be used by the async engine.

For example:

import asyncio

import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.automap import automap_base


sync_engine = sa.create_engine('postgresql:///test', echo=True, future=True)

Base = automap_base()
Base.prepare(sync_engine, reflect=True)


async def async_main(collection, modifications):
    engine = create_async_engine(
        "postgresql+asyncpg:///test",
        echo=True,
        future=True,
        connect_args={'ssl': False},
    )

    async_session = orm.sessionmaker(
        engine, class_=AsyncSession, future=True
    )

    async with async_session() as session:
        model = Base.classes[collection]
        matches = set(model.__mapper__.columns.keys()) & set(modifications['fields'])
        for m in matches:
            q = sa.select(sa.func.sum(getattr(model, m)))


            result = await session.execute(q)
            curr = result.all()
            for row in curr:
                print(row)
            print()

    # for AsyncEngine created in function scope, close and
    # clean-up pooled connections
    await engine.dispose()


asyncio.run(reading("users", {'fields': ["usage", "allowance"]}))

If you don't need models, caching the MetaData object rather than recreating it on every call to retrieve_table_obj would make the existing code more efficient, and replacing select(func.sum(table.c)) with select(sa.func.sum(getattr(table.c, c)))

Sign up to request clarification or add additional context in comments.

5 Comments

Thanks for this! I am going to try to integrate this into my code and see how it goes. Just to clarify about your caching suggestion: Do you mean just making the call once and storing it in a list and then throughout the rest of the code pulling from the list rather than the database? Is a list the best data type in which to store the MetaData object? I much prefer the clarity and succinctness of select(func.sum(table.c)) rather than select(sa.func.sum(getattr(table.c, c))), so I would l like to preserve select(func.sum(table.c)) type of syntax wherever possible.
I would just keep the metadata object itself, otherwise a dict; searching a long list can be expensive. I don't see how table.c can work, but if you can make it work that's fine. table.c[column_name] would be another option.
Right, that makes sense! Thanks again
Regarding building up a query bit by bit with multiple conditions, if I have an update function: async def updater(table_name, where_condition, data): Table = models_cache.classes[table_name] async with session_maker() as session: query = update(Table).filter_by(**where_condition) query = query.values(**data) await session.execute(query) await session.commit() asyncio.run(updater('appuser', {'name': 'Cheshire'}, {'org': 'Wonderland'})) I am not sure how to integrate your great example above with this.
For example, if we want a SQL query that looks something like: UPDATE appuserSET org = 'Wonderland'WHERE (name = 'Alice' OR name = 'Cheshire') AND (id > 0); And we call the function with parameters like: asyncio.run(updater('appuser', {'name': 'Cheshire'}, {'or': [{'name': 'Alice'}]}, {'and': [{'id': '> 0'}]}, {'org': 'Wonderland'})) How can we gradually build up a query line by line?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.