green snake
Photo by Pixabay on Pexels.com

Weaponizing Real Time: WebSocket/SSE Notifications with FastAPI — Connection Management, Rooms, Reconnection, Scale-Out, and Observability


Summary (Inverted Pyramid)

  • Immediate UI updates are delivered via WebSocket or SSE. Choose per use case.
  • For a single instance, a WebSocket endpoint plus connection management (connection pool, rooms) is enough.
  • To scale across multiple instances, fan-out messages via Redis Pub/Sub or similar.
  • Healthy operations require heartbeats, backpressure, reconnection, auth & scopes, and monitoring.
  • Design end-to-end, including Nginx/CDN timeouts, CORS/WebSocket config, and load testing.

Who Will Benefit

  • Learner A (Senior undergrad)
    Wants real-time chat/notifications. Needs differences between WebSocket and SSE and the minimal setup.
  • Small Team B (3-person agency)
    Wants a management dashboard with instant updates. Needs safe implementation of connection management, auth, and room broadcasting.
  • SaaS Developer C (startup)
    Wants notifications to arrive even as containers scale out. Needs Redis fan-out, heartbeats, and rate limiting.

Accessibility Assessment

  • Structured with short paragraphs and bullet points per chapter. Code uses monospaced font; comments are concise.
  • Disambiguates early choices beginners often stumble on (WebSocket vs SSE). Pitfalls and mitigations summarized in a table.
  • Overall level: roughly AA.

1. Choosing the Approach: WebSocket or SSE?

  • WebSocket: bidirectional. Best for use cases with frequent client → server sends such as chat, collaborative editing, games.
  • SSE (Server-Sent Events): unidirectional (server → client). Best when broadcasting dominates: stock prices, notification badges, job progress. Works over HTTP/1.1 and traverses proxies easily.

Decision points

  • If you need bidirectional, use WebSocket. If broadcast-centric and you want broad browser support and resilience, choose SSE.
  • For very large concurrent connections, SSE can be a better infra fit in some cases (leveraging HTTP/2 and CDNs).
  • Mobile apps tend to adopt WebSocket.

2. Minimal WebSocket (Single-Instance)

2.1 Connection Management

# app/realtime/manager.py
from typing import Dict, Set
from fastapi import WebSocket

class ConnectionManager:
    def __init__(self):
        self.active: Set[WebSocket] = set()
        self.rooms: Dict[str, Set[WebSocket]] = {}

    async def connect(self, ws: WebSocket, room: str | None = None):
        await ws.accept()
        self.active.add(ws)
        if room:
            self.rooms.setdefault(room, set()).add(ws)

    def disconnect(self, ws: WebSocket):
        self.active.discard(ws)
        for r in list(self.rooms.values()):
            r.discard(ws)

    async def send_to_ws(self, ws: WebSocket, data: dict):
        await ws.send_json(data)

    async def broadcast_all(self, data: dict):
        for ws in list(self.active):
            try:
                await ws.send_json(data)
            except Exception:
                self.disconnect(ws)

    async def broadcast_room(self, room: str, data: dict):
        for ws in list(self.rooms.get(room, set())):
            try:
                await ws.send_json(data)
            except Exception:
                self.disconnect(ws)

2.2 Router

# app/realtime/ws.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query
from app.realtime.manager import ConnectionManager

router = APIRouter()
manager = ConnectionManager()

@router.websocket("/ws")
async def ws_endpoint(ws: WebSocket, room: str | None = Query(None)):
    await manager.connect(ws, room)
    try:
        while True:
            msg = await ws.receive_json()
            # Example: echo to the room
            if room:
                await manager.broadcast_room(room, {"echo": msg})
            else:
                await manager.send_to_ws(ws, {"echo": msg})
    except WebSocketDisconnect:
        manager.disconnect(ws)

Key points

  • Always call accept() first.
  • On disconnect, remove from all sets. Don’t leak on exceptions.

3. Authentication and Scopes

WebSocket upgrades after the HTTP handshake. Authenticate using a Bearer token or Cookie, and check authorization for room participation.

# app/realtime/auth.py
from fastapi import WebSocket, HTTPException, status
from app.security.jwt import decode_token  # Reuse your previous JWT implementation

async def authenticate_ws(ws: WebSocket):
    auth = ws.headers.get("authorization", "")
    if not auth.lower().startswith("bearer "):
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="missing token")
    token = auth.split(" ", 1)[1]
    data = decode_token(token)
    scopes = set(str(data.get("scope","")).split())
    return {"sub": data["sub"], "scopes": scopes}

Usage example:

# app/realtime/ws.py (with auth)
from fastapi import Depends
from app.realtime.auth import authenticate_ws

@router.websocket("/ws/secure")
async def ws_secure(ws: WebSocket, user = Depends(authenticate_ws), room: str | None = Query(None)):
    await manager.connect(ws, room)
    try:
        await manager.send_to_ws(ws, {"hello": user["sub"]})
        while True:
            msg = await ws.receive_json()
            if "articles:write" in user["scopes"]:
                await manager.broadcast_room(room or "public", {"by": user["sub"], "msg": msg})
            else:
                await manager.send_to_ws(ws, {"error": "insufficient scope"})
    except WebSocketDisconnect:
        manager.disconnect(ws)

4. Heartbeats and Reconnection

  • Send periodic ping server → client and drop connections that don’t respond.
  • Clients reconnect with exponential backoff. On reconnect, send the last received event ID to fill gaps (SSE standardizes this via Last-Event-ID).

Server-side example:

# app/realtime/heartbeat.py
import asyncio, json
from app.realtime.manager import ConnectionManager

async def heartbeat(manager: ConnectionManager, interval=30):
    while True:
        await asyncio.sleep(interval)
        await manager.broadcast_all({"type": "ping"})

Run this in a background task (e.g., on the app startup event).


5. Backpressure and Size Limits

  • Use a send queue; when it exceeds a threshold, drop stale updates.
  • Enforce per-message max size and per-second send limits to protect against abusive clients.

Simple queue example:

# app/realtime/queue.py
import asyncio
from collections import deque

class SendQueue:
    def __init__(self, maxlen=1000):
        self.q = deque(maxlen=maxlen)
        self.cv = asyncio.Condition()

    async def put(self, item):
        async with self.cv:
            self.q.append(item)
            self.cv.notify()

    async def consume(self):
        while True:
            async with self.cv:
                while not self.q:
                    await self.cv.wait()
                item = self.q.popleft()
            yield item

6. Scale-Out: Cross-Instance Fan-Out with Redis Pub/Sub

Local connection sets won’t reach clients connected to other instances. Insert Redis Pub/Sub to “relay” messages to every instance.

6.1 Architecture

  • Each instance: deliver to local connections and subscribe to a Redis channel.
  • When publishing: send locally and publish to Redis.

6.2 Example Implementation (async)

# app/realtime/bus.py
from redis import asyncio as aioredis
from typing import Callable

class RedisBus:
    def __init__(self, url: str, channel: str):
        self.url, self.channel = url, channel
        self.redis: aioredis.Redis | None = None

    async def start(self, on_message: Callable[[dict], None]):
        self.redis = aioredis.from_url(self.url, encoding="utf-8", decode_responses=True)
        pubsub = self.redis.pubsub()
        await pubsub.subscribe(self.channel)
        async for m in pubsub.listen():
            if m.get("type") == "message":
                on_message(eval(m["data"]))  # Use JSON in production

    async def publish(self, msg: dict):
        if not self.redis:
            self.redis = aioredis.from_url(self.url, encoding="utf-8", decode_responses=True)
        await self.redis.publish(self.channel, repr(msg))

6.3 Integration Points

  • Before broadcast_room, call bus.publish({"room": room, "data": payload}).
  • In on_message, call manager.broadcast_room(m["room"], m["data"]).
  • Now any instance can reach the same room.

7. Minimal SSE (One-Way Broadcast)

# app/realtime/sse.py
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
import asyncio, json, time

router = APIRouter()

async def event_stream(queue):
    while True:
        data = await queue.get()
        yield f"id: {int(time.time()*1000)}\n"
        yield "event: message\n"
        yield f"data: {json.dumps(data)}\n\n"

@router.get("/sse")
async def sse_endpoint(request: Request):
    queue = asyncio.Queue()
    # Put items into the queue elsewhere; for demo, start a 1s ticker task
    async def ticker():
        while True:
            await asyncio.sleep(1)
            await queue.put({"now": time.time()})
    asyncio.create_task(ticker())
    headers = {
        "Cache-Control": "no-cache",
        "Content-Type": "text/event-stream",
        "Connection": "keep-alive",
    }
    return StreamingResponse(event_stream(queue), headers=headers)

Key points

  • One connection = one response stream. Use data: per line and separate events with a blank line.
  • On reconnect, support catching up via Last-Event-ID if possible.

8. Nginx/Proxy Settings and Timeouts

  • WebSocket: ensure Upgrade / Connection headers are passed through. Set a sufficiently long proxy_read_timeout.
  • SSE: avoid proxy buffering; add X-Accel-Buffering: no (or equivalent) when possible.
  • Adjust client_max_body_size to your upstream payload needs.
  • Multiplexing SSE over HTTP/2 can reduce connection counts.

9. Monitoring and Operations

  • Metrics: concurrent connections, connection lifetime, reconnection rate, avg message size, msgs/sec in/out, drops.
  • Logs: connect/disconnect, auth failures, room join/leave, queue overflows.
  • Load testing: validate both concurrent connections and message flow with autocannon or wrk plus custom clients.

10. Common Pitfalls and Fixes

Symptom Cause Mitigation
Messages lost at scale No cross-instance relay Redis Pub/Sub or a message broker for fan-out
Abrupt disconnects Proxy idle timeouts Extend proxy_read_timeout, send heartbeats
Memory keeps growing Disconnect leaks or unbounded queues Ensure disconnect on exceptions; cap queues
Privilege escalation No authorization for room joins Enforce scope checks on join; validate tokens
Reconnection storms No backoff Client exponential backoff; server-side connection caps and 429

11. Example: End-to-End Mini App

# app/main.py
from fastapi import FastAPI
from app.realtime import ws as ws_router
from app.realtime import sse as sse_router

app = FastAPI(title="Realtime Demo")
app.include_router(ws_router.router, prefix="/realtime")
app.include_router(sse_router.router, prefix="/realtime")

@app.get("/health")
def health():
    return {"ok": True}

12. Security

  • Auth: verify JWT/Cookie during handshake. Rooms require scopes.
  • Input validation: always schema-validate JSON from clients.
  • Rate limiting: control connection counts and send frequency per IP/user.
  • Secrets: don’t log payloads or tokens.
  • CORS/Origin: enable CORS for SSE; implement Origin checks for WebSocket.

13. Phased Rollout Roadmap

  1. Implement connection management and rooms on a single instance.
  2. Add auth & scopes; define heartbeat and reconnection behavior.
  3. Integrate Redis Pub/Sub for scale-out.
  4. Add backpressure, rate limits, and monitoring metrics.
  5. Tune proxy settings, run load tests, and build dashboards.

References


Takeaways

  • Start by choosing WebSocket vs SSE based on requirements: WebSocket for bidirectional, SSE for broadcast-centric.
  • For a single instance, connection pool + rooms, auth, and heartbeats will run reliably.
  • At scale, use Redis Pub/Sub for cross-instance fan-out, and enforce backpressure and rate limiting for stability.
  • Design from an operational perspective—including observability and proxy settings—and real time becomes a competitive weapon.

By greeden

Leave a Reply

Your email address will not be published. Required fields are marked *

日本語が含まれない投稿は無視されますのでご注意ください。(スパム対策)