Bài 5: Kiến trúc 3 Lớp — Router → Service → Repository¶
Tham khảo trong project:
admin/src/repositories/base.py,admin/src/services/audio_service.py,admin/src/routers/audio.py
1. Nỗi đau của code "phẳng" (Nhồi nhét tất cả vào 1 chỗ)¶
Thử tưởng tượng bạn code không chia lớp và ném tất cả mọi thứ vào một cái hàm (handler) duy nhất, trông nó sẽ thảm họa thế này:
# ❌ Anti-pattern: Tất tần tật vào chung 1 rổ
@router.post("/audio")
async def upload_audio(file: UploadFile, session: AsyncSession = Depends(get_session)):
# 1. Luật nghiệp vụ lại nằm ở Router???
if not file.filename.endswith(".mp3"):
raise HTTPException(400, "Chỉ chấp nhận MP3")
# 2. Query DB chọc thẳng từ Router???
total_size = await session.scalar(select(func.sum(audio.c.size_bytes)))
if total_size + file.size > 10 * 1024**3:
raise HTTPException(507, "Hết dung lượng")
# 3. Ghi file ổ cứng cũng ở Router???
dest = Path(AUDIO_DIR) / file.filename
dest.write_bytes(await file.read())
# 4. Insert DB cũng ở đây nốt???
await session.execute(insert(audio).values(...))
return {"data": "ok"}
Hậu quả nhãn tiền: Luật nghiệp vụ (check dung lượng), ghi file ổ cứng (I/O), và câu lệnh SQL truy vấn DB bị xoắn lấy nhau như một mớ bòng bong.
- Muốn viết test cho đoạn kiểm tra dung lượng? Bạn buộc phải dựng cả DB thật và ổ cứng giả lập lên.
- Muốn đổi cách lưu file sang S3? Phải chui vào tận file Router để sửa.
- Đoạn code này hoàn toàn bị "chết dính" với HTTP request, không thể tái sử dụng ở chỗ khác được.
2. Phân chia 3 lớp và Trách nhiệm rõ ràng¶
Hãy chia hệ thống như một công ty với 3 phòng ban chuyên biệt:
┌──────────────────────────────────────────────────────────────┐
│ ROUTER (Tiếp tân - routers/audio.py) │
│ • Nhận request HTTP từ khách hàng. │
│ • Kiểm tra hình dáng dữ liệu xem có móp méo (Pydantic). │
│ • Đẩy việc khó xuống cho Service. │
│ • Trả HTTP response (200 OK, 400 Bad Request...). │
│ • TUYỆT ĐỐI: Không biết DB là gì, không phán xét logic. │
├──────────────────────────────────────────────────────────────┤
│ SERVICE (Chuyên viên nghiệp vụ - services/audio_service.py) │
│ • Xử lý luật kinh doanh (check dung lượng, validate...). │
│ • Sai bảo các Repository làm việc. │
│ • Quyết định sinh tử: Ghi file trước hay lưu DB trước? │
│ • TUYỆT ĐỐI: Không biết HTTP là gì, không biết SQL. │
├──────────────────────────────────────────────────────────────┤
│ REPOSITORY (Thủ kho - repositories/audio_repo.py) │
│ • Viết câu lệnh SQLAlchemy (SELECT/INSERT/UPDATE). │
│ • Thao tác đọc/ghi file hệ thống. │
│ • TUYỆT ĐỐI: Không chứa business logic. │
│ • Bám sát bản hợp đồng (interface) từ base.py. │
└──────────────────────────────────────────────────────────────┘
3. Lớp 1: Abstract Interface (Bản hợp đồng cốt lõi)¶
Tầng Service sẽ chỉ làm việc với một "Bản hợp đồng" (interface), chứ không thèm quan tâm đến Kẻ thi công thực tế (implementation) là ai. Đây chính là sức mạnh của Dependency Inversion Principle (DIP).
# admin/src/repositories/base.py
from abc import ABC, abstractmethod
from sqlalchemy.ext.asyncio import AsyncSession
class AudioRepository(ABC):
"""Bản hợp đồng — Service chỉ nhìn vào đây để gọi hàm, không biết bên dưới chạy SQLite hay gì."""
@abstractmethod
async def list_all(self, session: AsyncSession, *, q: str | None = None) -> list[dict]: ...
@abstractmethod
async def get_by_id(self, session: AsyncSession, id: int) -> dict | None: ...
@abstractmethod
async def create(self, session: AsyncSession, *, filename: str, title: str, ...) -> dict: ...
@abstractmethod
async def delete(self, session: AsyncSession, id: int) -> bool: ...
@abstractmethod
async def get_total_size(self, session: AsyncSession) -> int: ...
# ↑ Trả về tổng dung lượng đã dùng — Service gọi hàm này để check giới hạn lưu trữ.
# Service không cần biết Repository đang đếm file trên ổ cứng hay dùng SUM() trong DB.
Cái hay ở đây: Giả sử mai sếp bảo "Đổi qua dùng PostgreSQL!", bạn chỉ việc code một class PostgresAudioRepository mới. Tầng Service vẫn chạy ngon ơ mà không cần sửa lấy một chữ.
4. Lớp 2: Service (Trái tim nghiệp vụ)¶
# admin/src/services/audio_service.py
class AudioService:
def __init__(
self,
repo: AudioRepository, # ← Truyền cái vỏ (interface), không truyền ruột
category_repo: CategoryRepository,
event_repo: EventRepository,
) -> None:
self.repo = repo
self.category_repo = category_repo
self.event_repo = event_repo
async def upload(self, session, file, *, title, category_id, event_id) -> dict:
# 1. Kiểm tra nghiệp vụ khắt khe (Business validation)
filename = self._validate_filename(file.filename) # Chỉ nhận .mp3/.m4a
title = _require_title(title) # Tên bài không được rỗng
await self._validate_foreign_keys(session, category_id, event_id) # Phải chọn đúng Category/Event
data = await file.read()
_check_file_size(len(data)) # Khống chế file tối đa 50MB
# 2. Check nhanh dung lượng ổ cứng (Fail-fast trước khi làm trò nặng nhọc)
await self._check_storage_limit(session, len(data))
# 3. Chuyển đuôi m4a → mp3 nếu cần (Tác vụ cực tốn CPU)
if filename.endswith(".m4a"):
data, filename = await self._convert_m4a_to_mp3(data, filename)
# 4. Vùng nguy hiểm (Critical section): Ghi file và lưu DB
async with _UPLOAD_CRITICAL: # Dùng khóa (Lock) để chống người khác chen ngang
await self._check_storage_limit(session, len(data)) # Phải check lại phát nữa cho chắc
await self._check_filename_unique(session, filename)
# QUY TẮC SINH TỬ: Ghi file ra ổ cứng TRƯỚC, chèn data vào DB SAU
dest = Path(AUDIO_DIR) / filename
await asyncio.to_thread(dest.write_bytes, data) # Tác vụ ghi ổ cứng dễ gây kẹt → Ném ra Thread phụ
try:
duration = await self._probe_duration(dest)
return await self.repo.create(
session, filename=filename, title=title,
size_bytes=len(data), duration_seconds=duration, ...
)
except Exception:
dest.unlink(missing_ok=True) # ROLLBACK: Nếu ghi DB xịt, phải tức tốc xóa cái file vừa lưu đi
raise
async def delete(self, session, id) -> bool:
audio = await self.repo.get_by_id(session, id)
if not audio:
raise HTTPException(404, "Audio không tồn tại")
# QUY TẮC SINH TỬ ĐẢO NGƯỢC: Khi xóa thì Xóa DB TRƯỚC, xóa file SAU
# Giải thích: Xóa file trước mà DB lỗi → Mất file nhưng DB vẫn còn record ảo.
# Xóa DB trước mà file lỗi không xóa được → DB sạch sẽ, ổ cứng dư một file rác (Chấp nhận được, chỉ cần log lại).
deleted = await self.repo.delete(session, id)
if deleted:
try:
Path(AUDIO_DIR, audio["filename"]).unlink(missing_ok=True)
except OSError as e:
logger.warning("Không xóa được file %s: %s", audio["filename"], e)
# KHÔNG BAO GIỜ RAISE LỖI Ở ĐÂY — DB đã xóa thành công rồi, file rác cứ kệ nó.
return deleted
Hãy nhớ kỹ: Trật tự thực hiện (cái nào làm trước, cái nào làm sau) chính là luật nghiệp vụ. Và nó phải nằm ở Service, không phải ở Router hay Repo.
5. Lớp 3: Repository (Thủ kho)¶
# admin/src/repositories/audio_repo.py
class SqliteAudioRepository(AudioRepository):
"""Người thi công thực tế cho SQLite — Chỉ lo truy vấn SQL."""
async def list_all(self, session, *, q=None, ...) -> list[dict]:
query = (
select(audio, categories.c.name.label("category_name"), ...)
.outerjoin(categories, ...)
.order_by(audio.c.uploaded_at.desc())
)
if q:
query = query.where(audio.c.title_normalized.like(f"%{q}%"))
result = await session.execute(query)
return [dict(row._mapping) for row in result.fetchall()]
async def get_total_size(self, session) -> int:
result = await session.execute(select(func.sum(audio.c.size_bytes)))
return result.scalar() or 0
# ... Và code tiếp để thỏa mãn tất cả các hàm trong bản hợp đồng AudioRepository
6. Lớp Router — Lễ tân tiếp khách¶
# admin/src/routers/audio.py
# Factory để "nhào nặn" ra một em Service với đầy đủ đồ nghề (Repo)
def get_audio_service() -> AudioService:
return AudioService(
SqliteAudioRepository(), # Nhét ruột thật vào vỏ
SqliteCategoryRepository(),
SqliteEventRepository(),
)
@router.post("", dependencies=[Depends(require_editor)])
async def upload_audio(
file: UploadFile,
title: str = Form(...),
category_id: int | None = Form(None),
svc: AudioService = Depends(get_audio_service),
session: AsyncSession = Depends(get_session),
) -> dict:
# Router làm đúng nhiệm vụ: Chắn cổng dữ liệu (Form) → Báo Service làm việc → Gói kết quả trả về
# Router hoàn toàn KHÔNG BIẾT vụ check dung lượng ổ cứng.
# Router KHÔNG BIẾT file được cất vào thư mục nào.
# Router KHÔNG BIẾT mặt mũi cấu trúc bảng SQLite ra sao.
result = await svc.upload(session, file, title=title, category_id=category_id)
return {"data": result}
7. Flow đầy đủ — Đường đi của một Request Upload¶
Nhìn luồng chạy này, bạn sẽ thấy mọi thứ khớp với nhau như một cỗ máy Thụy Sĩ:
POST /api/audio (multipart/form-data)
│
├── Lễ tân (Router): Hứng UploadFile + Các trường Text
├── Depends: get_session → Tạo AsyncSession + BẮT ĐẦU TRANSACTION
├── Depends: require_editor → Check quyền (Có JWT cookie không? Có role Editor không?)
│
├── Mời Chuyên viên (Service.upload) vào làm việc:
│ ├── _validate_filename() → Chửi ngay nếu đuôi file sai
│ ├── _require_title() → Chửi ngay nếu không nhập tên
│ ├── _validate_foreign_keys() → [GỌI REPO] Check Category tồn tại không
│ ├── _check_file_size() → Giới hạn 50MB
│ ├── _check_storage_limit() → [GỌI REPO] Check xem server còn chỗ không
│ ├── _convert_m4a_to_mp3() → Bật tool xịn FFmpeg convert (nếu cần)
│ ├── Bật khiên bảo vệ (async with _UPLOAD_CRITICAL):
│ │ ├── Check lại dung lượng (Lỡ nãy giờ có thằng khác nhét file vô)
│ │ ├── Check trùng tên file [GỌI REPO]
│ │ ├── GHI FILE RA Ổ CỨNG /data/audio/
│ │ ├── Chèn DB [GỌI REPO] → INSERT INTO audio...
│ │ └── CẤP CỨU (Rollback): Lỗi DB thì phi ngay ra ổ cứng xóa file đi
│ └── Xong xuôi, gói data trả về
│
├── Lễ tân (Router): Cầm data nhét vào response {"data": result}
└── Dọn dẹp (Depends cleanup): session.commit() → CHỐT TRANSACTION DB
8. Sự đánh đổi trong thực chiến (Trade-offs)¶
Cái giá "quá ngon" bạn nhận được¶
Test sướng như tiên:
# Bạn có thể test Service cái rụp mà không cần bật DB thật hay đụng chạm ổ cứng
from unittest.mock import AsyncMock, Mock
async def test_upload_rejects_oversized_file():
mock_repo = Mock(spec=AudioRepository)
mock_repo.get_total_size = AsyncMock(return_value=9.9 * 1024**3) # Giả lập server sắp đầy
svc = AudioService(mock_repo, Mock(), Mock())
with pytest.raises(HTTPException) as exc:
await svc.upload(session=Mock(), file=mock_file(size=200 * 1024**2))
assert exc.value.status_code == 507 # Chặn ngay lỗi đầy ổ cứng
mock_repo.create.assert_not_called() # Chắc chắn DB không thèm insert
Single source of truth: Có bug xảy ra ở luồng xóa file? Chỉ cần mò vào đúng file Service, không chạy lung tung tìm ở chỗ khác.
Nhưng cũng đi kèm "Nỗi đau"¶
Cồng kềnh (Boilerplate): Để ra mắt đúng 1 tính năng Audio, bạn phải đẻ ra 4 file:
base.py(Bản hợp đồng)audio_repo.py(Thợ xây)audio_service.py(Kỹ sư)routers/audio.py(Tiếp tân)
Cross-service transaction (Transaction xuyên bộ phận) cực kỳ rắc rối:
# Ví dụ đau đầu: Khi bạn xóa 1 Playlist, bạn cũng phải chui vào Schedule để tính lại độ dài lịch phát.
# Nhưng PlaylistService và ScheduleService là 2 phòng ban khác nhau!
# Cách "chữa cháy" trong project MVP này: Đẩy quyền điều phối lên Router
@router.delete("/playlists/{id}/tracks/{track_id}")
async def remove_track(
id: int, track_id: int,
playlist_svc: PlaylistService = Depends(...),
schedule_svc: ScheduleService = Depends(get_schedule_service),
session: AsyncSession = Depends(get_session),
):
await playlist_svc.remove_track(session, id, track_id)
# Hiệu ứng dây chuyền: Gọi service khác để update
await schedule_svc.recompute_duration_for_playlist(session, id)
Việc bắt thằng Router phải đứng ra làm đạo diễn thế này là một ngoại lệ (exception) có chủ đích để giữ code đơn giản, không phải pattern chuẩn.
Tiến hóa: Mẫu thiết kế Unit of Work (Khi dự án phình to)¶
Pattern "Repo tự commit" (Bài 4) chạy rất ngon cho MVP. Nhưng nếu hệ thống của bạn có tới 3-4 tính năng cần "orchestrate 2-3 service cùng lúc", nó sẽ toang:
- Thằng Router tự nhiên phải ôm việc đạo diễn quá nhiều (Vi phạm SoC).
- Nếu Service 1 chạy ngon, Service 2 lỗi → Bạn phải tự viết code đi dọn rác của Service 1.
Lúc này, giang hồ thường tiến hóa lên Unit of Work (UoW). Nó giống như một cái giỏ gom tất cả Repo lại, xong việc mới bóp cò commit 1 lần:
class UnitOfWork:
def __init__(self, session_factory):
self._session_factory = session_factory
async def __aenter__(self):
self.session = self._session_factory()
self.playlists = SqlitePlaylistRepository()
self.schedules = SqliteScheduleRepository()
return self
async def __aexit__(self, exc_type, exc, tb):
if exc_type:
await self.session.rollback() # Có biến → Hủy kèo toàn bộ
else:
await self.session.commit() # Trót lọt → Chốt đơn
await self.session.close()
# Chỗ Router lúc này nhìn sẽ pro hơn hẳn:
@router.delete("/playlists/{id}/tracks/{track_id}")
async def remove_track(id: int, track_id: int, uow: UnitOfWork = Depends(get_uow)):
async with uow:
await uow.playlists.remove_track(uow.session, id, track_id)
await uow.schedules.recompute_duration_for_playlist(uow.session, id)
# Thoát ra êm đẹp → UoW tự commit cả 2. Nổ lỗi giữa chừng → UoW tự Rollback tất cả!
Nhưng ở dự án này thì... KHÔNG CẦN: Vì nó chỉ có lác đác 1-2 chỗ bị cross-service, dùng UoW sẽ làm dự án MVP thành "over-engineering". Hãy biết điểm dừng (YAGNI).
9. Khi nào thì dẹp luôn 3 lớp?¶
Đừng mù quáng tôn thờ nó. Hãy bỏ chia lớp khi:
- Bạn viết 1 file Script chạy 1 lần (dọn data rác, migration) → Viết tuột từ trên xuống dưới cho lẹ.
- API chỉ đơn thuần moi data ra trả về (ví dụ
GET /healthhoặcGET /categorieschả có rule gì) → Chọc thẳng vào Repo cũng chả chết ai. - Project làm demo cuối tuần (MVP có < 5 cái bảng DB) → Monolith module xài tạm đi, bao giờ có khách hàng thật rồi chia sau.
Chốt lại: Chỉ đẻ thêm layer khi nghiệp vụ bắt đầu lằng nhằng hoặc bạn cần tách riêng ra để viết test.
10. Ánh xạ vào Nguyên lý tổng quát¶
Bộ 3 lớp này chính là màn trình diễn xuất sắc nhất của bài 0:
| Kỹ năng trong bài | Nguyên lý (Bài 0) | Mô tả |
|---|---|---|
| Router / Service / Repo | SoC | 3 công việc không đội trời chung: HTTP / Nghiệp vụ / Lưu trữ. |
| Mỗi layer làm đúng việc của mình | SRP | Luật làm ăn thay đổi → Sửa Service. Cấu trúc bảng thay đổi → Sửa Repo. |
Service gọi AudioRepository (Bản hợp đồng) |
DIP | Tao không cần biết mày xài SQLite hay Postgres, tao chỉ gọi đúng cái hàm tạo đã thỏa thuận. |
Depends(get_audio_service) |
IoC | FastAPI đóng vai quản gia tự động tiêm Service vào Router. |
Đừng dính bẫy tên gọi¶
Kiến trúc 3 lớp chỉ là 1 đại diện. Bạn sang công ty khác nghe họ chém gió về "Clean Architecture", "Hexagonal" hay "Onion Architecture"... thì bản chất cốt lõi bên trong vẫn chỉ là SoC + DIP mà thôi.
Đừng cãi nhau kiến trúc nào xịn hơn. Hãy tự hỏi: "Code này đã tách biệt rõ các ranh giới chưa? Đã phụ thuộc vào một bản hợp đồng chung (Abstraction) chưa?".
Bài tập áp dụng¶
Thử thách: Tự tay code một cấu trúc 3 lớp cho một chức năng quản lý Bài Viết (Post) của một hệ thống Blog:
# 1. Interface (base.py)
class PostRepository(ABC):
@abstractmethod
async def list_published(self, session) -> list[dict]: ...
@abstractmethod
async def create(self, session, *, title, content, author_id) -> dict: ...
@abstractmethod
async def publish(self, session, id) -> dict | None: ...
# 2. Service (post_service.py) — Bạn phải giải quyết các rules này:
# - Tiêu đề (title) phải từ 10 ký tự trở lên.
# - Nội dung (content) phải từ 100 ký tự trở lên.
# - User đăng bài (author_id) phải là người có thật và chưa bị khóa tài khoản.
# - Cấm nhấn nút publish 2 lần cho 1 bài viết đã xuất bản.
# 3. Implementation (post_repo.py) — Viết câu lệnh SQL thật ở đây.
# 4. Router (routers/posts.py) — Khai báo API endpoint nhận JSON body.
Hãy tự nhẩm trong đầu:
- Tại sao Service phải nhận biến
repo: PostRepository(kiểu dữ liệu là cái Interface) thay vì truyền thẳng cáiSqlitePostRepositoryvào? - Khi nào thì dùng
asyncio.to_thread()? Việc ghi file ổ cứng cớ sao lại dùng hàm đó? - Tại sao nguyên tắc khi xóa là: Xóa dữ liệu dưới DB xong xuôi hết rồi mới được phép xóa file trên ổ cứng? Nếu đảo ngược lại thì chuyện tồi tệ gì sẽ xảy ra?
- Cái ổ khóa
asyncio.Lock()xuất hiện ở hàm upload sinh ra để đề phòng thảm họa gì?