green snake
Photo by Pixabay on Pexels.com

リアルタイムを武器にする:FastAPIで実装するWebSocket/SSE通知――接続管理、房(ルーム)、再接続、スケールアウト、監視


要約(インバーテッドピラミッド)

  • ユーザーへの即時反映はWebSocketまたはSSEで実現する。用途により選ぶ。
  • 1台での実装はWebSocketエンドポイントと接続管理(接続プール、房)だけで動く。
  • 複数インスタンスにスケールするなら、Redis Pub/Subなどでメッセージの横流しを行う。
  • 健全運用にはハートビート、バックプレッシャ、再接続、認証とスコープ、監視が必要。
  • NginxやCDNのタイムアウト、CORS/WebSocket設定、負荷試験まで含めて設計する。

誰が読んで得をするか

  • 学習者Aさん(学部4年)
    チャットや通知をリアルタイムにしたい。WebSocketとSSEの違いから最小構成を知りたい。
  • 小規模チームBさん(受託3名)
    管理ダッシュボードの数値を即時更新したい。接続管理と認証、房配信を安全に実装したい。
  • SaaS開発Cさん(スタートアップ)
    コンテナを増やしても通知が届く構成にしたい。Redisでの横連携、ハートビート、レート制限を整えたい。

アクセシビリティ評価

  • 章毎に短い段落と箇条書きで構造化。コードは固定幅、コメントは簡潔。
  • 初学者が迷いやすい選択(WebSocket vs SSE)を冒頭で整理。落とし穴と回避策を表にまとめる。
  • 総合レベル:AA相当。

1. 方式の選択:WebSocket or SSE?

  • WebSocket:双方向。チャット、共同編集、ゲームなどクライアント→サーバの頻繁な送信がある用途に向く。
  • SSE(Server-Sent Events):片方向(サーバ→クライアント)。ストック価格、通知バッジ、ジョブ進捗など配信中心に向く。HTTP/1.1で動き、プロキシ越えが容易。

判断ポイント

  • 双方向が必要ならWebSocket。配信中心でブラウザ互換と障害に強くしたいならSSE。
  • 大量同時接続はSSEの方がインフラ相性が良いケースがある(HTTP/2やCDN活用)。
  • モバイルアプリはWebSocketの採用が多い。

2. 最小のWebSocket(1台構成)

2.1 接続管理

# app/realtime/manager.py
from typing import Dict, Set
from fastapi import WebSocket

class ConnectionManager:
    def __init__(self):
        self.active: Set[WebSocket] = set()
        self.rooms: Dict[str, Set[WebSocket]] = {}

    async def connect(self, ws: WebSocket, room: str | None = None):
        await ws.accept()
        self.active.add(ws)
        if room:
            self.rooms.setdefault(room, set()).add(ws)

    def disconnect(self, ws: WebSocket):
        self.active.discard(ws)
        for r in list(self.rooms.values()):
            r.discard(ws)

    async def send_to_ws(self, ws: WebSocket, data: dict):
        await ws.send_json(data)

    async def broadcast_all(self, data: dict):
        for ws in list(self.active):
            try:
                await ws.send_json(data)
            except Exception:
                self.disconnect(ws)

    async def broadcast_room(self, room: str, data: dict):
        for ws in list(self.rooms.get(room, set())):
            try:
                await ws.send_json(data)
            except Exception:
                self.disconnect(ws)

2.2 ルーター

# app/realtime/ws.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query
from app.realtime.manager import ConnectionManager

router = APIRouter()
manager = ConnectionManager()

@router.websocket("/ws")
async def ws_endpoint(ws: WebSocket, room: str | None = Query(None)):
    await manager.connect(ws, room)
    try:
        while True:
            msg = await ws.receive_json()
            # 例:部屋にエコー
            if room:
                await manager.broadcast_room(room, {"echo": msg})
            else:
                await manager.send_to_ws(ws, {"echo": msg})
    except WebSocketDisconnect:
        manager.disconnect(ws)

ポイント

  • accept()は必ず最初に呼ぶ。
  • 切断時は全ての集合から除去。例外時にリークさせない。

3. 認証とスコープ

WebSocketはHTTPハンドシェイク後にアップグレードする。BearerトークンやCookieで認証し、房参加の権限チェックを行う。

# app/realtime/auth.py
from fastapi import WebSocket, HTTPException, status
from app.security.jwt import decode_token  # 以前の記事のJWTを再利用

async def authenticate_ws(ws: WebSocket):
    auth = ws.headers.get("authorization", "")
    if not auth.lower().startswith("bearer "):
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="missing token")
    token = auth.split(" ", 1)[1]
    data = decode_token(token)
    scopes = set(str(data.get("scope","")).split())
    return {"sub": data["sub"], "scopes": scopes}

適用例:

# app/realtime/ws.py(認証付)
from fastapi import Depends
from app.realtime.auth import authenticate_ws

@router.websocket("/ws/secure")
async def ws_secure(ws: WebSocket, user = Depends(authenticate_ws), room: str | None = Query(None)):
    await manager.connect(ws, room)
    try:
        await manager.send_to_ws(ws, {"hello": user["sub"]})
        while True:
            msg = await ws.receive_json()
            if "articles:write" in user["scopes"]:
                await manager.broadcast_room(room or "public", {"by": user["sub"], "msg": msg})
            else:
                await manager.send_to_ws(ws, {"error": "insufficient scope"})
    except WebSocketDisconnect:
        manager.disconnect(ws)

4. ハートビートと再接続

  • サーバ→クライアントへ一定間隔でpingを送り、応答が無い接続を切る。
  • クライアントは指数バックオフで再接続。再接続時に最後に受け取ったイベントIDを送ると抜けを埋められる(SSEはLast-Event-IDが標準)。

サーバ側例:

# app/realtime/heartbeat.py
import asyncio, json
from app.realtime.manager import ConnectionManager

async def heartbeat(manager: ConnectionManager, interval=30):
    while True:
        await asyncio.sleep(interval)
        await manager.broadcast_all({"type": "ping"})

起動時にバックグラウンドで走らせる(startupイベントなど)。


5. バックプレッシャとサイズ制限

  • 送信キューを採用し、一定長を超えたら古い更新を間引く。
  • 1メッセージの最大サイズ、秒間送信数を制限し、悪意あるクライアントから守る。

簡易キュー例:

# app/realtime/queue.py
import asyncio
from collections import deque

class SendQueue:
    def __init__(self, maxlen=1000):
        self.q = deque(maxlen=maxlen)
        self.cv = asyncio.Condition()

    async def put(self, item):
        async with self.cv:
            self.q.append(item)
            self.cv.notify()

    async def consume(self):
        while True:
            async with self.cv:
                while not self.q:
                    await self.cv.wait()
                item = self.q.popleft()
            yield item

6. スケールアウト:Redis Pub/Subで横連携

複数インスタンス間の配信は、アプリ内の接続集合だけでは届かない。Redis Pub/Subを挟み、メッセージを「全インスタンスへ中継」する。

6.1 構成

  • 各インスタンス:ローカル接続へ配信+Redisの特定チャネルを購読。
  • 発行時:ローカルにも送りつつ、Redisにpublish

6.2 実装例(async版)

# app/realtime/bus.py
from redis import asyncio as aioredis
from typing import Callable

class RedisBus:
    def __init__(self, url: str, channel: str):
        self.url, self.channel = url, channel
        self.redis: aioredis.Redis | None = None

    async def start(self, on_message: Callable[[dict], None]):
        self.redis = aioredis.from_url(self.url, encoding="utf-8", decode_responses=True)
        pubsub = self.redis.pubsub()
        await pubsub.subscribe(self.channel)
        async for m in pubsub.listen():
            if m.get("type") == "message":
                on_message(eval(m["data"]))  # 実運用はJSONに

    async def publish(self, msg: dict):
        if not self.redis:
            self.redis = aioredis.from_url(self.url, encoding="utf-8", decode_responses=True)
        await self.redis.publish(self.channel, repr(msg))

6.3 連携ポイント

  • broadcast_roomで送る前にbus.publish({"room": room, "data": payload})
  • on_messagemanager.broadcast_room(m["room"], m["data"]) を呼ぶ。
  • これでどのインスタンスからでも同じ部屋に届く。

7. SSE(片方向配信)の最小実装

# app/realtime/sse.py
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
import asyncio, json, time

router = APIRouter()

async def event_stream(queue):
    while True:
        data = await queue.get()
        yield f"id: {int(time.time()*1000)}\n"
        yield "event: message\n"
        yield f"data: {json.dumps(data)}\n\n"

@router.get("/sse")
async def sse_endpoint(request: Request):
    queue = asyncio.Queue()
    # どこかでqueueへput。ここではデモ的に1秒ごとに送るタスクを起動してもよい
    async def ticker():
        while True:
            await asyncio.sleep(1)
            await queue.put({"now": time.time()})
    asyncio.create_task(ticker())
    headers = {
        "Cache-Control": "no-cache",
        "Content-Type": "text/event-stream",
        "Connection": "keep-alive",
    }
    return StreamingResponse(event_stream(queue), headers=headers)

ポイント

  • 1接続=1レスポンスストリーム。data:で1行、空行で区切る。
  • 再接続時はLast-Event-IDで追いつく仕組みを用意すると良い。

8. Nginx/プロキシの設定とタイムアウト

  • WebSocket:Upgrade/Connection ヘッダを必ず中継。proxy_read_timeout を十分に長く。
  • SSE:中継でバッファされないようX-Accel-Buffering: no相当を設定可能なら付与。
  • client_max_body_size は上り方向のサイズに応じて調整。
  • HTTP/2でSSEを多重化できると接続数が節約できる。

9. 監視と運用

  • メトリクス:同時接続数、接続寿命、再接続率、平均メッセージサイズ、秒間送受信数、ドロップ数。
  • ログ:接続/切断、認証失敗、房参加/離脱、キュー溢れ。
  • 負荷試験:autocannonwrk+自作クライアントで同時接続とメッセージの両方を検証。

10. よくある落とし穴と回避策

症状 原因 対策
スケールすると届かない インスタンス間連携なし Redis Pub/Subやメッセージブローカーで横流し
途中で切れる プロキシのアイドルタイムアウト proxy_read_timeout延長、心拍送信
メモリが増え続ける 切断漏れ・キュー無限伸長 例外時のdisconnect、最大長付きキュー
権限逸脱 房参加の認可なし 房参加時にスコープ検査、トークン検証
再接続の嵐 バックオフ未実装 クライアントに指数バックオフ、サーバは接続上限と429

11. 例:一気通貫のミニアプリ

# app/main.py
from fastapi import FastAPI
from app.realtime import ws as ws_router
from app.realtime import sse as sse_router

app = FastAPI(title="Realtime Demo")
app.include_router(ws_router.router, prefix="/realtime")
app.include_router(sse_router.router, prefix="/realtime")

@app.get("/health")
def health():
    return {"ok": True}

12. セキュリティ

  • 認証:ハンドシェイクでJWT/Cookieを検査。房はスコープ必須。
  • 入力検証:クライアントからのメッセージJSONは必ずスキーマ検証。
  • レート制限:接続数と送信頻度をIP/ユーザー単位で制御。
  • 機密情報:ログにペイロードやトークンを出さない。
  • CORS/Origin:SSEはCORS、WebSocketはOriginチェックを実装。

13. 段階導入ロードマップ

  1. 単一インスタンスで接続管理と房を実装。
  2. 認証とスコープ。ハートビートと再接続仕様を決める。
  3. Redis Pub/Sub連携でスケールアウト。
  4. バックプレッシャ、レート制限、監視メトリクスを追加。
  5. プロキシ設定・負荷試験・ダッシュボード整備。

参考リンク


まとめ

  • まず要件に合わせてWebSocketかSSEを選ぶ。双方向はWebSocket、配信中心はSSE。
  • 単一インスタンスでは接続プールと房、認証、心拍で十分動く。
  • スケール時はRedis Pub/Subで横連携し、バックプレッシャとレート制限で安定性を確保。
  • 監視とプロキシ設定まで含めた「運用視点」で設計すると、リアルタイムは武器になる。

投稿者 greeden

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

日本語が含まれない投稿は無視されますのでご注意ください。(スパム対策)