34

I am writing my first project in FastAPI and I am struggling a bit. In particular, I am not sure how I am supposed to use asyncpg connection pool in my app. Currently what I have goes like this

in db.py I have

pgpool = None


async def get_pool():
    global pgpool
    if not pgpool:
        pgpool = await asyncpg.create_pool(dsn='MYDB_DSN')
    return pgpool

and then in individual files I use the get_pool as a dependency.

@router.post("/user/", response_model=models.User, status_code=201)
async def create_user(user: models.UserCreate, pgpool = Depends(get_pool)):
    # ... do things ...

First, every endpoint I have uses the database, so it seems silly to add that dependency argument for every single function. Second, this seems like a roundabout way of doing things. I define a global, then I define a function that returns that global and then I inject the function. I am sure there is more natural way of going about it.

I have seen people suggest just adding whatever I need as a property to the app object

@app.on_event("startup")
async def startup():
    app.pool = await asyncpg.create_pool(dsn='MYDB_DSN')

but it doesn't work when I have multiple files with routers, I don't know how to access the app object from a router object.

What am I missing?

4 Answers 4

18

You can use an application factory pattern to setup your application.

To avoid using global or adding things directly to the app object you can create your own class Database to hold your connection pool.

To pass the connection pool to every route you can use a middleware and add the pool to request.state

Here's the example code:

import asyncio

import asyncpg
from fastapi import FastAPI, Request

class Database():

    async def create_pool(self):
        self.pool = await asyncpg.create_pool(dsn='MYDB_DSN')

def create_app():

    app = FastAPI()
    db = Database()

    @app.middleware("http")
    async def db_session_middleware(request: Request, call_next):
        request.state.pgpool = db.pool
        response = await call_next(request)
        return response

    @app.on_event("startup")
    async def startup():
        await db.create_pool()

    @app.on_event("shutdown")
    async def shutdown():
        # cleanup
        pass

    @app.get("/")
    async def hello(request: Request):
        print(request.state.pool)

    return app

app = create_app()
Sign up to request clarification or add additional context in comments.

10 Comments

well, in this case, I will have to add request: Request to every route instead of pool=Depends(get_pool), doesn't seem to save much hustle.
Ah... Well, you could create your pool in a module (e.g database.py) and import it directly from there.
FWIW the maintainers suggest something similar -- github.com/tiangolo/fastapi/issues/1800
Sorry, may I ask why you guys are using directly asyncpg instead of using the Databases wrapper that was in the documentation. Is it because it gives you more control to the underlying connection pool?
Guys, in the meanwhile I have been using Fast-API for almost a year on production. Databases is not a great wrapper after all. I had issues with getting the connection pooling setup correctly with it. This is why I ended up using native asyncpg as well, as it has a cleaner way to setup connection pooling. If you don't have a complex distributed system where too many servers connect to the same central database, then you might be ok using Databases wrapper. Otherwise if you need a more efficient connection pooling, you have to use asyncpg after all.
|
3

The way I do it is in db.py.

class Database:
    def __init__(self,user,password,host,database,port="5432"):
        self.user = user
        self.password = password
        self.host = host
        self.port = port
        self.database = database
        self._cursor = None

        self._connection_pool = None
        
    async def connect(self):
        if not self._connection_pool:
            try:
                self._connection_pool = await asyncpg.create_pool(
                    min_size=1,
                    max_size=20,
                    command_timeout=60,
                    host=self.host,
                    port=self.port,
                    user=self.user,
                    password=self.password,
                    database=self.database,
                    ssl="require"
                )
                logger.info("Database pool connectionn opened")

            except Exception as e:
                logger.exception(e)

    async def fetch_rows(self, query: str,*args):
        if not self._connection_pool:
            await self.connect()
        else:
            con = await self._connection_pool.acquire()
            try:
                result = await con.fetch(query,*args)
                return result
            except Exception as e:
                logger.exception(e)
            finally:
                await self._connection_pool.release(con)

    async def close(self):
        if not self._connection_pool:
            try:
                await self._connection_pool.close()
                logger.info("Database pool connection closed")
            except Exception as e:
                logger.exception(e)

Then in app

@app.on_event("startup")
async def startup_event():
    database_instance = db.Database(**db_arguments)
    await database_instance.connect()
    app.state.db = database_instance
    logger.info("Server Startup")

@app.on_event("shutdown")
async def shutdown_event():
    if not app.state.db:
        await app.state.db.close()
    logger.info("Server Shutdown")

Then you can get the db instance with request.app.state.db by passing in a request parameter in the routes.

1 Comment

If you read my question, I have tried this approach. There seems to be no way to access the main app object from APIRouter() routes.
1

you can use classmethod to avoid creating the database connection pool multiple times.

and use lifespan to setup the connection before the http method is called.

for example

db.py

import os
from typing import Annotated

from fastapi import Depends
from sqlmodel import Session, create_engine


class PostgresPool:
    pg_engine = None

    @classmethod
    def init(cls):
        DB_URI = os.environ.get("DB_URI", "")
        if cls.pg_engine is None:
            logging.info("create pg engine.")
            cls.pg_engine = create_engine(PG_URI, pool_size=5)

    @classmethod
    def close(cls):
        if cls.pg_engine is not None:
            cls.pg_engine.dispose()

    @classmethod
    def get_session(cls):
        with Session(cls.pg_engine) as session:
            yield session

SessionDep = Annotated[Session, Depends(PostgresPool.get_session)]

app.py

from contextlib import asynccontextmanager
from fastapi import FastAPI

from db import PostgresPool

@asynccontextmanager
async def lifespan(app: FastAPI):
    PostgresPool.init()
    yield
    PostgresPool.close()


app = FastAPI(lifespan=lifespan)

some_http_endpioint.py

from db import SessionDep

@router.get("/http/endpoint")
def http_endpoint(job_id: int, session: SessionDep):
    pass

2 Comments

with Session(cls.pg_engine) as session: where is pg_engine defined?
@Pynchia copy the wrong part of my code, it's now updated
-2

The info in your post allowed me to come up with this solution. A little digging in the class definitions and I was able to find a startup event which can hook async defs onto.

db.py

from asyncpg import create_pool, Pool

pgpool: Pool | None = None

async def get_pool():
    global pgpool
    if not pgpool:
        pgpool = await create_pool(dsn='MY_DSN')
    return pgpool

my_router.py

from fastapi import APIRouter
from asyncpg import Pool
from db import get_pool

router = APIRouter()
pgpool: Pool | None = None

@router.on_event("startup")
async def router_startup():
    global pgpool
    pgpool = await get_pool()

pgpool.acquire() will be available to async defs within my_router.py.

2 Comments

Interesting. I don't see the router startup event anywhere in the docs though, not sure how well supported this feature is.
you should not be using global for this. you should be using the FastAPI startup event and passing the pool through the context. using global can cause a lot of issues.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.