Bài 8: Authentication — Google OAuth2 + JWT + HMAC¶
Project reference:
admin/src/auth.py,admin/src/services/auth_service.py,admin/src/services/signed_url.py,admin/src/routers/auth.py
1. Tổng quan Flow Auth¶
Browser Admin API Google
│ │ │
│── click "Login Google" ──►│ │
│ │── redirect ────────►│
│◄── redirect ──────────────│◄── code + state ────│
│ │── exchange code ────►│
│ │◄── access_token ────│
│ │── get userinfo ─────►│
│ │◄── email, name ─────│
│ │ │
│ │ check email trong users table
│ │ issue JWT → set cookie
│◄── kcds_session cookie ───│
│ │
│── GET /api/audio ─────────│ (cookie gửi tự động)
│ │ verify JWT → fetch role từ DB → OK
│◄── 200 data ──────────────│
2. OAuth2 — Authorization Code Flow¶
Thực ra OAuth2 không phải là giao thức xác thực người dùng — nó là authorization protocol: Google chỉ xác nhận với app rằng “người này đồng ý cho bạn đọc email của họ”, không hơn không kém.
Step 1: Redirect đến Google¶
# admin/src/routers/auth.py
import secrets
from urllib.parse import urlencode
@router.get("/google/login")
async def google_login(response: Response) -> RedirectResponse:
# state = random token để verify callback là từ Google (chống CSRF)
state = secrets.token_urlsafe(32)
# Lưu state vào cookie (HttpOnly, short TTL)
response.set_cookie("oauth_state", state, max_age=600, httponly=True)
params = {
"client_id": GOOGLE_CLIENT_ID,
"redirect_uri": GOOGLE_REDIRECT_URI,
"response_type": "code",
"scope": "openid email profile",
"state": state,
}
return RedirectResponse(f"https://accounts.google.com/o/oauth2/auth?{urlencode(params)}")
Step 2: Google Callback¶
@router.get("/google/callback")
async def google_callback(
code: str, # authorization code từ Google
state: str, # state từ Google — phải match với cookie
request: Request,
response: Response,
session: AsyncSession = Depends(get_session),
svc: AuthService = Depends(get_auth_service),
) -> RedirectResponse:
# Verify state (CSRF protection)
stored_state = request.cookies.get("oauth_state")
if not stored_state or not hmac.compare_digest(state, stored_state):
return RedirectResponse("/?login_error=invalid_state")
try:
# Exchange code → access_token (server-to-server, không qua browser)
token = await _exchange_code(code)
# Dùng access_token lấy thông tin user từ Google
userinfo = await _get_userinfo(token)
email = userinfo["email"]
# Check email trong whitelist (users table)
user = await svc.login_or_reject(session, email, userinfo)
# Issue JWT
jwt_token = create_session(email)
# Set HttpOnly cookie
response.set_cookie(
SESSION_COOKIE_NAME,
jwt_token,
httponly=True, # JavaScript không đọc được
samesite="lax", # gửi theo same-site navigation (xem note bên dưới)
secure=COOKIE_SECURE, # chỉ HTTPS (production)
max_age=JWT_TTL_SEC,
)
return RedirectResponse("/")
except UserNotWhitelisted:
return RedirectResponse("/?login_error=not_whitelisted")
except UserDisabled:
return RedirectResponse("/?login_error=disabled")
3. JWT — JSON Web Token¶
JWT là signed token — server ký bằng secret, client giữ token, server verify signature mỗi request.
Cấu trúc JWT¶
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9 ← Header (base64)
.eyJzdWIiOiJ1c2VyQGV4YW1wbGUuY29tIiwiZXhwIjoxNzE0MDAwMDAwfQ== ← Payload (base64)
.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c ← Signature (HMAC-SHA256)
Payload (sau decode):
JWT payload không được mã hoá — bất kỳ ai có token đều có thể base64 decode và đọc nội dung. Phần duy nhất được bảo vệ là signature. Vì vậy, tuyệt đối không đặt thông tin nhạy cảm như role hay password vào JWT payload.
Issue JWT¶
# admin/src/services/auth_service.py
import jwt
from src.config import JWT_SECRET, JWT_TTL_SEC
def create_session(email: str) -> str:
"""Tạo JWT token cho session."""
payload = {
"sub": email, # subject
"exp": time.time() + JWT_TTL_SEC, # expiry (7 ngày)
}
return jwt.encode(payload, JWT_SECRET, algorithm="HS256")
def verify_session(token: str) -> str | None:
"""Verify token và trả về email. None nếu invalid/expired."""
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
return payload.get("sub")
except jwt.ExpiredSignatureError:
return None # token hết hạn
except jwt.InvalidTokenError:
return None # token bị sửa hoặc sai format
Verify mỗi Request¶
# admin/src/auth.py
async def get_current_user(
cookie_token: str | None = Cookie(default=None, alias="kcds_session"),
session: AsyncSession = Depends(get_session),
user_repo: UserRepository = Depends(get_user_repo),
) -> dict:
if not cookie_token:
raise HTTPException(401, "Chưa đăng nhập")
# 1. Verify JWT signature + expiry
email = verify_session(cookie_token)
if not email:
raise HTTPException(401, "Phiên đăng nhập hết hạn")
# 2. Fetch role FRESH từ DB (không tin vào JWT payload)
# → đổi role/disable user có hiệu lực NGAY, không cần re-login
user = await user_repo.get_by_email(session, email)
if user is None:
raise HTTPException(401, "Tài khoản không tồn tại")
if not user.get("enabled"):
raise HTTPException(401, "Tài khoản đã bị vô hiệu hoá")
return user # {"email": ..., "role": ..., "enabled": ...}
Tại sao không lưu role vào JWT?
Tình huống: Admin hạ quyền user A từ sub_admin → user lúc 10:00
Nếu role trong JWT: user A tiếp tục có quyền sub_admin đến khi JWT hết hạn (7 ngày)
Nếu fetch từ DB: user A mất quyền ngay request tiếp theo
4. Cookie vs Bearer Token¶
| Cookie (HttpOnly) | Bearer Token (Header) | |
|---|---|---|
| Lưu ở đâu | Browser tự quản lý | localStorage / sessionStorage |
| JavaScript access | ❌ Không (HttpOnly) | ✅ Có |
| Gửi tự động | ✅ Theo mọi same-site request | ❌ Phải set header thủ công |
| XSS risk | Thấp (JS không đọc được) | Cao (JS đọc được) |
| CSRF risk | Cao (cần SameSite + CSRF token) | Thấp |
Project dùng HttpOnly Cookie vì admin panel là web app trên browser. SameSite=Lax giảm thiểu CSRF risk mà không cần CSRF token riêng.
Tại sao Lax chứ không phải Strict?¶
| Mode | Gửi cookie khi... | Vấn đề |
|---|---|---|
Strict |
Chỉ khi request phát sinh từ chính site đó | User click link tới admin panel từ email/Slack/Google search → cookie không gửi → thấy login page dù đang có session hợp lệ. UX khựng. |
Lax (project dùng) |
Same-site + top-level navigation (click link, type URL) | Đủ chặn CSRF phổ biến (POST/DELETE từ site khác không gửi cookie), đồng thời giữ UX mượt khi user điều hướng từ nguồn ngoài. |
None |
Mọi request cross-site | Phải kèm Secure=true. Chỉ dùng khi thực sự cần embed cookie-auth API vào iframe/app bên thứ 3. |
Tóm gọn: Lax là sweet spot cho web app có link share được (email notification, Slack, bookmark). Strict chỉ hợp banking/high-security khi UX ít quan trọng hơn isolation tuyệt đối.
Frontend không cần biết JWT tồn tại:
// admin/src/ui/src/modules/auth.js
export async function initAuth({ onAuthed, onUnauthed }) {
// Cookie được gửi tự động bởi browser
const res = await fetch("/api/auth/me", { credentials: "include" });
if (!res.ok) {
onUnauthed(); // cookie missing/expired → show login
return;
}
_me = (await res.json()).data; // chỉ biết role qua API, không đọc JWT trực tiếp
onAuthed();
}
5. HMAC Signed URL — Auth cho Audio Stream¶
HTML5 <audio src="..."> không cho phép set custom headers (Authorization). Cần cơ chế auth khác cho stream endpoint.
Giải pháp: Signed URL với HMAC
# admin/src/services/signed_url.py
import hashlib
import hmac
import time
from src.config import AUDIO_STREAM_SECRET, AUDIO_STREAM_TTL_SEC
def sign_audio_url(audio_id: int, ttl_sec: int | None = None) -> tuple[str, int]:
"""Tạo signed URL có TTL."""
expires = int(time.time()) + (ttl_sec or AUDIO_STREAM_TTL_SEC) # 30 phút
sig = _hmac(audio_id, expires)
# URL embed signature + expiry làm query params
return f"/api/audio/{audio_id}/stream?sig={sig}&expires={expires}", expires
def verify_audio_sig(audio_id: int, sig: str, expires: int) -> bool:
"""Verify signature. False nếu hết hạn hoặc sai."""
if expires < int(time.time()):
return False # hết TTL
expected = _hmac(audio_id, expires)
# compare_digest: constant-time comparison, chống timing attack
return hmac.compare_digest(sig, expected)
def _hmac(audio_id: int, expires: int) -> str:
msg = f"{audio_id}:{expires}".encode() # message = id:expires
return hmac.new(
AUDIO_STREAM_SECRET.encode(),
msg,
hashlib.sha256,
).hexdigest()
Flow:
1. Frontend: POST /api/audio/42/stream-url (cookie auth)
← {url: "/api/audio/42/stream?sig=abc123&expires=1714000000"}
2. Frontend: <audio src="/api/audio/42/stream?sig=abc123&expires=1714000000">
Browser gửi GET request (không có cookie, không có header)
3. Server: verify_audio_sig(42, "abc123", 1714000000)
→ HMAC(secret, "42:1714000000") == "abc123"? → stream file
Tại sao HMAC an toàn?
- AUDIO_STREAM_SECRET chỉ server biết
- Attacker không thể giả mạo sig vì không có secret
- URL có TTL 30 phút → hết hạn tự vô hiệu
- compare_digest tránh timing attack (so sánh character by character, không dừng sớm khi sai)
6. RBAC — Role-Based Access Control¶
# admin/src/roles.py
ROLE_ADMIN = "admin"
ROLE_SUB_ADMIN = "sub_admin"
ROLE_USER = "user"
EDITOR_ROLES = (ROLE_ADMIN, ROLE_SUB_ADMIN)
# admin/src/auth.py
def require_user(user=Depends(get_current_user)):
"""Bất kỳ user authenticated."""
return user
def require_editor(user=Depends(get_current_user)):
"""admin hoặc sub_admin — mutation content."""
if user["role"] not in EDITOR_ROLES:
raise HTTPException(403, "Chỉ quản trị/phó quản trị mới thực hiện được")
return user
def require_admin(user=Depends(get_current_user)):
"""Chỉ admin — user management."""
if user["role"] != ROLE_ADMIN:
raise HTTPException(403, "Chỉ quản trị mới thực hiện được")
return user
# Áp dụng tại router level
@router.get("", dependencies=[Depends(require_user)]) # user thường xem được
@router.post("", dependencies=[Depends(require_editor)]) # chỉ editor upload
@router.delete("/{id}", dependencies=[Depends(require_editor)]) # chỉ editor xóa
Single Admin constraint — Database Level¶
Chỉ được có 1 admin. Enforce bằng partial unique index trong SQLite:
-- admin/src/db/schema.sql
CREATE UNIQUE INDEX IF NOT EXISTS idx_users_single_admin
ON users((CASE WHEN role='admin' THEN 1 END));
-- Index này chỉ index rows có role='admin'
-- UNIQUE → không thể có 2 rows cùng value '1' trong index
-- → database tự chặn, không cần application logic
Resign — Atomic Role Swap¶
# admin/src/repositories/user_repo.py
async def swap_admin(
self,
session: AsyncSession,
*,
old_admin: str,
old_new_role: str,
successor: str,
) -> None:
"""Atomic: hạ old_admin TRƯỚC, nâng successor SAU.
Thứ tự bắt buộc: nếu nâng successor trước → tạm thời có 2 admin →
vi phạm `idx_users_single_admin` → SA raise IntegrityError → rollback.
"""
await session.execute(
update(users).where(users.c.email == old_admin).values(role=old_new_role)
)
# Sau bước trên: 0 admin trong DB
await session.execute(
update(users).where(users.c.email == successor).values(role="admin")
)
# Sau bước trên: 1 admin trong DB
await session.commit() # commit cả 2 UPDATE trong 1 transaction
# Nếu UPDATE thứ 2 raise (vì bất kỳ lý do gì) → commit không chạy → rollback auto
📌 Cả 2
execute()share cùng 1 auto-begun transaction của SQLAlchemy 2.0. Không dùngasync with session.begin():wrapper vì session có thể đã có TX từ các SELECT trước trong service (SA raiseInvalidRequestErrornếubegin()lồng nhau).
7. Security Best Practices từ Project¶
1. Secrets trong env, không hardcode:
2. Fail-fast khi thiếu secret:
# admin/src/config.py
def validate():
required = [
("JWT_SECRET", JWT_SECRET),
("AUDIO_STREAM_SECRET", AUDIO_STREAM_SECRET),
("GOOGLE_CLIENT_ID", GOOGLE_CLIENT_ID),
]
missing = [name for name, val in required if not val]
if missing:
raise RuntimeError(f"Thiếu env vars bắt buộc: {missing}")
3. Constant-time comparison cho secrets:
# ✅ Đúng — tránh timing attack
hmac.compare_digest(provided_sig, expected_sig)
# ❌ Sai — dừng sớm khi gặp ký tự sai
provided_sig == expected_sig
4. HttpOnly cookie — JavaScript không đọc được:
response.set_cookie(
"kcds_session",
jwt_token,
httponly=True, # ← quan trọng
samesite="lax",
secure=True, # chỉ HTTPS
)
5. State parameter trong OAuth — chống CSRF:
state = secrets.token_urlsafe(32) # random, không đoán được
# Lưu state → gửi đến Google → Google trả về → verify phải match
Nguyên lý tổng quát¶
| Pattern trong bài | Nguyên lý (bài 0 + security) | Chuyển giao |
|---|---|---|
| OAuth2 state param | Defense in Depth — CSRF protection bằng random token | Mọi OAuth flow (GitHub, Facebook, Apple) đều cần state |
| JWT không chứa role, fetch từ DB | Principle of Least Privilege + revoke ngay | Trade-off: stateless vs fresh data |
| HttpOnly + SameSite=Lax | Defense in Depth — giảm XSS + CSRF surface | Áp dụng cho mọi web session cookie |
| HMAC signed URL có TTL | Least Privilege — URL chỉ valid cho 1 resource trong 30 phút | Pattern chung của S3 presigned URL, CloudFront signed URL |
hmac.compare_digest |
Fail Secure — constant-time compare chống timing attack | Bất kỳ so sánh secret nào (HMAC, password hash, token) |
secrets.token_urlsafe |
Don't roll your own crypto — dùng stdlib | Không bao giờ dùng random.random() cho security |
config.validate() secret bắt buộc |
Fail Fast — app không start khi thiếu secret | Security bug muộn = breach, fail fast tốt hơn |
| Partial unique index cho admin | Fail Secure at DB level — DB enforce invariant | Push invariant xuống layer thấp nhất có thể |
Chuyển giao: các nguyên lý bảo mật nền tảng¶
Bài này dùng 5 nguyên lý bảo mật phổ quát — áp dụng được với mọi auth system:
- Defense in Depth — nhiều layer phòng thủ, không dựa vào 1 cái
- Least Privilege — token/role chỉ có quyền tối thiểu cần thiết, TTL ngắn
- Fail Secure — khi lỗi, mặc định reject (không cho phép)
- Don't Trust Input — mọi input từ user (kể cả từ cookie, header, URL) đều untrusted
- Don't Roll Your Own Crypto — dùng library đã được peer-review (jwt, bcrypt, passlib, cryptography)
Trade-off đáng suy nghĩ¶
JWT vs Session Cookie: Bài 10 §4 có checklist. Tóm tắt:
- JWT: stateless, scale ngang dễ, khó revoke
- Session: revoke instant, scale cần session store shared (Redis)
Project KCDS chọn JWT nhưng fetch role từ DB mỗi request — pragmatic compromise.
Access token + Refresh token: Production app nghiêm chỉnh có cả 2 (access 15 phút, refresh 30 ngày). Project này chỉ có 1 token 7 ngày — đủ cho MVP, không đủ cho scale.
Security checklist khi code auth¶
- Secret đủ dài (≥ 32 bytes random)?
- Secret trong env, không hardcode?
- Password hash bằng bcrypt/argon2 (không MD5/SHA256 thuần)?
- Token có TTL không?
- Cookie HttpOnly + Secure + SameSite?
- Constant-time compare cho HMAC/token?
- State parameter cho OAuth?
- Fail-fast khi thiếu secret?
- Log auth failure (nhưng không log token/password)?
- Rate limit login endpoint?
Bài tập áp dụng¶
Implement auth đơn giản với JWT cho blog API:
# auth.py
import hashlib
import hmac
import jwt
import time
import secrets
SECRET = "your-secret-key" # trong production: lấy từ env
TTL = 3600 # 1 tiếng
def create_token(user_id: int) -> str:
return jwt.encode(
{"sub": user_id, "exp": time.time() + TTL},
SECRET, algorithm="HS256"
)
def verify_token(token: str) -> int | None:
try:
payload = jwt.decode(token, SECRET, algorithms=["HS256"])
return payload["sub"]
except jwt.InvalidTokenError:
return None
# FastAPI dependency
async def get_current_user(
authorization: str | None = Header(default=None),
db: Session = Depends(get_db),
) -> dict:
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(401)
token = authorization.removeprefix("Bearer ")
user_id = verify_token(token)
if not user_id:
raise HTTPException(401, "Token invalid")
user = await db.get_user(user_id)
if not user:
raise HTTPException(401)
return user
Câu hỏi tự kiểm tra:
1. Tại sao role không đặt trong JWT payload? Ưu/nhược của từng cách?
2. hmac.compare_digest() khác == chỗ nào? Timing attack là gì?
3. Tại sao phải hạ quyền mình TRƯỚC khi nâng quyền successor?
4. SameSite=Lax bảo vệ khỏi loại tấn công nào?