Bỏ qua

Bài 5: Kiến trúc 3 Lớp — Router → Service → Repository

Project reference: admin/src/repositories/base.py, admin/src/services/audio_service.py, admin/src/routers/audio.py


1. Vấn đề với code "flat" (không có layer)

# ❌ Anti-pattern: tất cả trong 1 handler
@router.post("/audio")
async def upload_audio(file: UploadFile, session: AsyncSession = Depends(get_session)):
    # Business rule trong router???
    if not file.filename.endswith(".mp3"):
        raise HTTPException(400, "Chỉ chấp nhận MP3")

    # Query DB trực tiếp trong router???
    total_size = await session.scalar(select(func.sum(audio.c.size_bytes)))
    if total_size + file.size > 10 * 1024**3:
        raise HTTPException(507, "Hết dung lượng")

    # Ghi file trong router???
    dest = Path(AUDIO_DIR) / file.filename
    dest.write_bytes(await file.read())

    # Insert DB trong router???
    await session.execute(insert(audio).values(...))

    return {"data": "ok"}

Hậu quả tức thì: business logic, I/O file, và DB query lẫn lộn vào nhau trong một hàm. Muốn test riêng logic kiểm tra storage limit thì phải kéo theo cả DB thật và filesystem. Muốn thay đổi cách lưu file thì phải mò vào router. Không có gì reuse được vì mọi thứ đều bị bind chặt với HTTP layer.


2. Ba lớp và trách nhiệm

┌─────────────────────────────────────────────────────────┐
│  ROUTER (routers/audio.py)                              │
│  • Nhận HTTP request                                    │
│  • Parse/validate shape (Pydantic)                      │
│  • Gọi service                                          │
│  • Trả HTTP response                                    │
│  • KHÔNG biết DB là gì, KHÔNG có business logic        │
├─────────────────────────────────────────────────────────┤
│  SERVICE (services/audio_service.py)                    │
│  • Business logic (storage limit, validation rules)     │
│  • Điều phối repository                                 │
│  • Quyết định transaction order (file trước, DB sau)    │
│  • KHÔNG biết HTTP, KHÔNG biết SQLite là gì            │
├─────────────────────────────────────────────────────────┤
│  REPOSITORY (repositories/audio_repo.py)                │
│  • SQLAlchemy query (SELECT/INSERT/UPDATE/DELETE)        │
│  • File system I/O                                      │
│  • KHÔNG có business logic                              │
│  • Implement abstract interface từ base.py              │
└─────────────────────────────────────────────────────────┘

3. Lớp 1: Abstract Interface (base.py)

Service phụ thuộc vào interface, không phụ thuộc vào implementation. Đây là Dependency Inversion Principle (DIP).

# admin/src/repositories/base.py
from abc import ABC, abstractmethod
from sqlalchemy.ext.asyncio import AsyncSession

class AudioRepository(ABC):
    """Interface — service chỉ biết đến class này, không biết SQLite tồn tại."""

    @abstractmethod
    async def list_all(self, session: AsyncSession, *, q: str | None = None) -> list[dict]: ...

    @abstractmethod
    async def get_by_id(self, session: AsyncSession, id: int) -> dict | None: ...

    @abstractmethod
    async def create(self, session: AsyncSession, *, filename: str, title: str, ...) -> dict: ...

    @abstractmethod
    async def delete(self, session: AsyncSession, id: int) -> bool: ...

    @abstractmethod
    async def get_total_size(self, session: AsyncSession) -> int: ...
    # ↑ Trả số bytes tổng — service dùng để check storage limit
    # Service không cần biết đây là SUM(size_bytes) hay đếm files hay gì

Nhờ cách này, nếu một ngày chuyển từ SQLite sang PostgreSQL, chỉ cần viết PostgresAudioRepository(AudioRepository) mới — service không phải đổi một dòng nào.


4. Lớp 2: Service (business logic)

# admin/src/services/audio_service.py
class AudioService:
    def __init__(
        self,
        repo: AudioRepository,           # ← interface, không phải SqliteAudioRepo
        category_repo: CategoryRepository,
        event_repo: EventRepository,
    ) -> None:
        self.repo = repo
        self.category_repo = category_repo
        self.event_repo = event_repo

    async def upload(self, session, file, *, title, category_id, event_id) -> dict:
        # 1. Business validation (không phải shape validation)
        filename = self._validate_filename(file.filename)  # chỉ .mp3/.m4a
        title = _require_title(title)                      # không được rỗng
        await self._validate_foreign_keys(session, category_id, event_id)  # FK phải tồn tại

        data = await file.read()
        _check_file_size(len(data))  # max 50MB

        # 2. Pre-check storage limit (fail fast trước khi tốn CPU convert)
        await self._check_storage_limit(session, len(data))

        # 3. Convert m4a → mp3 nếu cần (CPU intensive — ngoài critical section)
        if filename.endswith(".m4a"):
            data, filename = await self._convert_m4a_to_mp3(data, filename)

        # 4. Critical section: re-check + write + insert (atomic về mặt logic)
        async with _UPLOAD_CRITICAL:               # asyncio.Lock — tránh race condition
            await self._check_storage_limit(session, len(data))  # re-check sau convert
            await self._check_filename_unique(session, filename)

            # QUAN TRỌNG: Ghi file TRƯỚC, insert DB SAU
            dest = Path(AUDIO_DIR) / filename
            await asyncio.to_thread(dest.write_bytes, data)  # blocking I/O → thread pool

            try:
                duration = await self._probe_duration(dest)
                return await self.repo.create(
                    session, filename=filename, title=title,
                    size_bytes=len(data), duration_seconds=duration, ...
                )
            except Exception:
                dest.unlink(missing_ok=True)  # ROLLBACK: xóa file nếu DB lỗi
                raise

    async def delete(self, session, id) -> bool:
        audio = await self.repo.get_by_id(session, id)
        if not audio:
            raise HTTPException(404, "Audio không tồn tại")

        # QUAN TRỌNG: Xóa DB TRƯỚC, file SAU
        # Lý do: nếu xóa file trước rồi DB lỗi → file mất nhưng record còn
        # Nếu xóa DB trước rồi file lỗi → record mất nhưng file thành orphan (chấp nhận được, chỉ log)
        deleted = await self.repo.delete(session, id)

        if deleted:
            try:
                Path(AUDIO_DIR, audio["filename"]).unlink(missing_ok=True)
            except OSError as e:
                logger.warning("Không xóa được file %s: %s", audio["filename"], e)
                # KHÔNG raise — DB đã xóa thành công, file lỗi chỉ log

        return deleted

Thứ tự thao tác này là business rule: ghi file trước, insert DB sau. Quyết định đó thuộc về service vì đây là logic nghiệp vụ — router không biết thứ tự nào, repository không quan tâm context nào.


5. Lớp 3: Repository (implementation)

# admin/src/repositories/audio_repo.py
class SqliteAudioRepository(AudioRepository):
    """SQLite implementation — khi đổi sang PostgreSQL, chỉ tạo class này thôi."""

    async def list_all(self, session, *, q=None, ...) -> list[dict]:
        query = (
            select(audio, categories.c.name.label("category_name"), ...)
            .outerjoin(categories, ...)
            .order_by(audio.c.uploaded_at.desc())
        )

        if q:
            query = query.where(audio.c.title_normalized.like(f"%{q}%"))

        result = await session.execute(query)
        return [dict(row._mapping) for row in result.fetchall()]

    async def get_total_size(self, session) -> int:
        result = await session.execute(select(func.sum(audio.c.size_bytes)))
        return result.scalar() or 0

    # ... implement tất cả @abstractmethod từ AudioRepository

6. Lớp Router — Gọi Service

# admin/src/routers/audio.py

# Factory tạo service với concrete repo
def get_audio_service() -> AudioService:
    return AudioService(
        SqliteAudioRepository(),     # inject implementation
        SqliteCategoryRepository(),
        SqliteEventRepository(),
    )

@router.post("", dependencies=[Depends(require_editor)])
async def upload_audio(
    file: UploadFile,
    title: str = Form(...),
    category_id: int | None = Form(None),
    svc: AudioService = Depends(get_audio_service),
    session: AsyncSession = Depends(get_session),
) -> dict:
    # Router: validate shape (Form fields) + gọi service + trả response
    # Router KHÔNG biết storage limit là bao nhiêu
    # Router KHÔNG biết file được lưu ở đâu
    # Router KHÔNG biết table nào trong SQLite
    result = await svc.upload(session, file, title=title, category_id=category_id)
    return {"data": result}

7. Flow đầy đủ — Ví dụ Upload

POST /api/audio (multipart/form-data)
├── Router: parse UploadFile + Form fields
├── Depends: get_session → tạo AsyncSession + BEGIN TRANSACTION
├── Depends: require_editor → check JWT cookie + role
├── svc.upload(session, file, title=...) [Service]
│   ├── _validate_filename() → chỉ .mp3/.m4a
│   ├── _require_title() → không được rỗng
│   ├── _validate_foreign_keys() → category/event phải tồn tại [gọi repo]
│   ├── _check_file_size() → max 50MB
│   ├── _check_storage_limit() [gọi repo.get_total_size()]
│   ├── _convert_m4a_to_mp3() → ffmpeg subprocess
│   ├── async with _UPLOAD_CRITICAL:
│   │   ├── re-check storage limit
│   │   ├── check filename unique [gọi repo.get_by_filename()]
│   │   ├── write file to /data/audio/
│   │   ├── repo.create() → INSERT INTO audio [Repository]
│   │   └── rollback: unlink file nếu INSERT lỗi
│   └── return dict
├── Router: return {"data": result}
└── Depends cleanup: session.commit() → COMMIT TRANSACTION

8. Trade-off thực tế

Ưu điểm rõ ràng

Testability:

# Test service mà không cần DB, không cần filesystem
from unittest.mock import AsyncMock, Mock

async def test_upload_rejects_oversized_file():
    mock_repo = Mock(spec=AudioRepository)
    mock_repo.get_total_size = AsyncMock(return_value=9.9 * 1024**3)  # gần đầy

    svc = AudioService(mock_repo, Mock(), Mock())

    with pytest.raises(HTTPException) as exc:
        await svc.upload(session=Mock(), file=mock_file(size=200 * 1024**2))

    assert exc.value.status_code == 507  # storage full
    mock_repo.create.assert_not_called()  # file không được insert

Single source of truth: Logic rollback (file → DB) chỉ ở 1 nơi. Audit dễ, sửa dễ.

Trade-off phải biết

Boilerplate: 1 entity Audio → 4 files: - base.py (interface) - audio_repo.py (implementation) - audio_service.py (business logic) - routers/audio.py (HTTP handler)

Cross-service transaction không tự nhiên:

# Tình huống: xóa playlist phải cascade update schedule
# Nhưng playlist_service và schedule_service là 2 service khác nhau
# Không có global transaction bao 2 service

# Cách giải quyết trong project: router orchestrate
@router.delete("/playlists/{id}/tracks/{track_id}")
async def remove_track(
    id: int, track_id: int,
    playlist_svc: PlaylistService = Depends(...),
    schedule_svc: ScheduleService = Depends(get_schedule_service),
    session: AsyncSession = Depends(get_session),
):
    await playlist_svc.remove_track(session, id, track_id)
    # Cascade: bump duration cho schedule entries dùng playlist này
    await schedule_svc.recompute_duration_for_playlist(session, id)

Router đảm nhận orchestration — đây là exception có chủ đích, không phải pattern thường dùng.

Evolution: Unit of Work pattern (khi scale lớn hơn)

Pattern "router orchestrate + repo tự commit" work tốt ở MVP. Nhưng khi cross-service transaction xuất hiện ≥ 3 chỗ, nó bộc lộ nhược điểm:

  • Router biết quá nhiều về transaction boundary (vi phạm SoC)
  • Service 2 fail sau khi service 1 đã commit → rollback tay phức tạp
  • Không test được "atomic cross-service" vì mỗi service có session riêng

Unit of Work (UoW) là context manager gom session + ≥ 2 repo, commit/rollback 1 lần cuối:

class UnitOfWork:
    def __init__(self, session_factory):
        self._session_factory = session_factory

    async def __aenter__(self):
        self.session = self._session_factory()
        self.playlists = SqlitePlaylistRepository()
        self.schedules = SqliteScheduleRepository()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        if exc_type:
            await self.session.rollback()
        else:
            await self.session.commit()
        await self.session.close()


# Router dùng UoW:
@router.delete("/playlists/{id}/tracks/{track_id}")
async def remove_track(
    id: int, track_id: int,
    uow: UnitOfWork = Depends(get_uow),
):
    async with uow:
        await uow.playlists.remove_track(uow.session, id, track_id)
        await uow.schedules.recompute_duration_for_playlist(uow.session, id)
        # Exit không exception → commit cả 2; có exception → rollback cả 2

Điểm khác biệt so với "router orchestrate":

  • Repo không tự commit (ngược với pattern hiện tại của project) — UoW quản lý
  • Rollback tự động bao cả 2 repo nếu bất kỳ bước nào fail
  • Service layer có thể nhận UnitOfWork thay AsyncSession → service cũng dùng được

Khi nào chuyển sang UoW:

  • ≥ 3 router có pattern "orchestrate 2+ service"
  • Cần atomic invariant cross-service (ví dụ: "xóa playlist + update schedule phải cùng rollback")
  • Muốn test "rollback khi service 2 fail" mà không mock phức tạp

Khi nào KHÔNG cần: Project MVP, 1-2 chỗ cross-service → router orchestrate đơn giản hơn, không cần refactor toàn bộ pattern commit.

📌 UoW là bước tiến từ "repo tự commit". Nếu bắt đầu project mới mà biết trước sẽ có nhiều cross-service transaction → thiết kế UoW từ đầu sẽ tiết kiệm chi phí migration.


9. Khi nào KHÔNG cần 3 layer?

  • Script một lần (migration, seeding data) → viết thẳng, không cần layer
  • Endpoint cực đơn giản chỉ proxy DB (GET /health, GET /categories không có logic) → service mỏng chỉ delegate là bình thường
  • Project < 5 entities → monolith module đủ dùng

Nguyên tắc: Thêm layer khi có business logic phức tạp hoặc cần isolate để test. Không thêm layer vì "kiến trúc đẹp".


Nguyên lý tổng quát

Bài này là hiện thực cụ thể của 4 nguyên lý cốt lõi từ bài 0:

Pattern trong bài Nguyên lý Mô tả
Router / Service / Repo SoC 3 concerns tách bạch: HTTP / business / persistence
Mỗi layer 1 trách nhiệm SRP Service thay đổi khi business rule đổi, Repo thay đổi khi schema đổi
Service phụ thuộc AudioRepository (ABC) DIP Không biết SQLite hay Postgres
Depends(get_audio_service) IoC Framework inject implementation

Chuyển giao: nhiều cách "chia layer"

3-layer chỉ là 1 trong nhiều kiến trúc cùng nguyên lý SoC + DIP:

Kiến trúc Layer điển hình
3-Layer (project này) Router → Service → Repository
Clean Architecture (Uncle Bob) Entity → Use Case → Interface Adapter → Framework
Hexagonal (Ports & Adapters) Domain ở giữa, Port (interface) + Adapter (impl) ở rìa
Onion Domain → Domain Service → App Service → Infrastructure

Cùng nguyên lý, khác hình thù. Khi sang project khác, đừng tranh luận "pattern X hay Y đúng" — hỏi: "Code này có tách concern không? Có phụ thuộc abstraction không?"

Khi nào phá pattern này

Bài 10 §1 có 3 trường hợp cụ thể — script 1 lần, endpoint trivial, prototype < 3 entity. Nguyên tắc: phá pattern có chủ đích, không phá vì lười.

Cross-service transaction

Pattern "repo tự commit" (bài 4 §9) khiến cross-service transaction khó. Project giải bằng router orchestration. Khi scale lớn, thường chuyển sang:

  • Unit of Work pattern — 1 object gom nhiều repo, commit 1 lần cuối
  • Saga pattern — chuỗi action + compensating action (cho distributed system)

Bài tập áp dụng

Implement đầy đủ 3 layer cho entity Post của một blog:

# 1. Interface (base.py)
class PostRepository(ABC):
    @abstractmethod
    async def list_published(self, session) -> list[dict]: ...
    @abstractmethod
    async def create(self, session, *, title, content, author_id) -> dict: ...
    @abstractmethod
    async def publish(self, session, id) -> dict | None: ...

# 2. Service (post_service.py) — business rules:
#    - title >= 10 ký tự
#    - content >= 100 ký tự
#    - author phải tồn tại và enabled
#    - không được publish bài đã publish

# 3. Implementation (post_repo.py)
# 4. Router (routers/posts.py)

Câu hỏi tự kiểm tra: 1. Tại sao service nhận AudioRepository (interface) thay vì SqliteAudioRepository? 2. Khi nào dùng asyncio.to_thread()? Tại sao ghi file cần dùng nó? 3. Tại sao xóa DB trước, xóa file sau? Nếu đảo ngược thì sao? 4. asyncio.Lock() bảo vệ cái gì trong upload flow?