green snake
Photo by Pixabay on Pexels.com

Armando el Tiempo Real como Arma: Notificaciones WebSocket/SSE con FastAPI — Gestión de Conexiones, Salas, Reconexión, Escalado y Observabilidad


Resumen (Pirámide invertida)

  • Las actualizaciones inmediatas de UI se entregan vía WebSocket o SSE. Elige según el caso de uso.
  • Para una sola instancia, un endpoint WebSocket más gestión de conexiones (pool de conexiones, salas) es suficiente.
  • Para escalar entre múltiples instancias, difunde (fan-out) mensajes mediante Redis Pub/Sub o similar.
  • Operaciones sanas requieren latidos (heartbeats), contraflujo (backpressure), reconexión, autenticación & scopes, y monitoreo.
  • Diseña extremo a extremo, incluyendo timeouts de Nginx/CDN, configuración CORS/WebSocket y pruebas de carga.

A quién beneficiará

  • Aprendiz A (último año de grado)
    Quiere chat/notificaciones en tiempo real. Necesita diferencias entre WebSocket y SSE y el mínimo necesario.
  • Equipo pequeño B (agencia de 3 personas)
    Quiere un dashboard de gestión con actualizaciones instantáneas. Necesita implementación segura de gestión de conexiones, auth y difusión a salas.
  • Desarrollador SaaS C (startup)
    Quiere que las notificaciones lleguen incluso cuando los contenedores escalen. Necesita fan-out con Redis, heartbeats y limitación de tasa.

Evaluación de accesibilidad

  • Estructurado con párrafos cortos y listas por capítulo. El código usa fuente monoespaciada; los comentarios son concisos.
  • Desambiguación temprana de elecciones en las que principiantes suelen tropezar (WebSocket vs SSE). Trampas y mitigaciones resumidas en una tabla.
  • Nivel general: aproximadamente AA.

1. Elegir el enfoque: ¿WebSocket o SSE?

  • WebSocket: bidireccional. Ideal para casos con envíos frecuentes cliente → servidor como chat, edición colaborativa, juegos.
  • SSE (Server-Sent Events): unidireccional (servidor → cliente). Ideal cuando domina la difusión: precios de acciones, badges de notificación, progreso de jobs. Funciona sobre HTTP/1.1 y atraviesa proxies fácilmente.

Puntos de decisión

  • Si necesitas bidireccionalidad, usa WebSocket. Si el foco es difusión y quieres amplio soporte de navegador y resiliencia, elige SSE.
  • Para conexiones concurrentes muy grandes, SSE puede encajar mejor en infra en algunos casos (aprovechando HTTP/2 y CDNs).
  • Las apps móviles tienden a adoptar WebSocket.

2. WebSocket mínimo (instancia única)

2.1 Gestión de conexiones

# app/realtime/manager.py
from typing import Dict, Set
from fastapi import WebSocket

class ConnectionManager:
    def __init__(self):
        self.active: Set[WebSocket] = set()
        self.rooms: Dict[str, Set[WebSocket]] = {}

    async def connect(self, ws: WebSocket, room: str | None = None):
        await ws.accept()
        self.active.add(ws)
        if room:
            self.rooms.setdefault(room, set()).add(ws)

    def disconnect(self, ws: WebSocket):
        self.active.discard(ws)
        for r in list(self.rooms.values()):
            r.discard(ws)

    async def send_to_ws(self, ws: WebSocket, data: dict):
        await ws.send_json(data)

    async def broadcast_all(self, data: dict):
        for ws in list(self.active):
            try:
                await ws.send_json(data)
            except Exception:
                self.disconnect(ws)

    async def broadcast_room(self, room: str, data: dict):
        for ws in list(self.rooms.get(room, set())):
            try:
                await ws.send_json(data)
            except Exception:
                self.disconnect(ws)

2.2 Router

# app/realtime/ws.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query
from app.realtime.manager import ConnectionManager

router = APIRouter()
manager = ConnectionManager()

@router.websocket("/ws")
async def ws_endpoint(ws: WebSocket, room: str | None = Query(None)):
    await manager.connect(ws, room)
    try:
        while True:
            msg = await ws.receive_json()
            # Ejemplo: eco a la sala
            if room:
                await manager.broadcast_room(room, {"echo": msg})
            else:
                await manager.send_to_ws(ws, {"echo": msg})
    except WebSocketDisconnect:
        manager.disconnect(ws)

Puntos clave

  • Llama siempre a accept() primero.
  • En desconexión, elimina de todos los conjuntos. No filtres conexiones en excepción.

3. Autenticación y scopes

La mejora (upgrade) a WebSocket ocurre tras el handshake HTTP. Autentica usando un Bearer token o Cookie y verifica autorización para participación en salas.

# app/realtime/auth.py
from fastapi import WebSocket, HTTPException, status
from app.security.jwt import decode_token  # Reutiliza tu implementación JWT

async def authenticate_ws(ws: WebSocket):
    auth = ws.headers.get("authorization", "")
    if not auth.lower().startswith("bearer "):
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="missing token")
    token = auth.split(" ", 1)[1]
    data = decode_token(token)
    scopes = set(str(data.get("scope","")).split())
    return {"sub": data["sub"], "scopes": scopes}

Ejemplo de uso:

# app/realtime/ws.py (con auth)
from fastapi import Depends
from app.realtime.auth import authenticate_ws

@router.websocket("/ws/secure")
async def ws_secure(ws: WebSocket, user = Depends(authenticate_ws), room: str | None = Query(None)):
    await manager.connect(ws, room)
    try:
        await manager.send_to_ws(ws, {"hello": user["sub"]})
        while True:
            msg = await ws.receive_json()
            if "articles:write" in user["scopes"]:
                await manager.broadcast_room(room or "public", {"by": user["sub"], "msg": msg})
            else:
                await manager.send_to_ws(ws, {"error": "insufficient scope"})
    except WebSocketDisconnect:
        manager.disconnect(ws)

4. Latidos (heartbeats) y reconexión

  • Envía ping periódicos servidor → cliente y corta conexiones que no respondan.
  • Los clientes se reconectan con backoff exponencial. Al reconectar, envía el último ID de evento recibido para rellenar huecos (SSE lo estandariza con Last-Event-ID).

Ejemplo en servidor:

# app/realtime/heartbeat.py
import asyncio, json
from app.realtime.manager import ConnectionManager

async def heartbeat(manager: ConnectionManager, interval=30):
    while True:
        await asyncio.sleep(interval)
        await manager.broadcast_all({"type": "ping"})

Ejecuta esto en una tarea de fondo (p. ej., en el evento startup de la app).


5. Contraflujo (backpressure) y límites de tamaño

  • Usa una cola de envío; cuando supere un umbral, descarta actualizaciones obsoletas.
  • Aplica tamaño máximo por mensaje y límites de envíos por segundo para protegerte de clientes abusivos.

Ejemplo simple de cola:

# app/realtime/queue.py
import asyncio
from collections import deque

class SendQueue:
    def __init__(self, maxlen=1000):
        self.q = deque(maxlen=maxlen)
        self.cv = asyncio.Condition()

    async def put(self, item):
        async with self.cv:
            self.q.append(item)
            self.cv.notify()

    async def consume(self):
        while True:
            async with self.cv:
                while not self.q:
                    await self.cv.wait()
                item = self.q.popleft()
            yield item

6. Escalado: Difusión entre instancias con Redis Pub/Sub

Los conjuntos de conexiones locales no alcanzan clientes conectados a otras instancias. Inserta Redis Pub/Sub para “retransmitir” mensajes a cada instancia.

6.1 Arquitectura

  • Cada instancia: entrega a conexiones locales y se suscribe a un canal de Redis.
  • Al publicar: envía localmente y publish a Redis.

6.2 Implementación de ejemplo (async)

# app/realtime/bus.py
from redis import asyncio as aioredis
from typing import Callable
import json

class RedisBus:
    def __init__(self, url: str, channel: str):
        self.url, self.channel = url, channel
        self.redis: aioredis.Redis | None = None

    async def start(self, on_message: Callable[[dict], None]):
        self.redis = aioredis.from_url(self.url, encoding="utf-8", decode_responses=True)
        pubsub = self.redis.pubsub()
        await pubsub.subscribe(self.channel)
        async for m in pubsub.listen():
            if m.get("type") == "message":
                on_message(json.loads(m["data"]))

    async def publish(self, msg: dict):
        if not self.redis:
            self.redis = aioredis.from_url(self.url, encoding="utf-8", decode_responses=True)
        await self.redis.publish(self.channel, json.dumps(msg))

6.3 Puntos de integración

  • Antes de broadcast_room, llama bus.publish({"room": room, "data": payload}).
  • En on_message, llama manager.broadcast_room(m["room"], m["data"]).
  • Ahora cualquier instancia puede alcanzar la misma sala.

7. SSE mínimo (difusión unidireccional)

# app/realtime/sse.py
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
import asyncio, json, time

router = APIRouter()

async def event_stream(queue):
    while True:
        data = await queue.get()
        yield f"id: {int(time.time()*1000)}\n"
        yield "event: message\n"
        yield f"data: {json.dumps(data)}\n\n"

@router.get("/sse")
async def sse_endpoint(request: Request):
    queue = asyncio.Queue()
    # Inserta ítems en la cola en otro lugar; para demo, inicia un ticker de 1s
    async def ticker():
        while True:
            await asyncio.sleep(1)
            await queue.put({"now": time.time()})
    asyncio.create_task(ticker())
    headers = {
        "Cache-Control": "no-cache",
        "Content-Type": "text/event-stream",
        "Connection": "keep-alive",
    }
    return StreamingResponse(event_stream(queue), headers=headers)

Puntos clave

  • Una conexión = un flujo de respuesta. Usa data: por línea y separa eventos con una línea en blanco.
  • En reconexión, soporta “catch-up” vía Last-Event-ID si es posible.

8. Ajustes de Nginx/proxy y timeouts

  • WebSocket: asegúrate de que los encabezados Upgrade / Connection se propaguen. Configura un proxy_read_timeout suficientemente largo.
  • SSE: evita el buffering del proxy; agrega X-Accel-Buffering: no (o equivalente) cuando sea posible.
  • Ajusta client_max_body_size a tus necesidades de payload upstream.
  • Multiplexar SSE sobre HTTP/2 puede reducir el número de conexiones.

9. Monitoreo y operaciones

  • Métricas: conexiones concurrentes, vida útil de conexión, tasa de reconexión, tamaño medio de mensaje, msgs/seg in/out, descartes.
  • Logs: conectar/desconectar, fallos de auth, entrar/salir de salas, desbordes de colas.
  • Pruebas de carga: valida conexiones concurrentes y flujo de mensajes con autocannon o wrk más clientes a medida.

10. Errores comunes y soluciones

Síntoma Causa Mitigación
Pérdida de mensajes al escalar Sin relé entre instancias Redis Pub/Sub o broker de mensajes para fan-out
Desconexiones abruptas Timeouts inactivos del proxy Amplía proxy_read_timeout, envía heartbeats
Crecimiento continuo de memoria Fugas en desconexión o colas sin límite Asegura desconexión en excepciones; limita colas
Escalada de privilegios Sin autorización para unirse a salas Aplica comprobación de scopes al unirse; valida tokens
Tormentas de reconexión Sin backoff Backoff exponencial en cliente; límites y 429 en servidor

11. Ejemplo: Mini app extremo a extremo

# app/main.py
from fastapi import FastAPI
from app.realtime import ws as ws_router
from app.realtime import sse as sse_router

app = FastAPI(title="Realtime Demo")
app.include_router(ws_router.router, prefix="/realtime")
app.include_router(sse_router.router, prefix="/realtime")

@app.get("/health")
def health():
    return {"ok": True}

12. Seguridad

  • Auth: verifica JWT/Cookie durante el handshake. Las salas requieren scopes.
  • Validación de entrada: valida siempre con esquemas el JSON de los clientes.
  • Limitación de tasa: controla cuentas de conexiones y frecuencia de envío por IP/usuario.
  • Secretos: no registres payloads ni tokens.
  • CORS/Origen: habilita CORS para SSE; implementa comprobación de Origin para WebSocket.

13. Hoja de ruta de despliegue por fases

  1. Implementa gestión de conexiones y salas en una sola instancia.
  2. Añade auth & scopes; define heartbeat y comportamiento de reconexión.
  3. Integra Redis Pub/Sub para escalado.
  4. Añade backpressure, rate limits y métricas de monitoreo.
  5. Ajusta el proxy, ejecuta pruebas de carga y crea dashboards.

Referencias


Conclusiones

  • Comienza eligiendo WebSocket vs SSE según requisitos: WebSocket para bidireccional, SSE para difusión.
  • En instancia única, un pool de conexiones + salas, auth y heartbeats funcionará con fiabilidad.
  • A escala, usa Redis Pub/Sub para fan-out entre instancias, y aplica backpressure y rate limiting para estabilidad.
  • Diseña desde la operativa—incluyendo observabilidad y configuración de proxies—y el tiempo real se convierte en un arma competitiva.

por greeden

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

日本語が含まれない投稿は無視されますのでご注意ください。(スパム対策)