Bài 5: Kiến trúc 3 Lớp — Router → Service → Repository¶
Project reference:
admin/src/repositories/base.py,admin/src/services/audio_service.py,admin/src/routers/audio.py
1. Vấn đề với code "flat" (không có layer)¶
# ❌ Anti-pattern: tất cả trong 1 handler
@router.post("/audio")
async def upload_audio(file: UploadFile, session: AsyncSession = Depends(get_session)):
# Business rule trong router???
if not file.filename.endswith(".mp3"):
raise HTTPException(400, "Chỉ chấp nhận MP3")
# Query DB trực tiếp trong router???
total_size = await session.scalar(select(func.sum(audio.c.size_bytes)))
if total_size + file.size > 10 * 1024**3:
raise HTTPException(507, "Hết dung lượng")
# Ghi file trong router???
dest = Path(AUDIO_DIR) / file.filename
dest.write_bytes(await file.read())
# Insert DB trong router???
await session.execute(insert(audio).values(...))
return {"data": "ok"}
Hậu quả tức thì: business logic, I/O file, và DB query lẫn lộn vào nhau trong một hàm. Muốn test riêng logic kiểm tra storage limit thì phải kéo theo cả DB thật và filesystem. Muốn thay đổi cách lưu file thì phải mò vào router. Không có gì reuse được vì mọi thứ đều bị bind chặt với HTTP layer.
2. Ba lớp và trách nhiệm¶
┌─────────────────────────────────────────────────────────┐
│ ROUTER (routers/audio.py) │
│ • Nhận HTTP request │
│ • Parse/validate shape (Pydantic) │
│ • Gọi service │
│ • Trả HTTP response │
│ • KHÔNG biết DB là gì, KHÔNG có business logic │
├─────────────────────────────────────────────────────────┤
│ SERVICE (services/audio_service.py) │
│ • Business logic (storage limit, validation rules) │
│ • Điều phối repository │
│ • Quyết định transaction order (file trước, DB sau) │
│ • KHÔNG biết HTTP, KHÔNG biết SQLite là gì │
├─────────────────────────────────────────────────────────┤
│ REPOSITORY (repositories/audio_repo.py) │
│ • SQLAlchemy query (SELECT/INSERT/UPDATE/DELETE) │
│ • File system I/O │
│ • KHÔNG có business logic │
│ • Implement abstract interface từ base.py │
└─────────────────────────────────────────────────────────┘
3. Lớp 1: Abstract Interface (base.py)¶
Service phụ thuộc vào interface, không phụ thuộc vào implementation. Đây là Dependency Inversion Principle (DIP).
# admin/src/repositories/base.py
from abc import ABC, abstractmethod
from sqlalchemy.ext.asyncio import AsyncSession
class AudioRepository(ABC):
"""Interface — service chỉ biết đến class này, không biết SQLite tồn tại."""
@abstractmethod
async def list_all(self, session: AsyncSession, *, q: str | None = None) -> list[dict]: ...
@abstractmethod
async def get_by_id(self, session: AsyncSession, id: int) -> dict | None: ...
@abstractmethod
async def create(self, session: AsyncSession, *, filename: str, title: str, ...) -> dict: ...
@abstractmethod
async def delete(self, session: AsyncSession, id: int) -> bool: ...
@abstractmethod
async def get_total_size(self, session: AsyncSession) -> int: ...
# ↑ Trả số bytes tổng — service dùng để check storage limit
# Service không cần biết đây là SUM(size_bytes) hay đếm files hay gì
Nhờ cách này, nếu một ngày chuyển từ SQLite sang PostgreSQL, chỉ cần viết PostgresAudioRepository(AudioRepository) mới — service không phải đổi một dòng nào.
4. Lớp 2: Service (business logic)¶
# admin/src/services/audio_service.py
class AudioService:
def __init__(
self,
repo: AudioRepository, # ← interface, không phải SqliteAudioRepo
category_repo: CategoryRepository,
event_repo: EventRepository,
) -> None:
self.repo = repo
self.category_repo = category_repo
self.event_repo = event_repo
async def upload(self, session, file, *, title, category_id, event_id) -> dict:
# 1. Business validation (không phải shape validation)
filename = self._validate_filename(file.filename) # chỉ .mp3/.m4a
title = _require_title(title) # không được rỗng
await self._validate_foreign_keys(session, category_id, event_id) # FK phải tồn tại
data = await file.read()
_check_file_size(len(data)) # max 50MB
# 2. Pre-check storage limit (fail fast trước khi tốn CPU convert)
await self._check_storage_limit(session, len(data))
# 3. Convert m4a → mp3 nếu cần (CPU intensive — ngoài critical section)
if filename.endswith(".m4a"):
data, filename = await self._convert_m4a_to_mp3(data, filename)
# 4. Critical section: re-check + write + insert (atomic về mặt logic)
async with _UPLOAD_CRITICAL: # asyncio.Lock — tránh race condition
await self._check_storage_limit(session, len(data)) # re-check sau convert
await self._check_filename_unique(session, filename)
# QUAN TRỌNG: Ghi file TRƯỚC, insert DB SAU
dest = Path(AUDIO_DIR) / filename
await asyncio.to_thread(dest.write_bytes, data) # blocking I/O → thread pool
try:
duration = await self._probe_duration(dest)
return await self.repo.create(
session, filename=filename, title=title,
size_bytes=len(data), duration_seconds=duration, ...
)
except Exception:
dest.unlink(missing_ok=True) # ROLLBACK: xóa file nếu DB lỗi
raise
async def delete(self, session, id) -> bool:
audio = await self.repo.get_by_id(session, id)
if not audio:
raise HTTPException(404, "Audio không tồn tại")
# QUAN TRỌNG: Xóa DB TRƯỚC, file SAU
# Lý do: nếu xóa file trước rồi DB lỗi → file mất nhưng record còn
# Nếu xóa DB trước rồi file lỗi → record mất nhưng file thành orphan (chấp nhận được, chỉ log)
deleted = await self.repo.delete(session, id)
if deleted:
try:
Path(AUDIO_DIR, audio["filename"]).unlink(missing_ok=True)
except OSError as e:
logger.warning("Không xóa được file %s: %s", audio["filename"], e)
# KHÔNG raise — DB đã xóa thành công, file lỗi chỉ log
return deleted
Thứ tự thao tác này là business rule: ghi file trước, insert DB sau. Quyết định đó thuộc về service vì đây là logic nghiệp vụ — router không biết thứ tự nào, repository không quan tâm context nào.
5. Lớp 3: Repository (implementation)¶
# admin/src/repositories/audio_repo.py
class SqliteAudioRepository(AudioRepository):
"""SQLite implementation — khi đổi sang PostgreSQL, chỉ tạo class này thôi."""
async def list_all(self, session, *, q=None, ...) -> list[dict]:
query = (
select(audio, categories.c.name.label("category_name"), ...)
.outerjoin(categories, ...)
.order_by(audio.c.uploaded_at.desc())
)
if q:
query = query.where(audio.c.title_normalized.like(f"%{q}%"))
result = await session.execute(query)
return [dict(row._mapping) for row in result.fetchall()]
async def get_total_size(self, session) -> int:
result = await session.execute(select(func.sum(audio.c.size_bytes)))
return result.scalar() or 0
# ... implement tất cả @abstractmethod từ AudioRepository
6. Lớp Router — Gọi Service¶
# admin/src/routers/audio.py
# Factory tạo service với concrete repo
def get_audio_service() -> AudioService:
return AudioService(
SqliteAudioRepository(), # inject implementation
SqliteCategoryRepository(),
SqliteEventRepository(),
)
@router.post("", dependencies=[Depends(require_editor)])
async def upload_audio(
file: UploadFile,
title: str = Form(...),
category_id: int | None = Form(None),
svc: AudioService = Depends(get_audio_service),
session: AsyncSession = Depends(get_session),
) -> dict:
# Router: validate shape (Form fields) + gọi service + trả response
# Router KHÔNG biết storage limit là bao nhiêu
# Router KHÔNG biết file được lưu ở đâu
# Router KHÔNG biết table nào trong SQLite
result = await svc.upload(session, file, title=title, category_id=category_id)
return {"data": result}
7. Flow đầy đủ — Ví dụ Upload¶
POST /api/audio (multipart/form-data)
│
├── Router: parse UploadFile + Form fields
├── Depends: get_session → tạo AsyncSession + BEGIN TRANSACTION
├── Depends: require_editor → check JWT cookie + role
│
├── svc.upload(session, file, title=...) [Service]
│ ├── _validate_filename() → chỉ .mp3/.m4a
│ ├── _require_title() → không được rỗng
│ ├── _validate_foreign_keys() → category/event phải tồn tại [gọi repo]
│ ├── _check_file_size() → max 50MB
│ ├── _check_storage_limit() [gọi repo.get_total_size()]
│ ├── _convert_m4a_to_mp3() → ffmpeg subprocess
│ ├── async with _UPLOAD_CRITICAL:
│ │ ├── re-check storage limit
│ │ ├── check filename unique [gọi repo.get_by_filename()]
│ │ ├── write file to /data/audio/
│ │ ├── repo.create() → INSERT INTO audio [Repository]
│ │ └── rollback: unlink file nếu INSERT lỗi
│ └── return dict
│
├── Router: return {"data": result}
└── Depends cleanup: session.commit() → COMMIT TRANSACTION
8. Trade-off thực tế¶
Ưu điểm rõ ràng¶
Testability:
# Test service mà không cần DB, không cần filesystem
from unittest.mock import AsyncMock, Mock
async def test_upload_rejects_oversized_file():
mock_repo = Mock(spec=AudioRepository)
mock_repo.get_total_size = AsyncMock(return_value=9.9 * 1024**3) # gần đầy
svc = AudioService(mock_repo, Mock(), Mock())
with pytest.raises(HTTPException) as exc:
await svc.upload(session=Mock(), file=mock_file(size=200 * 1024**2))
assert exc.value.status_code == 507 # storage full
mock_repo.create.assert_not_called() # file không được insert
Single source of truth: Logic rollback (file → DB) chỉ ở 1 nơi. Audit dễ, sửa dễ.
Trade-off phải biết¶
Boilerplate: 1 entity Audio → 4 files:
- base.py (interface)
- audio_repo.py (implementation)
- audio_service.py (business logic)
- routers/audio.py (HTTP handler)
Cross-service transaction không tự nhiên:
# Tình huống: xóa playlist phải cascade update schedule
# Nhưng playlist_service và schedule_service là 2 service khác nhau
# Không có global transaction bao 2 service
# Cách giải quyết trong project: router orchestrate
@router.delete("/playlists/{id}/tracks/{track_id}")
async def remove_track(
id: int, track_id: int,
playlist_svc: PlaylistService = Depends(...),
schedule_svc: ScheduleService = Depends(get_schedule_service),
session: AsyncSession = Depends(get_session),
):
await playlist_svc.remove_track(session, id, track_id)
# Cascade: bump duration cho schedule entries dùng playlist này
await schedule_svc.recompute_duration_for_playlist(session, id)
Router đảm nhận orchestration — đây là exception có chủ đích, không phải pattern thường dùng.
Evolution: Unit of Work pattern (khi scale lớn hơn)¶
Pattern "router orchestrate + repo tự commit" work tốt ở MVP. Nhưng khi cross-service transaction xuất hiện ≥ 3 chỗ, nó bộc lộ nhược điểm:
- Router biết quá nhiều về transaction boundary (vi phạm SoC)
- Service 2 fail sau khi service 1 đã commit → rollback tay phức tạp
- Không test được "atomic cross-service" vì mỗi service có session riêng
Unit of Work (UoW) là context manager gom session + ≥ 2 repo, commit/rollback 1 lần cuối:
class UnitOfWork:
def __init__(self, session_factory):
self._session_factory = session_factory
async def __aenter__(self):
self.session = self._session_factory()
self.playlists = SqlitePlaylistRepository()
self.schedules = SqliteScheduleRepository()
return self
async def __aexit__(self, exc_type, exc, tb):
if exc_type:
await self.session.rollback()
else:
await self.session.commit()
await self.session.close()
# Router dùng UoW:
@router.delete("/playlists/{id}/tracks/{track_id}")
async def remove_track(
id: int, track_id: int,
uow: UnitOfWork = Depends(get_uow),
):
async with uow:
await uow.playlists.remove_track(uow.session, id, track_id)
await uow.schedules.recompute_duration_for_playlist(uow.session, id)
# Exit không exception → commit cả 2; có exception → rollback cả 2
Điểm khác biệt so với "router orchestrate":
- Repo không tự commit (ngược với pattern hiện tại của project) — UoW quản lý
- Rollback tự động bao cả 2 repo nếu bất kỳ bước nào fail
- Service layer có thể nhận
UnitOfWorkthayAsyncSession→ service cũng dùng được
Khi nào chuyển sang UoW:
- ≥ 3 router có pattern "orchestrate 2+ service"
- Cần atomic invariant cross-service (ví dụ: "xóa playlist + update schedule phải cùng rollback")
- Muốn test "rollback khi service 2 fail" mà không mock phức tạp
Khi nào KHÔNG cần: Project MVP, 1-2 chỗ cross-service → router orchestrate đơn giản hơn, không cần refactor toàn bộ pattern commit.
📌 UoW là bước tiến từ "repo tự commit". Nếu bắt đầu project mới mà biết trước sẽ có nhiều cross-service transaction → thiết kế UoW từ đầu sẽ tiết kiệm chi phí migration.
9. Khi nào KHÔNG cần 3 layer?¶
- Script một lần (migration, seeding data) → viết thẳng, không cần layer
- Endpoint cực đơn giản chỉ proxy DB (GET /health, GET /categories không có logic) → service mỏng chỉ delegate là bình thường
- Project < 5 entities → monolith module đủ dùng
Nguyên tắc: Thêm layer khi có business logic phức tạp hoặc cần isolate để test. Không thêm layer vì "kiến trúc đẹp".
Nguyên lý tổng quát¶
Bài này là hiện thực cụ thể của 4 nguyên lý cốt lõi từ bài 0:
| Pattern trong bài | Nguyên lý | Mô tả |
|---|---|---|
| Router / Service / Repo | SoC | 3 concerns tách bạch: HTTP / business / persistence |
| Mỗi layer 1 trách nhiệm | SRP | Service thay đổi khi business rule đổi, Repo thay đổi khi schema đổi |
Service phụ thuộc AudioRepository (ABC) |
DIP | Không biết SQLite hay Postgres |
Depends(get_audio_service) |
IoC | Framework inject implementation |
Chuyển giao: nhiều cách "chia layer"¶
3-layer chỉ là 1 trong nhiều kiến trúc cùng nguyên lý SoC + DIP:
| Kiến trúc | Layer điển hình |
|---|---|
| 3-Layer (project này) | Router → Service → Repository |
| Clean Architecture (Uncle Bob) | Entity → Use Case → Interface Adapter → Framework |
| Hexagonal (Ports & Adapters) | Domain ở giữa, Port (interface) + Adapter (impl) ở rìa |
| Onion | Domain → Domain Service → App Service → Infrastructure |
Cùng nguyên lý, khác hình thù. Khi sang project khác, đừng tranh luận "pattern X hay Y đúng" — hỏi: "Code này có tách concern không? Có phụ thuộc abstraction không?"
Khi nào phá pattern này¶
Bài 10 §1 có 3 trường hợp cụ thể — script 1 lần, endpoint trivial, prototype < 3 entity. Nguyên tắc: phá pattern có chủ đích, không phá vì lười.
Cross-service transaction¶
Pattern "repo tự commit" (bài 4 §9) khiến cross-service transaction khó. Project giải bằng router orchestration. Khi scale lớn, thường chuyển sang:
- Unit of Work pattern — 1 object gom nhiều repo, commit 1 lần cuối
- Saga pattern — chuỗi action + compensating action (cho distributed system)
Bài tập áp dụng¶
Implement đầy đủ 3 layer cho entity Post của một blog:
# 1. Interface (base.py)
class PostRepository(ABC):
@abstractmethod
async def list_published(self, session) -> list[dict]: ...
@abstractmethod
async def create(self, session, *, title, content, author_id) -> dict: ...
@abstractmethod
async def publish(self, session, id) -> dict | None: ...
# 2. Service (post_service.py) — business rules:
# - title >= 10 ký tự
# - content >= 100 ký tự
# - author phải tồn tại và enabled
# - không được publish bài đã publish
# 3. Implementation (post_repo.py)
# 4. Router (routers/posts.py)
Câu hỏi tự kiểm tra:
1. Tại sao service nhận AudioRepository (interface) thay vì SqliteAudioRepository?
2. Khi nào dùng asyncio.to_thread()? Tại sao ghi file cần dùng nó?
3. Tại sao xóa DB trước, xóa file sau? Nếu đảo ngược thì sao?
4. asyncio.Lock() bảo vệ cái gì trong upload flow?