Bỏ qua

Bài 3: Dependency Injection trong FastAPI

Project reference: admin/src/auth.py, admin/src/deps.py, admin/src/db/engine.py


1. Dependency Injection là gì?

Không dùng DI (cách naive):

@router.get("/audio")
async def list_audio():
    # ❌ Hardcode dependencies bên trong handler
    session = create_session()           # tạo session trực tiếp
    repo = SqliteAudioRepository()
    service = AudioService(repo)
    return await service.list_all(session)

Ba vấn đề nảy sinh ngay: không thể test handler mà không khởi động DB thật; nếu đổi repository từ SQLite sang PostgreSQL phải tìm và sửa toàn bộ handler; session không được quản lý nhất quán và dễ bị quên đóng.

Dùng DI (FastAPI Depends):

@router.get("/audio")
async def list_audio(
    session: AsyncSession = Depends(get_session),   # framework inject
    svc: AudioService = Depends(get_audio_service), # framework inject
):
    return await svc.list_all(session)

Framework tự resolve, inject, và cleanup dependencies.


2. Tạo Dependency đơn giản

Factory function

# admin/src/routers/audio.py
def get_audio_service() -> AudioService:
    """Factory — tạo service với concrete repositories."""
    return AudioService(
        SqliteAudioRepository(),
        SqliteCategoryRepository(),
        SqliteEventRepository(),
    )

@router.get("")
async def list_audio(
    svc: AudioService = Depends(get_audio_service),  # gọi factory mỗi request
):
    ...

Dependency trả về async generator (có cleanup)

# admin/src/db/engine.py
async def get_session() -> AsyncIterator[AsyncSession]:
    """FastAPI dependency cung cấp AsyncSession cho mỗi request."""
    async with AsyncSessionLocal() as session:
        yield session
        # ← request xử lý xong, `async with` tự close session
# Dùng trong router
@router.get("/audio")
async def list_audio(
    session: AsyncSession = Depends(get_session),  # session được inject
    # session tự động close khi handler trả về
):
    ...

Từ khoá yield biến dependency thành generator có vòng đời hai giai đoạn: framework chạy phần trước yield để khởi tạo, inject giá trị đó vào handler, rồi sau khi handler xử lý xong mới tiếp tục phần sau yield để cleanup. Về bản chất giống with statement, chỉ khác là FastAPI lo phần __exit__ thay bạn.

📌 Transaction ở đâu? Project này không bọc session.begin() ở dependency level. Thay vào đó, mỗi repository method tự gọi await session.commit() sau khi mutation xong. Pattern này đơn giản hơn nhưng đánh đổi: nếu service orchestrate nhiều mutation và 1 cái lỗi giữa chừng, các commit trước đó đã persist — service phải tự lo ordering/rollback (xem bài 5, §4 "Transaction order là business rule").

Ví dụ cụ thể — audio_repo.py commit ngay trong từng method:

# admin/src/repositories/audio_repo.py
class SqliteAudioRepository(AudioRepository):
    async def create(
        self,
        session: AsyncSession,
        *,
        filename: str,
        title: str,
        title_normalized: str,
        size_bytes: int,
        duration_seconds: int,
        category_id: int | None,
        event_id: int | None,
        uploaded_at: str,
    ) -> dict[str, Any]:
        result = await session.execute(
            insert(audio)
            .values(
                filename=filename,
                title=title,
                title_normalized=title_normalized,
                size_bytes=size_bytes,
                duration_seconds=duration_seconds,
                category_id=category_id,
                event_id=event_id,
                uploaded_at=uploaded_at,
            )
            .returning(audio)                  # lấy row vừa insert, không cần SELECT lại
        )
        row = result.fetchone()
        await session.commit()                 # ← commit ngay trong repo method
        return _row_to_dict(row)

    async def delete(self, session: AsyncSession, id: int) -> bool:
        result = await session.execute(delete(audio).where(audio.c.id == id))
        await session.commit()                 # ← 1 statement = 1 TX tự commit
        return (result.rowcount or 0) > 0

Mỗi method = 1 transaction độc lập. Service chỉ gọi repo.create(...) / repo.delete(...) — không cần biết commit/rollback. Trade-off cụ thể được đào sâu ở bài 4 §9bài 5 §8.


3. Dependency Chain (Chain of Depends)

Dependency có thể phụ thuộc vào dependency khác — FastAPI build dependency graph và resolve tự động:

# Tầng 1: DB session
async def get_session() -> AsyncSession: ...

# Tầng 2: User repo (phụ thuộc session)
def get_user_repo() -> UserRepository:
    return SqliteUserRepository()

# Tầng 3: get_current_user (phụ thuộc session + user_repo)
async def get_current_user(
    cookie_token: str | None = Cookie(default=None, alias="kcds_session"),
    session: AsyncSession = Depends(get_session),      # ← Depends tầng 1
    user_repo: UserRepository = Depends(get_user_repo), # ← Depends tầng 2
) -> dict:
    email = verify_session(cookie_token)  # decode JWT
    return await user_repo.get_by_email(session, email)

# Tầng 4: Role check (phụ thuộc get_current_user)
def require_editor(
    user: dict = Depends(get_current_user),  # ← Depends tầng 3
) -> dict:
    if user["role"] not in ("admin", "sub_admin"):
        raise HTTPException(403, "Không đủ quyền")
    return user

# Tầng 5: Router dùng require_editor
@router.post("/audio")
async def upload_audio(
    user: dict = Depends(require_editor),  # ← Depends tầng 4
    session: AsyncSession = Depends(get_session),
):
    # user đã được verify role
    ...

FastAPI tự động: 1. Resolve require_editor → cần get_current_user 2. Resolve get_current_user → cần get_session + get_user_repo 3. Resolve get_session → tạo DB session 4. Chạy ngược lại: inject vào handler 5. Sau khi handler xong → cleanup get_session (close session)

Nếu cùng dependency được dùng nhiều lần trong 1 request (ví dụ get_session ở cả handler và get_current_user), FastAPI cache lại — không tạo 2 session riêng.


4. dependencies=[] — Apply Auth cho cả Router

Thay vì thêm Depends vào từng endpoint:

# ❌ Lặp code
@router.get("/audio", dependencies=[Depends(require_user)])
async def list_audio(...): ...

@router.post("/audio", dependencies=[Depends(require_user)])
async def upload_audio(...): ...

Dùng dependencies ở router level:

# ✅ Apply cho cả router
router = APIRouter(dependencies=[Depends(require_user)])

@router.get("")      # tự động require_user
async def list_audio(...): ...

@router.post("")     # tự động require_user
async def upload(...): ...

Hoặc per-endpoint khi cần override:

# admin/src/routers/audio.py — mixed auth
@router.get("", dependencies=[Depends(require_user)])         # user thường được xem
async def list_audio(...): ...

@router.post("", dependencies=[Depends(require_editor)])      # chỉ editor được upload
async def upload_audio(...): ...

@router.delete("/{id}", dependencies=[Depends(require_editor)]) # chỉ editor được xóa
async def delete_audio(...): ...

5. Shared Factories trong deps.py

Khi một factory được dùng ở nhiều router, đặt vào deps.py:

# admin/src/deps.py
def get_schedule_service() -> ScheduleService:
    """Dùng bởi schedule router + playlists router (cascade)."""
    return ScheduleService(
        SqliteScheduleRepository(),
        SqlitePlaylistRepository(),
        SqliteBotStateRepository(),
        ChannelsService(SqliteDiscordChannelsRepository()),
    )

def get_user_service() -> UserService:
    """Dùng bởi auth router (me/logout) + users router (CRUD)."""
    return UserService(get_user_repo())
# Dùng ở cả 2 router
# routers/schedule.py
from src.deps import get_schedule_service

@router.post("")
async def save_schedule(svc=Depends(get_schedule_service)): ...

# routers/playlists.py — khi xóa track cần cascade update schedule
from src.deps import get_schedule_service

@router.delete("/{playlist_id}/tracks/{track_id}")
async def remove_track(svc=Depends(get_schedule_service)): ...

Nguyên tắc: Factory cụ thể cho 1 router → giữ trong file router đó. Factory dùng ở ≥2 router → đặt vào deps.py.


6. Tại sao DI giúp Test dễ hơn?

# Trong production code
@router.get("/audio")
async def list_audio(svc=Depends(get_audio_service)):
    return await svc.list_all(session)

# Trong test — override dependency
from unittest.mock import AsyncMock, Mock

def get_mock_service():
    svc = Mock(spec=AudioService)
    svc.list_all = AsyncMock(return_value=[{"id": 1, "title": "Test"}])
    return svc

app.dependency_overrides[get_audio_service] = get_mock_service

# Giờ test không chạm DB thật
async def test_list_audio():
    response = await client.get("/api/audio")
    assert response.json() == {"data": [{"id": 1, "title": "Test"}]}

Nguyên lý tổng quát

Pattern trong bài Nguyên lý (bài 0) Chuyển giao
Depends(get_session) IoC — framework quyết định khi nào gọi factory và resolve chain Angular DI, Spring @Autowired, NestJS @Inject — cơ chế khác nhưng nguyên lý giống
Service nhận repo qua constructor DIP — phụ thuộc abstraction (AudioRepository ABC) Pattern này là nền cho unit test (bài 9 §4)
deps.py cho factory ≥ 2 router dùng DRY có kỷ luật — chỉ abstract khi thực sự shared Không extract sớm — xem "Rule of Three" (bài 0 §2)
dependencies=[Depends(require_editor)] router-level SoC giữa auth logic và business handler Middleware trong Express/Django REST; guards trong NestJS

Chuyển giao: IoC ngoài FastAPI

IoC không phải độc quyền của FastAPI. Mọi framework nghiêm chỉnh đều có cơ chế IoC:

  • Django: class-based view với dispatch() — framework gọi get()/post() của bạn
  • Express: middleware chain — framework gọi next() để chuyển control
  • discord.py: @client.event decorator — framework gọi on_ready() (bài 7)

Khi học framework mới, hỏi: "IoC ở đây qua cơ chế gì?" — câu trả lời thường là: DI, decorator, event hook, hoặc template method.

Khi nào DI phản tác dụng

Bài 10 §6 có anti-pattern "DI cho mọi thứ" — không nên Depends(get_datetime_now) cho datetime.now. DI đáng khi: external service, business abstraction, config. Không đáng cho stdlib và pure function.


Bài tập áp dụng

Xây dựng dependency chain cho một API blog đơn giản:

# 1. DB session
async def get_db() -> AsyncIterator[Session]:
    async with SessionLocal() as session:
        yield session

# 2. Auth dep
async def get_current_user(
    token: str | None = Header(default=None, alias="Authorization"),
    db: Session = Depends(get_db),
) -> dict:
    if not token:
        raise HTTPException(401, "Chưa đăng nhập")
    # decode token...
    return user

# 3. Permission dep
def require_author(user: dict = Depends(get_current_user)) -> dict:
    if user["role"] != "author":
        raise HTTPException(403, "Chỉ tác giả mới được đăng bài")
    return user

# 4. Router dùng chain
@router.post("/posts", dependencies=[Depends(require_author)])
async def create_post(body: PostCreate, db=Depends(get_db)):
    ...

Câu hỏi tự kiểm tra: 1. Điều gì xảy ra với get_session sau khi handler trả về? Tại sao? 2. Nếu get_session được Depends ở cả handler và get_current_user, FastAPI tạo mấy session? 3. Tại sao nên dùng dependencies=[Depends(...)] thay vì thêm param vào handler?