サイトアイコン IT & ライフハックブログ|学びと実践のためのアイデア集

FastAPI×Celery/Redisバックグラウンド処理完全ガイド:重い処理をAPIから切り離してサービスを安定させる方法

green snake

Photo by Pixabay on Pexels.com

FastAPI×Celery/Redisバックグラウンド処理完全ガイド:重い処理をAPIから切り離してサービスを安定させる方法


はじめに(この記事でできるようになること)

本記事では、FastAPIを使ったWeb APIに「バックグラウンド処理」「バッチ的な非同期ジョブ」を組み込む方法を、丁寧に解説します。

  • ユーザーからのリクエストにはすばやくレスポンスを返しつつ、時間のかかる処理は裏側で進める
  • メール送信・レポート生成・画像加工・外部APIへの大量リクエストなどを、安全に非同期化する
  • Celery+Redisを使って、「FastAPI本体」「ワーカー」「メッセージブローカー」の三つ組みを構成する

といったことを、サンプルコード付きで理解していきます。


誰が読んで得をするか(具体的な読者像)

個人開発・学習者さん

  • 「登録完了メール」や「PDFレポート生成」など、数秒〜数分かかる処理をFastAPIから呼びたい
  • BackgroundTasks ぐらいは知っているけれど、プロセスを分けた本格的な非同期処理はまだ使ったことがない
  • RedisやCeleryの名前は聞いたことがあるけれど、「どう組み合わせるの?」がふわっとしている

→ この記事を読み進めることで、CeleryワーカーとRedisブローカーを使った「王道のバックグラウンド処理」の流れを、一通り体験できます。

小規模チームのバックエンドエンジニアさん

  • 社内システムや小〜中規模SaaSをFastAPIで開発していて、「重い処理」や「再試行が大事な処理」をどう設計するか悩んでいる
  • 同期APIのタイムアウトや、ピーク時のスローダウンが気になり始めている
  • チームで「ここから先はジョブキューに投げよう」という共通パターンを作りたい

→ FastAPIとCeleryの責務分離、リトライ・スケジューリングの基本パターンを押さえることで、運用しやすい構成のイメージが掴めます。

SaaS開発チーム・スタートアップの皆さま

  • 画像処理・バッチ送信・データ集計など、スケールする処理が多いプロダクトを手がけている
  • マイクロサービス化やイベント駆動を見据えつつ、まずはシンプルなワーカー構成から始めたい
  • 将来的には、ワーカーのスケールアウトやキューモニタリングも視野に入れている

→ 「FastAPI=同期API」「Celery=非同期ワーカー」「Redis=キューブローカー」という役割分担をベースに、段階的にスケールさせていくための足場作りに役立つと思います。


アクセシビリティ評価(読みやすさの観点)

  • 記事構成は、「全体像 → なぜバックグラウンド処理が必要か → Celery/Redisの役割 → 実装例 → エラーハンドリングとリトライ → スケジューリング → 運用のコツ → ロードマップ」という流れで整理しています。
  • 専門用語(ブローカー、ワーカー、ジョブ、タスクなど)は、初出時に短く説明し、その後は同じ用語を繰り返すことで認知負荷を下げています。
  • コードブロックは1つあたりを短めに保ち、コメントも必要最低限にして、視覚的な負担が増えないようにしています。
  • 読者が自分のペースで読み進められるよう、段落はやや短めに区切り、箇条書きも活用しています。

全体として、多様な利用者にとって読みやすく、理解のステップが追いやすいよう、WCAG AA相当のテキスト構造を意識しています。


1. なぜバックグラウンド処理が必要なのか

まずは、「そもそもなぜAPIから処理を切り離す必要があるのか」を整理しておきましょう。

1.1 APIに重い処理を直接書くと何が起きるか

たとえば、次のようなエンドポイントを考えてみます。

  • ユーザーがフォームを送信する
  • サーバ側でPDFレポートを生成(10秒)
  • 外部メールサービスに送信依頼(+ネットワーク待ち)
  • 結果を返す

この流れを1つのHTTPリクエストの中で完結させると、

  • クライアント側のタイムアウトに引っかかる
  • Uvicornのワーカーが「重い処理」で塞がり、他のリクエストが待たされる
  • 外部サービスの遅延や一時的なエラーに、ユーザー体験が直撃する

といった問題が発生しやすくなります。

1.2 バックグラウンド処理の基本的な考え方

そこで登場するのが「バックグラウンド処理」の発想です。

  1. HTTPリクエストでは、「ジョブを登録しました」とだけすばやく返す
  2. 実際の重い処理は、別プロセス(ワーカー)がキューからジョブを受け取って実行する
  3. 結果や進捗は、後から別のAPI・通知・ダッシュボードなどで参照する

このように、

  • FastAPI本体=入り口の受付係
  • Celeryワーカー=裏側で作業するスタッフ
  • Redisなどのブローカー=「やることリスト」を置く場所

と役割を分けることで、耐障害性とスケーラビリティを高めることができます。


2. CeleryとRedisの役割をざっくり理解する

ここからは、よく使われる組み合わせ「Celery+Redis」について、ざっくりと役割を整理します。

2.1 Celeryとは

Celeryは、Python製の分散タスクキュー/ジョブキューです。

  • 「タスク」と呼ばれる関数を、別プロセスで非同期に実行してくれる
  • メッセージブローカー(RedisやRabbitMQなど)を使って、タスクをキューに積んだり取り出したりする
  • リトライやスケジューリング(定期実行)、結果の保存(結果バックエンド)など、多くの機能を備えています

FastAPI公式ドキュメントでも、Celeryとの連携例が紹介されています。

2.2 Redisとは

Redisは、インメモリのデータストアであり、メッセージブローカーとしても使われます。

  • Celeryにとって、「タスクのキューを保存する場所」として機能
  • 軽量かつ高速で、ローカル・コンテナ・クラウドなど様々な場所で動かせる

もちろんRabbitMQなど他のブローカーを使うこともできますが、Redisは導入がシンプルなので、まずはRedisから始めるのがおすすめです。


3. 開発環境の準備:FastAPI+Celery+Redisを動かす

ここからは、実際に動かせる形でセットアップしていきます。

3.1 必要なパッケージ

例として、次のようなパッケージを使います。

pip install "fastapi[standard]" celery[redis] redis
  • fastapi[standard]:FastAPI本体+Uvicornなどの基本ツール
  • celery[redis]:Celery本体とRedisサポート
  • redis:Python用Redisクライアント

(バージョンはプロジェクトに合わせて調整してください)

3.2 Redisの起動(ローカル開発)

ローカル開発では、Dockerを使うと手軽です。

docker run -d --name redis -p 6379:6379 redis:7

これで、localhost:6379 でRedisに接続できるようになります。


4. FastAPI側とCelery側の最小構成を作る

次に、FastAPIアプリとCeleryアプリを定義していきます。

4.1 プロジェクト構成の例

project/
  app/
    __init__.py
    main.py          # FastAPI本体
    celery_app.py    # Celeryアプリ定義
    tasks.py         # バックグラウンドタスク
  requirements.txt

4.2 Celeryアプリの定義

# app/celery_app.py
from celery import Celery

celery_app = Celery(
    "worker",
    broker="redis://localhost:6379/0",      # ブローカー(Redis)
    backend="redis://localhost:6379/1",     # 結果バックエンド(任意)
)

celery_app.conf.update(
    task_routes={
        "app.tasks.send_email": {"queue": "emails"},
        "app.tasks.generate_report": {"queue": "reports"},
    },
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
)

ここでは単純に localhost を使っていますが、本番では環境変数や設定クラスからURLを読み込むようにします。

4.3 タスクの定義

# app/tasks.py
from time import sleep
from app.celery_app import celery_app

@celery_app.task(name="app.tasks.send_email")
def send_email(to: str, subject: str, body: str) -> str:
    # 実際にはメール送信ロジック
    sleep(5)  # 処理が重いと仮定
    # 成功したらログなどを記録するイメージ
    return f"Email sent to {to}"

@celery_app.task(name="app.tasks.generate_report")
def generate_report(user_id: int) -> str:
    # 重い集計やPDF生成など
    sleep(10)
    return f"Report generated for user {user_id}"

ポイントは、

  • @celery_app.task デコレータを付けた関数が「タスク」として扱われる
  • send_email.delay(...) のように呼び出すと、即時ではなく「キューに登録」される

という点です。


5. FastAPIからタスクをキューに投げる

次に、FastAPI側のエンドポイントからタスクを登録してみます。

5.1 FastAPI本体の定義

# app/main.py
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from app.tasks import send_email, generate_report

app = FastAPI(title="FastAPI + Celery Example")

class EmailRequest(BaseModel):
    to: str
    subject: str
    body: str

@app.post("/emails")
def create_email(req: EmailRequest):
    # Celeryタスクを非同期でキューへ
    task = send_email.delay(req.to, req.subject, req.body)
    return {"task_id": task.id}

@app.post("/reports/{user_id}")
def create_report(user_id: int):
    task = generate_report.delay(user_id)
    return {"task_id": task.id}

HTTPリクエストに対しては、すぐにtask_idだけ返し, 実際の処理はCeleryワーカーに任せる形になっています。

5.2 タスクの結果を問い合わせるAPI(任意)

結果バックエンドを設定している場合(先ほどbackendにRedisを指定しました)、タスクの状態を問い合わせることもできます。

from app.celery_app import celery_app

@app.get("/tasks/{task_id}")
def get_task_status(task_id: str):
    result = celery_app.AsyncResult(task_id)
    return {
        "task_id": task_id,
        "status": result.status,      # PENDING / STARTED / SUCCESS / FAILURE など
        "result": result.result,      # 完了時は戻り値、失敗時は例外情報
    }

フロントエンドからは、

  1. /emails/reports/{user_id}task_idを取得
  2. /tasks/{task_id} をポーリングして状態を確認

という流れで進捗を見られるようになります。


6. ワーカーの起動と動作確認

ここまでで、

  • FastAPIアプリ(HTTP入口)
  • Celeryワーカー(バックグラウンド処理本体)
  • Redis(キューブローカー)

が揃いました。次は実際に動かしてみましょう。

6.1 FastAPIの起動

uvicorn app.main:app --reload

6.2 Celeryワーカーの起動

プロジェクトルートで、次のコマンドを実行します。

celery -A app.celery_app.celery_app worker --loglevel=info

-A には「Celeryアプリのインポートパス」を指定します。
ここでは app/celery_app.pycelery_app を定義しているので、このような形になります。

6.3 動作確認

別のターミナルから、

curl -X POST "http://127.0.0.1:8000/emails" \
  -H "Content-Type: application/json" \
  -d '{"to": "test@example.com", "subject": "Hello", "body": "Hi"}'

と呼ぶと、即座に

{"task_id": "xxxxxxxx-xxxx-...."}

のようなレスポンスが返り、ワーカー側のログに「Email sent to …」といったメッセージが表示されるはずです。

/tasks/{task_id} にGETリクエストを送れば、タスクの状態も確認できます。


7. 失敗時のリトライ・タイムアウトなどの制御

実務では、外部サービスの一時的な失敗やネットワークエラーが付きものです。
Celeryのタスクには、リトライやタイムアウトなどの設定を行うことができます。

7.1 自動リトライの例

# app/tasks.py(抜粋)
from celery import shared_task
import requests

@shared_task(
    bind=True,
    max_retries=5,
    default_retry_delay=10,  # 秒
)
def send_notification(self, endpoint: str, payload: dict) -> str:
    try:
        r = requests.post(endpoint, json=payload, timeout=5)
        r.raise_for_status()
    except requests.RequestException as exc:
        # self.retry でリトライをスケジュール
        raise self.retry(exc=exc)
    return "ok"
  • max_retries:最大リトライ回数
  • default_retry_delay:リトライ間隔(秒)
  • self.retry():次のリトライをキューに登録します

7.2 タスクの実行時間制限

time_limitsoft_time_limit を使うと、タスクの最大実行時間も制御できます。

@celery_app.task(
    name="app.tasks.heavy_task",
    time_limit=60,       # 秒
    soft_time_limit=50,
)
def heavy_task():
    # 時間のかかる処理
    ...

時間制限を決めておくことで、暴走したタスクからワーカーを守り、他のタスクへの影響を減らすことができます。


8. 定期実行(スケジューリング)を行う

「毎日0時にレポートを生成」「5分ごとに集計タスクを走らせる」といった定期実行も、Celeryビート(スケジューラ)で実現できます。

8.1 Celeryビートの設定

# app/celery_app.py(ビート設定を追加)
from celery.schedules import crontab

celery_app.conf.beat_schedule = {
    "generate-daily-reports": {
        "task": "app.tasks.generate_report",
        "schedule": crontab(hour=0, minute=0),
        "args": (1,),  # 例としてuser_id=1のレポート
    },
}

8.2 ビートの起動

ワーカーとは別に、スケジューラ用プロセスを起動します。

celery -A app.celery_app.celery_app beat --loglevel=info

この構成で、

  • beatプロセスがスケジュールに従ってタスクをキューに登録
  • workerプロセスがキューからタスクを取り出して実行

という流れが自動的に回り続けます。


9. FastAPIのBackgroundTasksとの違いと使い分け

FastAPIにも BackgroundTasks という軽量なバックグラウンド機能があります。

from fastapi import BackgroundTasks

def send_email_sync(to: str):
    # 同期のメール送信処理
    ...

@app.post("/signup")
def signup(..., background_tasks: BackgroundTasks):
    # ユーザー登録処理...
    background_tasks.add_task(send_email_sync, user.email)
    return {"status": "ok"}

これはとても使いやすいのですが、

  • FastAPIの同一プロセス内で実行される
  • プロセスが落ちるとタスクも失われる
  • リトライやスケジューリングなどの高度な機能はない

という特徴があります。

一方、Celeryは

  • 別プロセス(別コンテナ)でタスクを実行
  • キューに乗ったタスクは、ワーカーが一時的に死んでも残り続ける
  • リトライ・スケジュール・結果保存など、豊富な制御が可能

という違いがあります。

目安としては:

  • 「ごく短時間で終わり、失敗しても痛手が少ない処理」→ BackgroundTasks
  • 「重く、失敗時のリトライや確実な実行が大事な処理」→ Celeryなどのジョブキュー

といった使い分けを意識すると、設計が整理しやすくなります。


10. 運用を意識した設計のポイント

最後に、実務でCeleryを運用していく際に意識したいポイントを、いくつか挙げておきます。

10.1 ログと監視

  • タスクの成功・失敗ログは、FastAPIとは別に「ワーカーのログ」として集約する
  • タスクの失敗率や平均実行時間をメトリクスとして観測すると、ボトルネックの特定に役立つ
  • 一部のタスクについては、「このエラーが3回連続で発生したらアラート」といった閾値を設定しておく

10.2 キューの設計

  • メール・レポート・集計など、性質の異なるタスクはキューを分ける(emails / reports / defaultなど)
  • キューごとにワーカーの数を変えることで、重要度の高いタスクにリソースを優先的に割り当てる
  • バックプレッシャーを意識し、あまりにタスクが溜まっている場合はAPI側で受付制限をすることも検討する

10.3 エラー時のユーザー体験

  • タスクが失敗したときに、ユーザーにどう伝えるか(通知・リトライ・お問い合わせ誘導など)
  • 「処理が完了したらメールを送る」「ダッシュボードのステータスを更新する」といった設計を、API側と合わせて考える

11. 導入ロードマップ(少しずつ段階を踏むために)

一気にすべてを導入する必要はありません。段階的に進めるためのステップ案をまとめます。

  1. BackgroundTasksで軽い処理から非同期化する

    • まずはメール送信など、本当に軽いものをFastAPI内のバックグラウンド処理に移してみる。
  2. Celery+Redisをローカルで動かしてみる

    • 本記事の「最小構成」を真似して、小さなPoCプロジェクトを作る。
  3. 1種類のタスクをCeleryに移行する

    • レポート作成や重い集計など、時間のかかる処理を1つだけCeleryタスク化してみる。
  4. タスク状態を問い合わせるAPIやダッシュボードを整える

    • task_idベースで状態確認を行い、ユーザーに進捗や結果を見えるようにする。
  5. リトライ・キュー分け・スケジューリングなどを追加していく

    • サービスの成長に合わせて、必要なタスクにだけ少しずつ高度な機能を適用。
  6. 本番環境(Dockerやオーケストレーション)に展開する

    • FastAPIコンテナ・Celeryワーカーコンテナ・Redisコンテナを分け、Docker ComposeやKubernetesで運用する。

12. まとめ

  • FastAPIだけで重い処理を抱え込むと、レスポンスの遅延やタイムアウト、全体のスループット低下につながりやすくなります。
  • Celery+Redisを組み合わせることで、重い処理や失敗時にリトライしたい処理を別プロセスのワーカーに任せる構成を簡単に構築できます。
  • FastAPIは「タスクをキューに登録してすばやくレスポンスを返す窓口」として、Celeryは「裏側で淡々と処理をこなすエンジン」として役割分担するのが基本です。
  • BackgroundTasks は「軽い処理」、Celery は「より信頼性やスケールが必要な処理」と考え、段階的に使い分けていくと移行がスムーズです。
  • いきなり大規模なジョブキューシステムを目指す必要はありません。まずは1つのタスクから少しずつ切り出していくことで、無理なくサービスを「落ちにくく、重くなりにくい」構成へと育てていけます。

ここまで読んでくださって、ありがとうございます。
あなたのFastAPIアプリの裏側で、静かにたくさんの仕事をこなしてくれる「頼れるワーカーさんたち」が育っていきますように。
どうぞ、あせらず一歩ずつ、試してみてくださいね。


モバイルバージョンを終了