2

I'm new to FastAPI and most async related functionality, so there's a chance that I am committing multiple faux pas at once; any help is appreciated.

I want to provide an API that exposes some functionality, let's say transforming a lower string to an upper string (for this MWE). Instead of computing every individual request immediately, I would like to fill up a buffer and only compute once that buffer is full. This might be, because it is cheaper (computationally or $-wise) and I am okay with the introduced delay.

I tried the following implementation

from fastapi import FastAPI
import asyncio
from typing import Dict

N = 2

app = FastAPI()
app.state.requests = []
app.state.answers = []
app.state.state = "start"


@app.get("/upper/")
async def upper(x: str) -> Dict[str, str]:
    myid = len(app.state.requests)
    app.state.requests.append(x)

    if len(app.state.requests) == N:
        app.state.answers = []
        for i in range(len(app.state.requests)):
            app.state.answers.append(app.state.requests[i].upper())
        app.state.state = "computed"
    else:
        await block_compute()

    myanswer = app.state.answers[myid]

    if len(app.state.requests) == N:
        app.state.answers = []
        app.state.requests = []
        app.state.state = "start"

    else:
        await block_flush()

    return {"id": str(myid), "answer": myanswer}


async def block_compute() -> None:
    while app.state.state != "computed":
        await asyncio.sleep(0.1)


async def block_flush() -> None:
    while app.state.state == "computed":
        await asyncio.sleep(0.1)


If I now run poetry run uvicorn src.api_playground.mwe:app --reload, and visit

  1. http://127.0.0.1:8000/upper/?x=cat in one tab, nothing happens (that's correct).

  2. Next I visit http://127.0.0.1:8000/upper/?x=dog and I get {"id":"1","answer":"DOG"}, which makes sense. But the first request is still waiting.

How would I implement this properly? Another concern of mine is that while clearing buffers, another request comes in and I have race conditions that yield problems. This seems too much of a hack to be the proper solution, but I am not sure where to look.

2 Answers 2

5

I gave this a little thought, and I think it is not completely impossible. It does go against the ASGI spec a little.

A little background first.

The ASGI spec (which Uvicorn implements, just as Starlette and by extension FastAPI) requires a webserver (Uvicorn) to call an application (e.g. Starlette or FastAPI), where the application must implement a signature of like app(scope, receive, send). In this, scope holds all kinds of details about the request, send is an async callable that the application (Starlette/FastAPI) uses to send a response to the webserver, and receive is an async callable that the application can use to receive more incoming data from the webserver. The latter, think about streaming a file, where chunks of bytes are sent from the webserver towards the application.

So typically, you have defined your FastAPI app as app = FastAPI(). Now when a requests arrives at Uvicorn, Uvicorn knows it can call app(scope=scope, receive=receive, send=send) and FastAPI knows what to do with the incoming request (=scope parameter) and returns the response to Uvicorn by calling await send(<send something>).

"Why is this relevant?" you might think. It's relevant because it's important to understand that by design, requests are handled as separate requests. All requests result in a call to app(scope=scope, receive=receive, send=send) which then applies logic to deal with the request. By now, we should be clear that batching requests is by no means a trivial question.

Middleware

One fun thing about ASGI is that you can chain applications together. That means, that a webserver calls an ASGI app, which in turn calls another ASGI app, which in turn calls another ASGI app, and the response climbs back on the chain. This is exactly how Middleware works in Starlette and FastAPI. They are basically ASGI applications, that receive a request, do something with it, and then call another ASGI application (Starlette or FastAPI), which wouldn't know the difference. Now, just so we are clear on this; this is still very much by design a per-request process. But, as an example, this is perfectly valid:

from fastapi import FastAPI

async def heavy_lifting_interceptor(scope, receive, send):
    await app(scope=scope, receive=receive, send=send)
    
app = FastAPI()

@app.get("/")
async def root():
    return {"hello":"world"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(heavy_lifting_interceptor, host="0.0.0.0", port=8000)

Note that Uvicorn is calling heavy_lifting_interceptor, which in turn is calling FastAPI with the exact same parameters.

Possible solution

FastAPI (or Starlette) cannot buffer requests, so I am thinking that any heavy computational stuff needs to happen before the requests hits FastAPI. So I am thinking a infinite Task (say batcher() that checks a asyncio.Queue. We also create some Middleware, that takes all incoming requests and puts them on the queue. It puts tuples on the Queue, holding (scope, receive, send) where scope is a dict, send and receive are async callables received from Uvicorn.

batcher() can wait for n items in the Queue or a certain amount of time (whatever comes first). An implementation of this could look like this. Then, batcher() takes the relevant information from the scope of all requests and does the heavy (batched) lifting). It also sends the requests (one by one in a for loop) to the application (e.g. FastAPI, by calling await app(scope=scope, receive=receive, send=send).

Note that the send must be changed, as it is still the same send callable from Uvicorn. You want it to return the response to batcher() because you need to amend the response with the outcome of the heavy-lifting batch processing for that request. An implementation of that can be found here in the CORSMiddleware. The adjusted send adds the outcome of the heavylifting, and calls the original send (the one received ultimo from Uvicorn).

So, I don't think it's impossible but it certainly not trivial. Hope this clarifies enough to get you going though!

EDIT: Possible implementation

I was wondering if it was possible, and I think I figured it out. It was fun to build but I am sure it is far from 'neat' or 'pythonic'. However, it works, the below can be ran as-is. If you fire requests at it you will see it will process them in batches. This does (obviously) increase the response time! The methodology is to capture all requests, and have a separate processor do its magic and attach the result of the heavy lifting to the request itself. If the heavy lifting is done, the Middleware will then call the normal endpoint with the new request (with the attached result to it).

from fastapi import FastAPI, Request
import typing
import asyncio
import time 
import logging 

Scope = typing.MutableMapping[str, typing.Any]
Message = typing.MutableMapping[str, typing.Any]
Receive = typing.Callable[[], typing.Awaitable[Message]]
Send = typing.Callable[[Message], typing.Awaitable[None]]
RequestTuple = typing.Tuple[Scope, Receive, Send]

logger = logging.getLogger("uvicorn")

async def very_heavy_lifting(requests: dict[int,RequestTuple], batch_no) -> dict[int, RequestTuple]:
    #This mimics a heavy lifting function, takes a whole 3 seconds to process this batch
    logger.info(f"Heavy lifting for batch {batch_no} with {len(requests.keys())} requests")
    await asyncio.sleep(3)
    processed_requests: dict[int,RequestTuple] = {}
    for id, request in requests.items():
        request[0]["heavy_lifting_result"] = f"result of request {id} in batch {batch_no}"
        processed_requests[id] = (request[0], request[1], request[2])
    return processed_requests

class Batcher():
    def __init__(self, batch_max_size: int = 5, batch_max_seconds: int = 3) -> None:
        self.batch_max_size = batch_max_size
        self.batch_max_seconds = batch_max_seconds
        self.to_process: dict[int, RequestTuple] = {}
        self.processing: dict[int, RequestTuple] = {}
        self.processed: dict[int, RequestTuple] = {}
        self.batch_no = 1

    def start_batcher(self):
        _ = asyncio.get_event_loop()
        self.batcher_task = asyncio.create_task(self._batcher())

    async def _batcher(self):
        while True:
            time_out = time.time() + self.batch_max_seconds
            while time.time() < time_out:
                if len(self.to_process) >= self.batch_max_size:
                    logger.info(f"Batch {self.batch_no} is full \
                        (requests: {len(self.to_process.keys())}, max allowed: {self.batch_max_size})")
                    self.batch_no += 1
                    await self.process_requests(self.batch_no)

                    break
                await asyncio.sleep(0)
            else:
                if len(self.to_process)>0:
                    logger.info(f"Batch {self.batch_no} is over timelimit (requests: {len(self.to_process.keys())})")
                    self.batch_no += 1
                    await self.process_requests(self.batch_no)
            await asyncio.sleep(0)

    async def process_requests(self, batch_no: int):
        logger.info(f"Start of processing batch {batch_no}...")
        for id, request in self.to_process.items():
            self.processing[id] = request
        self.to_process = {}
        processed_requests  = await very_heavy_lifting(self.processing, batch_no)
        self.processed = processed_requests
        self.processing = {}
        logger.info(f"Finished processing batch {batch_no}")

batcher = Batcher() 

class InterceptorMiddleware():
    def __init__(self, app) -> None:
        self.app = app
        self.request_id: int = 0

    async def __call__(self, scope: Scope, receive: Receive, send: Send):
        if scope["type"] != "http":  # pragma: no cover
            await self.app(scope, receive, send)
            return

        self.request_id += 1
        current_id = self.request_id
        batcher.to_process[self.request_id] = (scope, receive, send)
        logger.info(f"Added request {current_id} to batch {batcher.batch_no}.")
        while True:
            request = batcher.processed.get(current_id, None)
            if not request:
                await asyncio.sleep(0.5)
            else:
                logger.info(f"Request {current_id} was processed, forwarding to FastAPI endpoint..")
                batcher.processed.pop(current_id)
                await self.app(request[0], request[1], request[2])
                await asyncio.sleep(0)

app = FastAPI()

@app.on_event("startup")
async def startup_event():
    batcher.start_batcher()
    return

app.add_middleware(InterceptorMiddleware)

@app.get("/")
async def root(request: Request):
    return {"Return value": request["heavy_lifting_result"]}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
Sign up to request clarification or add additional context in comments.

3 Comments

Thanks a lot! I will read through the references but at least this is already clarifying that I am not missing that would be trivial :)
I updated the answer and created an example app :) was fun!
This post made my day! Thanks a ton for putting in the effort of coding up the runnable solution.
2

You can check the async-batcher project, it's a Python package that helps to easily process your HTTP/GRPC queries in batches, and it provides some ready-to-use batchers (Keras, Sklearn, DynamoDB, ScyllaDB, SQLAlachemy, ...). It has nice features like max batch size, max waiting time for a batch to be ready, and concurrent batches in case your process method is longer than the max waiting time.

If you are not familiar with asyncio, or your processing batch method cannot be async, you can provide a normal method instead of coroutine, and it will automatically run it in a separate thread to avoid blocking the asyncio event loop.

You can check the examples for more details: https://github.com/hussein-awala/async-batcher/tree/main/examples.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.