Icono del sitio IT&ライフハックブログ|学びと実践のためのアイデア集

Armando el Tiempo Real como Arma: Notificaciones WebSocket/SSE con FastAPI — Gestión de Conexiones, Salas, Reconexión, Escalado y Observabilidad

green snake

Photo by Pixabay on Pexels.com

Armando el Tiempo Real como Arma: Notificaciones WebSocket/SSE con FastAPI — Gestión de Conexiones, Salas, Reconexión, Escalado y Observabilidad


Resumen (Pirámide invertida)

  • Las actualizaciones inmediatas de UI se entregan vía WebSocket o SSE. Elige según el caso de uso.
  • Para una sola instancia, un endpoint WebSocket más gestión de conexiones (pool de conexiones, salas) es suficiente.
  • Para escalar entre múltiples instancias, difunde (fan-out) mensajes mediante Redis Pub/Sub o similar.
  • Operaciones sanas requieren latidos (heartbeats), contraflujo (backpressure), reconexión, autenticación & scopes, y monitoreo.
  • Diseña extremo a extremo, incluyendo timeouts de Nginx/CDN, configuración CORS/WebSocket y pruebas de carga.

A quién beneficiará

  • Aprendiz A (último año de grado)
    Quiere chat/notificaciones en tiempo real. Necesita diferencias entre WebSocket y SSE y el mínimo necesario.
  • Equipo pequeño B (agencia de 3 personas)
    Quiere un dashboard de gestión con actualizaciones instantáneas. Necesita implementación segura de gestión de conexiones, auth y difusión a salas.
  • Desarrollador SaaS C (startup)
    Quiere que las notificaciones lleguen incluso cuando los contenedores escalen. Necesita fan-out con Redis, heartbeats y limitación de tasa.

Evaluación de accesibilidad

  • Estructurado con párrafos cortos y listas por capítulo. El código usa fuente monoespaciada; los comentarios son concisos.
  • Desambiguación temprana de elecciones en las que principiantes suelen tropezar (WebSocket vs SSE). Trampas y mitigaciones resumidas en una tabla.
  • Nivel general: aproximadamente AA.

1. Elegir el enfoque: ¿WebSocket o SSE?

  • WebSocket: bidireccional. Ideal para casos con envíos frecuentes cliente → servidor como chat, edición colaborativa, juegos.
  • SSE (Server-Sent Events): unidireccional (servidor → cliente). Ideal cuando domina la difusión: precios de acciones, badges de notificación, progreso de jobs. Funciona sobre HTTP/1.1 y atraviesa proxies fácilmente.

Puntos de decisión

  • Si necesitas bidireccionalidad, usa WebSocket. Si el foco es difusión y quieres amplio soporte de navegador y resiliencia, elige SSE.
  • Para conexiones concurrentes muy grandes, SSE puede encajar mejor en infra en algunos casos (aprovechando HTTP/2 y CDNs).
  • Las apps móviles tienden a adoptar WebSocket.

2. WebSocket mínimo (instancia única)

2.1 Gestión de conexiones

# app/realtime/manager.py
from typing import Dict, Set
from fastapi import WebSocket

class ConnectionManager:
    def __init__(self):
        self.active: Set[WebSocket] = set()
        self.rooms: Dict[str, Set[WebSocket]] = {}

    async def connect(self, ws: WebSocket, room: str | None = None):
        await ws.accept()
        self.active.add(ws)
        if room:
            self.rooms.setdefault(room, set()).add(ws)

    def disconnect(self, ws: WebSocket):
        self.active.discard(ws)
        for r in list(self.rooms.values()):
            r.discard(ws)

    async def send_to_ws(self, ws: WebSocket, data: dict):
        await ws.send_json(data)

    async def broadcast_all(self, data: dict):
        for ws in list(self.active):
            try:
                await ws.send_json(data)
            except Exception:
                self.disconnect(ws)

    async def broadcast_room(self, room: str, data: dict):
        for ws in list(self.rooms.get(room, set())):
            try:
                await ws.send_json(data)
            except Exception:
                self.disconnect(ws)

2.2 Router

# app/realtime/ws.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query
from app.realtime.manager import ConnectionManager

router = APIRouter()
manager = ConnectionManager()

@router.websocket("/ws")
async def ws_endpoint(ws: WebSocket, room: str | None = Query(None)):
    await manager.connect(ws, room)
    try:
        while True:
            msg = await ws.receive_json()
            # Ejemplo: eco a la sala
            if room:
                await manager.broadcast_room(room, {"echo": msg})
            else:
                await manager.send_to_ws(ws, {"echo": msg})
    except WebSocketDisconnect:
        manager.disconnect(ws)

Puntos clave

  • Llama siempre a accept() primero.
  • En desconexión, elimina de todos los conjuntos. No filtres conexiones en excepción.

3. Autenticación y scopes

La mejora (upgrade) a WebSocket ocurre tras el handshake HTTP. Autentica usando un Bearer token o Cookie y verifica autorización para participación en salas.

# app/realtime/auth.py
from fastapi import WebSocket, HTTPException, status
from app.security.jwt import decode_token  # Reutiliza tu implementación JWT

async def authenticate_ws(ws: WebSocket):
    auth = ws.headers.get("authorization", "")
    if not auth.lower().startswith("bearer "):
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="missing token")
    token = auth.split(" ", 1)[1]
    data = decode_token(token)
    scopes = set(str(data.get("scope","")).split())
    return {"sub": data["sub"], "scopes": scopes}

Ejemplo de uso:

# app/realtime/ws.py (con auth)
from fastapi import Depends
from app.realtime.auth import authenticate_ws

@router.websocket("/ws/secure")
async def ws_secure(ws: WebSocket, user = Depends(authenticate_ws), room: str | None = Query(None)):
    await manager.connect(ws, room)
    try:
        await manager.send_to_ws(ws, {"hello": user["sub"]})
        while True:
            msg = await ws.receive_json()
            if "articles:write" in user["scopes"]:
                await manager.broadcast_room(room or "public", {"by": user["sub"], "msg": msg})
            else:
                await manager.send_to_ws(ws, {"error": "insufficient scope"})
    except WebSocketDisconnect:
        manager.disconnect(ws)

4. Latidos (heartbeats) y reconexión

  • Envía ping periódicos servidor → cliente y corta conexiones que no respondan.
  • Los clientes se reconectan con backoff exponencial. Al reconectar, envía el último ID de evento recibido para rellenar huecos (SSE lo estandariza con Last-Event-ID).

Ejemplo en servidor:

# app/realtime/heartbeat.py
import asyncio, json
from app.realtime.manager import ConnectionManager

async def heartbeat(manager: ConnectionManager, interval=30):
    while True:
        await asyncio.sleep(interval)
        await manager.broadcast_all({"type": "ping"})

Ejecuta esto en una tarea de fondo (p. ej., en el evento startup de la app).


5. Contraflujo (backpressure) y límites de tamaño

  • Usa una cola de envío; cuando supere un umbral, descarta actualizaciones obsoletas.
  • Aplica tamaño máximo por mensaje y límites de envíos por segundo para protegerte de clientes abusivos.

Ejemplo simple de cola:

# app/realtime/queue.py
import asyncio
from collections import deque

class SendQueue:
    def __init__(self, maxlen=1000):
        self.q = deque(maxlen=maxlen)
        self.cv = asyncio.Condition()

    async def put(self, item):
        async with self.cv:
            self.q.append(item)
            self.cv.notify()

    async def consume(self):
        while True:
            async with self.cv:
                while not self.q:
                    await self.cv.wait()
                item = self.q.popleft()
            yield item

6. Escalado: Difusión entre instancias con Redis Pub/Sub

Los conjuntos de conexiones locales no alcanzan clientes conectados a otras instancias. Inserta Redis Pub/Sub para “retransmitir” mensajes a cada instancia.

6.1 Arquitectura

  • Cada instancia: entrega a conexiones locales y se suscribe a un canal de Redis.
  • Al publicar: envía localmente y publish a Redis.

6.2 Implementación de ejemplo (async)

# app/realtime/bus.py
from redis import asyncio as aioredis
from typing import Callable
import json

class RedisBus:
    def __init__(self, url: str, channel: str):
        self.url, self.channel = url, channel
        self.redis: aioredis.Redis | None = None

    async def start(self, on_message: Callable[[dict], None]):
        self.redis = aioredis.from_url(self.url, encoding="utf-8", decode_responses=True)
        pubsub = self.redis.pubsub()
        await pubsub.subscribe(self.channel)
        async for m in pubsub.listen():
            if m.get("type") == "message":
                on_message(json.loads(m["data"]))

    async def publish(self, msg: dict):
        if not self.redis:
            self.redis = aioredis.from_url(self.url, encoding="utf-8", decode_responses=True)
        await self.redis.publish(self.channel, json.dumps(msg))

6.3 Puntos de integración

  • Antes de broadcast_room, llama bus.publish({"room": room, "data": payload}).
  • En on_message, llama manager.broadcast_room(m["room"], m["data"]).
  • Ahora cualquier instancia puede alcanzar la misma sala.

7. SSE mínimo (difusión unidireccional)

# app/realtime/sse.py
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
import asyncio, json, time

router = APIRouter()

async def event_stream(queue):
    while True:
        data = await queue.get()
        yield f"id: {int(time.time()*1000)}\n"
        yield "event: message\n"
        yield f"data: {json.dumps(data)}\n\n"

@router.get("/sse")
async def sse_endpoint(request: Request):
    queue = asyncio.Queue()
    # Inserta ítems en la cola en otro lugar; para demo, inicia un ticker de 1s
    async def ticker():
        while True:
            await asyncio.sleep(1)
            await queue.put({"now": time.time()})
    asyncio.create_task(ticker())
    headers = {
        "Cache-Control": "no-cache",
        "Content-Type": "text/event-stream",
        "Connection": "keep-alive",
    }
    return StreamingResponse(event_stream(queue), headers=headers)

Puntos clave

  • Una conexión = un flujo de respuesta. Usa data: por línea y separa eventos con una línea en blanco.
  • En reconexión, soporta “catch-up” vía Last-Event-ID si es posible.

8. Ajustes de Nginx/proxy y timeouts

  • WebSocket: asegúrate de que los encabezados Upgrade / Connection se propaguen. Configura un proxy_read_timeout suficientemente largo.
  • SSE: evita el buffering del proxy; agrega X-Accel-Buffering: no (o equivalente) cuando sea posible.
  • Ajusta client_max_body_size a tus necesidades de payload upstream.
  • Multiplexar SSE sobre HTTP/2 puede reducir el número de conexiones.

9. Monitoreo y operaciones

  • Métricas: conexiones concurrentes, vida útil de conexión, tasa de reconexión, tamaño medio de mensaje, msgs/seg in/out, descartes.
  • Logs: conectar/desconectar, fallos de auth, entrar/salir de salas, desbordes de colas.
  • Pruebas de carga: valida conexiones concurrentes y flujo de mensajes con autocannon o wrk más clientes a medida.

10. Errores comunes y soluciones

Síntoma Causa Mitigación
Pérdida de mensajes al escalar Sin relé entre instancias Redis Pub/Sub o broker de mensajes para fan-out
Desconexiones abruptas Timeouts inactivos del proxy Amplía proxy_read_timeout, envía heartbeats
Crecimiento continuo de memoria Fugas en desconexión o colas sin límite Asegura desconexión en excepciones; limita colas
Escalada de privilegios Sin autorización para unirse a salas Aplica comprobación de scopes al unirse; valida tokens
Tormentas de reconexión Sin backoff Backoff exponencial en cliente; límites y 429 en servidor

11. Ejemplo: Mini app extremo a extremo

# app/main.py
from fastapi import FastAPI
from app.realtime import ws as ws_router
from app.realtime import sse as sse_router

app = FastAPI(title="Realtime Demo")
app.include_router(ws_router.router, prefix="/realtime")
app.include_router(sse_router.router, prefix="/realtime")

@app.get("/health")
def health():
    return {"ok": True}

12. Seguridad

  • Auth: verifica JWT/Cookie durante el handshake. Las salas requieren scopes.
  • Validación de entrada: valida siempre con esquemas el JSON de los clientes.
  • Limitación de tasa: controla cuentas de conexiones y frecuencia de envío por IP/usuario.
  • Secretos: no registres payloads ni tokens.
  • CORS/Origen: habilita CORS para SSE; implementa comprobación de Origin para WebSocket.

13. Hoja de ruta de despliegue por fases

  1. Implementa gestión de conexiones y salas en una sola instancia.
  2. Añade auth & scopes; define heartbeat y comportamiento de reconexión.
  3. Integra Redis Pub/Sub para escalado.
  4. Añade backpressure, rate limits y métricas de monitoreo.
  5. Ajusta el proxy, ejecuta pruebas de carga y crea dashboards.

Referencias


Conclusiones

  • Comienza eligiendo WebSocket vs SSE según requisitos: WebSocket para bidireccional, SSE para difusión.
  • En instancia única, un pool de conexiones + salas, auth y heartbeats funcionará con fiabilidad.
  • A escala, usa Redis Pub/Sub para fan-out entre instancias, y aplica backpressure y rate limiting para estabilidad.
  • Diseña desde la operativa—incluyendo observabilidad y configuración de proxies—y el tiempo real se convierte en un arma competitiva.
Salir de la versión móvil