green snake
Photo by Pixabay on Pexels.com
目次

FastAPIで実務的なバックグラウンド処理:BackgroundTasksとCeleryでつくるジョブキュー設計ガイド


要約(最初にざっくり全体像)

  • ユーザーを待たせたくない処理(メール送信・画像処理・レポート生成など)は、HTTPレスポンスとは別にバックグラウンドで処理するのが基本方針です。
  • FastAPI標準の BackgroundTasks は「同じプロセス内でレスポンス後にちょっとした処理を動かす」のに向いています。
  • 一方、重い処理や時間のかかるバッチ、再試行(リトライ)やキュー管理が必要な場合は、分散タスクキュー Celery を組み合わせると安定します。
  • 本記事では、BackgroundTasks と Celery の両方を FastAPI に組み込み、ユースケースごとに「どちらを選べばいいか」を具体的に整理します。
  • 最後に、テストや監視、再試行、スケールなど運用面での注意点・パターンをまとめ、段階的な導入ロードマップもご用意します。

誰が読んで得をするか(具体的なイメージ)

  • 個人開発・学習者の方
    「ファイルアップロードのあとにサムネイルを作りたいけれど、レスポンスはすぐ返したい」「メール送信のたびにレスポンスが遅くなる…」と悩んでいる。
    BackgroundTasks で簡単なバックグラウンド処理を実装し、どこまでならこれで十分なのかが分かります。

  • 小規模チームのバックエンドエンジニアさん
    PDFレポート生成や外部API連携など、時間のかかる処理が増え、APIレスポンスと混ざってつらくなってきた。
    → Celery を FastAPI と組み合わせ、ジョブキュー+ワーカー構成に移行するイメージがつかめます。

  • SaaS開発チーム・スタートアップの方
    1日に何万件ものメール・通知・バッチ処理があり、適切な再試行や監視が必要になってきた。
    → Celery の基本構成・監視(Flowerなど)・リトライ設計・スケーリングの考え方をざっくり掴めます。


アクセシビリティ評価(読みやすさ・配慮)

  • 情報構造
    まず概念と違いをざっくり説明し、そのあとに BackgroundTasks → Celery の順で深掘りし、最後に比較表とロードマップを置く「逆三角形」の構成にしています。
  • 用語と文体
    専門用語は初出で短く説明し、その後は用語を統一しています。文体はです・ます調で、柔らかめだけれど落ち着いたトーンを心がけています。
  • コードブロック
    コードはすべて固定幅ブロックで示し、コメントは短く整理しました。視線が迷わないように空行も多めです。
  • 想定読者
    少しPythonやFastAPIに触れたことがある方を前提にしつつ、段階的に読めるようセクションを独立させています。

全体として、多様な読者が読み進めやすい AA 相当のアクセシビリティを意識しています。


1. なぜバックグラウンド処理が必要なのか

まず、「そもそもなぜバックグラウンド処理が必要なのか」を整理しておきますね。

1.1 HTTPリクエストと「待ち時間」の問題

通常、FastAPIのエンドポイントは次のような流れで動きます。

  1. クライアントがリクエストを送る
  2. サーバ側で処理する
  3. 処理がおわったらレスポンスを返す

ここで、もし重い処理が混ざると…

  • ユーザーが秒単位で待たされる
  • Webブラウザ側でリクエストタイムアウトになる
  • 同時アクセスが増えたときにワーカーが詰まってしまう

といった問題を引き起こします。

1.2 バックグラウンド処理が向いているもの

代表的な例を挙げると、次のような処理はバックグラウンドに追い出してしまうと幸福度が上がります。

  • メール通知、Slack通知、プッシュ通知の送信
  • 画像のサムネイル生成、PDF生成、動画のトランスコード
  • 大きなCSV・Excelの集計やレポート作成
  • 外部APIへの複数回の問い合わせ(レート制限に配慮しつつ順次実行)
  • 大量データのインポート・エクスポート

ユーザーは「処理を受け付けました」という結果さえ早く返ってくればよく、それ以外は裏側でじっくり進めてよいケースがほとんどです。


2. FastAPI標準の BackgroundTasks を使ってみる

まずは FastAPI が標準で提供している BackgroundTasks から見ていきます。これは「レスポンスを返したあとに、同じプロセス内でちょっとした処理を動かす仕組み」です。

2.1 いちばんシンプルな例:メール通知

# app/main.py
from fastapi import FastAPI, BackgroundTasks

app = FastAPI(title="BackgroundTasks sample")

def send_email(to: str, subject: str, body: str) -> None:
    # 実際にはメールサーバへの接続・送信処理を書く
    print(f"[MAIL] to={to}, subject={subject}, body={body}")

@app.post("/users/{user_id}/welcome")
async def send_welcome(user_id: int, background_tasks: BackgroundTasks):
    # ここではユーザー取得などは省略
    email = f"user{user_id}@example.com"

    # レスポンス送信後にメールを送るよう登録
    background_tasks.add_task(
        send_email,
        to=email,
        subject="ようこそ!",
        body="ご登録ありがとうございます。",
    )
    return {"status": "ok", "message": "登録を受け付けました"}

このエンドポイントでは、

  • HTTPレスポンスはすぐに返す
  • そのあと、同じプロセス内で send_email 関数が呼ばれる

という流れになります。

2.2 非同期関数も登録できる

BackgroundTasks には def 関数だけでなく async def 関数も登録できます。

import httpx
from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

async def notify_webhook(payload: dict) -> None:
    async with httpx.AsyncClient(timeout=5.0) as client:
        await client.post("https://example.com/webhook", json=payload)

@app.post("/events")
async def create_event(background_tasks: BackgroundTasks):
    # 何かイベントを保存したと仮定
    background_tasks.add_task(notify_webhook, {"type": "created"})
    return {"status": "accepted"}

非同期関数を登録した場合も、実行タイミングは「レスポンス送信後」ですが、FastAPI / Starlette 側がイベントループでうまく処理してくれます。

2.3 仕組みをざっくり理解しておく

BackgroundTasks は、あくまで「同じアプリプロセス内で、レスポンス後に実行する関数のリスト」と思っておくと分かりやすいです。

  • 別プロセスや別マシンには飛びません
  • キューやリトライはありません
  • ワーカーが死んだらタスクも消えます

ですので、「少し時間のかかる軽い処理を、レスポンスのあとにちょっと回したい」という用途に向いています。


3. BackgroundTasks の設計ポイントと限界

便利な BackgroundTasks ですが、向き・不向きがあります。ここを整理しておくと、どこからCeleryのような「本気のジョブキュー」に切り替えるべきか判断しやすくなります。

3.1 向いているケース

  • 実行時間が数秒〜十数秒程度までの軽い処理
  • 実行回数がそこまで多くない(1分あたり数十〜数百程度)
  • REST API本体と同じインスタンス数で処理して問題ない
  • 処理が失敗しても致命ではない、もしくはログだけで十分

具体例:

  • ユーザー登録後の1通だけのメール送信
  • 軽いログ送信、監査レコードの追加
  • 小さい画像のサムネイル生成

3.2 向いていないケース(注意したいところ)

  • 大量のタスクをさばく必要がある(例:数万〜数十万単位)
  • 数分〜数十分かかるような重い処理
  • 再試行(リトライ)や遅延実行、スケジュール実行が必要
  • タスクの状態(成功・失敗・進捗)を後から参照したい

なぜかというと、

  • プロセスが落ちればタスクも消える
  • 実行中のタスク数が増えると、FastAPIのワーカーが圧迫される
  • 状態管理やリトライを自分で実装するのはかなり大変

だからです。

3.3 よくある落とし穴と対策

  1. CPUが重い処理を入れてしまう
    → イベントループをブロックしてしまい、他のリクエストが遅くなります。
    → CPUバウンドな処理は別プロセス(Celeryなど)に逃がすのが安全です。

  2. 例外処理をしていない
    → タスク内で例外が起きると、そのままログにだけ出て、呼び出し元からは見えにくいです。
    → タスク内でログをきちんと残し、必要なら通知を送るようにしておきましょう。

  3. 大量のタスクを突っ込んでしまう
    → レスポンスは返りますが、裏で処理しきれなくなり、メモリやCPUが徐々に圧迫されます。
    → 「このくらいの量ならOK」という上限を決め、本格的に重くなりそうなら Celery へ移行するタイミングです。


4. 分散タスクキュー Celery の基本

BackgroundTasks では物足りなくなったときに登場するのが Celery です。Celery は Python の代表的な分散タスクキューで、多くの実プロダクトで使われています。

4.1 Celeryの構成要素

Celeryはざっくり言うと、次の3つで動きます。

  1. Broker(ブローカー)
    タスクのキューを管理する役割。Redis・RabbitMQ などがよく使われます。

  2. Worker(ワーカー)
    ブローカーからタスクを取り出して実行するプロセス。複数台に分散できます。

  3. Result Backend(結果ストア)
    タスクの実行結果や状態を保存する場所。Redis・DBなどが利用できます。

FastAPIは「タスクを登録する入口」の役割を担い、実際の処理は別プロセス(Celeryワーカー)が行うイメージです。

4.2 典型的なディレクトリ構成

myapp/
├─ app/
│  ├─ main.py        # FastAPI
│  ├─ celery_app.py  # Celeryアプリ定義
│  ├─ tasks.py       # Celeryタスク定義
│  └─ ...
├─ docker-compose.yml
└─ requirements.txt

この構成で、FastAPIとCeleryワーカーを別コンテナとして起動することが多いです。


5. FastAPI + Celery の最小構成を組んでみる

ここから、シンプルな例で FastAPI と Celery の連携を体験してみますね。

5.1 Celeryアプリの定義

# app/celery_app.py
from celery import Celery

celery_app = Celery(
    "myapp",
    broker="redis://redis:6379/0",      # docker-compose前提の例
    backend="redis://redis:6379/1",
)

celery_app.conf.update(
    task_track_started=True,
    result_expires=3600,  # 1時間で結果を破棄
)

ここでは Redis をブローカー兼バックエンドにしています。

5.2 タスク定義

# app/tasks.py
import time
from app.celery_app import celery_app

@celery_app.task(name="tasks.long_add")
def long_add(x: int, y: int) -> int:
    # わざと時間のかかる処理に見立てる
    time.sleep(10)
    return x + y

@celery_app.task デコレータでタスクとして登録し、delayapply_async で呼び出せるようになります。

5.3 FastAPIからタスクを発行する

# app/main.py
from fastapi import FastAPI
from pydantic import BaseModel
from app.tasks import long_add

app = FastAPI(title="FastAPI + Celery sample")

class AddRequest(BaseModel):
    x: int
    y: int

@app.post("/jobs/add")
def enqueue_add(req: AddRequest):
    # Celeryタスクとして投入
    async_result = long_add.delay(req.x, req.y)
    return {"task_id": async_result.id}

@app.get("/jobs/{task_id}")
def get_result(task_id: str):
    result = long_add.AsyncResult(task_id)
    if not result.ready():
        return {"task_id": task_id, "status": result.status}
    if result.failed():
        return {"task_id": task_id, "status": "FAILURE"}
    return {
        "task_id": task_id,
        "status": "SUCCESS",
        "result": result.result,
    }

この例では、

  1. /jobs/add{"x": 1, "y": 2} をPOSTすると、タスクがキューに積まれ、タスクIDが返ってきます。
  2. /jobs/{task_id} をGETすると、進捗や結果を確認できます。

5.4 docker-composeでまとめて起動するイメージ

実務では、RedisやCeleryワーカーも含めて docker-compose でまとめて起動することが多いです。

# docker-compose.yml(イメージ)
version: "3.9"
services:
  api:
    build: .
    command: uvicorn app.main:app --host 0.0.0.0 --port 8000
    depends_on:
      - redis
    ports:
      - "8000:8000"

  worker:
    build: .
    command: celery -A app.celery_app.celery_app worker --loglevel=INFO
    depends_on:
      - redis

  redis:
    image: redis:7
    ports:
      - "6379:6379"

このように、FastAPIとCeleryワーカーを別コンテナ・別プロセスとして動かすことで、

  • APIレスポンスと重い処理を切り離せる
  • ワーカーを水平スケールしやすい

といったメリットが得られます。


6. BackgroundTasks と Celery の比較

ここまでで、FastAPI標準の BackgroundTasks と、分散タスクキューである Celery をざっと見てきました。ここで、一度整理のために比較表にしてみます。

観点 BackgroundTasks Celery
実行場所 FastAPIアプリと同じプロセス 別プロセス(ワーカー)、別ホストも可
依存コンポーネント なし(FastAPIのみ) Broker(Redis / RabbitMQ)、ワーカー
実行タイミング HTTPレスポンス送信後 キューに投入後、ワーカーが空き次第実行
リトライ・遅延実行 手作りが必要 標準でサポート(retry, countdownなど)
タスク状態管理 自前で実装が必要 タスクID、状態、結果を管理可能
スケーリング APIワーカーに依存 ワーカーを独立して増減可能
向いている処理 軽い処理、数秒程度、少量 重い処理、大量のタスク、バッチ的処理

ざっくり言うと、

  • まずは BackgroundTasks で始める
  • 負荷や要件が増えてきたら Celery へ移行(もしくは併用)

というのが現実的なステップかなと思います。


7. Celeryの少し実務寄りな機能たち

せっかくなので、Celeryならではの便利機能を少しだけ味見しておきましょう。

7.1 リトライ(再試行)

外部APIやメールサーバは、たまに一時的なエラーを返すことがあります。Celeryなら、タスク側で簡単にリトライを指定できます。

# app/tasks.py
from celery import shared_task
from app.celery_app import celery_app
import httpx

@celery_app.task(bind=True, max_retries=3, default_retry_delay=10)
def send_webhook(self, url: str, payload: dict):
    try:
        resp = httpx.post(url, json=payload, timeout=5.0)
        resp.raise_for_status()
    except Exception as exc:
        # 失敗したら10秒後に再試行(最大3回)
        raise self.retry(exc=exc)

7.2 遅延実行・スケジュール実行

「5分後に実行したい」「毎日0時に実行したい」といったニーズも簡単に扱えます。

# 5分後に実行
send_webhook.apply_async(
    args=["https://example.com/webhook", {"foo": "bar"}],
    countdown=300,
)

定期実行については、celery beat やサードパーティのスケジューラを組み合わせて使うことが多いです。

7.3 監視・管理ツール(Flowerなど)

タスクの実行状況をWebで可視化するツールとして、Flowerがよく使われます。

  • 実行中・待ちのタスク一覧
  • 成功・失敗の履歴
  • ワーカーの状態

などをひと目で確認できるので、本番運用でCeleryを使うならぜひ導入を検討したいところです。


8. テストとローカル開発での工夫

バックグラウンド処理が増えると、テストやローカル開発の体験にも影響が出てきます。ここでは、BackgroundTasks と Celery のそれぞれで、テストしやすくするための小さな工夫をご紹介します。

8.1 BackgroundTasks のテスト

BackgroundTasks は単純に「関数を登録しているだけ」なので、ユニットテストでは「タスク関数自体」を直接テストするのがおすすめです。

# app/tasks_local.py
def write_audit_log(user_id: int, action: str) -> None:
    ...

# app/main.py
from fastapi import BackgroundTasks

@app.post("/do-something")
async def do_something(background_tasks: BackgroundTasks):
    # 本体処理は省略
    background_tasks.add_task(write_audit_log, 123, "do_something")
    return {"ok": True}

テストでは

  • write_audit_log に対するユニットテストでロジックを確認する
  • エンドポイントのテストでは、BackgroundTasks がちゃんと呼ばれるかだけを軽く確認する

という2段構えが現実的です。

8.2 Celeryタスクのテスト(eagerモード)

Celeryには task_always_eager という設定があり、これをオンにすると「ワーカーを使わず、即座にローカルでタスクを実行する」モードになります。

# tests/conftest.py など
from app.celery_app import celery_app

def pytest_configure():
    celery_app.conf.update(task_always_eager=True)

こうしておくと、テスト実行中は

res = long_add.delay(1, 2)
assert res.result == 3

のように、同期的に結果を確認できます(実稼働では task_always_eager=False に戻すことを忘れないようにしてください)。


9. 運用で気をつけたいポイント(Celery寄り)

Celeryを本番で運用する場合、次のようなポイントを事前に決めておくとトラブルが減ります。

9.1 タスクの「粒度」と「冪等性」

  • 1つのタスクが長時間(数十分〜数時間)かかるような設計は避け、できるだけ細かいタスクに分割する
  • タスクは「同じものが二度実行されても大丈夫」な冪等性を意識する(外部APIの二重実行に注意)

冪等性を意識しておくと、リトライや障害復旧時に「どこまで進んだか」を判断しやすくなります。

9.2 再試行・タイムアウト・デッドレター

  • リトライ回数と間隔(例:最大3回、指数バックオフ)を決める
  • タスクの最大実行時間(タイムアウト)を設定し、異常に長い処理を自動的に止める
  • 何度リトライしても失敗するタスクを「デッドレター(特別なキュー)」に送る仕組みを作る

ここはシステム全体のSLO(どれくらいの確率で成功していてほしいか)とも関係する部分なので、チーム内で合意しておくと安心です。

9.3 ログと監視

  • 重要なタスクは開始・成功・失敗のログをきちんと出す(JSONログがおすすめ)
  • 花形のタスク・ワーカー数・失敗率などをダッシュボードに可視化する
  • 失敗率が急増したときにアラートを飛ばす

FastAPI本体のログ・メトリクスと合わせて設計しておくと、障害時の原因特定がかなり楽になります。


10. 対象読者別の導入パターン

ここまでの内容を踏まえて、「自分の場合はどこから始めればいいか」をもう一度整理してみますね。

10.1 個人開発・学習者の方

  • ステップ1:BackgroundTasks を使って、メール送信や軽い処理をレスポンス後に回してみる
  • ステップ2:処理時間の長いものや、数が多くなってきたら Celery のサンプル構成をローカルで試す
  • ステップ3:docker-compose で FastAPI + Celery + Redis の3つをまとめて動かしてみる

この時点で、「何をどこまで自分で管理する必要があるのか」がかなりクリアになるはずです。

10.2 小規模チームのバックエンドエンジニアさん

  • Step 1:既存のAPIで、BackgroundTasks に逃がせそうな処理がどこか洗い出す
  • Step 2:重い処理・大量処理がある箇所を中心に、Celeryワーカー単位で切り出す
  • Step 3:ジョブキューの設計(キュー名や優先度)と監査ログを整え、Flowerなどで可視化

「まずは一部の機能だけCeleryへ」という段階導入が現実的です。いきなりすべてを移行する必要はありません。

10.3 SaaS開発チーム・スタートアップの方

  • タスク種別ごとにキューを分ける(例:emails, reports, integrations
  • 各キューごとにワーカー数・リソースを調整し、優先度をコントロールする
  • 監視・アラート・ダッシュボードを整え、「どのタスクで詰まっているか」をすぐに把握できる状態を目指す

ここまで来ると、「FastAPI本体は軽く・シンプルに」「重い仕事は全部Celeryへ」という、分かりやすい役割分担になってきます。


11. 導入ロードマップ(少しずつで大丈夫です)

最後に、これからバックグラウンド処理を導入・改善していくためのロードマップをまとめておきます。

  1. BackgroundTasks で軽い処理(メール送信・Webhook通知など)をレスポンス後に回す
  2. 処理時間と件数を計測し、「どの処理がボトルネックか」を把握する
  3. 重くなっている処理を Celeryタスクとして切り出し、ローカルのdocker-composeで動かしてみる
  4. タスクIDと状態確認API(/jobs/{task_id} のような形)を用意し、フロントや他サービスと連携できるようにする
  5. リトライ・タイムアウト・冪等性・監視を整え、本番トラフィックを徐々にCelery経由に切り替えていく
  6. 必要に応じてキューの分割・ワーカーのスケール・Flowerなどの監視ツールを導入する

「全部いきなり」は本当に大変なので、まずは BackgroundTasks から、小さく一歩ずつ進めていくのがおすすめです。


参考リンク(公式ドキュメント・解説記事)

※内容は記事執筆時点のものです。最新版は各サイトでご確認ください。


まとめ

  • ユーザーを待たせたくない処理は、できるだけバックグラウンドに逃がすのが基本です。
  • FastAPI標準の BackgroundTasks は、軽い処理をレスポンス後に実行するのにとても手軽で便利ですが、重い処理や大量のタスクには向きません。
  • 大規模・高負荷・再試行や監視が必要な処理では、分散タスクキュー Celery と組み合わせることで、安定した運用がしやすくなります。
  • まずは BackgroundTasks から始め、必要になったら Celery を導入する段階的なアプローチが、実務でも現実的で安心です。

少し長くなってしまいましたが、「どの処理をどこまでバックグラウンドに追い出すか」「どこからCeleryを使うか」の判断材料になっていたらうれしいです。
ゆっくり一歩ずつで大丈夫ですので、まずは身近なメール送信やサムネイル作成から、バックグラウンド処理を試してみてくださいね。


投稿者 greeden

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

日本語が含まれない投稿は無視されますのでご注意ください。(スパム対策)