Armando el Tiempo Real como Arma: Notificaciones WebSocket/SSE con FastAPI — Gestión de Conexiones, Salas, Reconexión, Escalado y Observabilidad
Resumen (Pirámide invertida)
- Las actualizaciones inmediatas de UI se entregan vía WebSocket o SSE. Elige según el caso de uso.
- Para una sola instancia, un endpoint
WebSocketmás gestión de conexiones (pool de conexiones, salas) es suficiente. - Para escalar entre múltiples instancias, difunde (fan-out) mensajes mediante Redis Pub/Sub o similar.
- Operaciones sanas requieren latidos (heartbeats), contraflujo (backpressure), reconexión, autenticación & scopes, y monitoreo.
- Diseña extremo a extremo, incluyendo timeouts de Nginx/CDN, configuración CORS/WebSocket y pruebas de carga.
A quién beneficiará
- Aprendiz A (último año de grado)
Quiere chat/notificaciones en tiempo real. Necesita diferencias entre WebSocket y SSE y el mínimo necesario. - Equipo pequeño B (agencia de 3 personas)
Quiere un dashboard de gestión con actualizaciones instantáneas. Necesita implementación segura de gestión de conexiones, auth y difusión a salas. - Desarrollador SaaS C (startup)
Quiere que las notificaciones lleguen incluso cuando los contenedores escalen. Necesita fan-out con Redis, heartbeats y limitación de tasa.
Evaluación de accesibilidad
- Estructurado con párrafos cortos y listas por capítulo. El código usa fuente monoespaciada; los comentarios son concisos.
- Desambiguación temprana de elecciones en las que principiantes suelen tropezar (WebSocket vs SSE). Trampas y mitigaciones resumidas en una tabla.
- Nivel general: aproximadamente AA.
1. Elegir el enfoque: ¿WebSocket o SSE?
- WebSocket: bidireccional. Ideal para casos con envíos frecuentes cliente → servidor como chat, edición colaborativa, juegos.
- SSE (Server-Sent Events): unidireccional (servidor → cliente). Ideal cuando domina la difusión: precios de acciones, badges de notificación, progreso de jobs. Funciona sobre HTTP/1.1 y atraviesa proxies fácilmente.
Puntos de decisión
- Si necesitas bidireccionalidad, usa WebSocket. Si el foco es difusión y quieres amplio soporte de navegador y resiliencia, elige SSE.
- Para conexiones concurrentes muy grandes, SSE puede encajar mejor en infra en algunos casos (aprovechando HTTP/2 y CDNs).
- Las apps móviles tienden a adoptar WebSocket.
2. WebSocket mínimo (instancia única)
2.1 Gestión de conexiones
# app/realtime/manager.py
from typing import Dict, Set
from fastapi import WebSocket
class ConnectionManager:
def __init__(self):
self.active: Set[WebSocket] = set()
self.rooms: Dict[str, Set[WebSocket]] = {}
async def connect(self, ws: WebSocket, room: str | None = None):
await ws.accept()
self.active.add(ws)
if room:
self.rooms.setdefault(room, set()).add(ws)
def disconnect(self, ws: WebSocket):
self.active.discard(ws)
for r in list(self.rooms.values()):
r.discard(ws)
async def send_to_ws(self, ws: WebSocket, data: dict):
await ws.send_json(data)
async def broadcast_all(self, data: dict):
for ws in list(self.active):
try:
await ws.send_json(data)
except Exception:
self.disconnect(ws)
async def broadcast_room(self, room: str, data: dict):
for ws in list(self.rooms.get(room, set())):
try:
await ws.send_json(data)
except Exception:
self.disconnect(ws)
2.2 Router
# app/realtime/ws.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query
from app.realtime.manager import ConnectionManager
router = APIRouter()
manager = ConnectionManager()
@router.websocket("/ws")
async def ws_endpoint(ws: WebSocket, room: str | None = Query(None)):
await manager.connect(ws, room)
try:
while True:
msg = await ws.receive_json()
# Ejemplo: eco a la sala
if room:
await manager.broadcast_room(room, {"echo": msg})
else:
await manager.send_to_ws(ws, {"echo": msg})
except WebSocketDisconnect:
manager.disconnect(ws)
Puntos clave
- Llama siempre a
accept()primero. - En desconexión, elimina de todos los conjuntos. No filtres conexiones en excepción.
3. Autenticación y scopes
La mejora (upgrade) a WebSocket ocurre tras el handshake HTTP. Autentica usando un Bearer token o Cookie y verifica autorización para participación en salas.
# app/realtime/auth.py
from fastapi import WebSocket, HTTPException, status
from app.security.jwt import decode_token # Reutiliza tu implementación JWT
async def authenticate_ws(ws: WebSocket):
auth = ws.headers.get("authorization", "")
if not auth.lower().startswith("bearer "):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="missing token")
token = auth.split(" ", 1)[1]
data = decode_token(token)
scopes = set(str(data.get("scope","")).split())
return {"sub": data["sub"], "scopes": scopes}
Ejemplo de uso:
# app/realtime/ws.py (con auth)
from fastapi import Depends
from app.realtime.auth import authenticate_ws
@router.websocket("/ws/secure")
async def ws_secure(ws: WebSocket, user = Depends(authenticate_ws), room: str | None = Query(None)):
await manager.connect(ws, room)
try:
await manager.send_to_ws(ws, {"hello": user["sub"]})
while True:
msg = await ws.receive_json()
if "articles:write" in user["scopes"]:
await manager.broadcast_room(room or "public", {"by": user["sub"], "msg": msg})
else:
await manager.send_to_ws(ws, {"error": "insufficient scope"})
except WebSocketDisconnect:
manager.disconnect(ws)
4. Latidos (heartbeats) y reconexión
- Envía
pingperiódicos servidor → cliente y corta conexiones que no respondan. - Los clientes se reconectan con backoff exponencial. Al reconectar, envía el último ID de evento recibido para rellenar huecos (SSE lo estandariza con
Last-Event-ID).
Ejemplo en servidor:
# app/realtime/heartbeat.py
import asyncio, json
from app.realtime.manager import ConnectionManager
async def heartbeat(manager: ConnectionManager, interval=30):
while True:
await asyncio.sleep(interval)
await manager.broadcast_all({"type": "ping"})
Ejecuta esto en una tarea de fondo (p. ej., en el evento startup de la app).
5. Contraflujo (backpressure) y límites de tamaño
- Usa una cola de envío; cuando supere un umbral, descarta actualizaciones obsoletas.
- Aplica tamaño máximo por mensaje y límites de envíos por segundo para protegerte de clientes abusivos.
Ejemplo simple de cola:
# app/realtime/queue.py
import asyncio
from collections import deque
class SendQueue:
def __init__(self, maxlen=1000):
self.q = deque(maxlen=maxlen)
self.cv = asyncio.Condition()
async def put(self, item):
async with self.cv:
self.q.append(item)
self.cv.notify()
async def consume(self):
while True:
async with self.cv:
while not self.q:
await self.cv.wait()
item = self.q.popleft()
yield item
6. Escalado: Difusión entre instancias con Redis Pub/Sub
Los conjuntos de conexiones locales no alcanzan clientes conectados a otras instancias. Inserta Redis Pub/Sub para “retransmitir” mensajes a cada instancia.
6.1 Arquitectura
- Cada instancia: entrega a conexiones locales y se suscribe a un canal de Redis.
- Al publicar: envía localmente y
publisha Redis.
6.2 Implementación de ejemplo (async)
# app/realtime/bus.py
from redis import asyncio as aioredis
from typing import Callable
import json
class RedisBus:
def __init__(self, url: str, channel: str):
self.url, self.channel = url, channel
self.redis: aioredis.Redis | None = None
async def start(self, on_message: Callable[[dict], None]):
self.redis = aioredis.from_url(self.url, encoding="utf-8", decode_responses=True)
pubsub = self.redis.pubsub()
await pubsub.subscribe(self.channel)
async for m in pubsub.listen():
if m.get("type") == "message":
on_message(json.loads(m["data"]))
async def publish(self, msg: dict):
if not self.redis:
self.redis = aioredis.from_url(self.url, encoding="utf-8", decode_responses=True)
await self.redis.publish(self.channel, json.dumps(msg))
6.3 Puntos de integración
- Antes de
broadcast_room, llamabus.publish({"room": room, "data": payload}). - En
on_message, llamamanager.broadcast_room(m["room"], m["data"]). - Ahora cualquier instancia puede alcanzar la misma sala.
7. SSE mínimo (difusión unidireccional)
# app/realtime/sse.py
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
import asyncio, json, time
router = APIRouter()
async def event_stream(queue):
while True:
data = await queue.get()
yield f"id: {int(time.time()*1000)}\n"
yield "event: message\n"
yield f"data: {json.dumps(data)}\n\n"
@router.get("/sse")
async def sse_endpoint(request: Request):
queue = asyncio.Queue()
# Inserta ítems en la cola en otro lugar; para demo, inicia un ticker de 1s
async def ticker():
while True:
await asyncio.sleep(1)
await queue.put({"now": time.time()})
asyncio.create_task(ticker())
headers = {
"Cache-Control": "no-cache",
"Content-Type": "text/event-stream",
"Connection": "keep-alive",
}
return StreamingResponse(event_stream(queue), headers=headers)
Puntos clave
- Una conexión = un flujo de respuesta. Usa
data:por línea y separa eventos con una línea en blanco. - En reconexión, soporta “catch-up” vía
Last-Event-IDsi es posible.
8. Ajustes de Nginx/proxy y timeouts
- WebSocket: asegúrate de que los encabezados
Upgrade/Connectionse propaguen. Configura unproxy_read_timeoutsuficientemente largo. - SSE: evita el buffering del proxy; agrega
X-Accel-Buffering: no(o equivalente) cuando sea posible. - Ajusta
client_max_body_sizea tus necesidades de payload upstream. - Multiplexar SSE sobre HTTP/2 puede reducir el número de conexiones.
9. Monitoreo y operaciones
- Métricas: conexiones concurrentes, vida útil de conexión, tasa de reconexión, tamaño medio de mensaje, msgs/seg in/out, descartes.
- Logs: conectar/desconectar, fallos de auth, entrar/salir de salas, desbordes de colas.
- Pruebas de carga: valida conexiones concurrentes y flujo de mensajes con
autocannonowrkmás clientes a medida.
10. Errores comunes y soluciones
| Síntoma | Causa | Mitigación |
|---|---|---|
| Pérdida de mensajes al escalar | Sin relé entre instancias | Redis Pub/Sub o broker de mensajes para fan-out |
| Desconexiones abruptas | Timeouts inactivos del proxy | Amplía proxy_read_timeout, envía heartbeats |
| Crecimiento continuo de memoria | Fugas en desconexión o colas sin límite | Asegura desconexión en excepciones; limita colas |
| Escalada de privilegios | Sin autorización para unirse a salas | Aplica comprobación de scopes al unirse; valida tokens |
| Tormentas de reconexión | Sin backoff | Backoff exponencial en cliente; límites y 429 en servidor |
11. Ejemplo: Mini app extremo a extremo
# app/main.py
from fastapi import FastAPI
from app.realtime import ws as ws_router
from app.realtime import sse as sse_router
app = FastAPI(title="Realtime Demo")
app.include_router(ws_router.router, prefix="/realtime")
app.include_router(sse_router.router, prefix="/realtime")
@app.get("/health")
def health():
return {"ok": True}
12. Seguridad
- Auth: verifica JWT/Cookie durante el handshake. Las salas requieren scopes.
- Validación de entrada: valida siempre con esquemas el JSON de los clientes.
- Limitación de tasa: controla cuentas de conexiones y frecuencia de envío por IP/usuario.
- Secretos: no registres payloads ni tokens.
- CORS/Origen: habilita CORS para SSE; implementa comprobación de Origin para WebSocket.
13. Hoja de ruta de despliegue por fases
- Implementa gestión de conexiones y salas en una sola instancia.
- Añade auth & scopes; define heartbeat y comportamiento de reconexión.
- Integra Redis Pub/Sub para escalado.
- Añade backpressure, rate limits y métricas de monitoreo.
- Ajusta el proxy, ejecuta pruebas de carga y crea dashboards.
Referencias
- FastAPI
- Starlette
- Redis
- MDN
- Nginx
Conclusiones
- Comienza eligiendo WebSocket vs SSE según requisitos: WebSocket para bidireccional, SSE para difusión.
- En instancia única, un pool de conexiones + salas, auth y heartbeats funcionará con fiabilidad.
- A escala, usa Redis Pub/Sub para fan-out entre instancias, y aplica backpressure y rate limiting para estabilidad.
- Diseña desde la operativa—incluyendo observabilidad y configuración de proxies—y el tiempo real se convierte en un arma competitiva.
