Bỏ qua

Bài 8: Authentication — Google OAuth2 + JWT + HMAC

Project reference: admin/src/auth.py, admin/src/services/auth_service.py, admin/src/services/signed_url.py, admin/src/routers/auth.py


1. Tổng quan Flow Auth

Browser                    Admin API              Google
   │                           │                     │
   │── click "Login Google" ──►│                     │
   │                           │── redirect ────────►│
   │◄── redirect ──────────────│◄── code + state ────│
   │                           │── exchange code ────►│
   │                           │◄── access_token ────│
   │                           │── get userinfo ─────►│
   │                           │◄── email, name ─────│
   │                           │                     │
   │                           │ check email trong users table
   │                           │ issue JWT → set cookie
   │◄── kcds_session cookie ───│
   │                           │
   │── GET /api/audio ─────────│ (cookie gửi tự động)
   │                           │ verify JWT → fetch role từ DB → OK
   │◄── 200 data ──────────────│

2. OAuth2 — Authorization Code Flow

Thực ra OAuth2 không phải là giao thức xác thực người dùng — nó là authorization protocol: Google chỉ xác nhận với app rằng “người này đồng ý cho bạn đọc email của họ”, không hơn không kém.

Step 1: Redirect đến Google

# admin/src/routers/auth.py
import secrets
from urllib.parse import urlencode

@router.get("/google/login")
async def google_login(response: Response) -> RedirectResponse:
    # state = random token để verify callback là từ Google (chống CSRF)
    state = secrets.token_urlsafe(32)

    # Lưu state vào cookie (HttpOnly, short TTL)
    response.set_cookie("oauth_state", state, max_age=600, httponly=True)

    params = {
        "client_id": GOOGLE_CLIENT_ID,
        "redirect_uri": GOOGLE_REDIRECT_URI,
        "response_type": "code",
        "scope": "openid email profile",
        "state": state,
    }
    return RedirectResponse(f"https://accounts.google.com/o/oauth2/auth?{urlencode(params)}")

Step 2: Google Callback

@router.get("/google/callback")
async def google_callback(
    code: str,          # authorization code từ Google
    state: str,         # state từ Google — phải match với cookie
    request: Request,
    response: Response,
    session: AsyncSession = Depends(get_session),
    svc: AuthService = Depends(get_auth_service),
) -> RedirectResponse:
    # Verify state (CSRF protection)
    stored_state = request.cookies.get("oauth_state")
    if not stored_state or not hmac.compare_digest(state, stored_state):
        return RedirectResponse("/?login_error=invalid_state")

    try:
        # Exchange code → access_token (server-to-server, không qua browser)
        token = await _exchange_code(code)

        # Dùng access_token lấy thông tin user từ Google
        userinfo = await _get_userinfo(token)
        email = userinfo["email"]

        # Check email trong whitelist (users table)
        user = await svc.login_or_reject(session, email, userinfo)

        # Issue JWT
        jwt_token = create_session(email)

        # Set HttpOnly cookie
        response.set_cookie(
            SESSION_COOKIE_NAME,
            jwt_token,
            httponly=True,        # JavaScript không đọc được
            samesite="lax",       # gửi theo same-site navigation (xem note bên dưới)
            secure=COOKIE_SECURE, # chỉ HTTPS (production)
            max_age=JWT_TTL_SEC,
        )
        return RedirectResponse("/")

    except UserNotWhitelisted:
        return RedirectResponse("/?login_error=not_whitelisted")
    except UserDisabled:
        return RedirectResponse("/?login_error=disabled")

3. JWT — JSON Web Token

JWT là signed token — server ký bằng secret, client giữ token, server verify signature mỗi request.

Cấu trúc JWT

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9   ← Header (base64)
.eyJzdWIiOiJ1c2VyQGV4YW1wbGUuY29tIiwiZXhwIjoxNzE0MDAwMDAwfQ==  ← Payload (base64)
.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c  ← Signature (HMAC-SHA256)

Payload (sau decode):

{
  "sub": "user@example.com",   // subject = email
  "exp": 1714000000             // expiry unix timestamp
}

JWT payload không được mã hoá — bất kỳ ai có token đều có thể base64 decode và đọc nội dung. Phần duy nhất được bảo vệ là signature. Vì vậy, tuyệt đối không đặt thông tin nhạy cảm như role hay password vào JWT payload.

Issue JWT

# admin/src/services/auth_service.py
import jwt
from src.config import JWT_SECRET, JWT_TTL_SEC

def create_session(email: str) -> str:
    """Tạo JWT token cho session."""
    payload = {
        "sub": email,                               # subject
        "exp": time.time() + JWT_TTL_SEC,           # expiry (7 ngày)
    }
    return jwt.encode(payload, JWT_SECRET, algorithm="HS256")

def verify_session(token: str) -> str | None:
    """Verify token và trả về email. None nếu invalid/expired."""
    try:
        payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
        return payload.get("sub")
    except jwt.ExpiredSignatureError:
        return None  # token hết hạn
    except jwt.InvalidTokenError:
        return None  # token bị sửa hoặc sai format

Verify mỗi Request

# admin/src/auth.py
async def get_current_user(
    cookie_token: str | None = Cookie(default=None, alias="kcds_session"),
    session: AsyncSession = Depends(get_session),
    user_repo: UserRepository = Depends(get_user_repo),
) -> dict:
    if not cookie_token:
        raise HTTPException(401, "Chưa đăng nhập")

    # 1. Verify JWT signature + expiry
    email = verify_session(cookie_token)
    if not email:
        raise HTTPException(401, "Phiên đăng nhập hết hạn")

    # 2. Fetch role FRESH từ DB (không tin vào JWT payload)
    # → đổi role/disable user có hiệu lực NGAY, không cần re-login
    user = await user_repo.get_by_email(session, email)
    if user is None:
        raise HTTPException(401, "Tài khoản không tồn tại")
    if not user.get("enabled"):
        raise HTTPException(401, "Tài khoản đã bị vô hiệu hoá")

    return user  # {"email": ..., "role": ..., "enabled": ...}

Tại sao không lưu role vào JWT?

Tình huống: Admin hạ quyền user A từ sub_admin → user lúc 10:00
Nếu role trong JWT: user A tiếp tục có quyền sub_admin đến khi JWT hết hạn (7 ngày)
Nếu fetch từ DB: user A mất quyền ngay request tiếp theo

Cookie (HttpOnly) Bearer Token (Header)
Lưu ở đâu Browser tự quản lý localStorage / sessionStorage
JavaScript access ❌ Không (HttpOnly) ✅ Có
Gửi tự động ✅ Theo mọi same-site request ❌ Phải set header thủ công
XSS risk Thấp (JS không đọc được) Cao (JS đọc được)
CSRF risk Cao (cần SameSite + CSRF token) Thấp

Project dùng HttpOnly Cookie vì admin panel là web app trên browser. SameSite=Lax giảm thiểu CSRF risk mà không cần CSRF token riêng.

Tại sao Lax chứ không phải Strict?

Mode Gửi cookie khi... Vấn đề
Strict Chỉ khi request phát sinh từ chính site đó User click link tới admin panel từ email/Slack/Google search → cookie không gửi → thấy login page dù đang có session hợp lệ. UX khựng.
Lax (project dùng) Same-site + top-level navigation (click link, type URL) Đủ chặn CSRF phổ biến (POST/DELETE từ site khác không gửi cookie), đồng thời giữ UX mượt khi user điều hướng từ nguồn ngoài.
None Mọi request cross-site Phải kèm Secure=true. Chỉ dùng khi thực sự cần embed cookie-auth API vào iframe/app bên thứ 3.

Tóm gọn: Lax là sweet spot cho web app có link share được (email notification, Slack, bookmark). Strict chỉ hợp banking/high-security khi UX ít quan trọng hơn isolation tuyệt đối.

Frontend không cần biết JWT tồn tại:

// admin/src/ui/src/modules/auth.js
export async function initAuth({ onAuthed, onUnauthed }) {
    // Cookie được gửi tự động bởi browser
    const res = await fetch("/api/auth/me", { credentials: "include" });

    if (!res.ok) {
        onUnauthed();   // cookie missing/expired → show login
        return;
    }

    _me = (await res.json()).data;  // chỉ biết role qua API, không đọc JWT trực tiếp
    onAuthed();
}

5. HMAC Signed URL — Auth cho Audio Stream

HTML5 <audio src="..."> không cho phép set custom headers (Authorization). Cần cơ chế auth khác cho stream endpoint.

Giải pháp: Signed URL với HMAC

# admin/src/services/signed_url.py
import hashlib
import hmac
import time
from src.config import AUDIO_STREAM_SECRET, AUDIO_STREAM_TTL_SEC

def sign_audio_url(audio_id: int, ttl_sec: int | None = None) -> tuple[str, int]:
    """Tạo signed URL có TTL."""
    expires = int(time.time()) + (ttl_sec or AUDIO_STREAM_TTL_SEC)  # 30 phút
    sig = _hmac(audio_id, expires)
    # URL embed signature + expiry làm query params
    return f"/api/audio/{audio_id}/stream?sig={sig}&expires={expires}", expires

def verify_audio_sig(audio_id: int, sig: str, expires: int) -> bool:
    """Verify signature. False nếu hết hạn hoặc sai."""
    if expires < int(time.time()):
        return False  # hết TTL
    expected = _hmac(audio_id, expires)
    # compare_digest: constant-time comparison, chống timing attack
    return hmac.compare_digest(sig, expected)

def _hmac(audio_id: int, expires: int) -> str:
    msg = f"{audio_id}:{expires}".encode()  # message = id:expires
    return hmac.new(
        AUDIO_STREAM_SECRET.encode(),
        msg,
        hashlib.sha256,
    ).hexdigest()

Flow:

1. Frontend: POST /api/audio/42/stream-url (cookie auth)
   ← {url: "/api/audio/42/stream?sig=abc123&expires=1714000000"}

2. Frontend: <audio src="/api/audio/42/stream?sig=abc123&expires=1714000000">
   Browser gửi GET request (không có cookie, không có header)

3. Server: verify_audio_sig(42, "abc123", 1714000000)
   → HMAC(secret, "42:1714000000") == "abc123"? → stream file

Tại sao HMAC an toàn? - AUDIO_STREAM_SECRET chỉ server biết - Attacker không thể giả mạo sig vì không có secret - URL có TTL 30 phút → hết hạn tự vô hiệu - compare_digest tránh timing attack (so sánh character by character, không dừng sớm khi sai)


6. RBAC — Role-Based Access Control

# admin/src/roles.py
ROLE_ADMIN = "admin"
ROLE_SUB_ADMIN = "sub_admin"
ROLE_USER = "user"
EDITOR_ROLES = (ROLE_ADMIN, ROLE_SUB_ADMIN)

# admin/src/auth.py
def require_user(user=Depends(get_current_user)):
    """Bất kỳ user authenticated."""
    return user

def require_editor(user=Depends(get_current_user)):
    """admin hoặc sub_admin — mutation content."""
    if user["role"] not in EDITOR_ROLES:
        raise HTTPException(403, "Chỉ quản trị/phó quản trị mới thực hiện được")
    return user

def require_admin(user=Depends(get_current_user)):
    """Chỉ admin — user management."""
    if user["role"] != ROLE_ADMIN:
        raise HTTPException(403, "Chỉ quản trị mới thực hiện được")
    return user
# Áp dụng tại router level
@router.get("", dependencies=[Depends(require_user)])       # user thường xem được
@router.post("", dependencies=[Depends(require_editor)])    # chỉ editor upload
@router.delete("/{id}", dependencies=[Depends(require_editor)])  # chỉ editor xóa

Single Admin constraint — Database Level

Chỉ được có 1 admin. Enforce bằng partial unique index trong SQLite:

-- admin/src/db/schema.sql
CREATE UNIQUE INDEX IF NOT EXISTS idx_users_single_admin
    ON users((CASE WHEN role='admin' THEN 1 END));
-- Index này chỉ index rows có role='admin'
-- UNIQUE → không thể có 2 rows cùng value '1' trong index
-- → database tự chặn, không cần application logic

Resign — Atomic Role Swap

# admin/src/repositories/user_repo.py
async def swap_admin(
    self,
    session: AsyncSession,
    *,
    old_admin: str,
    old_new_role: str,
    successor: str,
) -> None:
    """Atomic: hạ old_admin TRƯỚC, nâng successor SAU.

    Thứ tự bắt buộc: nếu nâng successor trước → tạm thời có 2 admin →
    vi phạm `idx_users_single_admin` → SA raise IntegrityError → rollback.
    """
    await session.execute(
        update(users).where(users.c.email == old_admin).values(role=old_new_role)
    )
    # Sau bước trên: 0 admin trong DB

    await session.execute(
        update(users).where(users.c.email == successor).values(role="admin")
    )
    # Sau bước trên: 1 admin trong DB

    await session.commit()   # commit cả 2 UPDATE trong 1 transaction
    # Nếu UPDATE thứ 2 raise (vì bất kỳ lý do gì) → commit không chạy → rollback auto

📌 Cả 2 execute() share cùng 1 auto-begun transaction của SQLAlchemy 2.0. Không dùng async with session.begin(): wrapper vì session có thể đã có TX từ các SELECT trước trong service (SA raise InvalidRequestError nếu begin() lồng nhau).


7. Security Best Practices từ Project

1. Secrets trong env, không hardcode:

# ✅ Đúng
JWT_SECRET = os.getenv("JWT_SECRET", "")

# ❌ Sai
JWT_SECRET = "my-secret-key-123"

2. Fail-fast khi thiếu secret:

# admin/src/config.py
def validate():
    required = [
        ("JWT_SECRET", JWT_SECRET),
        ("AUDIO_STREAM_SECRET", AUDIO_STREAM_SECRET),
        ("GOOGLE_CLIENT_ID", GOOGLE_CLIENT_ID),
    ]
    missing = [name for name, val in required if not val]
    if missing:
        raise RuntimeError(f"Thiếu env vars bắt buộc: {missing}")

3. Constant-time comparison cho secrets:

# ✅ Đúng — tránh timing attack
hmac.compare_digest(provided_sig, expected_sig)

# ❌ Sai — dừng sớm khi gặp ký tự sai
provided_sig == expected_sig

4. HttpOnly cookie — JavaScript không đọc được:

response.set_cookie(
    "kcds_session",
    jwt_token,
    httponly=True,   # ← quan trọng
    samesite="lax",
    secure=True,     # chỉ HTTPS
)

5. State parameter trong OAuth — chống CSRF:

state = secrets.token_urlsafe(32)  # random, không đoán được
# Lưu state → gửi đến Google → Google trả về → verify phải match


Nguyên lý tổng quát

Pattern trong bài Nguyên lý (bài 0 + security) Chuyển giao
OAuth2 state param Defense in Depth — CSRF protection bằng random token Mọi OAuth flow (GitHub, Facebook, Apple) đều cần state
JWT không chứa role, fetch từ DB Principle of Least Privilege + revoke ngay Trade-off: stateless vs fresh data
HttpOnly + SameSite=Lax Defense in Depth — giảm XSS + CSRF surface Áp dụng cho mọi web session cookie
HMAC signed URL có TTL Least Privilege — URL chỉ valid cho 1 resource trong 30 phút Pattern chung của S3 presigned URL, CloudFront signed URL
hmac.compare_digest Fail Secure — constant-time compare chống timing attack Bất kỳ so sánh secret nào (HMAC, password hash, token)
secrets.token_urlsafe Don't roll your own crypto — dùng stdlib Không bao giờ dùng random.random() cho security
config.validate() secret bắt buộc Fail Fast — app không start khi thiếu secret Security bug muộn = breach, fail fast tốt hơn
Partial unique index cho admin Fail Secure at DB level — DB enforce invariant Push invariant xuống layer thấp nhất có thể

Chuyển giao: các nguyên lý bảo mật nền tảng

Bài này dùng 5 nguyên lý bảo mật phổ quát — áp dụng được với mọi auth system:

  1. Defense in Depth — nhiều layer phòng thủ, không dựa vào 1 cái
  2. Least Privilege — token/role chỉ có quyền tối thiểu cần thiết, TTL ngắn
  3. Fail Secure — khi lỗi, mặc định reject (không cho phép)
  4. Don't Trust Input — mọi input từ user (kể cả từ cookie, header, URL) đều untrusted
  5. Don't Roll Your Own Crypto — dùng library đã được peer-review (jwt, bcrypt, passlib, cryptography)

Trade-off đáng suy nghĩ

JWT vs Session Cookie: Bài 10 §4 có checklist. Tóm tắt:

  • JWT: stateless, scale ngang dễ, khó revoke
  • Session: revoke instant, scale cần session store shared (Redis)

Project KCDS chọn JWT nhưng fetch role từ DB mỗi request — pragmatic compromise.

Access token + Refresh token: Production app nghiêm chỉnh có cả 2 (access 15 phút, refresh 30 ngày). Project này chỉ có 1 token 7 ngày — đủ cho MVP, không đủ cho scale.

Security checklist khi code auth

  • Secret đủ dài (≥ 32 bytes random)?
  • Secret trong env, không hardcode?
  • Password hash bằng bcrypt/argon2 (không MD5/SHA256 thuần)?
  • Token có TTL không?
  • Cookie HttpOnly + Secure + SameSite?
  • Constant-time compare cho HMAC/token?
  • State parameter cho OAuth?
  • Fail-fast khi thiếu secret?
  • Log auth failure (nhưng không log token/password)?
  • Rate limit login endpoint?

Bài tập áp dụng

Implement auth đơn giản với JWT cho blog API:

# auth.py
import hashlib
import hmac
import jwt
import time
import secrets

SECRET = "your-secret-key"  # trong production: lấy từ env
TTL = 3600  # 1 tiếng

def create_token(user_id: int) -> str:
    return jwt.encode(
        {"sub": user_id, "exp": time.time() + TTL},
        SECRET, algorithm="HS256"
    )

def verify_token(token: str) -> int | None:
    try:
        payload = jwt.decode(token, SECRET, algorithms=["HS256"])
        return payload["sub"]
    except jwt.InvalidTokenError:
        return None

# FastAPI dependency
async def get_current_user(
    authorization: str | None = Header(default=None),
    db: Session = Depends(get_db),
) -> dict:
    if not authorization or not authorization.startswith("Bearer "):
        raise HTTPException(401)
    token = authorization.removeprefix("Bearer ")
    user_id = verify_token(token)
    if not user_id:
        raise HTTPException(401, "Token invalid")
    user = await db.get_user(user_id)
    if not user:
        raise HTTPException(401)
    return user

Câu hỏi tự kiểm tra: 1. Tại sao role không đặt trong JWT payload? Ưu/nhược của từng cách? 2. hmac.compare_digest() khác == chỗ nào? Timing attack là gì? 3. Tại sao phải hạ quyền mình TRƯỚC khi nâng quyền successor? 4. SameSite=Lax bảo vệ khỏi loại tấn công nào?