I gave this a little thought, and I think it is not completely impossible. It does go against the ASGI spec a little.
A little background first.
The ASGI spec (which Uvicorn implements, just as Starlette and by extension FastAPI) requires a webserver (Uvicorn) to call an application (e.g. Starlette or FastAPI), where the application must implement a signature of like app(scope, receive, send). In this, scope holds all kinds of details about the request, send is an async callable that the application (Starlette/FastAPI) uses to send a response to the webserver, and receive is an async callable that the application can use to receive more incoming data from the webserver. The latter, think about streaming a file, where chunks of bytes are sent from the webserver towards the application.
So typically, you have defined your FastAPI app as app = FastAPI(). Now when a requests arrives at Uvicorn, Uvicorn knows it can call app(scope=scope, receive=receive, send=send) and FastAPI knows what to do with the incoming request (=scope parameter) and returns the response to Uvicorn by calling await send(<send something>).
"Why is this relevant?" you might think. It's relevant because it's important to understand that by design, requests are handled as separate requests. All requests result in a call to app(scope=scope, receive=receive, send=send) which then applies logic to deal with the request. By now, we should be clear that batching requests is by no means a trivial question.
Middleware
One fun thing about ASGI is that you can chain applications together. That means, that a webserver calls an ASGI app, which in turn calls another ASGI app, which in turn calls another ASGI app, and the response climbs back on the chain. This is exactly how Middleware works in Starlette and FastAPI. They are basically ASGI applications, that receive a request, do something with it, and then call another ASGI application (Starlette or FastAPI), which wouldn't know the difference. Now, just so we are clear on this; this is still very much by design a per-request process. But, as an example, this is perfectly valid:
from fastapi import FastAPI
async def heavy_lifting_interceptor(scope, receive, send):
await app(scope=scope, receive=receive, send=send)
app = FastAPI()
@app.get("/")
async def root():
return {"hello":"world"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(heavy_lifting_interceptor, host="0.0.0.0", port=8000)
Note that Uvicorn is calling heavy_lifting_interceptor, which in turn is calling FastAPI with the exact same parameters.
Possible solution
FastAPI (or Starlette) cannot buffer requests, so I am thinking that any heavy computational stuff needs to happen before the requests hits FastAPI. So I am thinking a infinite Task (say batcher() that checks a asyncio.Queue. We also create some Middleware, that takes all incoming requests and puts them on the queue. It puts tuples on the Queue, holding (scope, receive, send) where scope is a dict, send and receive are async callables received from Uvicorn.
batcher() can wait for n items in the Queue or a certain amount of time (whatever comes first). An implementation of this could look like this. Then, batcher() takes the relevant information from the scope of all requests and does the heavy (batched) lifting). It also sends the requests (one by one in a for loop) to the application (e.g. FastAPI, by calling await app(scope=scope, receive=receive, send=send).
Note that the send must be changed, as it is still the same send callable from Uvicorn. You want it to return the response to batcher() because you need to amend the response with the outcome of the heavy-lifting batch processing for that request. An implementation of that can be found here in the CORSMiddleware. The adjusted send adds the outcome of the heavylifting, and calls the original send (the one received ultimo from Uvicorn).
So, I don't think it's impossible but it certainly not trivial. Hope this clarifies enough to get you going though!
EDIT: Possible implementation
I was wondering if it was possible, and I think I figured it out. It was fun to build but I am sure it is far from 'neat' or 'pythonic'. However, it works, the below can be ran as-is. If you fire requests at it you will see it will process them in batches. This does (obviously) increase the response time! The methodology is to capture all requests, and have a separate processor do its magic and attach the result of the heavy lifting to the request itself. If the heavy lifting is done, the Middleware will then call the normal endpoint with the new request (with the attached result to it).
from fastapi import FastAPI, Request
import typing
import asyncio
import time
import logging
Scope = typing.MutableMapping[str, typing.Any]
Message = typing.MutableMapping[str, typing.Any]
Receive = typing.Callable[[], typing.Awaitable[Message]]
Send = typing.Callable[[Message], typing.Awaitable[None]]
RequestTuple = typing.Tuple[Scope, Receive, Send]
logger = logging.getLogger("uvicorn")
async def very_heavy_lifting(requests: dict[int,RequestTuple], batch_no) -> dict[int, RequestTuple]:
#This mimics a heavy lifting function, takes a whole 3 seconds to process this batch
logger.info(f"Heavy lifting for batch {batch_no} with {len(requests.keys())} requests")
await asyncio.sleep(3)
processed_requests: dict[int,RequestTuple] = {}
for id, request in requests.items():
request[0]["heavy_lifting_result"] = f"result of request {id} in batch {batch_no}"
processed_requests[id] = (request[0], request[1], request[2])
return processed_requests
class Batcher():
def __init__(self, batch_max_size: int = 5, batch_max_seconds: int = 3) -> None:
self.batch_max_size = batch_max_size
self.batch_max_seconds = batch_max_seconds
self.to_process: dict[int, RequestTuple] = {}
self.processing: dict[int, RequestTuple] = {}
self.processed: dict[int, RequestTuple] = {}
self.batch_no = 1
def start_batcher(self):
_ = asyncio.get_event_loop()
self.batcher_task = asyncio.create_task(self._batcher())
async def _batcher(self):
while True:
time_out = time.time() + self.batch_max_seconds
while time.time() < time_out:
if len(self.to_process) >= self.batch_max_size:
logger.info(f"Batch {self.batch_no} is full \
(requests: {len(self.to_process.keys())}, max allowed: {self.batch_max_size})")
self.batch_no += 1
await self.process_requests(self.batch_no)
break
await asyncio.sleep(0)
else:
if len(self.to_process)>0:
logger.info(f"Batch {self.batch_no} is over timelimit (requests: {len(self.to_process.keys())})")
self.batch_no += 1
await self.process_requests(self.batch_no)
await asyncio.sleep(0)
async def process_requests(self, batch_no: int):
logger.info(f"Start of processing batch {batch_no}...")
for id, request in self.to_process.items():
self.processing[id] = request
self.to_process = {}
processed_requests = await very_heavy_lifting(self.processing, batch_no)
self.processed = processed_requests
self.processing = {}
logger.info(f"Finished processing batch {batch_no}")
batcher = Batcher()
class InterceptorMiddleware():
def __init__(self, app) -> None:
self.app = app
self.request_id: int = 0
async def __call__(self, scope: Scope, receive: Receive, send: Send):
if scope["type"] != "http": # pragma: no cover
await self.app(scope, receive, send)
return
self.request_id += 1
current_id = self.request_id
batcher.to_process[self.request_id] = (scope, receive, send)
logger.info(f"Added request {current_id} to batch {batcher.batch_no}.")
while True:
request = batcher.processed.get(current_id, None)
if not request:
await asyncio.sleep(0.5)
else:
logger.info(f"Request {current_id} was processed, forwarding to FastAPI endpoint..")
batcher.processed.pop(current_id)
await self.app(request[0], request[1], request[2])
await asyncio.sleep(0)
app = FastAPI()
@app.on_event("startup")
async def startup_event():
batcher.start_batcher()
return
app.add_middleware(InterceptorMiddleware)
@app.get("/")
async def root(request: Request):
return {"Return value": request["heavy_lifting_result"]}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)