green snake
Photo by Pixabay on Pexels.com

はじめての本格セキュリティ設計:FastAPIの認証・認可ガイド――JWT/OAuth2、Cookieセッション、RBAC/スコープ、CSRF対策、実務の落とし穴


要約(全体像)

  • パスワードはハッシュ化して保存し、アクセストークンは短寿命、リフレッシュトークンは長寿命+安全な保管が原則。
  • シングルページアプリ(SPA)やモバイルにはJWT(Bearer)、ブラウザ中心のWebにはCookie+CSRF対策が相性良い。
  • 認可はロール(RBAC)スコープで分離し、FastAPIのSecurityDependsで判定を組み込む。
  • セッション固定化、トークン再利用、XSS、CSRF、弱いパスワードなど、よくある脅威に対して具体的な対策を実装する。
  • テストとローテーション、鍵管理、監査ログまで含めて、運用で破綻しない仕組みにまとめる。

誰が読んで得をするか

  • 学習者Aさん(卒研・個人開発):フォーム認証やJWTの最小構成を確実に動かしたい。
  • 小規模チームBさん(受託3名):APIと管理画面のCookie運用に迷い、CSRFとSameSiteの設計を固めたい。
  • SaaS開発Cさん(スタートアップ):RBAC+スコープで権限境界を明確にし、トークン回転と失効を運用にのせたい。

1. 基本方針:認証・認可・セッションの分解

  • 認証(Authentication):誰かを確かめる(パスワード、SAML/OIDC、APIキーなど)
  • 認可(Authorization):何ができるかを決める(RBAC=役割、スコープ=機能単位)
  • セッション:認証済み状態を継続するための仕組み(JWT、Cookie、サーバセッション)

判断ポイント

  • フロントがSPAやモバイル中心ならJWT(アクセストークンは短寿命、リフレッシュは安全な保存)。
  • 同一オリジン中心のWebならCookieセッションが自然。HttpOnly/Secure/SameSiteCSRF対策を必ず。
  • 権限はロールで大枠、APIエンドポイントにはスコープで細粒度のガード。

2. ユーザーモデルとパスワードハッシュ

2.1 モデル例(SQLAlchemy 2.x)

# app/models.py
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, Boolean
from typing import Optional

class Base(DeclarativeBase): pass

class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    password_hash: Mapped[str] = mapped_column(String(255))
    is_active: Mapped[bool] = mapped_column(default=True)
    role: Mapped[str] = mapped_column(String(50), default="user")  # admin/user など

2.2 ハッシュ化と検証

# app/security/passwords.py
from passlib.context import CryptContext

pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto")

def hash_password(plain: str) -> str:
    return pwd_ctx.hash(plain)

def verify_password(plain: str, hashed: str) -> bool:
    return pwd_ctx.verify(plain, hashed)

判断ポイント

  • 平文保存は禁止。bcryptargon2など遅延性を持つハッシュを使用。
  • パスワードポリシー(長さ・文字種・漏洩チェック)をバリデーションへ。

3. JWT(Bearer)方式の最小実装

3.1 設定と鍵管理

# app/core/settings.py
from pydantic_settings import BaseSettings
from pydantic import Field

class Settings(BaseSettings):
    jwt_secret: str = Field(..., description="HS256秘密鍵")
    jwt_issuer: str = "example-api"
    access_token_expires_minutes: int = 15
    refresh_token_expires_days: int = 30
    class Config:
        env_file = ".env"
        extra = "ignore"

def get_settings():
    return Settings()

3.2 発行・検証(PyJWT/JOSE系を利用)

# app/security/jwt.py
from datetime import datetime, timedelta, timezone
import jwt  # pyjwt
from typing import Any, Dict
from app.core.settings import get_settings

ALGO = "HS256"

def create_token(sub: str, scope: str, expires_delta: timedelta, token_type: str) -> str:
    s = get_settings()
    now = datetime.now(timezone.utc)
    payload: Dict[str, Any] = {
        "iss": s.jwt_issuer,
        "sub": sub,
        "scope": scope,           # "articles:read users:write" などスペース区切り
        "type": token_type,       # "access" or "refresh"
        "iat": int(now.timestamp()),
        "exp": int((now + expires_delta).timestamp())
    }
    return jwt.encode(payload, s.jwt_secret, algorithm=ALGO)

def decode_token(token: str) -> Dict[str, Any]:
    s = get_settings()
    return jwt.decode(token, s.jwt_secret, algorithms=[ALGO], options={"require": ["exp","sub","type"]})

3.3 認証ルーター(ログイン/リフレッシュ)

# app/routers/auth.py
from fastapi import APIRouter, HTTPException, Depends, status
from pydantic import BaseModel
from datetime import timedelta
from app.security.passwords import verify_password
from app.security.jwt import create_token, decode_token
from app.core.settings import get_settings

router = APIRouter(prefix="/auth", tags=["auth"])

class LoginRequest(BaseModel):
    email: str
    password: str

class TokenPair(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"

# 仮のDB参照関数
def get_user_by_email(email: str):
    # 実装は省略。password_hashを返すユーザーを取得
    ...

@router.post("/login", response_model=TokenPair)
def login(payload: LoginRequest):
    user = get_user_by_email(payload.email)
    if not user or not verify_password(payload.password, user.password_hash) or not user.is_active:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid credentials")

    s = get_settings()
    access = create_token(
        sub=str(user.id),
        scope="articles:read users:read",  # 動的に組み立て可
        expires_delta=timedelta(minutes=s.access_token_expires_minutes),
        token_type="access"
    )
    refresh = create_token(
        sub=str(user.id),
        scope="refresh",
        expires_delta=timedelta(days=s.refresh_token_expires_days),
        token_type="refresh"
    )
    return TokenPair(access_token=access, refresh_token=refresh)

class RefreshRequest(BaseModel):
    refresh_token: str

@router.post("/refresh", response_model=TokenPair)
def refresh(req: RefreshRequest):
    data = decode_token(req.refresh_token)
    if data.get("type") != "refresh":
        raise HTTPException(401, "invalid token type")
    s = get_settings()
    access = create_token(sub=data["sub"], scope="articles:read users:read",
                          expires_delta=timedelta(minutes=s.access_token_expires_minutes), token_type="access")
    # リフレッシュトークンの回転(再発行+旧トークン失効)を運用では推奨
    new_refresh = create_token(sub=data["sub"], scope="refresh",
                               expires_delta=timedelta(days=s.refresh_token_expires_days), token_type="refresh")
    return TokenPair(access_token=access, refresh_token=new_refresh)

判断ポイント

  • アクセストークンは短寿命、リフレッシュは回転(ローテーション)ブラックリストで流出時に無効化可能に。
  • スコープは空白区切りで列挙。リソース×動詞(articles:read)の命名に統一感を。

4. 認可:RBACとスコープの組み合わせ

4.1 デコーダとスコープ検査

# app/security/deps.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Sequence
from app.security.jwt import decode_token
from app.models import User

bearer = HTTPBearer(auto_error=False)

def get_current_user(creds: HTTPAuthorizationCredentials = Depends(bearer)) -> User:
    if creds is None:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="missing token")
    data = decode_token(creds.credentials)
    # DBからUser取得(省略)、active確認
    ...
    return user

def require_scopes(required: Sequence[str]):
    def checker(creds: HTTPAuthorizationCredentials = Depends(bearer)):
        if creds is None:
            raise HTTPException(401, "missing token")
        data = decode_token(creds.credentials)
        token_scopes = set(str(data.get("scope","")).split())
        if not set(required).issubset(token_scopes):
            raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="insufficient scope")
        return data
    return checker

4.2 ルートに適用

# app/routers/articles.py
from fastapi import APIRouter, Depends
from app.security.deps import get_current_user, require_scopes

router = APIRouter(prefix="/articles", tags=["articles"])

@router.get("", dependencies=[Depends(require_scopes(["articles:read"]))])
def list_articles():
    return [{"id": 1, "title": "Hello"}]

@router.post("", dependencies=[Depends(require_scopes(["articles:write"]))])
def create_article():
    return {"ok": True}

判断ポイント

  • ルート側には必要スコープだけを書くと読みやすい。
  • ロール→スコープのマッピングは発行時に付与するか、API側で変換する。

5. Cookieセッション+CSRF(二重送信トークン)

ブラウザ中心のWebは、HttpOnlyなセッションクッキーが使いやすい。状態はサーバ側に保存しても、JWTをクッキーに入れてもよい(HttpOnly/Secure/SameSiteが必須)。

5.1 Cookieに保存(例:アクセストークン)

# app/routers/web_auth.py
from fastapi import APIRouter, Response, HTTPException, Depends
from pydantic import BaseModel
from datetime import timedelta
from app.security.passwords import verify_password
from app.security.jwt import create_token, decode_token
from app.core.settings import get_settings

router = APIRouter(prefix="/web", tags=["web-auth"])

class WebLogin(BaseModel):
    email: str
    password: str

@router.post("/login")
def web_login(payload: WebLogin, response: Response):
    user = ...  # 省略
    if not user or not verify_password(payload.password, user.password_hash):
        raise HTTPException(401, "invalid credentials")
    s = get_settings()
    access = create_token(str(user.id), "articles:read", timedelta(minutes=s.access_token_expires_minutes), "access")
    # `HttpOnly`でJSから読み取れず、`Secure`でHTTPS限定、SameSiteは用途に応じて設定
    response.set_cookie("access_token", access, httponly=True, secure=True, samesite="Lax", max_age=60*15, path="/")
    # CSRF用の`XSRF-TOKEN`を別Cookieで配る(JSはこれを読んでヘッダに載せる)
    response.set_cookie("XSRF-TOKEN", "random-csrf", httponly=False, secure=True, samesite="Lax", path="/")
    return {"ok": True}

5.2 CSRF検証(Double Submit Cookie)

# app/middlewares/csrf.py
from fastapi import Request, HTTPException, status

async def verify_csrf(request: Request):
    if request.method in ("POST","PUT","PATCH","DELETE"):
        cookie = request.cookies.get("XSRF-TOKEN")
        header = request.headers.get("X-CSRF-TOKEN")
        if not cookie or not header or cookie != header:
            raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="csrf failed")

適用例:

# app/routers/protected_web.py
from fastapi import APIRouter, Depends
from app.middlewares.csrf import verify_csrf

router = APIRouter(prefix="/web-api", dependencies=[Depends(verify_csrf)])

@router.post("/profile")
def update_profile():
    return {"ok": True}

判断ポイント

  • HttpOnlyはアクセストークンの読み取り防止に有効だが、CSRF対策は別途必要。
  • SameSiteはLaxが無難。クロスサイト要件があるならNone+HTTPS必須。

6. リフレッシュ運用と失効

  • 盗まれたリフレッシュトークン対策として、回転(再発行)直前のトークン無効化を管理テーブルで行う。
  • ユーザーの明示ログアウト時は、現在のアクセストークンのjti(トークンID)やリフレッシュブラックリストへ。

サンプル(概念):

# app/security/revoke.py
# revoked_tokens (jtiやサイン用ハッシュ)をストアしておき、decode後に照合して拒否

判断ポイント

  • RedisなどのTTL付きストアで管理すると実装が楽。
  • 回転時のトークン再利用検知(旧refreshで再度アクセス)をアラートに。

7. 監査ログとレート制限

  • 重要イベント(ログイン成功/失敗、パスワード変更、権限変更、トークン再発行)を監査ログに構造化出力。
  • ログイン試行はレート制限(IP×ユーザー)と遅延でブルートフォースを抑制。

例:先日のキャッシュ記事で紹介したトークンバケットを転用。


8. テスト戦略(セキュリティ観点)

  • パスワード検証の単体テスト、弱いパスワードの拒否。
  • /auth/login→保護APIのE2E。期限切れ・改ざん・別スコープのテスト。
  • CSRF:Cookieとヘッダの不一致で403になるか。
  • 失効:ブラックリストに載せたトークンが拒否されるか。
  • 監査ログ:イベントが漏れなく記録されるか。

9. よくある落とし穴と対策

症状 原因 対策
トークンがJSで盗まれる localStorage保管・XSS HttpOnly Cookie、CSP、入力サニタイズ、XSS監査
CSRFが通る Cookieのみ認証、CSRF対策不足 Double Submit CookieやSameSite、Origin/Referer検証
スコープが拡張される 文字列パース不備 スコープは集合で扱い、厳密比較
リフレッシュ乱用 長寿命+回転なし 回転+ブラックリスト、デバイスごとにID・失効
パスが漏れる ログにアクセストークン出力 認証ヘッダはマスク、PIIは出さない

10. ひな形:一気通貫の最小アプリ

# app/main.py
from fastapi import FastAPI
from app.routers import auth, articles, web_auth, protected_web

app = FastAPI(title="Secure API")

# JWT(Bearer)向けAPI
app.include_router(auth.router)
app.include_router(articles.router)

# Web(Cookie+CSRF)向けAPI
app.include_router(web_auth.router)
app.include_router(protected_web.router)

@app.get("/health")
def health():
    return {"ok": True}

11. 導入ロードマップ

  1. パスワードハッシュとユーザーモデル、/auth/loginJWT最小を通す。
  2. スコープとrequire_scopes保護ルートを整備。
  3. リフレッシュ回転、ブラックリスト、監査ログ、レート制限を追加。
  4. Web向けにCookie+CSRF導入。HttpOnly/Secure/SameSiteの適用確認。
  5. 鍵ローテーション、環境分離、監査のダッシュボード化で運用にのせる。

12. 参考リンク


まとめ

  • JWTは短寿命アクセストークン回転可能なリフレッシュの二段構え。ロールとスコープで認可を明確化し、ルートに必要最小の条件だけを書くと保守しやすい。
  • WebではCookie+CSRFが有効。HttpOnly/Secure/SameSiteの三点セットと、二重送信トークンで防御層を重ねる。
  • 失効、レート制限、監査ログ、鍵とトークンのローテーションを運用へ組み込み、テストで継続的に安全性を確認する。今日から小さく導入し、確実に安全域を広げていきましょう。

投稿者 greeden

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

日本語が含まれない投稿は無視されますのでご注意ください。(スパム対策)