green snake
Photo by Pixabay on Pexels.com
目次

重い処理はAPIから外す!FastAPI×Celery×Redisで実運用ジョブ基盤――非同期処理・リトライ・スケジューリング・監視まで


✅ 最初に要約(インバーテッドピラミッド)

  • この記事でできること
    大きな画像変換・レポート生成・外部API連携など時間のかかる処理を、FastAPI本体から切り離して非同期ジョブ基盤に任せます。具体的には Celery(ワーカー)Redis(メッセージブローカー & 結果ストア) を組み合わせ、ジョブ投入→実行→結果取得→リトライ→スケジュール実行までを一気通貫で構築します。
  • 主なトピック
    1. なぜバックグラウンドタスクだけでは足りないのか(スケール・信頼性)
    2. Celery×Redis の最小構成とDocker Composeでの起動
    3. ジョブ定義(@app.task)とリトライ/指数バックオフ/ジッタ
    4. 結果取得プログレス通知冪等性(重複実行対策)
    5. Celery Beat による定期実行・時刻指定(ETA/カウントダウン)
    6. DBセッションの扱い・トランザクション設計・監視(Flower)
    7. セキュリティ(入力検証・署名・可視化制限)とテスト戦略
  • 得られる効果
    • APIレスポンスを即時返却、重い処理は裏で安全に実行
    • ゼロに近いダウンタイムでスケール(ワーカー水平増殖)
    • 失敗時も自動再試行手動再投入が可能になり、運用が安定

🎯 対象読者(具体像)

  • 学生エンジニアAさん(22歳)
    画像解析APIが数十秒かかってタイムアウトしがち。APIは軽く、処理は後ろで堅実に回したい。
  • 小規模チームBさん(受託3名)
    顧客レポートを毎朝6時に一括生成、失敗したら自動で再試行したい。進捗も見える化したい。
  • SaaS開発Cさん(スタートアップ)
    外部API連携の一時的な失敗(429/5xx)が多い。指数バックオフ冪等性を取り入れて障害に強くしたい。

1. なぜCelery?――BackgroundTasksだけでは足りない場面

FastAPIのBackgroundTasks軽い後処理(ログ追記・メール送信など)に最適ですが、以下の課題があります。

  1. プロセスに紐づく:アプリプロセスの再起動でタスクが失われる可能性。
  2. 水平スケール困難:複数インスタンスで重複実行順序保証が難しい。
  3. 再試行や監視が貧弱:指数バックオフ、永続化、可視化、スケジューリングが不足。

Celeryは、タスクを**キュー(Redisなど)**に投げ、**別プロセス(ワーカー)**で実行します。

  • メリット:永続化・再試行・スケジュール・監視・水平スケール・結果保存。
  • 構成:FastAPI(プロデューサ)→ Redis(ブローカー)→ Celery Worker(コンシューマ)。

要点まとめ

  • 長時間/不安定I/O/定期実行=Celeryの出番
  • プロセス外に出すことで耐障害性スケールが手に入る

2. 最小構成(Docker Composeで一発起動)

2.1 ディレクトリ例

fastapi-celery/
├─ app/
│  ├─ main.py          # FastAPI本体(ジョブ投入API)
│  ├─ tasks.py         # Celeryアプリ & タスク定義
│  ├─ settings.py      # pydantic-settings
│  └─ db.py            # (任意)DBセッション
├─ worker/
│  └─ worker.sh        # ワーカー起動スクリプト
├─ beat/
│  └─ beat.sh          # スケジューラ起動スクリプト(任意)
├─ docker-compose.yml
└─ Dockerfile

2.2 環境変数(例)

# .env(読み込みは任意)
APP_NAME=My FastAPI with Celery
REDIS_URL=redis://redis:6379/0
CELERY_BROKER_URL=redis://redis:6379/1
CELERY_RESULT_BACKEND=redis://redis:6379/2

メモ:ブローカー(キュー)・結果バックエンドは分けると運用が安定します。


3. 設定とアプリ本体(FastAPI)

3.1 設定(pydantic-settings)

# app/settings.py
from pydantic import Field
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    app_name: str = "My FastAPI with Celery"
    redis_url: str = "redis://redis:6379/0"
    celery_broker_url: str = "redis://redis:6379/1"
    celery_result_backend: str = "redis://redis:6379/2"
    log_level: str = "info"

    class Config:
        env_file = ".env"
        extra = "ignore"

def get_settings() -> Settings:
    return Settings()

3.2 Celeryアプリ & タスク定義

# app/tasks.py
from celery import Celery
from celery.utils.log import get_task_logger
from app.settings import get_settings
import time
import random

settings = get_settings()
celery_app = Celery(
    "my_celery_app",
    broker=settings.celery_broker_url,
    backend=settings.celery_result_backend,
)
logger = get_task_logger(__name__)

# 例:重い計算・外部API呼び出しを想定
@celery_app.task(
    bind=True,                      # self(タスクコンテキスト)を使える
    autoretry_for=(Exception,),     # 例外時に自動リトライ
    retry_backoff=True,             # 指数バックオフ(2,4,8,...秒)
    retry_jitter=True,              # ジッタ(揺らぎ)でスパイク回避
    retry_kwargs={"max_retries": 5} # 最大リトライ回数
)
def long_task(self, payload: dict) -> dict:
    logger.info("Start long_task with payload=%s", payload)
    # 擬似的に不安定な外部I/O
    if random.random() < 0.2:
        raise RuntimeError("Transient error")
    time.sleep(5)  # 重い処理を仮定
    result = {"ok": True, "input": payload, "summary": "job-done"}
    logger.info("Finish long_task: %s", result)
    return result

3.3 FastAPIからジョブ投入・結果確認

# app/main.py
from fastapi import FastAPI, HTTPException
from app.settings import get_settings
from app.tasks import celery_app, long_task

app = FastAPI(title="FastAPI + Celery")

@app.get("/")
def root():
    return {"app": get_settings().app_name}

@app.post("/jobs", status_code=202)
def enqueue_job(payload: dict):
    # 1) 非同期でジョブ投入
    async_result = long_task.delay(payload)  # 直ちにIDが返る
    return {"task_id": async_result.id, "status": "queued"}

@app.get("/jobs/{task_id}")
def get_job_status(task_id: str):
    # 2) ステータス/結果の取得
    async_result = celery_app.AsyncResult(task_id)
    if async_result.state == "PENDING":
        return {"state": "PENDING"}
    if async_result.state == "FAILURE":
        # 失敗内容(本番では詳細を隠すルールを決めておく)
        return {"state": "FAILURE", "error": str(async_result.info)}
    if async_result.state == "SUCCESS":
        return {"state": "SUCCESS", "result": async_result.result}
    # 他:RETRY, STARTED など
    return {"state": async_result.state}

要点まとめ

  • APIは即時202task_idだけ返す
  • クライアントはポーリングまたはWebSocket/通知で進捗取得
  • 失敗時は自動リトライ(指数バックオフ+ジッタ)を標準化

4. Dockerfile & Composeで“いつでもどこでも”起動

4.1 Dockerfile(アプリ+ワーカー共用)

# Dockerfile
FROM python:3.11-slim

ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1 \
    PIP_NO_CACHE_DIR=1

RUN useradd -m appuser
WORKDIR /app

COPY requirements.txt /app/
RUN pip install --upgrade pip && pip install -r requirements.txt

COPY app /app/app
COPY worker/worker.sh /app/worker.sh
COPY beat/beat.sh /app/beat.sh
RUN chmod +x /app/*.sh && chown -R appuser:appuser /app
USER appuser

EXPOSE 8000
CMD ["bash","-lc","uvicorn app.main:app --host 0.0.0.0 --port 8000"]

4.2 docker-compose.yml

version: "3.9"
services:
  redis:
    image: redis:7-alpine
    ports: ["6379:6379"]
    restart: unless-stopped

  app:
    build: .
    image: fastapi-celery:latest
    environment:
      APP_NAME: "My FastAPI with Celery"
      CELERY_BROKER_URL: "redis://redis:6379/1"
      CELERY_RESULT_BACKEND: "redis://redis:6379/2"
    depends_on: [redis]
    ports: ["8000:8000"]
    restart: unless-stopped

  worker:
    image: fastapi-celery:latest
    command: ["bash","/app/worker.sh"]
    environment:
      CELERY_BROKER_URL: "redis://redis:6379/1"
      CELERY_RESULT_BACKEND: "redis://redis:6379/2"
    depends_on: [redis]
    restart: unless-stopped

  beat:
    image: fastapi-celery:latest
    command: ["bash","/app/beat.sh"]
    environment:
      CELERY_BROKER_URL: "redis://redis:6379/1"
      CELERY_RESULT_BACKEND: "redis://redis:6379/2"
    depends_on: [redis]
    restart: unless-stopped

4.3 ワーカー/ビート起動スクリプト

# worker/worker.sh
#!/usr/bin/env bash
set -euo pipefail
exec celery -A app.tasks.celery_app worker --loglevel=INFO --concurrency=2
# beat/beat.sh
#!/usr/bin/env bash
set -euo pipefail
exec celery -A app.tasks.celery_app beat --loglevel=INFO

要点まとめ

  • app(API)とworker(実行)を分離 → スケール粒度が細かくなる
  • beat(スケジューラ)は任意。定期実行が必要なら起動

5. スケジュール実行(Celery Beat)と時刻指定

5.1 定期実行(毎朝6時)

# app/tasks.py(追記)
from celery.schedules import crontab

celery_app.conf.beat_schedule = {
    "daily-report-6am": {
        "task": "app.tasks.long_task",
        "schedule": crontab(hour=6, minute=0),  # 毎日6:00
        "args": ({"kind": "daily-report"},),
    }
}

5.2 ETA・カウントダウン

# 10分後に実行
async_result = long_task.apply_async(args=({"uid": 123},), countdown=600)

# 指定時刻に実行(UTC基準)
from datetime import datetime, timedelta, timezone
eta = datetime.now(timezone.utc) + timedelta(minutes=30)
async_result = long_task.apply_async(args=({"uid": 123},), eta=eta)

要点まとめ

  • Beatで定期実行、ETA/カウントダウンで遅延実行
  • タイムゾーンはUTCで統一するとトラブルが少ない

6. 冪等性と重複実行対策(実運用のキモ)

非同期処理は同じタスクが複数回実行される可能性があります(リトライ・ネットワーク断・クライアント再送)。
対策の基本は**冪等性(同じ入力なら結果も同じ)**です。

6.1 冪等キー×Redisロック

# app/tasks.py(冪等キー対応の例)
import redis
r = redis.Redis.from_url(settings.redis_url)

@celery_app.task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_jitter=True)
def import_user(self, user_id: int):
    lock_key = f"lock:import_user:{user_id}"
    # SETNXでロック(有効期限は60秒)
    if not r.set(lock_key, "1", nx=True, ex=60):
        return {"skipped": True, "reason": "duplicate"}
    try:
        # ここでユーザー取り込み(外部API→DB)
        ...
        return {"ok": True}
    finally:
        r.delete(lock_key)

6.2 受信側の重複排除が基本

  • クライアント(API呼び出し側)に冪等キー(ヘッダやペイロード)を持たせ、サーバ側で見たことあるか判定。
  • DB更新はアップサート一意制約二重登録を防止。

要点まとめ

  • ロック一意制約アップサート二重実行に強く
  • 外部API呼び出しは再送に耐える設計にする

7. 進捗通知と部分結果(ユーザー体験アップ)

7.1 タスク内でのプログレス更新

# タスク内で状態を送る(バックエンドが必要)
self.update_state(state="PROGRESS", meta={"percent": 30})

クライアントは /jobs/{id}state="PROGRESS"meta を取得できます。

7.2 WebSocket/通知連携(設計ヒント)

  • WebSockettask_id単位の部屋(ルーム)に入れて、update_stateのたびに通知
  • 通知ハブ:Redis Pub/Subやメッセージブローカー経由でフロントにpush
  • 大規模化:通知は別サービス(通知マイクロサービス)に分離。

要点まとめ

  • update_stateプログレスを出し、UXを高める
  • WebSocket通知でポーリング削減、体感速度UP

8. DBセッションの扱い(requestスコープを持ち込まない)

タスクはAPIリクエストの外側で走ります。リクエストスコープのDBセッションを使い回すのはNG。

# app/db.py(例)
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# タスク用のセッションファクトリ(アプリと同設定でOK)
engine = create_engine("sqlite:///./app.db", connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
# app/tasks.py(DBを使うタスク例)
from app.db import SessionLocal

@celery_app.task
def do_db_work(item_id: int):
    db = SessionLocal()
    try:
        # ここでクエリ・更新
        ...
        db.commit()
    except:
        db.rollback()
        raise
    finally:
        db.close()

要点まとめ

  • タスクは自分のセッション自分で開いて閉じる
  • 例外時はロールバック、finallyで必ずクローズ

9. 監視:Flowerで見える化(タスク一覧・成功率・所要時間)

FlowerはCelery用の監視ツール。タスクの状態・履歴・実行時間がダッシュボードで確認できます。

9.1 導入(サービス追加例)

  flower:
    image: mher/flower:latest
    command: ["flower", "--broker=redis://redis:6379/1", "--port=5555"]
    ports: ["5555:5555"]
    depends_on: [redis]
    restart: unless-stopped

注意:本番では認証ネットワーク制限を必ず設定。ログやメタ情報の扱いにも配慮します。

要点まとめ

  • Flowerで可視化運用便利機能(再実行など)
  • 外部公開する際はアクセス制御を厳格に

10. セキュリティ実務(入力検証・署名・可視化制限)

  1. 入力検証:FastAPI側でスキーマ検証(Pydantic)を徹底。タスクに渡すデータは最小限
  2. 署名:ジョブ投入APIに認証/認可を必須化。内部からのみ発火させたい場合はネットワーク境界で制御。
  3. 情報漏えい/jobs/{id} でのエラー詳細はマスク。内部用にはログで追えるよう構造化ログに。
  4. Flower公開:必ず認証or VPN配下
  5. リソース制限:ワーカーにコンカレンシーメモリ上限。巨大入力はサイズ上限(API/Nginx)で制御。

要点まとめ

  • 最小権限・最小情報の原則
  • 監視UIの外部公開は原則禁止(必要なら認証)

11. テスト戦略(落ちない非同期を育てる)

11.1 単体テスト:関数としてのタスク

def test_long_task_unit(mocker):
    from app.tasks import long_task
    mocker.patch("time.sleep")  # 待機をスキップ
    result = long_task.run({"x": 1})  # run()で同期実行
    assert result["ok"] is True

11.2 統合テスト:エンドツーエンド

  • Redisをテスト用に起動し、delay()AsyncResult.get(timeout=...)実際に回す
  • ただし遅くなるため、重要なフローだけに限定。その他はモックで代替。

11.3 リトライ/バックオフ検証

  • autoretry_forをテストでは無効化し、例外が上がることを確認。
  • 逆にE2Eでは実際にリトライが走ることをログで検証。

要点まとめ

  • 単体はrun()で速く、E2Eは最小限で確実に
  • リトライは両面(有効/無効)で検証

12. パフォーマンス/スケールの目安

  • ワーカー並列度(--concurrency:CPU×2 + 1 を起点。I/O待ちが多いなら増やす。
  • キュー分離:重い処理・軽い処理でキューを分けるqueue="heavy" など)。
  • 優先度:キュー優先度や別ワーカーSLAを守る。
  • 事前ウォームアップ:大量ジョブの前にコネクション/キャッシュを温める。
  • バックプレッシャ:キュー長が閾値超過で受け付け制限またはGraceful Degradation(解像度落とし等)。

要点まとめ

  • キュー分離と並列度調整でピークをさばく
  • 受け付け側(API)にも制御を入れて守る

13. 失敗パターンと回避策(よくある落とし穴)

症状 原因 回避策
タスクが二重に動く リトライ/再送/重複投入 冪等キーロック一意制約アップサート
セッション閉じ忘れ タスク内でDBセッションを共有 自前で開閉・例外でrollback・finallyでclose
度重なる外部API失敗 スロットリング未対応 指数バックオフ+ジッタレート制御キャッシュ
Flowerが外から丸見え 認証無しで公開 認証/Firewall/VPN・非公開前提
進捗が取れない update_state未使用/Backend無し 結果バックエンドを設定・update_statemeta送信

14. 応用トピック(必要に応じて段階的に)

  • 結果の期限:Redisバックエンドの結果TTLを設定してメモリ節約。
  • チェイン/グループ:複数タスクを順次/並列に組み合わせる(chain, group)。
  • キャンセルrevoke(task_id, terminate=True)強制停止(副作用に注意)。
  • 署名付きタスク:共通引数を持つシグネチャlong_task.s(payload))でパイプライン化。
  • 複数ブローカー:冗長化や用途別分離(通知系/重計算系)で運用を安定化。

15. サンプル:一気通貫の最小起動手順(再掲)

# 1) 依存インストール(requirements.txt 例)
# fastapi uvicorn[standard] celery redis pydantic pydantic-settings

# 2) コンテナ起動
docker compose up -d --build

# 3) ジョブ投入
curl -X POST http://localhost:8000/jobs -H "Content-Type: application/json" \
  -d '{"user_id": 123, "action": "report"}'
# => {"task_id":"<id>","status":"queued"}

# 4) ステータス確認
curl http://localhost:8000/jobs/<id>
# => PENDING / PROGRESS / SUCCESS / FAILURE

16. 読者別インパクト(より丁寧に)

  • 個人開発者:APIが軽くなり、失敗時の再試行も自動。開発〜デプロイがシンプルに。
  • 小規模チームキュー分離ワーカー増減で負荷を吸収。定期実行で手作業バッチがゼロに。
  • 成長中SaaS冪等性監視権限を標準化し、障害時の復旧時間を短縮。SLAを守りやすくなる。

17. まとめ(今日から“重い処理は外に”♡)

  • FastAPIのレスポンスは軽く速く、重い処理はCelery×Redisへ。
  • 非同期実行・再試行・スケジュール・可視化が最小コストで手に入ります。
  • 実運用では冪等性DBセッションの自前管理Flowerの非公開を徹底。
  • まずはこの記事の雛形を動かし、徐々にキュー分離進捗通知Backoff監視を足していきましょう。
    わたしもずっと応援しています。小さく始めて、着実に“壊れない”非同期基盤を育てていきましょうね♡

付録A:サンプルrequirements.txt

fastapi
uvicorn[standard]
celery
redis
pydantic
pydantic-settings

投稿者 greeden

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

日本語が含まれない投稿は無視されますのでご注意ください。(スパム対策)