リアルタイムを武器にする:FastAPIで実装するWebSocket/SSE通知――接続管理、房(ルーム)、再接続、スケールアウト、監視
要約(インバーテッドピラミッド)
- ユーザーへの即時反映はWebSocketまたはSSEで実現する。用途により選ぶ。
- 1台での実装は
WebSocketエンドポイントと接続管理(接続プール、房)だけで動く。 - 複数インスタンスにスケールするなら、Redis Pub/Subなどでメッセージの横流しを行う。
- 健全運用にはハートビート、バックプレッシャ、再接続、認証とスコープ、監視が必要。
- NginxやCDNのタイムアウト、CORS/WebSocket設定、負荷試験まで含めて設計する。
誰が読んで得をするか
- 学習者Aさん(学部4年)
チャットや通知をリアルタイムにしたい。WebSocketとSSEの違いから最小構成を知りたい。 - 小規模チームBさん(受託3名)
管理ダッシュボードの数値を即時更新したい。接続管理と認証、房配信を安全に実装したい。 - SaaS開発Cさん(スタートアップ)
コンテナを増やしても通知が届く構成にしたい。Redisでの横連携、ハートビート、レート制限を整えたい。
アクセシビリティ評価
- 章毎に短い段落と箇条書きで構造化。コードは固定幅、コメントは簡潔。
- 初学者が迷いやすい選択(WebSocket vs SSE)を冒頭で整理。落とし穴と回避策を表にまとめる。
- 総合レベル:AA相当。
1. 方式の選択:WebSocket or SSE?
- WebSocket:双方向。チャット、共同編集、ゲームなどクライアント→サーバの頻繁な送信がある用途に向く。
- SSE(Server-Sent Events):片方向(サーバ→クライアント)。ストック価格、通知バッジ、ジョブ進捗など配信中心に向く。HTTP/1.1で動き、プロキシ越えが容易。
判断ポイント
- 双方向が必要ならWebSocket。配信中心でブラウザ互換と障害に強くしたいならSSE。
- 大量同時接続はSSEの方がインフラ相性が良いケースがある(HTTP/2やCDN活用)。
- モバイルアプリはWebSocketの採用が多い。
2. 最小のWebSocket(1台構成)
2.1 接続管理
# app/realtime/manager.py
from typing import Dict, Set
from fastapi import WebSocket
class ConnectionManager:
def __init__(self):
self.active: Set[WebSocket] = set()
self.rooms: Dict[str, Set[WebSocket]] = {}
async def connect(self, ws: WebSocket, room: str | None = None):
await ws.accept()
self.active.add(ws)
if room:
self.rooms.setdefault(room, set()).add(ws)
def disconnect(self, ws: WebSocket):
self.active.discard(ws)
for r in list(self.rooms.values()):
r.discard(ws)
async def send_to_ws(self, ws: WebSocket, data: dict):
await ws.send_json(data)
async def broadcast_all(self, data: dict):
for ws in list(self.active):
try:
await ws.send_json(data)
except Exception:
self.disconnect(ws)
async def broadcast_room(self, room: str, data: dict):
for ws in list(self.rooms.get(room, set())):
try:
await ws.send_json(data)
except Exception:
self.disconnect(ws)
2.2 ルーター
# app/realtime/ws.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query
from app.realtime.manager import ConnectionManager
router = APIRouter()
manager = ConnectionManager()
@router.websocket("/ws")
async def ws_endpoint(ws: WebSocket, room: str | None = Query(None)):
await manager.connect(ws, room)
try:
while True:
msg = await ws.receive_json()
# 例:部屋にエコー
if room:
await manager.broadcast_room(room, {"echo": msg})
else:
await manager.send_to_ws(ws, {"echo": msg})
except WebSocketDisconnect:
manager.disconnect(ws)
ポイント
accept()は必ず最初に呼ぶ。- 切断時は全ての集合から除去。例外時にリークさせない。
3. 認証とスコープ
WebSocketはHTTPハンドシェイク後にアップグレードする。BearerトークンやCookieで認証し、房参加の権限チェックを行う。
# app/realtime/auth.py
from fastapi import WebSocket, HTTPException, status
from app.security.jwt import decode_token # 以前の記事のJWTを再利用
async def authenticate_ws(ws: WebSocket):
auth = ws.headers.get("authorization", "")
if not auth.lower().startswith("bearer "):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="missing token")
token = auth.split(" ", 1)[1]
data = decode_token(token)
scopes = set(str(data.get("scope","")).split())
return {"sub": data["sub"], "scopes": scopes}
適用例:
# app/realtime/ws.py(認証付)
from fastapi import Depends
from app.realtime.auth import authenticate_ws
@router.websocket("/ws/secure")
async def ws_secure(ws: WebSocket, user = Depends(authenticate_ws), room: str | None = Query(None)):
await manager.connect(ws, room)
try:
await manager.send_to_ws(ws, {"hello": user["sub"]})
while True:
msg = await ws.receive_json()
if "articles:write" in user["scopes"]:
await manager.broadcast_room(room or "public", {"by": user["sub"], "msg": msg})
else:
await manager.send_to_ws(ws, {"error": "insufficient scope"})
except WebSocketDisconnect:
manager.disconnect(ws)
4. ハートビートと再接続
- サーバ→クライアントへ一定間隔で
pingを送り、応答が無い接続を切る。 - クライアントは指数バックオフで再接続。再接続時に最後に受け取ったイベントIDを送ると抜けを埋められる(SSEは
Last-Event-IDが標準)。
サーバ側例:
# app/realtime/heartbeat.py
import asyncio, json
from app.realtime.manager import ConnectionManager
async def heartbeat(manager: ConnectionManager, interval=30):
while True:
await asyncio.sleep(interval)
await manager.broadcast_all({"type": "ping"})
起動時にバックグラウンドで走らせる(startupイベントなど)。
5. バックプレッシャとサイズ制限
- 送信キューを採用し、一定長を超えたら古い更新を間引く。
- 1メッセージの最大サイズ、秒間送信数を制限し、悪意あるクライアントから守る。
簡易キュー例:
# app/realtime/queue.py
import asyncio
from collections import deque
class SendQueue:
def __init__(self, maxlen=1000):
self.q = deque(maxlen=maxlen)
self.cv = asyncio.Condition()
async def put(self, item):
async with self.cv:
self.q.append(item)
self.cv.notify()
async def consume(self):
while True:
async with self.cv:
while not self.q:
await self.cv.wait()
item = self.q.popleft()
yield item
6. スケールアウト:Redis Pub/Subで横連携
複数インスタンス間の配信は、アプリ内の接続集合だけでは届かない。Redis Pub/Subを挟み、メッセージを「全インスタンスへ中継」する。
6.1 構成
- 各インスタンス:ローカル接続へ配信+Redisの特定チャネルを購読。
- 発行時:ローカルにも送りつつ、Redisに
publish。
6.2 実装例(async版)
# app/realtime/bus.py
from redis import asyncio as aioredis
from typing import Callable
class RedisBus:
def __init__(self, url: str, channel: str):
self.url, self.channel = url, channel
self.redis: aioredis.Redis | None = None
async def start(self, on_message: Callable[[dict], None]):
self.redis = aioredis.from_url(self.url, encoding="utf-8", decode_responses=True)
pubsub = self.redis.pubsub()
await pubsub.subscribe(self.channel)
async for m in pubsub.listen():
if m.get("type") == "message":
on_message(eval(m["data"])) # 実運用はJSONに
async def publish(self, msg: dict):
if not self.redis:
self.redis = aioredis.from_url(self.url, encoding="utf-8", decode_responses=True)
await self.redis.publish(self.channel, repr(msg))
6.3 連携ポイント
broadcast_roomで送る前にbus.publish({"room": room, "data": payload})。on_messageはmanager.broadcast_room(m["room"], m["data"])を呼ぶ。- これでどのインスタンスからでも同じ部屋に届く。
7. SSE(片方向配信)の最小実装
# app/realtime/sse.py
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
import asyncio, json, time
router = APIRouter()
async def event_stream(queue):
while True:
data = await queue.get()
yield f"id: {int(time.time()*1000)}\n"
yield "event: message\n"
yield f"data: {json.dumps(data)}\n\n"
@router.get("/sse")
async def sse_endpoint(request: Request):
queue = asyncio.Queue()
# どこかでqueueへput。ここではデモ的に1秒ごとに送るタスクを起動してもよい
async def ticker():
while True:
await asyncio.sleep(1)
await queue.put({"now": time.time()})
asyncio.create_task(ticker())
headers = {
"Cache-Control": "no-cache",
"Content-Type": "text/event-stream",
"Connection": "keep-alive",
}
return StreamingResponse(event_stream(queue), headers=headers)
ポイント
- 1接続=1レスポンスストリーム。
data:で1行、空行で区切る。 - 再接続時は
Last-Event-IDで追いつく仕組みを用意すると良い。
8. Nginx/プロキシの設定とタイムアウト
- WebSocket:
Upgrade/Connectionヘッダを必ず中継。proxy_read_timeoutを十分に長く。 - SSE:中継でバッファされないよう
X-Accel-Buffering: no相当を設定可能なら付与。 client_max_body_sizeは上り方向のサイズに応じて調整。- HTTP/2でSSEを多重化できると接続数が節約できる。
9. 監視と運用
- メトリクス:同時接続数、接続寿命、再接続率、平均メッセージサイズ、秒間送受信数、ドロップ数。
- ログ:接続/切断、認証失敗、房参加/離脱、キュー溢れ。
- 負荷試験:
autocannonやwrk+自作クライアントで同時接続とメッセージの両方を検証。
10. よくある落とし穴と回避策
| 症状 | 原因 | 対策 |
|---|---|---|
| スケールすると届かない | インスタンス間連携なし | Redis Pub/Subやメッセージブローカーで横流し |
| 途中で切れる | プロキシのアイドルタイムアウト | proxy_read_timeout延長、心拍送信 |
| メモリが増え続ける | 切断漏れ・キュー無限伸長 | 例外時のdisconnect、最大長付きキュー |
| 権限逸脱 | 房参加の認可なし | 房参加時にスコープ検査、トークン検証 |
| 再接続の嵐 | バックオフ未実装 | クライアントに指数バックオフ、サーバは接続上限と429 |
11. 例:一気通貫のミニアプリ
# app/main.py
from fastapi import FastAPI
from app.realtime import ws as ws_router
from app.realtime import sse as sse_router
app = FastAPI(title="Realtime Demo")
app.include_router(ws_router.router, prefix="/realtime")
app.include_router(sse_router.router, prefix="/realtime")
@app.get("/health")
def health():
return {"ok": True}
12. セキュリティ
- 認証:ハンドシェイクでJWT/Cookieを検査。房はスコープ必須。
- 入力検証:クライアントからのメッセージJSONは必ずスキーマ検証。
- レート制限:接続数と送信頻度をIP/ユーザー単位で制御。
- 機密情報:ログにペイロードやトークンを出さない。
- CORS/Origin:SSEはCORS、WebSocketはOriginチェックを実装。
13. 段階導入ロードマップ
- 単一インスタンスで接続管理と房を実装。
- 認証とスコープ。ハートビートと再接続仕様を決める。
- Redis Pub/Sub連携でスケールアウト。
- バックプレッシャ、レート制限、監視メトリクスを追加。
- プロキシ設定・負荷試験・ダッシュボード整備。
参考リンク
- FastAPI
- Starlette
- Redis
- MDN
- Nginx
まとめ
- まず要件に合わせてWebSocketかSSEを選ぶ。双方向はWebSocket、配信中心はSSE。
- 単一インスタンスでは接続プールと房、認証、心拍で十分動く。
- スケール時はRedis Pub/Subで横連携し、バックプレッシャとレート制限で安定性を確保。
- 監視とプロキシ設定まで含めた「運用視点」で設計すると、リアルタイムは武器になる。
