green snake
Photo by Pixabay on Pexels.com
目次

FastAPIパフォーマンスチューニング完全ガイド:非同期I/O・接続プール・キャッシュ・レート制限でスケーラブルなAPIを作る


要約(先に全体の流れをつかむ)

  • FastAPIのパフォーマンスは、アプリコードだけでなく「Uvicorn/Gunicornの設定」「DB接続プール」「キャッシュ」「レート制限」「外部ストレージ」の組み合わせで決まります。
  • 非同期I/Oを正しく使い、ブロッキング処理はスレッドやバックグラウンドジョブに逃がすことで、同時接続数に強いAPIになります。
  • DBや外部APIは接続プールとキャッシュを活用し、同じ計算・取得を何度も繰り返さない設計が重要です。
  • レート制限やキューイングで、突発的なアクセス集中からシステムを守りつつ「ほどよくサービスする」バランスをとります。
  • 最後に、測定とチューニングのロードマップをまとめ、どこから手をつければ良いかを具体的に整理します。

誰が読んで得をするか(具体的な読者像)

  1. 個人開発・学習者さん

    • 小さなFastAPIアプリができたが、ユーザーが増えるとどこから遅くなりそうか不安。
    • 「非同期にすると速くなるらしいけど、何を気をつければいいの?」と感じている方。
  2. 小規模チームのバックエンドエンジニアさん

    • 本番でFastAPIを使っているが、ピーク時間帯のレスポンス悪化やタイムアウトが気になり始めている。
    • DB接続プールやキャッシュ、レート制限の設計をまとめて整理したい方。
  3. 成長中のSaaS開発チームさん

    • 将来のスケールを見越して、今のうちに「伸びるアーキテクチャ」の方向性を固めたい。
    • 性能・安定性・コストのバランスを考えながら、どこまでチューニングすべきか判断軸が欲しい方。

1. まず「何を速くしたいのか」を決める

やみくもにチューニングを始める前に、「何をどうしたいか」をはっきりさせておくと、無駄な作業が減ります。

1.1 代表的な指標

  • レイテンシ:1リクエストあたりの応答時間(秒、ミリ秒)
  • スループット:1秒あたりに処理できるリクエスト数(RPS)
  • 同時接続数:同時に捌けるユーザー・接続の数
  • エラー率:タイムアウトや500系エラーの割合

一般には、ユーザー体験に直結するのは「P95レイテンシ」(95%のリクエストがこの時間以内)とエラー率です。

1.2 「どこが遅いのか」をざっくり分類

FastAPIアプリでボトルネックになりやすいのは大きく次の3つです。

  1. アプリケーションコード(CPU処理・アルゴリズム・JSON変換など)
  2. 外部I/O(DB、外部API、ストレージ、メールなど)
  3. インフラ設定(ワーカー数、ポッド数、接続数上限、タイムアウトなど)

この記事では、1〜3をバランスよく改善するための「考え方」と「サンプル」を順に見ていきます。


2. 非同期I/Oとスレッド/プロセスの整理

FastAPIは非同期I/O(asyncawait)を前提とした設計で、うまく使うと同時接続に強くなります。ただ、「何でもかんでも非同期にすれば速くなる」というわけではありません。

2.1 非同期関数の基本

FastAPIでは、エンドポイントを async def で書くと、内部で非同期I/Oを使った処理をうまくスケジューリングしてくれます。

from fastapi import FastAPI
import httpx

app = FastAPI()

@app.get("/weather")
async def get_weather(city: str):
    async with httpx.AsyncClient(timeout=2.0) as client:
        r = await client.get(f"https://api.example.com/weather?city={city}")
    return r.json()

ここで重要なのは、

  • httpx.AsyncClient のように、非同期対応のクライアントを使うこと
  • I/O待ちの間にイベントループが他のリクエストを捌けること

です。

2.2 CPUバウンドな処理は別スレッド/プロセスに

数万件のループ処理や、画像変換・暗号化のようにCPUをたくさん使う処理は、非同期にしても速くなりません。むしろ、イベントループをブロックしてしまうので注意が必要です。

FastAPIでは、CPUバウンドな処理は次のような手段で逃がすのが定石です。

  • run_in_threadpool を使ってスレッドプールに投げる
  • バックグラウンドジョブ(Celeryなど)に回す

簡単な例を挙げると:

from fastapi import FastAPI
from starlette.concurrency import run_in_threadpool

app = FastAPI()

def heavy_calc(n: int) -> int:
    s = 0
    for i in range(n):
        s += i * i
    return s

@app.get("/heavy")
async def heavy_endpoint(n: int = 100_000_000):
    result = await run_in_threadpool(heavy_calc, n)
    return {"n": n, "result": result}

こうしておけば、イベントループは塞がらず、CPU処理はスレッド側で行われます。

2.3 Uvicorn/Gunicornワーカー数の考え方

本番では、Uvicorn単体ではなく「Gunicorn + UvicornWorker」構成で動かすことが多いです。このとき、よくある目安は:

  • ワーカー数:CPUコア数 × 2 前後
  • 1ワーカーあたりの同時処理数:非同期であればかなり多くても対応可

ただし、あくまで「目安」であり、実際には負荷試験をしてから調整するのが安全です。


3. DB接続プール(SQLAlchemy)の設計

FastAPIアプリで一番重いボトルネックになりがちなのがDB(RDB)です。ここでは、SQLAlchemyを例に「接続プール」の基本的な考え方を整理します。

3.1 接続プールとは

接続プールは、DBとの接続を再利用するための仕組みです。

  • 接続を毎回張り直す → 接続確立のオーバーヘッドが大きく、遅くなる
  • プールで再利用 → 二度目以降はすぐにクエリを投げられる

FastAPI + SQLAlchemy でのシンプルな例を見てみましょう。

# app/db.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "postgresql+psycopg://user:pass@db:5432/app"

engine = create_engine(
    DATABASE_URL,
    pool_size=10,          # 常にキープする接続数
    max_overflow=20,       # ピーク時に一時的に増やせる数
    pool_pre_ping=True,    # 死んだコネクションを検知して再接続
)

SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)

3.2 プール設定の考え方

  • pool_size

    • 1インスタンスあたりに常備する接続数
    • 大きすぎるとDB側の接続上限を超えやすい
    • 小さすぎると待ち行列が増えてレイテンシが伸びる
  • max_overflow

    • ピーク時に一時的に増やせる接続数
    • DBの接続上限と相談しながら、余裕を持たせすぎない
  • DB側の接続上限

    • 例:PostgreSQLの max_connections
    • 「APIインスタンス数 × pool_size × 安全係数」が、この値を超えないように設計

例えば:

  • APIインスタンス:3台
  • 1台あたり pool_size=10max_overflow=10
  • 理論最大接続数:3 × (10 + 10) = 60

この場合、DB側は max_connections をもう少し大きく設定しておく必要があります(他のサービスや管理ツール用の余裕も含めて)。

3.3 セッションのライフサイクルを適切に

FastAPIでは、DBセッションを依存性として扱い、リクエストごとに開いて閉じるのが一般的です。

# app/deps.py
from app.db import SessionLocal
from sqlalchemy.orm import Session

def get_db() -> Session:
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
# app/routers/articles.py
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app.deps import get_db

router = APIRouter()

@router.get("/articles")
def list_articles(db: Session = Depends(get_db)):
    return db.execute("SELECT id, title FROM articles").fetchall()

リクエストのたびにセッションを開き、処理が終わったら必ず閉じる。この「当たり前」を徹底することが、接続リークと性能劣化を防ぐ第一歩です。


4. キャッシュ戦略:インメモリ、Redis、HTTPキャッシュ

同じデータを何度も計算したり、何度もDBから読みに行くのは、パフォーマンス的に非常にもったいないです。ここでは、代表的なキャッシュ戦略を整理します。

4.1 「どこ」にキャッシュするか

大きく分けると3種類あります。

  1. アプリプロセス内(インメモリキャッシュ)

    • ライブラリ:functools.lru_cache、自前の辞書
    • プロセスごとのキャッシュなので、インスタンス間では共有されない
  2. 外部キャッシュ(Redisなど)

    • 複数インスタンスから共有可能
    • TTL(有効期限)をつけて自動的に更新できる
  3. HTTPレベルのキャッシュ

    • ETagCache-Control ヘッダを使って、ブラウザやCDNでキャッシュ
    • 静的コンテンツやあまり変化しないレスポンスに有効

4.2 インメモリキャッシュの簡単な例

設定値や変わりにくい外部APIのレスポンスは、まずはインメモリキャッシュから試してもよいです。

# app/services/configs.py
from functools import lru_cache
import httpx

@lru_cache(maxsize=128)
def fetch_remote_config() -> dict:
    # 実際には外部サービスから設定を取ってくる想定
    r = httpx.get("https://config.example.com/app-config")
    return r.json()

ただしインメモリは、「デプロイのたびに消える」「複数インスタンス間では共有されない」ことを理解したうえで使う必要があります。

4.3 Redisを使ったキャッシュのイメージ

より本格的なキャッシュを考えるなら、Redisを使うのが定番です。

# app/cache.py
import json
import aioredis
from typing import Any

class RedisCache:
    def __init__(self, url: str):
        self._url = url
        self._redis: aioredis.Redis | None = None

    async def init(self):
        self._redis = await aioredis.from_url(self._url, encoding="utf-8", decode_responses=True)

    async def get_json(self, key: str) -> Any | None:
        assert self._redis is not None
        data = await self._redis.get(key)
        if data is None:
            return None
        return json.loads(data)

    async def set_json(self, key: str, value: Any, ttl: int):
        assert self._redis is not None
        await self._redis.set(key, json.dumps(value), ex=ttl)
# app/routers/rankings.py
from fastapi import APIRouter, Depends
from app.cache import RedisCache
from app.deps import get_db

router = APIRouter()

async def get_cache() -> RedisCache:
    # 実際にはアプリ起動時にinitしておいたインスタンスを返す想定
    ...

@router.get("/rankings")
async def get_rankings(cache: RedisCache = Depends(get_cache), db = Depends(get_db)):
    cached = await cache.get_json("rankings:top10")
    if cached is not None:
        return {"source": "cache", "items": cached}

    # DBから重い集計を実行したと仮定
    rows = db.execute("SELECT ...").fetchall()
    items = [dict(row) for row in rows]

    await cache.set_json("rankings:top10", items, ttl=60)  # 60秒キャッシュ
    return {"source": "db", "items": items}

このように「キャッシュ → なければ計算 → キャッシュして返す」というパターンを、よく使うエンドポイントに少しずつ適用していきます。

4.4 HTTPキャッシュヘッダの活用

静的なJSONや、日次更新程度のデータであれば、HTTPキャッシュを使うのも有効です。

from fastapi import FastAPI, Response

app = FastAPI()

@app.get("/static-data")
def static_data():
    data = {"message": "変化が少ないデータ"}
    headers = {"Cache-Control": "public, max-age=300"}  # 5分キャッシュ
    return Response(content=str(data), media_type="application/json", headers=headers)

CDNやブラウザが賢くキャッシュしてくれるので、アプリ側の負荷をかなり減らせます。


5. レート制限でサービスを守る

パフォーマンスを考えるとき、「どれだけ速く処理するか」だけでなく「どれ以上は頑張らないか」を決めることもとても大切です。これがレート制限(Rate Limiting)の発想です。

5.1 なぜレート制限が必要?

  • 一部クライアントのバグやボットが、大量のリクエストを送ってくる
  • 予想外のアクセス集中が起きたときに、全部を頑張って捌こうとして全体が落ちてしまう
  • 無料プランや特定ユーザーに対して、使いすぎを防ぎたい

こうしたケースで、レート制限は「システム全体の安定性を守るための安全装置」になります。

5.2 トークンバケット方式の簡単なイメージ

代表的なアルゴリズムのひとつがトークンバケットです。

  • 「バケツ」に一定量のトークンをためておく
  • リクエスト1つにつきトークンを1枚使う
  • トークンは一定のレートで自動的に補充される(例:1秒に5枚)
  • トークンがなくなったら、そのユーザーからのリクエストは429(Too Many Requests)で断る

これをRedisなどで表現すると、複数インスタンスから共有のレート制限が可能になります。

5.3 簡易的な依存関数の例(インメモリ)

学習用として、まずはインメモリでの簡易実装例を挙げます。
(実運用ではRedisなど外部ストアを使うことをおすすめします)

# app/rate_limit.py
import time
from fastapi import HTTPException, status

class SimpleRateLimiter:
    def __init__(self, capacity: int, refill_rate_per_sec: float):
        self.capacity = capacity
        self.refill_rate = refill_rate_per_sec
        self.tokens = capacity
        self.last_refill = time.monotonic()

    def allow(self) -> bool:
        now = time.monotonic()
        elapsed = now - self.last_refill
        refill = elapsed * self.refill_rate
        if refill > 0:
            self.tokens = min(self.capacity, self.tokens + refill)
            self.last_refill = now
        if self.tokens >= 1:
            self.tokens -= 1
            return True
        return False

global_limiter = SimpleRateLimiter(capacity=10, refill_rate_per_sec=5)

def limit_global():
    if not global_limiter.allow():
        raise HTTPException(
            status_code=status.HTTP_429_TOO_MANY_REQUESTS,
            detail="Too many requests, please slow down.",
        )
# app/main.py
from fastapi import FastAPI, Depends
from app.rate_limit import limit_global

app = FastAPI()

@app.get("/limited", dependencies=[Depends(limit_global)])
def limited_endpoint():
    return {"ok": True}

この例だと、

  • 一度に10回までは連続でリクエスト可能
  • その後は、1秒あたり5回のペースで回復

というレート制限がかかります。

実運用では、IPアドレスやユーザーIDごとのレート制限、さらにRedisでの分散レート制限など、もう少し複雑な構成が必要になります。


6. ファイル配信はFastAPIから直接やらない

パフォーマンスの観点でとても大事なのが、「FastAPIでやらないこと」を決めることです。その代表例が「大きなファイル配信」です。

6.1 なぜアプリから直接配信しない方が良いか

  • APIワーカーがファイル読み書きに専念してしまい、CPUやメモリを無駄に使う
  • 高速なストレージ・CDNに比べてパフォーマンスが出ない
  • レート制限や署名付きURLで制御する方が、設計がシンプルになる

そのため、一般的には:

  • ファイル自体はS3互換ストレージやクラウドストレージに置く
  • FastAPIは「署名付きURLを発行する」役だけを担う

という構成が推奨されます。

6.2 署名付きURLを返すだけのAPI例

# app/storage.py
from datetime import datetime, timedelta
import hmac, hashlib, base64, urllib.parse

def generate_signed_url(object_key: str, expires_in: int = 300) -> str:
    # 実際にはS3やクラウドストレージのSDKを使うのが一般的です。
    # ここでは簡単なイメージ例に留めます。
    secret = b"secret-key"
    expires_at = int((datetime.utcnow() + timedelta(seconds=expires_in)).timestamp())
    signature_raw = f"{object_key}:{expires_at}".encode()
    sig = hmac.new(secret, signature_raw, hashlib.sha256).digest()
    sig_b64 = base64.urlsafe_b64encode(sig).decode().rstrip("=")
    query = urllib.parse.urlencode({"expires": expires_at, "sig": sig_b64})
    return f"https://cdn.example.com/files/{object_key}?{query}"
# app/routers/files.py
from fastapi import APIRouter, HTTPException
from app.storage import generate_signed_url

router = APIRouter()

@router.get("/files/{file_id}")
def get_file_url(file_id: str):
    # 実際にはDBからobject_keyを引くなど
    object_key = f"{file_id}.pdf"
    url = generate_signed_url(object_key, expires_in=300)
    if not url:
        raise HTTPException(404, "not found")
    return {"url": url}

こうすると、FastAPIは軽いAPIサーバとして動き続け、重たいファイル配信はCDNやストレージに任せられます。


7. 測定とプロファイル:直感に頼らない

パフォーマンスチューニングで一番大事なのは、「推測ではなく計測すること」です。

7.1 簡単な測定から始める

まずは、シンプルな「計測ミドルウェア」を入れて、エンドポイントごとの処理時間をログに出してみましょう。

# app/middleware/timing.py
import time
import logging
from starlette.middleware.base import BaseHTTPMiddleware

log = logging.getLogger("timing")

class TimingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        start = time.perf_counter()
        response = await call_next(request)
        dur_ms = int((time.perf_counter() - start) * 1000)
        log.info("path=%s method=%s status=%s dur_ms=%d",
                 request.url.path, request.method, response.status_code, dur_ms)
        return response
# app/main.py
from fastapi import FastAPI
from app.middleware.timing import TimingMiddleware

app = FastAPI()
app.add_middleware(TimingMiddleware)

このログを Kibana や Cloud Logging などで眺めるだけでも、「どのエンドポイントが特に遅いか」が見えてきます。

7.2 負荷試験ツールの活用

  • locustk6wrk などを使って、簡単な負荷試験をする
  • 例えば「秒間100リクエストを1分間投げたときのP95レイテンシ」を測る

このとき、

  • ワーカー数や接続プールの設定を変えて、どの組み合わせが最もバランスが良いか
  • キャッシュやレート制限を入れたときにどう変わるか

を比較しながら調整していくと、改善の手応えがつかみやすくなります。


8. サンプル:キャッシュ+レート制限を入れたミニAPI

ここまでの要素を少しだけ組み合わせて、簡単なミニAPI例をまとめてみます。
(動作させるにはRedisなどが必要ですが、雰囲気をつかむためのサンプルです)

# app/cache.py(Redisキャッシュ)
import json
from typing import Any
from redis import asyncio as aioredis

class RedisCache:
    def __init__(self, url: str):
        self.url = url
        self._redis: aioredis.Redis | None = None

    async def init(self):
        if self._redis is None:
            self._redis = aioredis.from_url(self.url, encoding="utf-8", decode_responses=True)

    async def get_json(self, key: str) -> Any | None:
        await self.init()
        data = await self._redis.get(key)
        if not data:
            return None
        return json.loads(data)

    async def set_json(self, key: str, value: Any, ttl: int):
        await self.init()
        await self._redis.set(key, json.dumps(value), ex=ttl)
# app/rate_limit.py(Redisを使った簡易レート制限)
import time
from fastapi import HTTPException, status
from redis import asyncio as aioredis

class RedisRateLimiter:
    def __init__(self, url: str, prefix: str = "rate"):
        self.url = url
        self.prefix = prefix
        self._redis: aioredis.Redis | None = None

    async def init(self):
        if self._redis is None:
            self._redis = aioredis.from_url(self.url, encoding="utf-8", decode_responses=True)

    async def allow(self, key: str, limit: int, window_sec: int) -> bool:
        await self.init()
        assert self._redis is not None
        now = int(time.time())
        window = now // window_sec
        redis_key = f"{self.prefix}:{key}:{window}"
        # カウンタをインクリメントしてTTLを設定
        p = self._redis.pipeline()
        p.incr(redis_key)
        p.expire(redis_key, window_sec)
        count, _ = await p.execute()
        return int(count) <= limit

    async def ensure(self, key: str, limit: int, window_sec: int):
        ok = await self.allow(key, limit, window_sec)
        if not ok:
            raise HTTPException(
                status_code=status.HTTP_429_TOO_MANY_REQUESTS,
                detail="Too many requests",
            )
# app/main.py(キャッシュ+レート制限を使うAPI)
from fastapi import FastAPI, Depends, Request
from app.cache import RedisCache
from app.rate_limit import RedisRateLimiter

app = FastAPI(title="Perf Demo")

cache = RedisCache("redis://redis:6379/0")
limiter = RedisRateLimiter("redis://redis:6379/0")

async def rate_limited(request: Request):
    # シンプルにIPアドレスで制限(本番ではX-Forwarded-For等の考慮が必要)
    client_ip = request.client.host
    await limiter.ensure(client_ip, limit=30, window_sec=60)  # 同一IPあたり1分30回まで

@app.get("/expensive", dependencies=[Depends(rate_limited)])
async def expensive_endpoint():
    cached = await cache.get_json("expensive:result")
    if cached is not None:
        return {"source": "cache", "value": cached}

    # 実際には重い計算や外部API呼び出しなど
    value = {"answer": 42}
    await cache.set_json("expensive:result", value, ttl=10)
    return {"source": "fresh", "value": value}

このミニAPIは、

  • IPごとに1分あたり30回までしか叩けない
  • 初回は「fresh」、その後10秒間は「cache」から高速に返る

という性質を持ちます。少し現実に近い雰囲気が見えてきたでしょうか。


9. よくある落とし穴とその対策

最後に、FastAPIのパフォーマンスチューニングでよくある落とし穴と、簡単な対策を一覧でまとめておきます。

症状 原因 対策
同時接続が増えると急に遅くなる 外部I/Oが同期ブロッキング、DB接続プール不足 非同期クライアントへ移行、プールサイズ調整、キャッシュ導入
レイテンシにバラつきが大きい 一部のエンドポイントが重い/N+1クエリ 遅いエンドポイントを特定し、クエリ最適化やプリフェッチを行う
DBの接続上限に達して落ちる インスタンス数×プールサイズが大きすぎ DBのmax_connectionsとAPI側のプール設定のバランスを取る
スパイクアクセスで全体が落ちる レート制限なしで無制限に処理 レート制限やキューイングで負荷を平準化
ファイルダウンロードが重い APIサーバが大容量ファイルを直接配信 CDN/オブジェクトストレージに移し、署名付きURLだけ返す
チューニングの効果が見えない 計測前後の比較をしていない ベンチマークやタイミングログで「変化」を必ず測る

10. 導入ロードマップ(どこから始めるか)

ここまで読んでいただいたうえで、実際に手を動かすなら、次のようなステップをおすすめします。

  1. 簡単なタイミングミドルウェアを入れて、どのエンドポイントが遅いかをログで可視化する。
  2. 遅いエンドポイントのうち、外部I/Oがボトルネックになっているものを「非同期I/O」や「接続プールの調整」で改善する。
  3. 繰り返し呼ばれる重い処理には、インメモリ or Redis キャッシュを導入し、キャッシュヒット率を観察する。
  4. レート制限を導入し、「どれ以上は頑張らないか」のラインを設計する。
  5. ファイル配信やバッチ系の重い処理を、CDN・ストレージ・バックグラウンドジョブ(Celeryなど)に切り出していく。
  6. 負荷試験ツールを使って、「ピーク時にP95レイテンシがどうなるか」を定期的に確認し、インフラ設定を調整する。

少しずつ「計測 → 仮説 → 改善 → 再計測」を回していくうちに、FastAPIアプリのクセや、システム全体の限界値が見えてきます。


まとめ

  • FastAPIのパフォーマンスは、「非同期I/O」「ワーカー設定」「DB接続プール」「キャッシュ」「レート制限」「外部ストレージ」といった複数の要素が組み合わさって決まります。
  • まずは、計測とログ可視化で「どこが遅いのか」を知り、外部I/OやN+1クエリなど、効果の大きいところから手を付けるのが効率的です。
  • 接続プール・キャッシュ・レート制限は、どれも「多くのシステムで再利用できるパターン」です。サンプルコードを参考に、自分のプロジェクト向けに少しずつ形を整えていってください。
  • いきなり完璧を目指す必要はまったくありません。小さな計測と改善を積み重ねていくことで、FastAPIは十分スケーラブルで頼れるAPIサーバになってくれます。

ここまで読んでくださって、本当にありがとうございます。
少しずつ、無理のないペースで、あなたのFastAPIアプリを「速くて、安定していて、扱いやすい」サービスへ育てていきましょう。


投稿者 greeden

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

日本語が含まれない投稿は無視されますのでご注意ください。(スパム対策)