green snake
Photo by Pixabay on Pexels.com
目次

La guía completa del procesamiento en segundo plano con FastAPI × Celery/Redis

Cómo separar el trabajo pesado de tu API para mantener los servicios estables


Introducción (lo que podrás hacer después de leer)

Este artículo explica cuidadosamente cómo añadir procesamiento en segundo plano y trabajos asíncronos tipo batch a una Web API basada en FastAPI.

  • Devolver una respuesta a las solicitudes de los usuarios rápidamente, mientras las tareas que consumen tiempo se ejecutan entre bastidores
  • Convertir en asíncronas de forma segura tareas como envío de correos, generación de informes, procesamiento de imágenes o emisión de grandes volúmenes de solicitudes a APIs externas
  • Construir la configuración clásica de tres piezas con Celery + Redis: FastAPI (servidor API), workers y un message broker

Aprenderás todo el flujo con código de ejemplo.


A quién beneficia esto (perfiles de lectores específicos)

Desarrolladores en solitario / aprendices

  • Quieres disparar tareas que tardan de varios segundos a minutos (p. ej., “correo de confirmación de registro” o “generación de informe PDF”) desde FastAPI
  • Has usado BackgroundTasks, pero aún no has probado una configuración asíncrona de estilo producción con procesos separados
  • Has oído hablar de Redis y Celery, pero todavía no tienes claro “¿cómo encajan entre sí?”

→ Al leer esto, practicarás el flujo estándar y probado de trabajos en segundo plano usando un worker de Celery y un broker Redis.

Ingenieros backend en equipos pequeños

  • Estás construyendo sistemas internos o un SaaS pequeño/mediano con FastAPI y no sabes bien cómo diseñar “tareas pesadas” o “tareas que deben reintentarse”
  • Los timeouts en APIs síncronas y las ralentizaciones en picos empiezan a molestarte
  • Quieres un patrón compartido de equipo: “A partir de aquí, lo mandamos a la cola de trabajos.”

→ Al aprender la separación de responsabilidades entre FastAPI y Celery, además de patrones de reintento/planificación, obtendrás una visión sólida de un diseño operativo amigable.

Equipos SaaS / startups

  • Tu producto tiene muchas cargas escalables: procesamiento de imágenes, envíos masivos, agregación de datos, etc.
  • Quieres empezar con una configuración simple de workers, manteniendo en mente futuros microservicios o arquitectura dirigida por eventos
  • Ya estás pensando en escalar workers y monitorear colas

→ Usando la división de roles “FastAPI = API síncrona”, “Celery = worker asíncrono”, “Redis = broker de colas”, puedes construir una base que escale paso a paso.


Notas de accesibilidad (consideraciones de legibilidad)

  • El artículo está estructurado como: visión general → por qué procesamiento en segundo plano → roles de Celery/Redis → implementación → manejo de errores y reintentos → scheduling → consejos operativos → roadmap.
  • Los términos especializados (broker, worker, job, task, etc.) se explican brevemente en su primera aparición y luego se usan de forma consistente para reducir carga cognitiva.
  • Los bloques de código se mantienen cortos, con comentarios mínimos esenciales para reducir carga visual.
  • Los párrafos son intencionalmente cortos y se usan viñetas para apoyar una lectura a tu propio ritmo.

En conjunto, la estructura busca una experiencia clara paso a paso y un diseño de texto mindful de legibilidad estilo WCAG AA.


1. Por qué es necesario el procesamiento en segundo plano

Aclaremos por qué deberías separar el trabajo pesado del API en primer lugar.

1.1 ¿Qué pasa si pones trabajo pesado directamente dentro del API?

Considera un endpoint como este:

  • Un usuario envía un formulario
  • El servidor genera un informe PDF (10 segundos)
  • Llama a un proveedor externo de correo (más espera de red)
  • Devuelve el resultado

Si intentas completar todo esto dentro de una sola solicitud HTTP, probablemente verás:

  • Timeouts del lado del cliente
  • Workers de Uvicorn bloqueados por tareas pesadas, obligando a otras solicitudes a esperar
  • Latencia o fallos temporales de servicios externos dañando directamente la experiencia del usuario

1.2 La idea central del procesamiento en segundo plano

Aquí entra el procesamiento en segundo plano:

  1. La solicitud HTTP devuelve rápido con “Trabajo aceptado/registrado.”
  2. El trabajo pesado real lo ejecuta un proceso separado (un worker) que toma trabajos desde una cola.
  3. Los resultados y el progreso se consultan después mediante otra API, una notificación o un panel.

En otras palabras:

  • FastAPI = la recepcionista en la entrada
  • Celery worker = el personal que trabaja en la trastienda
  • Redis (broker) = el lugar donde se guarda la “lista de tareas”

Separar responsabilidades así mejora tolerancia a fallos y escalabilidad.


2. Entender Celery y Redis (roles de alto nivel)

Aquí va la combinación típica: Celery + Redis.

2.1 ¿Qué es Celery?

Celery es una cola distribuida de tareas/trabajos en Python.

  • Ejecuta “tasks” (funciones) de forma asíncrona en procesos separados
  • Usa un message broker (Redis, RabbitMQ, etc.) para encolar y desencolar tareas
  • Proporciona reintentos, scheduling (trabajos periódicos), almacenamiento de resultados (result backend) y más

La documentación oficial de FastAPI también muestra ejemplos de integración con Celery.

2.2 ¿Qué es Redis?

Redis es un datastore en memoria que también se usa a menudo como message broker.

  • Para Celery, funciona como “el lugar donde se almacenan las colas de tareas”
  • Ligero, rápido y fácil de ejecutar en local, en contenedores o en la nube

RabbitMQ también es común, pero Redis es más simple para empezar, así que es una gran primera opción.


3. Configuración del entorno: ejecutar FastAPI + Celery + Redis

Ahora montemos un entorno ejecutable.

3.1 Paquetes necesarios

Ejemplo:

pip install "fastapi[standard]" celery[redis] redis
  • fastapi[standard]: FastAPI más Uvicorn y otros básicos
  • celery[redis]: núcleo de Celery con soporte Redis
  • redis: cliente Redis para Python

(Ajusta versiones según tu proyecto.)

3.2 Iniciar Redis (desarrollo local)

Docker facilita Redis en local:

docker run -d --name redis -p 6379:6379 redis:7

Ahora Redis está disponible en localhost:6379.


4. Construir una configuración mínima de FastAPI + Celery

Definamos la app de FastAPI y la app de Celery.

4.1 Estructura de proyecto de ejemplo

project/
  app/
    __init__.py
    main.py          # Servidor API de FastAPI
    celery_app.py    # Definición de la app de Celery
    tasks.py         # Tareas en segundo plano
  requirements.txt

4.2 Definir la app de Celery

# app/celery_app.py
from celery import Celery

celery_app = Celery(
    "worker",
    broker="redis://localhost:6379/0",      # broker (Redis)
    backend="redis://localhost:6379/1",     # result backend (opcional)
)

celery_app.conf.update(
    task_routes={
        "app.tasks.send_email": {"queue": "emails"},
        "app.tasks.generate_report": {"queue": "reports"},
    },
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
)

Esto usa localhost por simplicidad; en producción deberías leer estas URLs desde variables de entorno o clases de settings.

4.3 Definir tareas

# app/tasks.py
from time import sleep
from app.celery_app import celery_app

@celery_app.task(name="app.tasks.send_email")
def send_email(to: str, subject: str, body: str) -> str:
    # La implementación real enviaría un correo
    sleep(5)  # fingimos que esto es costoso
    return f"Email sent to {to}"

@celery_app.task(name="app.tasks.generate_report")
def generate_report(user_id: int) -> str:
    # Agregación pesada o generación de PDF
    sleep(10)
    return f"Report generated for user {user_id}"

Puntos clave:

  • Cualquier función decorada con @celery_app.task se convierte en una “task”
  • Llamar a send_email.delay(...) no ejecuta de inmediato: encola la tarea

5. Encolar tareas desde FastAPI

Ahora registra tareas desde endpoints de FastAPI.

5.1 Definición de la app FastAPI

# app/main.py
from fastapi import FastAPI
from pydantic import BaseModel
from app.tasks import send_email, generate_report

app = FastAPI(title="FastAPI + Celery Example")

class EmailRequest(BaseModel):
    to: str
    subject: str
    body: str

@app.post("/emails")
def create_email(req: EmailRequest):
    task = send_email.delay(req.to, req.subject, req.body)
    return {"task_id": task.id}

@app.post("/reports/{user_id}")
def create_report(user_id: int):
    task = generate_report.delay(user_id)
    return {"task_id": task.id}

La solicitud HTTP devuelve solo un task_id rápidamente, y el worker se encarga de la ejecución.

5.2 API para consultar resultados de tareas (opcional)

Si configuras un result backend (lo apuntamos a Redis arriba), puedes consultar el estado de la tarea:

from app.celery_app import celery_app

@app.get("/tasks/{task_id}")
def get_task_status(task_id: str):
    result = celery_app.AsyncResult(task_id)
    return {
        "task_id": task_id,
        "status": result.status,      # PENDING / STARTED / SUCCESS / FAILURE ...
        "result": result.result,      # valor devuelto en éxito, info de excepción en fallo
    }

Flujo típico de frontend:

  1. Llamar a /emails o /reports/{user_id} y recibir task_id
  2. Hacer polling a /tasks/{task_id} para seguir el estado

6. Arrancar workers y confirmar que funciona

A estas alturas tienes:

  • FastAPI (punto de entrada HTTP)
  • Celery worker (ejecutor en segundo plano)
  • Redis (broker)

Ahora ejecútalo.

6.1 Iniciar FastAPI

uvicorn app.main:app --reload

6.2 Iniciar el worker de Celery

Desde la raíz del proyecto:

celery -A app.celery_app.celery_app worker --loglevel=info

-A apunta a la ruta de importación de tu app de Celery.
Aquí, celery_app está definido en app/celery_app.py.

6.3 Prueba rápida

Desde otra terminal:

curl -X POST "http://127.0.0.1:8000/emails" \
  -H "Content-Type: application/json" \
  -d '{"to": "test@example.com", "subject": "Hello", "body": "Hi"}'

Deberías recibir inmediatamente:

{"task_id": "xxxxxxxx-xxxx-...."}

Y en los logs del worker verás algo como “Email sent to …”.

También puedes comprobar el estado vía GET /tasks/{task_id}.


7. Reintentos, timeouts y otros controles ante fallos

En sistemas reales, los fallos temporales y errores de red son inevitables. Celery soporta reintentos y límites de tiempo.

7.1 Ejemplo de reintento automático

# app/tasks.py (extracto)
from celery import shared_task
import requests

@shared_task(
    bind=True,
    max_retries=5,
    default_retry_delay=10,  # segundos
)
def send_notification(self, endpoint: str, payload: dict) -> str:
    try:
        r = requests.post(endpoint, json=payload, timeout=5)
        r.raise_for_status()
    except requests.RequestException as exc:
        raise self.retry(exc=exc)
    return "ok"
  • max_retries: número máximo de reintentos
  • default_retry_delay: intervalo de reintento (segundos)
  • self.retry(): programa el siguiente reintento re-encolando

7.2 Límites de tiempo de tareas

Usa time_limit y soft_time_limit para acotar la ejecución:

@celery_app.task(
    name="app.tasks.heavy_task",
    time_limit=60,
    soft_time_limit=50,
)
def heavy_task():
    ...

Esto ayuda a evitar tareas desbocadas que consuman workers y perjudiquen otros trabajos.


8. Scheduling (trabajos periódicos)

Para “generar un informe diario a medianoche” o “ejecutar agregación cada 5 minutos”, usa Celery Beat.

8.1 Configurar el calendario de beat

# app/celery_app.py (añadir beat schedule)
from celery.schedules import crontab

celery_app.conf.beat_schedule = {
    "generate-daily-reports": {
        "task": "app.tasks.generate_report",
        "schedule": crontab(hour=0, minute=0),
        "args": (1,),  # ejemplo: user_id=1
    },
}

8.2 Iniciar Beat

Ejecuta el scheduler como un proceso separado:

celery -A app.celery_app.celery_app beat --loglevel=info

Luego:

  • Beat encola tareas según el calendario
  • Los workers las consumen y ejecutan

9. Cuándo usar BackgroundTasks de FastAPI vs Celery

FastAPI ofrece una función ligera de background: BackgroundTasks.

from fastapi import BackgroundTasks

def send_email_sync(to: str):
    ...

@app.post("/signup")
def signup(..., background_tasks: BackgroundTasks):
    background_tasks.add_task(send_email_sync, user.email)
    return {"status": "ok"}

Es muy conveniente, pero:

  • Se ejecuta en el mismo proceso que FastAPI
  • Si el proceso muere, las tareas se pierden
  • No tiene funciones avanzadas de reintento/scheduling

Celery, en cambio:

  • Ejecuta tareas en procesos/contendores separados
  • Las tareas permanecen en la cola aunque un worker muera temporalmente
  • Soporta reintentos, scheduling, almacenamiento de resultados y más

Regla simple:

  • Tareas cortas donde el fallo no es crítico → BackgroundTasks
  • Tareas pesadas o que requieren ejecución/reintento confiable → Celery (cola de trabajos)

10. Consejos de diseño orientados a operaciones (Ops)

Algunos puntos prácticos para operar Celery en sistemas reales.

10.1 Logging y monitoreo

  • Agrega logs de éxito/fallo por separado como “logs del worker”, no solo logs de FastAPI
  • Rastrea tasas de fallo y duración promedio como métricas para encontrar cuellos de botella
  • Para tareas clave, define umbrales de alerta como “3 fallos consecutivos”

10.2 Diseño de colas

  • Separa colas por naturaleza de tarea (p. ej., emails, reports, default)
  • Asigna diferentes números de workers por cola para priorizar trabajo crítico
  • Considera backpressure: si la cola crece demasiado, añade throttling del lado del API o límites de aceptación

10.3 Experiencia de usuario ante errores

  • Decide cómo comunicar fallos a los usuarios (notificaciones, reintentos automáticos, mensajes de soporte)
  • Diseña flujos end-to-end como “correo cuando termine” o “actualizar estado en dashboard” junto con el API

11. Hoja de ruta de adopción (paso a paso)

No tienes que adoptar todo de golpe. Aquí tienes una ruta gradual:

  1. Empieza asíncronizando trabajo ligero con BackgroundTasks

    • Por ejemplo, el envío de correos.
  2. Ejecuta Celery + Redis en local

    • Construye un PoC pequeño basado en la configuración mínima de este artículo.
  3. Mueve una tarea pesada a Celery

    • Empieza con generación de informes o agregación pesada.
  4. Añade APIs de estado de tareas y dashboards

    • Usa task_id para mostrar progreso/resultados.
  5. Introduce reintentos, separación de colas y scheduling donde haga falta

    • Aplica funciones avanzadas solo a las tareas que realmente lo necesiten.
  6. Despliega en producción (Docker / orquestación)

    • Separa contenedores: FastAPI, workers de Celery, Redis; gestiona con Docker Compose o Kubernetes.

12. Resumen

  • Poner trabajo pesado dentro de FastAPI puede causar fácilmente retrasos, timeouts y caídas de throughput.
  • Combinar Celery + Redis hace sencillo mover trabajo pesado o crítico por reintentos a procesos de workers separados.
  • La división básica es: FastAPI = encolar + respuesta rápida, Celery = ejecutar de forma confiable en segundo plano.
  • Piensa en BackgroundTasks para “tareas ligeras” y en Celery para “tareas que requieren fiabilidad y escala”, y migra gradualmente.
  • No necesitas apuntar a un gran sistema de colas inmediatamente. Empieza extrayendo una tarea, y tu servicio puede crecer hacia una estructura más resiliente: más difícil de tumbar y menos propensa a atascarse.

Gracias por leer hasta aquí.
Espero que tus “workers confiables entre bastidores” vayan asumiendo cada vez más carga en tu app FastAPI—un paso firme a la vez.


por greeden

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

日本語が含まれない投稿は無視されますのでご注意ください。(スパム対策)