Bỏ qua

Bài 2: FastAPI + ASGI + Async/Await

Project reference: admin/src/main.py, admin/src/routers/audio.py


1. WSGI vs ASGI — Tại sao cần async?

WSGI (cũ — Flask, Django)

Request → Thread 1 → query DB (thread BLOCK, chờ)
Request → Thread 2 → query DB (thread BLOCK, chờ)
Request → Thread 3 → query DB (thread BLOCK, chờ)

Mỗi request chiếm 1 thread. Khi thread chờ I/O (DB, file, network), nó không làm gì được — lãng phí. Muốn xử lý 100 concurrent request → cần 100 thread → tốn RAM.

ASGI (mới — FastAPI, Starlette)

Request → coroutine → await DB query (coroutine SUSPENDED, event loop chuyển sang request khác)
Request → coroutine → await DB query (coroutine SUSPENDED, event loop chuyển sang request khác)
→ DB xong → event loop resume coroutine → trả response

1 thread, 1 event loop, N coroutine chạy concurrently (không phải parallel). Khi await, coroutine trả quyền kiểm soát về event loop — event loop chạy coroutine khác.

Ví dụ thực tế trong project:

# admin/src/routers/audio.py
@router.get("")
async def list_audio(
    q: str | None = None,
    svc: AudioService = Depends(get_audio_service),
    session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
    # await → coroutine suspended, event loop phục vụ request khác
    # trong khi SQLite đang chạy query
    return {"data": await svc.list_all(session, q=q)}

Nếu list_all mất 50ms, thread không bị block — 50ms đó event loop xử lý được hàng chục request khác.


2. FastAPI App + Lifespan

# admin/src/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # ── STARTUP: chạy một lần khi app bắt đầu ──
    config.validate()           # kiểm tra env vars bắt buộc — fail-fast
    Path(AUDIO_DIR).mkdir(parents=True, exist_ok=True)
    await init_db()             # tạo bảng nếu chưa có

    yield                       # ← app đang chạy, phục vụ request

    # ── SHUTDOWN: chạy một lần khi app tắt ──
    # (cleanup resources, close connections...)

app = FastAPI(title="Khí Công Dưỡng Sinh – Admin", lifespan=lifespan)

Pattern fail-fast: config.validate() raise RuntimeError ngay khi thiếu env bắt buộc. App không bao giờ start ở trạng thái thiếu cấu hình — tránh lỗi xảy ra muộn lúc runtime.


3. Router — Tổ chức endpoint

Khai báo router

# admin/src/routers/audio.py
from fastapi import APIRouter

router = APIRouter()
stream_router = APIRouter()  # router riêng cho stream (khác auth policy)

@router.get("")                      # GET /api/audio
async def list_audio(...): ...

@router.post("/{id}/stream-url")     # POST /api/audio/{id}/stream-url
async def get_stream_url(...): ...

Mount vào app

# admin/src/main.py
app.include_router(audio.router,        prefix="/api/audio",  tags=["audio"])
app.include_router(audio.stream_router, prefix="/api/audio",  tags=["audio-stream"])
app.include_router(schedule.router,     prefix="/api/schedule", tags=["schedule"])
# ...

Mỗi file router là một module độc lập. Khi cần thêm feature mới, chỉ tạo file router mới rồi mount vào main.py — không phải sửa code cũ.


4. Path & Query Parameters

@router.get("")
async def list_audio(
    # Query params — tự động parse từ URL ?category_id=1&q=thiền
    category_id: int | None = None,   # /audio?category_id=1
    q: str | None = None,             # /audio?q=thiền
    month: int | None = None,
    year: int | None = None,
) -> dict[str, Any]:
    ...

@router.post("/{id}/stream-url")
async def get_stream_url(
    id: int,                          # Path param — /audio/42/stream-url
) -> dict[str, Any]:
    ...

FastAPI xử lý phần còn lại tự động: ép kiểu ("42" thành int 42, "true" thành bool True), validate đầu vào và trả HTTP 422 nếu không hợp lệ, đồng thời sinh OpenAPI docs tại /docs.


5. Request Body với Pydantic

# admin/src/models.py
from pydantic import BaseModel

class AudioMetadataUpdate(BaseModel):
    title: str | None = None
    category_id: int | None = None
    event_id: int | None = None
# router nhận body
@router.put("/{id}")
async def update_audio(
    id: int,
    body: AudioMetadataUpdate,   # FastAPI tự parse JSON body → Pydantic model
) -> dict[str, Any]:
    # body.title, body.category_id đã được validate type
    ...

Nguyên tắc quan trọng: Pydantic chỉ validate shape (kiểu dữ liệu, required/optional). Business rules ("title không được rỗng", "category_id phải tồn tại trong DB") → để ở Service layer.


6. Exception Handling

# admin/src/main.py — global handlers
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException) -> JSONResponse:
    return JSONResponse(status_code=exc.status_code, content={"error": exc.detail})

@app.exception_handler(ValueError)
async def value_error_handler(request: Request, exc: ValueError) -> JSONResponse:
    return JSONResponse(status_code=400, content={"error": str(exc)})

@app.exception_handler(RequestValidationError)
async def validation_error_handler(request: Request, exc: RequestValidationError) -> JSONResponse:
    return JSONResponse(status_code=422, content={"error": "Dữ liệu không hợp lệ"})
# Trong router/service — raise HTTPException bình thường
from fastapi import HTTPException

if not audio:
    raise HTTPException(404, "Audio không tồn tại")

if used + new_size > MAX_STORAGE_BYTES:
    raise HTTPException(507, "Vượt quá giới hạn lưu trữ")

Business code raise HTTPException hoặc ValueError. Global handler catch và format thành JSON response nhất quán {"error": "..."}, đảm bảo frontend luôn nhận cùng một shape dù lỗi xảy ra ở tầng nào.

Lưu ý thứ tự args: JSONResponse(content, status_code=200, ...) — positional đầu tiên là content. Luôn dùng keyword args (status_code=..., content=...) để tránh nhầm.

Đừng để stack trace "bay đi" — handler cho lỗi 500

3 handler ở trên chỉ bắt exception đã biết. Với lỗi unexpected (bug logic, KeyError, crash từ third-party library), FastAPI trả 500 mặc định — stack trace có thể bị nuốt trên production (Docker + logging pipeline không capture stderr đúng cách).

import logging
logger = logging.getLogger(__name__)

@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception) -> JSONResponse:
    # exc_info=True tự attach full traceback vào log record
    logger.exception("Unhandled exception on %s %s", request.method, request.url.path)
    return JSONResponse(
        status_code=500,
        content={"error": "Lỗi máy chủ — admin đã được thông báo"},
    )

2 điểm đáng nhớ:

  • logger.exception() = logger.error(..., exc_info=True) — luôn có full traceback trong log
  • Không expose raw error message ra response body (rò rỉ đường dẫn file, tên biến, version library → attacker exploit dễ hơn). Client chỉ cần "server lỗi" + correlation ID nếu có.

7. Response Format

# Convention trong project: hầu hết endpoint wrap data trong {"data": ...}
return {"data": await svc.list_all(session)}
return {"data": {"url": url, "expires": expires}}

# Thêm warnings khi cần
return {"data": result, "warnings": warnings}

# Ngoại lệ: DELETE trả 204 No Content (không body), hoặc endpoint trả raw file/stream

FastAPI tự serialize dict → JSON. Không cần jsonify() như Flask.

Tại sao convention wrap {data: ...} quan trọng

Nhất quán shape giúp client chỉ cần 1 hàm fetch duy nhất — không per-endpoint unwrap. Toàn bộ api.js của project:

// admin/src/ui/src/modules/api.js
export class ApiError extends Error {
  constructor(message, status) {
    super(message);
    this.status = status;
  }
}

export async function api(method, path, body = null) {
  const opts = {
    method,
    credentials: "include",                       // cookie auth tự gửi
    headers: { "Content-Type": "application/json" },
  };
  if (body !== null) opts.body = JSON.stringify(body);

  const res = await fetch(`/api${path}`, opts);

  // 401 → session hết hạn hoặc user bị disable → về login
  if (res.status === 401) {
    window.location.href = "/";
    throw new ApiError("Phiên đăng nhập hết hạn", 401);
  }

  // 204 No Content (logout, self-delete) — không có body
  if (res.status === 204) return null;

  const json = await res.json();
  if (!res.ok) throw new ApiError(json.error ?? "Lỗi không xác định", res.status);
  return json.data;   // ← lấy shortcut vì server luôn wrap
}

Điểm thiết kế đáng học:

  • Server convention ổn định → client viết 1 lần, dùng cho N endpoint (DRY đúng — bài 0 §2)
  • Error format thống nhất ({"error": "..."}) → client có 1 exception class ApiError duy nhất
  • 2 status code được handle đặc biệt (401 redirect, 204 null), còn lại theo res.ok

Đảo lại: mỗi endpoint trả shape khác nhau → client phải biết shape từng endpoint → code repeat N lần, bug N chỗ.


Nguyên lý tổng quát

Pattern trong bài Nguyên lý (bài 0) Chuyển giao
Async/await thay thread blocking Cooperative scheduling — task nhường điều khiển tại điểm await Áp dụng được với mọi async runtime (Node.js, Rust tokio, Go goroutine có khác chút)
Lifespan config.validate() Fail Fast — lỗi config nổ ra lúc startup Framework nào cũng nên có startup hook; Django AppConfig.ready(), Express app.listen callback
Router → Service (qua Depends) SoC giữa HTTP concern và business concern GraphQL resolver, gRPC handler cũng tách tương tự
Global exception handler SoC — business code raise, handler format Express middleware, Spring @ControllerAdvice
Response {"data": ...} wrap POLS (Principle of Least Surprise) — frontend luôn biết shape API design chung — REST, GraphQL, JSON-RPC

Chuyển giao quan trọng: "Async giúp I/O, không giúp CPU." Bài 10 §5 đào sâu hơn — nếu handler của bạn là CPU-bound, async không giúp gì, phải asyncio.to_thread() hoặc bỏ async.


Bài tập áp dụng

Thử tạo một API endpoint đơn giản:

# Tạo file my_router.py
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel

router = APIRouter()

class ItemCreate(BaseModel):
    name: str
    price: float

# In-memory store (demo)
items = {}
next_id = 1

@router.get("")
async def list_items() -> dict:
    return {"data": list(items.values())}

@router.post("")
async def create_item(body: ItemCreate) -> dict:
    global next_id
    item = {"id": next_id, "name": body.name, "price": body.price}
    items[next_id] = item
    next_id += 1
    return {"data": item}

@router.get("/{id}")
async def get_item(id: int) -> dict:
    if id not in items:
        raise HTTPException(404, "Item không tồn tại")
    return {"data": items[id]}
# Mount vào app
app.include_router(my_router.router, prefix="/api/items", tags=["items"])

Câu hỏi tự kiểm tra: 1. Tại sao phải dùng async def thay def? 2. Sự khác biệt giữa path param và query param trong FastAPI? 3. Pydantic validate gì? Ai validate business rules?