FastAPI×Celery/Redisバックグラウンド処理完全ガイド:重い処理をAPIから切り離してサービスを安定させる方法
はじめに(この記事でできるようになること)
本記事では、FastAPIを使ったWeb APIに「バックグラウンド処理」「バッチ的な非同期ジョブ」を組み込む方法を、丁寧に解説します。
- ユーザーからのリクエストにはすばやくレスポンスを返しつつ、時間のかかる処理は裏側で進める
- メール送信・レポート生成・画像加工・外部APIへの大量リクエストなどを、安全に非同期化する
- Celery+Redisを使って、「FastAPI本体」「ワーカー」「メッセージブローカー」の三つ組みを構成する
といったことを、サンプルコード付きで理解していきます。
誰が読んで得をするか(具体的な読者像)
個人開発・学習者さん
- 「登録完了メール」や「PDFレポート生成」など、数秒〜数分かかる処理をFastAPIから呼びたい
BackgroundTasksぐらいは知っているけれど、プロセスを分けた本格的な非同期処理はまだ使ったことがない- RedisやCeleryの名前は聞いたことがあるけれど、「どう組み合わせるの?」がふわっとしている
→ この記事を読み進めることで、CeleryワーカーとRedisブローカーを使った「王道のバックグラウンド処理」の流れを、一通り体験できます。
小規模チームのバックエンドエンジニアさん
- 社内システムや小〜中規模SaaSをFastAPIで開発していて、「重い処理」や「再試行が大事な処理」をどう設計するか悩んでいる
- 同期APIのタイムアウトや、ピーク時のスローダウンが気になり始めている
- チームで「ここから先はジョブキューに投げよう」という共通パターンを作りたい
→ FastAPIとCeleryの責務分離、リトライ・スケジューリングの基本パターンを押さえることで、運用しやすい構成のイメージが掴めます。
SaaS開発チーム・スタートアップの皆さま
- 画像処理・バッチ送信・データ集計など、スケールする処理が多いプロダクトを手がけている
- マイクロサービス化やイベント駆動を見据えつつ、まずはシンプルなワーカー構成から始めたい
- 将来的には、ワーカーのスケールアウトやキューモニタリングも視野に入れている
→ 「FastAPI=同期API」「Celery=非同期ワーカー」「Redis=キューブローカー」という役割分担をベースに、段階的にスケールさせていくための足場作りに役立つと思います。
アクセシビリティ評価(読みやすさの観点)
- 記事構成は、「全体像 → なぜバックグラウンド処理が必要か → Celery/Redisの役割 → 実装例 → エラーハンドリングとリトライ → スケジューリング → 運用のコツ → ロードマップ」という流れで整理しています。
- 専門用語(ブローカー、ワーカー、ジョブ、タスクなど)は、初出時に短く説明し、その後は同じ用語を繰り返すことで認知負荷を下げています。
- コードブロックは1つあたりを短めに保ち、コメントも必要最低限にして、視覚的な負担が増えないようにしています。
- 読者が自分のペースで読み進められるよう、段落はやや短めに区切り、箇条書きも活用しています。
全体として、多様な利用者にとって読みやすく、理解のステップが追いやすいよう、WCAG AA相当のテキスト構造を意識しています。
1. なぜバックグラウンド処理が必要なのか
まずは、「そもそもなぜAPIから処理を切り離す必要があるのか」を整理しておきましょう。
1.1 APIに重い処理を直接書くと何が起きるか
たとえば、次のようなエンドポイントを考えてみます。
- ユーザーがフォームを送信する
- サーバ側でPDFレポートを生成(10秒)
- 外部メールサービスに送信依頼(+ネットワーク待ち)
- 結果を返す
この流れを1つのHTTPリクエストの中で完結させると、
- クライアント側のタイムアウトに引っかかる
- Uvicornのワーカーが「重い処理」で塞がり、他のリクエストが待たされる
- 外部サービスの遅延や一時的なエラーに、ユーザー体験が直撃する
といった問題が発生しやすくなります。
1.2 バックグラウンド処理の基本的な考え方
そこで登場するのが「バックグラウンド処理」の発想です。
- HTTPリクエストでは、「ジョブを登録しました」とだけすばやく返す
- 実際の重い処理は、別プロセス(ワーカー)がキューからジョブを受け取って実行する
- 結果や進捗は、後から別のAPI・通知・ダッシュボードなどで参照する
このように、
- FastAPI本体=入り口の受付係
- Celeryワーカー=裏側で作業するスタッフ
- Redisなどのブローカー=「やることリスト」を置く場所
と役割を分けることで、耐障害性とスケーラビリティを高めることができます。
2. CeleryとRedisの役割をざっくり理解する
ここからは、よく使われる組み合わせ「Celery+Redis」について、ざっくりと役割を整理します。
2.1 Celeryとは
Celeryは、Python製の分散タスクキュー/ジョブキューです。
- 「タスク」と呼ばれる関数を、別プロセスで非同期に実行してくれる
- メッセージブローカー(RedisやRabbitMQなど)を使って、タスクをキューに積んだり取り出したりする
- リトライやスケジューリング(定期実行)、結果の保存(結果バックエンド)など、多くの機能を備えています
FastAPI公式ドキュメントでも、Celeryとの連携例が紹介されています。
2.2 Redisとは
Redisは、インメモリのデータストアであり、メッセージブローカーとしても使われます。
- Celeryにとって、「タスクのキューを保存する場所」として機能
- 軽量かつ高速で、ローカル・コンテナ・クラウドなど様々な場所で動かせる
もちろんRabbitMQなど他のブローカーを使うこともできますが、Redisは導入がシンプルなので、まずはRedisから始めるのがおすすめです。
3. 開発環境の準備:FastAPI+Celery+Redisを動かす
ここからは、実際に動かせる形でセットアップしていきます。
3.1 必要なパッケージ
例として、次のようなパッケージを使います。
pip install "fastapi[standard]" celery[redis] redis
fastapi[standard]:FastAPI本体+Uvicornなどの基本ツールcelery[redis]:Celery本体とRedisサポートredis:Python用Redisクライアント
(バージョンはプロジェクトに合わせて調整してください)
3.2 Redisの起動(ローカル開発)
ローカル開発では、Dockerを使うと手軽です。
docker run -d --name redis -p 6379:6379 redis:7
これで、localhost:6379 でRedisに接続できるようになります。
4. FastAPI側とCelery側の最小構成を作る
次に、FastAPIアプリとCeleryアプリを定義していきます。
4.1 プロジェクト構成の例
project/
app/
__init__.py
main.py # FastAPI本体
celery_app.py # Celeryアプリ定義
tasks.py # バックグラウンドタスク
requirements.txt
4.2 Celeryアプリの定義
# app/celery_app.py
from celery import Celery
celery_app = Celery(
"worker",
broker="redis://localhost:6379/0", # ブローカー(Redis)
backend="redis://localhost:6379/1", # 結果バックエンド(任意)
)
celery_app.conf.update(
task_routes={
"app.tasks.send_email": {"queue": "emails"},
"app.tasks.generate_report": {"queue": "reports"},
},
task_serializer="json",
result_serializer="json",
accept_content=["json"],
)
ここでは単純に localhost を使っていますが、本番では環境変数や設定クラスからURLを読み込むようにします。
4.3 タスクの定義
# app/tasks.py
from time import sleep
from app.celery_app import celery_app
@celery_app.task(name="app.tasks.send_email")
def send_email(to: str, subject: str, body: str) -> str:
# 実際にはメール送信ロジック
sleep(5) # 処理が重いと仮定
# 成功したらログなどを記録するイメージ
return f"Email sent to {to}"
@celery_app.task(name="app.tasks.generate_report")
def generate_report(user_id: int) -> str:
# 重い集計やPDF生成など
sleep(10)
return f"Report generated for user {user_id}"
ポイントは、
@celery_app.taskデコレータを付けた関数が「タスク」として扱われるsend_email.delay(...)のように呼び出すと、即時ではなく「キューに登録」される
という点です。
5. FastAPIからタスクをキューに投げる
次に、FastAPI側のエンドポイントからタスクを登録してみます。
5.1 FastAPI本体の定義
# app/main.py
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from app.tasks import send_email, generate_report
app = FastAPI(title="FastAPI + Celery Example")
class EmailRequest(BaseModel):
to: str
subject: str
body: str
@app.post("/emails")
def create_email(req: EmailRequest):
# Celeryタスクを非同期でキューへ
task = send_email.delay(req.to, req.subject, req.body)
return {"task_id": task.id}
@app.post("/reports/{user_id}")
def create_report(user_id: int):
task = generate_report.delay(user_id)
return {"task_id": task.id}
HTTPリクエストに対しては、すぐにtask_idだけ返し, 実際の処理はCeleryワーカーに任せる形になっています。
5.2 タスクの結果を問い合わせるAPI(任意)
結果バックエンドを設定している場合(先ほどbackendにRedisを指定しました)、タスクの状態を問い合わせることもできます。
from app.celery_app import celery_app
@app.get("/tasks/{task_id}")
def get_task_status(task_id: str):
result = celery_app.AsyncResult(task_id)
return {
"task_id": task_id,
"status": result.status, # PENDING / STARTED / SUCCESS / FAILURE など
"result": result.result, # 完了時は戻り値、失敗時は例外情報
}
フロントエンドからは、
/emailsや/reports/{user_id}でtask_idを取得/tasks/{task_id}をポーリングして状態を確認
という流れで進捗を見られるようになります。
6. ワーカーの起動と動作確認
ここまでで、
- FastAPIアプリ(HTTP入口)
- Celeryワーカー(バックグラウンド処理本体)
- Redis(キューブローカー)
が揃いました。次は実際に動かしてみましょう。
6.1 FastAPIの起動
uvicorn app.main:app --reload
6.2 Celeryワーカーの起動
プロジェクトルートで、次のコマンドを実行します。
celery -A app.celery_app.celery_app worker --loglevel=info
-A には「Celeryアプリのインポートパス」を指定します。
ここでは app/celery_app.py に celery_app を定義しているので、このような形になります。
6.3 動作確認
別のターミナルから、
curl -X POST "http://127.0.0.1:8000/emails" \
-H "Content-Type: application/json" \
-d '{"to": "test@example.com", "subject": "Hello", "body": "Hi"}'
と呼ぶと、即座に
{"task_id": "xxxxxxxx-xxxx-...."}
のようなレスポンスが返り、ワーカー側のログに「Email sent to …」といったメッセージが表示されるはずです。
/tasks/{task_id} にGETリクエストを送れば、タスクの状態も確認できます。
7. 失敗時のリトライ・タイムアウトなどの制御
実務では、外部サービスの一時的な失敗やネットワークエラーが付きものです。
Celeryのタスクには、リトライやタイムアウトなどの設定を行うことができます。
7.1 自動リトライの例
# app/tasks.py(抜粋)
from celery import shared_task
import requests
@shared_task(
bind=True,
max_retries=5,
default_retry_delay=10, # 秒
)
def send_notification(self, endpoint: str, payload: dict) -> str:
try:
r = requests.post(endpoint, json=payload, timeout=5)
r.raise_for_status()
except requests.RequestException as exc:
# self.retry でリトライをスケジュール
raise self.retry(exc=exc)
return "ok"
max_retries:最大リトライ回数default_retry_delay:リトライ間隔(秒)self.retry():次のリトライをキューに登録します
7.2 タスクの実行時間制限
time_limit や soft_time_limit を使うと、タスクの最大実行時間も制御できます。
@celery_app.task(
name="app.tasks.heavy_task",
time_limit=60, # 秒
soft_time_limit=50,
)
def heavy_task():
# 時間のかかる処理
...
時間制限を決めておくことで、暴走したタスクからワーカーを守り、他のタスクへの影響を減らすことができます。
8. 定期実行(スケジューリング)を行う
「毎日0時にレポートを生成」「5分ごとに集計タスクを走らせる」といった定期実行も、Celeryビート(スケジューラ)で実現できます。
8.1 Celeryビートの設定
# app/celery_app.py(ビート設定を追加)
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
"generate-daily-reports": {
"task": "app.tasks.generate_report",
"schedule": crontab(hour=0, minute=0),
"args": (1,), # 例としてuser_id=1のレポート
},
}
8.2 ビートの起動
ワーカーとは別に、スケジューラ用プロセスを起動します。
celery -A app.celery_app.celery_app beat --loglevel=info
この構成で、
- beatプロセスがスケジュールに従ってタスクをキューに登録
- workerプロセスがキューからタスクを取り出して実行
という流れが自動的に回り続けます。
9. FastAPIのBackgroundTasksとの違いと使い分け
FastAPIにも BackgroundTasks という軽量なバックグラウンド機能があります。
from fastapi import BackgroundTasks
def send_email_sync(to: str):
# 同期のメール送信処理
...
@app.post("/signup")
def signup(..., background_tasks: BackgroundTasks):
# ユーザー登録処理...
background_tasks.add_task(send_email_sync, user.email)
return {"status": "ok"}
これはとても使いやすいのですが、
- FastAPIの同一プロセス内で実行される
- プロセスが落ちるとタスクも失われる
- リトライやスケジューリングなどの高度な機能はない
という特徴があります。
一方、Celeryは
- 別プロセス(別コンテナ)でタスクを実行
- キューに乗ったタスクは、ワーカーが一時的に死んでも残り続ける
- リトライ・スケジュール・結果保存など、豊富な制御が可能
という違いがあります。
目安としては:
- 「ごく短時間で終わり、失敗しても痛手が少ない処理」→
BackgroundTasks - 「重く、失敗時のリトライや確実な実行が大事な処理」→ Celeryなどのジョブキュー
といった使い分けを意識すると、設計が整理しやすくなります。
10. 運用を意識した設計のポイント
最後に、実務でCeleryを運用していく際に意識したいポイントを、いくつか挙げておきます。
10.1 ログと監視
- タスクの成功・失敗ログは、FastAPIとは別に「ワーカーのログ」として集約する
- タスクの失敗率や平均実行時間をメトリクスとして観測すると、ボトルネックの特定に役立つ
- 一部のタスクについては、「このエラーが3回連続で発生したらアラート」といった閾値を設定しておく
10.2 キューの設計
- メール・レポート・集計など、性質の異なるタスクはキューを分ける(
emails/reports/defaultなど) - キューごとにワーカーの数を変えることで、重要度の高いタスクにリソースを優先的に割り当てる
- バックプレッシャーを意識し、あまりにタスクが溜まっている場合はAPI側で受付制限をすることも検討する
10.3 エラー時のユーザー体験
- タスクが失敗したときに、ユーザーにどう伝えるか(通知・リトライ・お問い合わせ誘導など)
- 「処理が完了したらメールを送る」「ダッシュボードのステータスを更新する」といった設計を、API側と合わせて考える
11. 導入ロードマップ(少しずつ段階を踏むために)
一気にすべてを導入する必要はありません。段階的に進めるためのステップ案をまとめます。
-
BackgroundTasksで軽い処理から非同期化する- まずはメール送信など、本当に軽いものをFastAPI内のバックグラウンド処理に移してみる。
-
Celery+Redisをローカルで動かしてみる
- 本記事の「最小構成」を真似して、小さなPoCプロジェクトを作る。
-
1種類のタスクをCeleryに移行する
- レポート作成や重い集計など、時間のかかる処理を1つだけCeleryタスク化してみる。
-
タスク状態を問い合わせるAPIやダッシュボードを整える
task_idベースで状態確認を行い、ユーザーに進捗や結果を見えるようにする。
-
リトライ・キュー分け・スケジューリングなどを追加していく
- サービスの成長に合わせて、必要なタスクにだけ少しずつ高度な機能を適用。
-
本番環境(Dockerやオーケストレーション)に展開する
- FastAPIコンテナ・Celeryワーカーコンテナ・Redisコンテナを分け、Docker ComposeやKubernetesで運用する。
12. まとめ
- FastAPIだけで重い処理を抱え込むと、レスポンスの遅延やタイムアウト、全体のスループット低下につながりやすくなります。
- Celery+Redisを組み合わせることで、重い処理や失敗時にリトライしたい処理を別プロセスのワーカーに任せる構成を簡単に構築できます。
- FastAPIは「タスクをキューに登録してすばやくレスポンスを返す窓口」として、Celeryは「裏側で淡々と処理をこなすエンジン」として役割分担するのが基本です。
BackgroundTasksは「軽い処理」、Celery は「より信頼性やスケールが必要な処理」と考え、段階的に使い分けていくと移行がスムーズです。- いきなり大規模なジョブキューシステムを目指す必要はありません。まずは1つのタスクから少しずつ切り出していくことで、無理なくサービスを「落ちにくく、重くなりにくい」構成へと育てていけます。
ここまで読んでくださって、ありがとうございます。
あなたのFastAPIアプリの裏側で、静かにたくさんの仕事をこなしてくれる「頼れるワーカーさんたち」が育っていきますように。
どうぞ、あせらず一歩ずつ、試してみてくださいね。
