Complete FastAPI Performance Tuning Guide: Build Scalable APIs with Async I/O, Connection Pools, Caching, and Rate Limiting
Summary (Grasp the overall flow first)
- FastAPI performance is determined not only by your application code, but by the combination of Uvicorn/Gunicorn settings, DB connection pools, caching, rate limiting, and external storage.
- By using async I/O correctly and offloading blocking work to threads or background jobs, you can build APIs that handle high concurrency.
- For DBs and external APIs, use connection pools and caching so you don’t repeat the same heavy computation or data fetching over and over.
- Protect the system from sudden traffic spikes using rate limiting and queuing, balancing between “protecting the system” and “being reasonably generous to clients.”
- Finally, we’ll summarize a measurement and tuning roadmap so you can clearly see where to begin.
Who will benefit from reading this (concrete reader personas)
-
Individual developers / learners
- You’ve built a small FastAPI app and are worried about what will slow down as users increase.
- You’ve heard “async makes it faster,” but aren’t sure what exactly you should be careful about.
-
Backend engineers in small teams
- You run FastAPI in production and have started to notice worse response times or timeouts during peak hours.
- You want a structured overview of how to design DB connection pools, caching, and rate limiting.
-
SaaS dev teams in the growth phase
- You want to set the direction for a “scalable architecture” before traffic really takes off.
- You need criteria for how far to push tuning while balancing performance, reliability, and cost.
1. Decide what you want to speed up first
Before blindly tuning, clarify “what you want to improve and how.” That reduces wasted effort.
1.1 Typical metrics
- Latency: response time per request (seconds, milliseconds)
- Throughput: requests processed per second (RPS)
- Concurrent connections: number of users/connections handled at the same time
- Error rate: proportion of timeouts and 5xx errors
In practice, what most directly affects UX is P95 latency (95% of requests complete within this time) and the error rate.
1.2 Roughly classify “where it’s slow”
Bottlenecks in a FastAPI app typically fall into three broad categories:
- Application code (CPU-heavy work, algorithms, JSON conversion, etc.)
- External I/O (DB, external APIs, storage, email, etc.)
- Infrastructure settings (number of workers, number of pods, connection limits, timeouts, etc.)
In this article, we’ll walk through the “mindset” and “examples” to improve 1–3 in a well-balanced way.
2. Async I/O vs threads / processes
FastAPI is designed around async I/O (async / await). Used well, it becomes very strong under high concurrency. But that doesn’t mean “turn everything async and it’ll be faster.”
2.1 Basics of async functions
In FastAPI, if you write an endpoint as async def, it will schedule async I/O operations internally in an efficient way.
from fastapi import FastAPI
import httpx
app = FastAPI()
@app.get("/weather")
async def get_weather(city: str):
async with httpx.AsyncClient(timeout=2.0) as client:
r = await client.get(f"https://api.example.com/weather?city={city}")
return r.json()
Key points here:
- Use async-capable clients like
httpx.AsyncClient - While waiting on I/O, the event loop can process other requests
2.2 CPU-bound work should go to separate threads / processes
Loops over tens of thousands of items, image processing, encryption, and other CPU-heavy work won’t be faster just because you make them async. They can actually block the event loop and hurt performance.
With FastAPI, the standard practice is to offload CPU-bound work like this:
- Use
run_in_threadpoolto submit the work to a thread pool - Offload to a background job system (Celery, etc.)
Here’s a simple example:
from fastapi import FastAPI
from starlette.concurrency import run_in_threadpool
app = FastAPI()
def heavy_calc(n: int) -> int:
s = 0
for i in range(n):
s += i * i
return s
@app.get("/heavy")
async def heavy_endpoint(n: int = 100_000_000):
result = await run_in_threadpool(heavy_calc, n)
return {"n": n, "result": result}
This way the event loop stays free and the CPU-heavy work runs in the thread pool.
2.3 Choosing Uvicorn / Gunicorn worker counts
In production, it’s common to run Gunicorn + UvicornWorker rather than plain Uvicorn. A typical rule of thumb is:
- Number of workers: around
#CPU cores × 2 - Concurrent requests per worker: can be quite high if the workload is mostly async I/O
However, this is just a rule of thumb. In reality, you should adjust after running load tests.
3. Designing DB connection pools (SQLAlchemy)
In FastAPI apps, the biggest bottleneck is often the DB (RDB). Let’s use SQLAlchemy as an example to clarify the basics of connection pooling.
3.1 What is a connection pool?
A connection pool reuses connections to the DB.
- Opening a new connection every time → large overhead for connection setup, slower overall
- Reusing from a pool → subsequent queries can run immediately
Here’s a simple FastAPI + SQLAlchemy example:
# app/db.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+psycopg://user:pass@db:5432/app"
engine = create_engine(
DATABASE_URL,
pool_size=10, # Number of connections kept ready
max_overflow=20, # Temporary extra connections allowed under peak load
pool_pre_ping=True, # Detect dead connections and reconnect
)
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
3.2 How to think about pool settings
-
pool_size- Number of connections kept per application instance
- Too large → can easily exceed DB connection limits
- Too small → queueing increases and latency grows
-
max_overflow- Number of extra connections allowed temporarily under peak load
- Tune so you don’t overshoot the DB’s connection limit
-
DB connection limit
- e.g. PostgreSQL’s
max_connections - Make sure
(#API instances × pool_size × safety factor)doesn’t exceed this
- e.g. PostgreSQL’s
For example:
- API instances: 3
- Per instance:
pool_size=10,max_overflow=10 - Theoretical max connections:
3 × (10 + 10) = 60
In this case, the DB’s max_connections must be configured to be somewhat larger than 60 (with extra room for other services and admin connections).
3.3 Manage session lifecycle properly
In FastAPI, it’s common practice to treat the DB session as a dependency and open/close it per request.
# app/deps.py
from app.db import SessionLocal
from sqlalchemy.orm import Session
def get_db() -> Session:
db = SessionLocal()
try:
yield db
finally:
db.close()
# app/routers/articles.py
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app.deps import get_db
router = APIRouter()
@router.get("/articles")
def list_articles(db: Session = Depends(get_db)):
return db.execute("SELECT id, title FROM articles").fetchall()
Open a session per request, and always close it when done. Strictly following this “obvious” pattern is the first step to preventing connection leaks and performance degradation.
4. Caching strategies: in-memory, Redis, HTTP caching
Repeatedly performing the same computation or repeatedly fetching the same data from the DB is extremely wasteful for performance. Let’s look at the main caching strategies.
4.1 Where to cache?
Broadly, you have three options:
-
Inside the app process (in-memory cache)
- Libraries:
functools.lru_cache, a plain dict, etc. - Cache is per process, not shared across instances
- Libraries:
-
External cache (Redis, etc.)
- Shared across multiple instances
- Can attach TTLs (expiry) for automatic refresh
-
HTTP-level caching
- Use
ETagandCache-Controlheaders to let browsers/CDNs cache responses - Effective for static content or data that changes infrequently
- Use
4.2 Simple in-memory cache example
For configuration values or external API responses that change rarely, in-memory cache is a good first step.
# app/services/configs.py
from functools import lru_cache
import httpx
@lru_cache(maxsize=128)
def fetch_remote_config() -> dict:
# In practice this would fetch config from an external service
r = httpx.get("https://config.example.com/app-config")
return r.json()
But keep in mind:
- In-memory cache is lost on deploy / restart
- It’s not shared between instances
Use it with those characteristics in mind.
4.3 Redis-based caching
For more robust caching, Redis is the classic choice.
# app/cache.py
import json
import aioredis
from typing import Any
class RedisCache:
def __init__(self, url: str):
self._url = url
self._redis: aioredis.Redis | None = None
async def init(self):
self._redis = await aioredis.from_url(self._url, encoding="utf-8", decode_responses=True)
async def get_json(self, key: str) -> Any | None:
assert self._redis is not None
data = await self._redis.get(key)
if data is None:
return None
return json.loads(data)
async def set_json(self, key: str, value: Any, ttl: int):
assert self._redis is not None
await self._redis.set(key, json.dumps(value), ex=ttl)
# app/routers/rankings.py
from fastapi import APIRouter, Depends
from app.cache import RedisCache
from app.deps import get_db
router = APIRouter()
async def get_cache() -> RedisCache:
# In practice, you’d return an instance initialized at app startup
...
@router.get("/rankings")
async def get_rankings(cache: RedisCache = Depends(get_cache), db = Depends(get_db)):
cached = await cache.get_json("rankings:top10")
if cached is not None:
return {"source": "cache", "items": cached}
# Assume this runs a heavy aggregation query
rows = db.execute("SELECT ...").fetchall()
items = [dict(row) for row in rows]
await cache.set_json("rankings:top10", items, ttl=60) # cache for 60 seconds
return {"source": "db", "items": items}
You can gradually apply this pattern of “check cache → miss → compute → write to cache → return” to frequently-used endpoints.
4.4 Using HTTP cache headers
For static JSON or data that updates, say, daily, HTTP caching is also very effective.
from fastapi import FastAPI, Response
app = FastAPI()
@app.get("/static-data")
def static_data():
data = {"message": "Data that rarely changes"}
headers = {"Cache-Control": "public, max-age=300"} # cache for 5 minutes
return Response(content=str(data), media_type="application/json", headers=headers)
CDNs and browsers will cache responses smartly, significantly reducing load on your app.
5. Protecting your service with rate limiting
When thinking about performance, it’s crucial not only to ask “how fast can we process requests?” but also “beyond what point do we stop trying to process more?” That’s the idea of rate limiting.
5.1 Why do we need rate limiting?
- Some buggy clients or bots might send a huge number of requests
- Trying to handle every request during unexpected traffic spikes can crash the entire system
- You may want to limit usage for free plans or certain users
In such cases, rate limiting acts as a safety device for system-wide stability.
5.2 Token bucket, in simple terms
A popular algorithm is the token bucket:
- Keep a “bucket” with a certain number of tokens
- Each request consumes 1 token
- Tokens are replenished at a constant rate (e.g. 5 tokens per second)
- If there are no tokens left, requests from that user get a 429 (Too Many Requests) response
If you implement this using Redis, you can enforce distributed rate limits across multiple instances.
5.3 Simple dependency example (in-memory)
For learning purposes, here is a simple in-memory implementation.
(For production, you should use an external store like Redis.)
# app/rate_limit.py
import time
from fastapi import HTTPException, status
class SimpleRateLimiter:
def __init__(self, capacity: int, refill_rate_per_sec: float):
self.capacity = capacity
self.refill_rate = refill_rate_per_sec
self.tokens = capacity
self.last_refill = time.monotonic()
def allow(self) -> bool:
now = time.monotonic()
elapsed = now - self.last_refill
refill = elapsed * self.refill_rate
if refill > 0:
self.tokens = min(self.capacity, self.tokens + refill)
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return True
return False
global_limiter = SimpleRateLimiter(capacity=10, refill_rate_per_sec=5)
def limit_global():
if not global_limiter.allow():
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Too many requests, please slow down.",
)
# app/main.py
from fastapi import FastAPI, Depends
from app.rate_limit import limit_global
app = FastAPI()
@app.get("/limited", dependencies=[Depends(limit_global)])
def limited_endpoint():
return {"ok": True}
In this example:
- You can burst up to 10 requests in a row
- After that, capacity recovers at a rate of 5 requests per second
In production, you’d usually need something more complex, such as per-IP or per-user limits and distributed limiting via Redis.
6. Don’t serve files directly from FastAPI
From a performance perspective, deciding “what not to do in FastAPI” is just as important. A prime example is serving large files.
6.1 Why not serve files directly from the app?
- API workers waste CPU and memory doing file I/O
- You won’t match the performance of dedicated storage / CDNs
- Using rate limiting and signed URLs gives you a simpler and more robust design
So the general pattern is:
- Store files on S3-compatible storage or cloud storage
- Let FastAPI only generate signed URLs for access
6.2 API example that just returns a signed URL
# app/storage.py
from datetime import datetime, timedelta
import hmac, hashlib, base64, urllib.parse
def generate_signed_url(object_key: str, expires_in: int = 300) -> str:
# In practice, you’d typically use the SDK of S3 or other cloud storage.
# This is a simplified illustration.
secret = b"secret-key"
expires_at = int((datetime.utcnow() + timedelta(seconds=expires_in)).timestamp())
signature_raw = f"{object_key}:{expires_at}".encode()
sig = hmac.new(secret, signature_raw, hashlib.sha256).digest()
sig_b64 = base64.urlsafe_b64encode(sig).decode().rstrip("=")
query = urllib.parse.urlencode({"expires": expires_at, "sig": sig_b64})
return f"https://cdn.example.com/files/{object_key}?{query}"
# app/routers/files.py
from fastapi import APIRouter, HTTPException
from app.storage import generate_signed_url
router = APIRouter()
@router.get("/files/{file_id}")
def get_file_url(file_id: str):
# In practice you’d look up object_key from a DB, etc.
object_key = f"{file_id}.pdf"
url = generate_signed_url(object_key, expires_in=300)
if not url:
raise HTTPException(404, "not found")
return {"url": url}
With this design, FastAPI keeps acting as a lightweight API server, while heavy file serving is handed off to the CDN or storage service.
7. Measure and profile: don’t rely on intuition
The single most important rule for performance tuning is: measure, don’t guess.
7.1 Start with simple measurements
First, add a simple “timing middleware” and log processing time per endpoint.
# app/middleware/timing.py
import time
import logging
from starlette.middleware.base import BaseHTTPMiddleware
log = logging.getLogger("timing")
class TimingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start = time.perf_counter()
response = await call_next(request)
dur_ms = int((time.perf_counter() - start) * 1000)
log.info("path=%s method=%s status=%s dur_ms=%d",
request.url.path, request.method, response.status_code, dur_ms)
return response
# app/main.py
from fastapi import FastAPI
from app.middleware.timing import TimingMiddleware
app = FastAPI()
app.add_middleware(TimingMiddleware)
Just visualizing these logs in Kibana, Cloud Logging, etc. lets you see which endpoints are particularly slow.
7.2 Use load testing tools
- Use
locust,k6,wrk, etc. to run basic load tests - For example, measure “P95 latency when sending 100 requests/sec for 1 minute”
While doing that:
- Change worker counts and pool settings to find a well-balanced configuration
- Compare metrics before/after enabling caching or rate limiting
This way you can actually feel the impact of your changes.
8. Sample mini-API with cache + rate limiting
Let’s combine a few pieces into a small sample API.
(You need Redis to actually run this, but the goal is to illustrate the idea.)
# app/cache.py (Redis cache)
import json
from typing import Any
from redis import asyncio as aioredis
class RedisCache:
def __init__(self, url: str):
self.url = url
self._redis: aioredis.Redis | None = None
async def init(self):
if self._redis is None:
self._redis = aioredis.from_url(self.url, encoding="utf-8", decode_responses=True)
async def get_json(self, key: str) -> Any | None:
await self.init()
data = await self._redis.get(key)
if not data:
return None
return json.loads(data)
async def set_json(self, key: str, value: Any, ttl: int):
await self.init()
await self._redis.set(key, json.dumps(value), ex=ttl)
# app/rate_limit.py (Simple Redis-based rate limiting)
import time
from fastapi import HTTPException, status
from redis import asyncio as aioredis
class RedisRateLimiter:
def __init__(self, url: str, prefix: str = "rate"):
self.url = url
self.prefix = prefix
self._redis: aioredis.Redis | None = None
async def init(self):
if self._redis is None:
self._redis = aioredis.from_url(self.url, encoding="utf-8", decode_responses=True)
async def allow(self, key: str, limit: int, window_sec: int) -> bool:
await self.init()
assert self._redis is not None
now = int(time.time())
window = now // window_sec
redis_key = f"{self.prefix}:{key}:{window}"
# Increment counter and set TTL
p = self._redis.pipeline()
p.incr(redis_key)
p.expire(redis_key, window_sec)
count, _ = await p.execute()
return int(count) <= limit
async def ensure(self, key: str, limit: int, window_sec: int):
ok = await self.allow(key, limit, window_sec)
if not ok:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Too many requests",
)
# app/main.py (API using cache + rate limiting)
from fastapi import FastAPI, Depends, Request
from app.cache import RedisCache
from app.rate_limit import RedisRateLimiter
app = FastAPI(title="Perf Demo")
cache = RedisCache("redis://redis:6379/0")
limiter = RedisRateLimiter("redis://redis:6379/0")
async def rate_limited(request: Request):
# Simple IP-based throttling (for production, consider X-Forwarded-For, etc.)
client_ip = request.client.host
await limiter.ensure(client_ip, limit=30, window_sec=60) # 30 req/min per IP
@app.get("/expensive", dependencies=[Depends(rate_limited)])
async def expensive_endpoint():
cached = await cache.get_json("expensive:result")
if cached is not None:
return {"source": "cache", "value": cached}
# In practice, this would be heavy computation or external API calls
value = {"answer": 42}
await cache.set_json("expensive:result", value, ttl=10)
return {"source": "fresh", "value": value}
This mini API behaves as follows:
- Each IP is limited to 30 requests per minute
- The first call returns
"fresh", and subsequent calls within 10 seconds return"cache"
You can start to feel what a more realistic system might look like.
9. Common pitfalls and how to address them
Finally, here’s a summary of common pitfalls in FastAPI performance tuning and simple countermeasures.
| Symptom | Cause | Countermeasure |
|---|---|---|
| Latency spikes when concurrency increases | External I/O is synchronous blocking, DB pool is undersized | Switch to async clients, tune pool size, introduce caching |
| Large variance in latency | Some endpoints are heavy / N+1 queries | Identify slow endpoints and optimize queries or add prefetching |
| DB hits connection limit and crashes | (#instances × pool_size) is too large | Balance DB max_connections and API pool settings |
| Whole system crashes on traffic spikes | No rate limiting, process tries to handle everything | Flatten load with rate limiting or queueing |
| File downloads are slow | API server directly serves large files | Move to CDN/object storage and only return signed URLs |
| Tuning seems to have no effect | No before/after measurement | Always benchmark or log timings to measure “change” |
10. Rollout roadmap (where to start)
Given everything so far, here’s a suggested step-by-step plan:
- Add a simple timing middleware and log which endpoints are slow.
- For the slow endpoints where external I/O is the bottleneck, improve them with async I/O and pool tuning.
- Add in-memory or Redis caching to heavy operations that are called repeatedly and monitor cache hit rates.
- Introduce rate limiting and define a clear line for “how far we are willing to go” for processing requests.
- Offload file serving and batch-like heavy tasks to CDNs, storage services, and background jobs (Celery, etc.).
- Use load testing tools to regularly check how P95 latency behaves under peak load and adjust infra settings accordingly.
As you iterate “measure → hypothesize → improve → re-measure,” you’ll develop an intuition for your FastAPI app’s quirks and the system’s limits.
Summary
- FastAPI performance is determined by the combination of async I/O, worker settings, DB connection pools, caching, rate limiting, and external storage.
- Start by measuring and visualizing logs to find where it’s slow, then tackle high-impact issues like external I/O and N+1 queries first.
- Connection pools, caching, and rate limiting are reusable patterns across many systems. Use the sample code as a starting point and gradually adapt it to your own projects.
- You don’t need to aim for perfection on day one. By steadily layering small measurements and improvements, FastAPI can become a scalable, reliable API server you can trust.
Thank you very much for reading this far.
At your own pace, step by step, let’s grow your FastAPI app into a service that is fast, stable, and easy to operate.
