green snake
Photo by Pixabay on Pexels.com

A Beginner’s Guide to Serious Security Design with FastAPI: Authentication & Authorization — JWT/OAuth2, Cookie Sessions, RBAC/Scopes, CSRF Protection, and Real-World Pitfalls


Summary (Overview)

  • Store passwords as hashes; keep access tokens short-lived and refresh tokens long-lived + stored safely as a rule of thumb.
  • For single-page apps (SPAs) or mobile, use JWT (Bearer); for browser-centric web apps, Cookies + CSRF protection are a good match.
  • Separate authorization into roles (RBAC) and scopes, and embed decisions using FastAPI’s Security / Depends.
  • Implement concrete countermeasures against common threats: session fixation, token replay, XSS, CSRF, weak passwords, etc.
  • Include tests, rotation, key management, and audit logs to build an operations-proof system.

Who Benefits

  • Learner A (thesis / solo project): wants a minimal, reliable setup for form auth and JWT.
  • Small Team B (3-person contract dev): unsure about Cookie operations for API and admin UI; wants to finalize CSRF and SameSite design.
  • SaaS Team C (startup): wants clear boundaries with RBAC + scopes, and to run token rotation and revocation in production.

Accessibility Assessment

  • Key points are organized with headings and bullet lists; code uses monospaced blocks with short comments for readability.
  • Technical terms include a brief explanation on first use. Decision points at the end of sections help screen-reader users follow the flow.
  • Overall level: roughly AA.

1. Fundamental Policy: Decomposing Authentication, Authorization, and Session

  • Authentication: verify who the user is (passwords, SAML/OIDC, API keys, etc.)
  • Authorization: determine what they can do (RBAC = roles, scopes = functional units)
  • Session: a mechanism to persist authenticated state (JWT, Cookies, server sessions)

Decision points

  • If the front end is SPA/mobile-centric, choose JWT (short-lived access; store refresh securely).
  • If it’s mainly same-origin web, Cookie sessions are natural. Always apply HttpOnly/Secure/SameSite and CSRF protection.
  • Use roles for broad boundaries; protect API endpoints with scopes for fine-grained guards.

2. User Model and Password Hashing

2.1 Model Example (SQLAlchemy 2.x)

# app/models.py
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, Boolean
from typing import Optional

class Base(DeclarativeBase): pass

class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    password_hash: Mapped[str] = mapped_column(String(255))
    is_active: Mapped[bool] = mapped_column(default=True)
    role: Mapped[str] = mapped_column(String(50), default="user")  # e.g., admin/user

2.2 Hashing and Verification

# app/security/passwords.py
from passlib.context import CryptContext

pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto")

def hash_password(plain: str) -> str:
    return pwd_ctx.hash(plain)

def verify_password(plain: str, hashed: str) -> bool:
    return pwd_ctx.verify(plain, hashed)

Decision points

  • Never store plaintext. Use intentionally slow hashes like bcrypt or argon2.
  • Enforce password policy (length, character classes, breach checking) in validation.

3. Minimal JWT (Bearer) Implementation

3.1 Settings and Key Management

# app/core/settings.py
from pydantic_settings import BaseSettings
from pydantic import Field

class Settings(BaseSettings):
    jwt_secret: str = Field(..., description="HS256 secret key")
    jwt_issuer: str = "example-api"
    access_token_expires_minutes: int = 15
    refresh_token_expires_days: int = 30
    class Config:
        env_file = ".env"
        extra = "ignore"

def get_settings():
    return Settings()

3.2 Issuance & Verification (using PyJWT / JOSE-type libs)

# app/security/jwt.py
from datetime import datetime, timedelta, timezone
import jwt  # pyjwt
from typing import Any, Dict
from app.core.settings import get_settings

ALGO = "HS256"

def create_token(sub: str, scope: str, expires_delta: timedelta, token_type: str) -> str:
    s = get_settings()
    now = datetime.now(timezone.utc)
    payload: Dict[str, Any] = {
        "iss": s.jwt_issuer,
        "sub": sub,
        "scope": scope,           # e.g., "articles:read users:write" (space-delimited)
        "type": token_type,       # "access" or "refresh"
        "iat": int(now.timestamp()),
        "exp": int((now + expires_delta).timestamp())
    }
    return jwt.encode(payload, s.jwt_secret, algorithm=ALGO)

def decode_token(token: str) -> Dict[str, Any]:
    s = get_settings()
    return jwt.decode(token, s.jwt_secret, algorithms=[ALGO], options={"require": ["exp","sub","type"]})

3.3 Auth Router (Login / Refresh)

# app/routers/auth.py
from fastapi import APIRouter, HTTPException, Depends, status
from pydantic import BaseModel
from datetime import timedelta
from app.security.passwords import verify_password
from app.security.jwt import create_token, decode_token
from app.core.settings import get_settings

router = APIRouter(prefix="/auth", tags=["auth"])

class LoginRequest(BaseModel):
    email: str
    password: str

class TokenPair(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"

# Temporary DB lookup function
def get_user_by_email(email: str):
    # Implementation omitted. Return a user that includes password_hash
    ...

@router.post("/login", response_model=TokenPair)
def login(payload: LoginRequest):
    user = get_user_by_email(payload.email)
    if not user or not verify_password(payload.password, user.password_hash) or not user.is_active:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid credentials")

    s = get_settings()
    access = create_token(
        sub=str(user.id),
        scope="articles:read users:read",  # assemble dynamically as needed
        expires_delta=timedelta(minutes=s.access_token_expires_minutes),
        token_type="access"
    )
    refresh = create_token(
        sub=str(user.id),
        scope="refresh",
        expires_delta=timedelta(days=s.refresh_token_expires_days),
        token_type="refresh"
    )
    return TokenPair(access_token=access, refresh_token=refresh)

class RefreshRequest(BaseModel):
    refresh_token: str

@router.post("/refresh", response_model=TokenPair)
def refresh(req: RefreshRequest):
    data = decode_token(req.refresh_token)
    if data.get("type") != "refresh":
        raise HTTPException(401, "invalid token type")
    s = get_settings()
    access = create_token(sub=data["sub"], scope="articles:read users:read",
                          expires_delta=timedelta(minutes=s.access_token_expires_minutes), token_type="access")
    # In production, rotate refresh tokens (reissue + invalidate the previous one)
    new_refresh = create_token(sub=data["sub"], scope="refresh",
                               expires_delta=timedelta(days=s.refresh_token_expires_days), token_type="refresh")
    return TokenPair(access_token=access, refresh_token=new_refresh)

Decision points

  • Access tokens are short-lived; refresh tokens should support rotation and blacklisting so leaks can be invalidated.
  • Scopes are space-delimited. Keep a consistent naming scheme like resource:verb (articles:read).

4. Authorization: Combining RBAC and Scopes

4.1 Decoder and Scope Checking

# app/security/deps.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Sequence
from app.security.jwt import decode_token
from app.models import User

bearer = HTTPBearer(auto_error=False)

def get_current_user(creds: HTTPAuthorizationCredentials = Depends(bearer)) -> User:
    if creds is None:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="missing token")
    data = decode_token(creds.credentials)
    # Fetch User from DB (omitted) and verify active status
    ...
    return user

def require_scopes(required: Sequence[str]):
    def checker(creds: HTTPAuthorizationCredentials = Depends(bearer)):
        if creds is None:
            raise HTTPException(401, "missing token")
        data = decode_token(creds.credentials)
        token_scopes = set(str(data.get("scope","")).split())
        if not set(required).issubset(token_scopes):
            raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="insufficient scope")
        return data
    return checker

4.2 Apply to Routes

# app/routers/articles.py
from fastapi import APIRouter, Depends
from app.security.deps import get_current_user, require_scopes

router = APIRouter(prefix="/articles", tags=["articles"])

@router.get("", dependencies=[Depends(require_scopes(["articles:read"]))])
def list_articles():
    return [{"id": 1, "title": "Hello"}]

@router.post("", dependencies=[Depends(require_scopes(["articles:write"]))])
def create_article():
    return {"ok": True}

Decision points

  • Routes remain readable if you declare only the required scopes there.
  • Map roles → scopes when issuing tokens, or translate roles to scopes server-side.

5. Cookie Sessions + CSRF (Double-Submit Token)

For browser-centric web apps, an HttpOnly session cookie is convenient. You can store state server-side or put a JWT in a cookie (but always use HttpOnly/Secure/SameSite).

5.1 Store in a Cookie (e.g., Access Token)

# app/routers/web_auth.py
from fastapi import APIRouter, Response, HTTPException, Depends
from pydantic import BaseModel
from datetime import timedelta
from app.security.passwords import verify_password
from app.security.jwt import create_token, decode_token
from app.core.settings import get_settings

router = APIRouter(prefix="/web", tags=["web-auth"])

class WebLogin(BaseModel):
    email: str
    password: str

@router.post("/login")
def web_login(payload: WebLogin, response: Response):
    user = ...  # omitted
    if not user or not verify_password(payload.password, user.password_hash):
        raise HTTPException(401, "invalid credentials")
    s = get_settings()
    access = create_token(str(user.id), "articles:read", timedelta(minutes=s.access_token_expires_minutes), "access")
    # `HttpOnly` prevents JS reads; `Secure` restricts to HTTPS; set SameSite per use case
    response.set_cookie("access_token", access, httponly=True, secure=True, samesite="Lax", max_age=60*15, path="/")
    # Distribute an `XSRF-TOKEN` in a separate cookie (JS reads this and sends it in a header)
    response.set_cookie("XSRF-TOKEN", "random-csrf", httponly=False, secure=True, samesite="Lax", path="/")
    return {"ok": True}

5.2 CSRF Verification (Double-Submit Cookie)

# app/middlewares/csrf.py
from fastapi import Request, HTTPException, status

async def verify_csrf(request: Request):
    if request.method in ("POST","PUT","PATCH","DELETE"):
        cookie = request.cookies.get("XSRF-TOKEN")
        header = request.headers.get("X-CSRF-TOKEN")
        if not cookie or not header or cookie != header:
            raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="csrf failed")

Example application:

# app/routers/protected_web.py
from fastapi import APIRouter, Depends
from app.middlewares.csrf import verify_csrf

router = APIRouter(prefix="/web-api", dependencies=[Depends(verify_csrf)])

@router.post("/profile")
def update_profile():
    return {"ok": True}

Decision points

  • HttpOnly helps prevent token reading but does not prevent CSRF—protect that separately.
  • SameSite="Lax" is a sensible default. For cross-site needs, use None + HTTPS.

6. Refresh Operations and Revocation

  • To address stolen refresh tokens, implement rotation (reissue) and invalidate the previously issued token via a management table.
  • On explicit logout, add the current access token’s jti (token ID) and refresh tokens to a blacklist.

Sample (conceptual):

# app/security/revoke.py
# Keep a store of revoked_tokens (by jti or signing hash); after decode, check and reject if present

Decision points

  • Using a TTL store like Redis simplifies implementation.
  • Detect refresh token reuse (old refresh used again after rotation) and alert.

7. Audit Logging and Rate Limiting

  • Output structured audit logs for key events (login success/failure, password change, permission change, token reissue).
  • Rate-limit login attempts (IP × user) and add delay to suppress brute force.

Example: reuse a token-bucket implementation from your caching toolkit.


8. Test Strategy (Security Perspective)

  • Unit tests for password verification and weak-password rejection.
  • E2E: /auth/login → protected API; test expiration, tampering, and wrong-scope cases.
  • CSRF: ensure cookie/header mismatch returns 403.
  • Revocation: tokens on the blacklist must be rejected.
  • Audit logs: verify events are recorded without gaps.

9. Common Pitfalls and Countermeasures

Symptom Cause Countermeasure
Tokens stolen by JS localStorage storage + XSS HttpOnly cookies, CSP, input sanitization, XSS audits
CSRF succeeds Cookie-only auth, missing CSRF defense Double-submit cookie, SameSite, Origin/Referer checks
Scopes get expanded Loose string parsing Treat scopes as a set and compare strictly
Refresh overuse Long lifetime + no rotation Rotation + blacklist; per-device IDs & revocation
Paths leak Access tokens printed in logs Mask auth headers; omit PII

10. Boilerplate: End-to-End Minimal App

# app/main.py
from fastapi import FastAPI
from app.routers import auth, articles, web_auth, protected_web

app = FastAPI(title="Secure API")

# APIs for JWT (Bearer)
app.include_router(auth.router)
app.include_router(articles.router)

# APIs for Web (Cookie + CSRF)
app.include_router(web_auth.router)
app.include_router(protected_web.router)

@app.get("/health")
def health():
    return {"ok": True}

11. Adoption Roadmap

  1. Implement password hashing and the user model; get minimal JWT working at /auth/login.
  2. Protect endpoints with scopes via require_scopes.
  3. Add refresh rotation, blacklist, audit logs, and rate limiting.
  4. Introduce Cookie + CSRF for web; verify HttpOnly/Secure/SameSite.
  5. Operationalize key rotation, environment separation, and audit dashboards.

12. References


Conclusion

  • With JWT, use short-lived access tokens plus rotatable refresh tokens. Clarify authorization with roles and scopes; declaring only minimal requirements at the route level improves maintainability.
  • On the web, Cookies + CSRF are effective. Apply the trio HttpOnly/Secure/SameSite, and layer defenses with the double-submit token.
  • Bake revocation, rate limiting, audit logs, and key/token rotation into operations, and continuously verify safety with tests. Start small today and steadily expand your secure zone.

By greeden

Leave a Reply

Your email address will not be published. Required fields are marked *

日本語が含まれない投稿は無視されますのでご注意ください。(スパム対策)