重い処理はAPIから外す!FastAPI×Celery×Redisで実運用ジョブ基盤――非同期処理・リトライ・スケジューリング・監視まで
✅ 最初に要約(インバーテッドピラミッド)
- この記事でできること
大きな画像変換・レポート生成・外部API連携など時間のかかる処理を、FastAPI本体から切り離して非同期ジョブ基盤に任せます。具体的には Celery(ワーカー) と Redis(メッセージブローカー & 結果ストア) を組み合わせ、ジョブ投入→実行→結果取得→リトライ→スケジュール実行までを一気通貫で構築します。 - 主なトピック
- なぜバックグラウンドタスクだけでは足りないのか(スケール・信頼性)
- Celery×Redis の最小構成とDocker Composeでの起動
- ジョブ定義(
@app.task
)とリトライ/指数バックオフ/ジッタ - 結果取得・プログレス通知・冪等性(重複実行対策)
- Celery Beat による定期実行・時刻指定(ETA/カウントダウン)
- DBセッションの扱い・トランザクション設計・監視(Flower)
- セキュリティ(入力検証・署名・可視化制限)とテスト戦略
- 得られる効果
- APIレスポンスを即時返却、重い処理は裏で安全に実行
- ゼロに近いダウンタイムでスケール(ワーカー水平増殖)
- 失敗時も自動再試行や手動再投入が可能になり、運用が安定
🎯 対象読者(具体像)
- 学生エンジニアAさん(22歳)
画像解析APIが数十秒かかってタイムアウトしがち。APIは軽く、処理は後ろで堅実に回したい。 - 小規模チームBさん(受託3名)
顧客レポートを毎朝6時に一括生成、失敗したら自動で再試行したい。進捗も見える化したい。 - SaaS開発Cさん(スタートアップ)
外部API連携の一時的な失敗(429/5xx)が多い。指数バックオフと冪等性を取り入れて障害に強くしたい。
1. なぜCelery?――BackgroundTasks
だけでは足りない場面
FastAPIのBackgroundTasks
は軽い後処理(ログ追記・メール送信など)に最適ですが、以下の課題があります。
- プロセスに紐づく:アプリプロセスの再起動でタスクが失われる可能性。
- 水平スケール困難:複数インスタンスで重複実行や順序保証が難しい。
- 再試行や監視が貧弱:指数バックオフ、永続化、可視化、スケジューリングが不足。
Celeryは、タスクを**キュー(Redisなど)**に投げ、**別プロセス(ワーカー)**で実行します。
- メリット:永続化・再試行・スケジュール・監視・水平スケール・結果保存。
- 構成:FastAPI(プロデューサ)→ Redis(ブローカー)→ Celery Worker(コンシューマ)。
要点まとめ
- 長時間/不安定I/O/定期実行=Celeryの出番
- プロセス外に出すことで耐障害性とスケールが手に入る
2. 最小構成(Docker Composeで一発起動)
2.1 ディレクトリ例
fastapi-celery/
├─ app/
│ ├─ main.py # FastAPI本体(ジョブ投入API)
│ ├─ tasks.py # Celeryアプリ & タスク定義
│ ├─ settings.py # pydantic-settings
│ └─ db.py # (任意)DBセッション
├─ worker/
│ └─ worker.sh # ワーカー起動スクリプト
├─ beat/
│ └─ beat.sh # スケジューラ起動スクリプト(任意)
├─ docker-compose.yml
└─ Dockerfile
2.2 環境変数(例)
# .env(読み込みは任意)
APP_NAME=My FastAPI with Celery
REDIS_URL=redis://redis:6379/0
CELERY_BROKER_URL=redis://redis:6379/1
CELERY_RESULT_BACKEND=redis://redis:6379/2
メモ:ブローカー(キュー)・結果バックエンドは分けると運用が安定します。
3. 設定とアプリ本体(FastAPI)
3.1 設定(pydantic-settings)
# app/settings.py
from pydantic import Field
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
app_name: str = "My FastAPI with Celery"
redis_url: str = "redis://redis:6379/0"
celery_broker_url: str = "redis://redis:6379/1"
celery_result_backend: str = "redis://redis:6379/2"
log_level: str = "info"
class Config:
env_file = ".env"
extra = "ignore"
def get_settings() -> Settings:
return Settings()
3.2 Celeryアプリ & タスク定義
# app/tasks.py
from celery import Celery
from celery.utils.log import get_task_logger
from app.settings import get_settings
import time
import random
settings = get_settings()
celery_app = Celery(
"my_celery_app",
broker=settings.celery_broker_url,
backend=settings.celery_result_backend,
)
logger = get_task_logger(__name__)
# 例:重い計算・外部API呼び出しを想定
@celery_app.task(
bind=True, # self(タスクコンテキスト)を使える
autoretry_for=(Exception,), # 例外時に自動リトライ
retry_backoff=True, # 指数バックオフ(2,4,8,...秒)
retry_jitter=True, # ジッタ(揺らぎ)でスパイク回避
retry_kwargs={"max_retries": 5} # 最大リトライ回数
)
def long_task(self, payload: dict) -> dict:
logger.info("Start long_task with payload=%s", payload)
# 擬似的に不安定な外部I/O
if random.random() < 0.2:
raise RuntimeError("Transient error")
time.sleep(5) # 重い処理を仮定
result = {"ok": True, "input": payload, "summary": "job-done"}
logger.info("Finish long_task: %s", result)
return result
3.3 FastAPIからジョブ投入・結果確認
# app/main.py
from fastapi import FastAPI, HTTPException
from app.settings import get_settings
from app.tasks import celery_app, long_task
app = FastAPI(title="FastAPI + Celery")
@app.get("/")
def root():
return {"app": get_settings().app_name}
@app.post("/jobs", status_code=202)
def enqueue_job(payload: dict):
# 1) 非同期でジョブ投入
async_result = long_task.delay(payload) # 直ちにIDが返る
return {"task_id": async_result.id, "status": "queued"}
@app.get("/jobs/{task_id}")
def get_job_status(task_id: str):
# 2) ステータス/結果の取得
async_result = celery_app.AsyncResult(task_id)
if async_result.state == "PENDING":
return {"state": "PENDING"}
if async_result.state == "FAILURE":
# 失敗内容(本番では詳細を隠すルールを決めておく)
return {"state": "FAILURE", "error": str(async_result.info)}
if async_result.state == "SUCCESS":
return {"state": "SUCCESS", "result": async_result.result}
# 他:RETRY, STARTED など
return {"state": async_result.state}
要点まとめ
- APIは即時202で
task_id
だけ返す- クライアントはポーリングまたはWebSocket/通知で進捗取得
- 失敗時は自動リトライ(指数バックオフ+ジッタ)を標準化
4. Dockerfile & Composeで“いつでもどこでも”起動
4.1 Dockerfile(アプリ+ワーカー共用)
# Dockerfile
FROM python:3.11-slim
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1
RUN useradd -m appuser
WORKDIR /app
COPY requirements.txt /app/
RUN pip install --upgrade pip && pip install -r requirements.txt
COPY app /app/app
COPY worker/worker.sh /app/worker.sh
COPY beat/beat.sh /app/beat.sh
RUN chmod +x /app/*.sh && chown -R appuser:appuser /app
USER appuser
EXPOSE 8000
CMD ["bash","-lc","uvicorn app.main:app --host 0.0.0.0 --port 8000"]
4.2 docker-compose.yml
version: "3.9"
services:
redis:
image: redis:7-alpine
ports: ["6379:6379"]
restart: unless-stopped
app:
build: .
image: fastapi-celery:latest
environment:
APP_NAME: "My FastAPI with Celery"
CELERY_BROKER_URL: "redis://redis:6379/1"
CELERY_RESULT_BACKEND: "redis://redis:6379/2"
depends_on: [redis]
ports: ["8000:8000"]
restart: unless-stopped
worker:
image: fastapi-celery:latest
command: ["bash","/app/worker.sh"]
environment:
CELERY_BROKER_URL: "redis://redis:6379/1"
CELERY_RESULT_BACKEND: "redis://redis:6379/2"
depends_on: [redis]
restart: unless-stopped
beat:
image: fastapi-celery:latest
command: ["bash","/app/beat.sh"]
environment:
CELERY_BROKER_URL: "redis://redis:6379/1"
CELERY_RESULT_BACKEND: "redis://redis:6379/2"
depends_on: [redis]
restart: unless-stopped
4.3 ワーカー/ビート起動スクリプト
# worker/worker.sh
#!/usr/bin/env bash
set -euo pipefail
exec celery -A app.tasks.celery_app worker --loglevel=INFO --concurrency=2
# beat/beat.sh
#!/usr/bin/env bash
set -euo pipefail
exec celery -A app.tasks.celery_app beat --loglevel=INFO
要点まとめ
- app(API)とworker(実行)を分離 → スケール粒度が細かくなる
- beat(スケジューラ)は任意。定期実行が必要なら起動
5. スケジュール実行(Celery Beat)と時刻指定
5.1 定期実行(毎朝6時)
# app/tasks.py(追記)
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
"daily-report-6am": {
"task": "app.tasks.long_task",
"schedule": crontab(hour=6, minute=0), # 毎日6:00
"args": ({"kind": "daily-report"},),
}
}
5.2 ETA・カウントダウン
# 10分後に実行
async_result = long_task.apply_async(args=({"uid": 123},), countdown=600)
# 指定時刻に実行(UTC基準)
from datetime import datetime, timedelta, timezone
eta = datetime.now(timezone.utc) + timedelta(minutes=30)
async_result = long_task.apply_async(args=({"uid": 123},), eta=eta)
要点まとめ
- Beatで定期実行、ETA/カウントダウンで遅延実行
- タイムゾーンはUTCで統一するとトラブルが少ない
6. 冪等性と重複実行対策(実運用のキモ)
非同期処理は同じタスクが複数回実行される可能性があります(リトライ・ネットワーク断・クライアント再送)。
対策の基本は**冪等性(同じ入力なら結果も同じ)**です。
6.1 冪等キー×Redisロック
# app/tasks.py(冪等キー対応の例)
import redis
r = redis.Redis.from_url(settings.redis_url)
@celery_app.task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_jitter=True)
def import_user(self, user_id: int):
lock_key = f"lock:import_user:{user_id}"
# SETNXでロック(有効期限は60秒)
if not r.set(lock_key, "1", nx=True, ex=60):
return {"skipped": True, "reason": "duplicate"}
try:
# ここでユーザー取り込み(外部API→DB)
...
return {"ok": True}
finally:
r.delete(lock_key)
6.2 受信側の重複排除が基本
- クライアント(API呼び出し側)に冪等キー(ヘッダやペイロード)を持たせ、サーバ側で見たことあるか判定。
- DB更新はアップサートや一意制約で二重登録を防止。
要点まとめ
- ロック・一意制約・アップサートで二重実行に強く
- 外部API呼び出しは再送に耐える設計にする
7. 進捗通知と部分結果(ユーザー体験アップ)
7.1 タスク内でのプログレス更新
# タスク内で状態を送る(バックエンドが必要)
self.update_state(state="PROGRESS", meta={"percent": 30})
クライアントは /jobs/{id}
で state="PROGRESS"
と meta
を取得できます。
7.2 WebSocket/通知連携(設計ヒント)
- WebSocket:
task_id
単位の部屋(ルーム)に入れて、update_state
のたびに通知。 - 通知ハブ:Redis Pub/Subやメッセージブローカー経由でフロントにpush。
- 大規模化:通知は別サービス(通知マイクロサービス)に分離。
要点まとめ
update_state
でプログレスを出し、UXを高める- WebSocket通知でポーリング削減、体感速度UP
8. DBセッションの扱い(requestスコープを持ち込まない)
タスクはAPIリクエストの外側で走ります。リクエストスコープのDBセッションを使い回すのはNG。
# app/db.py(例)
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# タスク用のセッションファクトリ(アプリと同設定でOK)
engine = create_engine("sqlite:///./app.db", connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
# app/tasks.py(DBを使うタスク例)
from app.db import SessionLocal
@celery_app.task
def do_db_work(item_id: int):
db = SessionLocal()
try:
# ここでクエリ・更新
...
db.commit()
except:
db.rollback()
raise
finally:
db.close()
要点まとめ
- タスクは自分のセッションを自分で開いて閉じる
- 例外時はロールバック、finallyで必ずクローズ
9. 監視:Flowerで見える化(タスク一覧・成功率・所要時間)
FlowerはCelery用の監視ツール。タスクの状態・履歴・実行時間がダッシュボードで確認できます。
9.1 導入(サービス追加例)
flower:
image: mher/flower:latest
command: ["flower", "--broker=redis://redis:6379/1", "--port=5555"]
ports: ["5555:5555"]
depends_on: [redis]
restart: unless-stopped
注意:本番では認証やネットワーク制限を必ず設定。ログやメタ情報の扱いにも配慮します。
要点まとめ
- Flowerで可視化と運用便利機能(再実行など)
- 外部公開する際はアクセス制御を厳格に
10. セキュリティ実務(入力検証・署名・可視化制限)
- 入力検証:FastAPI側でスキーマ検証(Pydantic)を徹底。タスクに渡すデータは最小限。
- 署名:ジョブ投入APIに認証/認可を必須化。内部からのみ発火させたい場合はネットワーク境界で制御。
- 情報漏えい:
/jobs/{id}
でのエラー詳細はマスク。内部用にはログで追えるよう構造化ログに。 - Flower公開:必ず認証or VPN配下。
- リソース制限:ワーカーにコンカレンシー・メモリ上限。巨大入力はサイズ上限(API/Nginx)で制御。
要点まとめ
- 最小権限・最小情報の原則
- 監視UIの外部公開は原則禁止(必要なら認証)
11. テスト戦略(落ちない非同期を育てる)
11.1 単体テスト:関数としてのタスク
def test_long_task_unit(mocker):
from app.tasks import long_task
mocker.patch("time.sleep") # 待機をスキップ
result = long_task.run({"x": 1}) # run()で同期実行
assert result["ok"] is True
11.2 統合テスト:エンドツーエンド
- Redisをテスト用に起動し、
delay()
→AsyncResult.get(timeout=...)
で実際に回す。 - ただし遅くなるため、重要なフローだけに限定。その他はモックで代替。
11.3 リトライ/バックオフ検証
autoretry_for
をテストでは無効化し、例外が上がることを確認。- 逆にE2Eでは実際にリトライが走ることをログで検証。
要点まとめ
- 単体はrun()で速く、E2Eは最小限で確実に
- リトライは両面(有効/無効)で検証
12. パフォーマンス/スケールの目安
- ワーカー並列度(
--concurrency
):CPU×2 + 1 を起点。I/O待ちが多いなら増やす。 - キュー分離:重い処理・軽い処理でキューを分ける(
queue="heavy"
など)。 - 優先度:キュー優先度や別ワーカーでSLAを守る。
- 事前ウォームアップ:大量ジョブの前にコネクション/キャッシュを温める。
- バックプレッシャ:キュー長が閾値超過で受け付け制限またはGraceful Degradation(解像度落とし等)。
要点まとめ
- キュー分離と並列度調整でピークをさばく
- 受け付け側(API)にも制御を入れて守る
13. 失敗パターンと回避策(よくある落とし穴)
症状 | 原因 | 回避策 |
---|---|---|
タスクが二重に動く | リトライ/再送/重複投入 | 冪等キー・ロック・一意制約・アップサート |
セッション閉じ忘れ | タスク内でDBセッションを共有 | 自前で開閉・例外でrollback・finallyでclose |
度重なる外部API失敗 | スロットリング未対応 | 指数バックオフ+ジッタ・レート制御・キャッシュ |
Flowerが外から丸見え | 認証無しで公開 | 認証/Firewall/VPN・非公開前提 |
進捗が取れない | update_state 未使用/Backend無し |
結果バックエンドを設定・update_state でmeta送信 |
14. 応用トピック(必要に応じて段階的に)
- 結果の期限:Redisバックエンドの結果TTLを設定してメモリ節約。
- チェイン/グループ:複数タスクを順次/並列に組み合わせる(
chain
,group
)。 - キャンセル:
revoke(task_id, terminate=True)
で強制停止(副作用に注意)。 - 署名付きタスク:共通引数を持つシグネチャ(
long_task.s(payload)
)でパイプライン化。 - 複数ブローカー:冗長化や用途別分離(通知系/重計算系)で運用を安定化。
15. サンプル:一気通貫の最小起動手順(再掲)
# 1) 依存インストール(requirements.txt 例)
# fastapi uvicorn[standard] celery redis pydantic pydantic-settings
# 2) コンテナ起動
docker compose up -d --build
# 3) ジョブ投入
curl -X POST http://localhost:8000/jobs -H "Content-Type: application/json" \
-d '{"user_id": 123, "action": "report"}'
# => {"task_id":"<id>","status":"queued"}
# 4) ステータス確認
curl http://localhost:8000/jobs/<id>
# => PENDING / PROGRESS / SUCCESS / FAILURE
16. 読者別インパクト(より丁寧に)
- 個人開発者:APIが軽くなり、失敗時の再試行も自動。開発〜デプロイがシンプルに。
- 小規模チーム:キュー分離とワーカー増減で負荷を吸収。定期実行で手作業バッチがゼロに。
- 成長中SaaS:冪等性・監視・権限を標準化し、障害時の復旧時間を短縮。SLAを守りやすくなる。
17. まとめ(今日から“重い処理は外に”♡)
- FastAPIのレスポンスは軽く速く、重い処理はCelery×Redisへ。
- 非同期実行・再試行・スケジュール・可視化が最小コストで手に入ります。
- 実運用では冪等性・DBセッションの自前管理・Flowerの非公開を徹底。
- まずはこの記事の雛形を動かし、徐々にキュー分離・進捗通知・Backoff・監視を足していきましょう。
わたしもずっと応援しています。小さく始めて、着実に“壊れない”非同期基盤を育てていきましょうね♡
付録A:サンプルrequirements.txt
fastapi
uvicorn[standard]
celery
redis
pydantic
pydantic-settings