Bài 4: SQLAlchemy Core + aiosqlite¶
Tham khảo trong project:
admin/src/db/engine.py,admin/src/repositories/audio_repo.py
1. Tại sao chúng ta lại nói "Không" với SQLAlchemy ORM?¶
Thế giới SQLAlchemy được chia làm 2 trường phái chính:
| ORM (Session + Models) | Core (Expression Language) | |
|---|---|---|
| Cách viết | session.query(Audio).filter(...) |
select(audio_table).where(...) |
| Kiểm soát SQL | Bị ẩn đi, tự động tải dữ liệu ngầm (lazy loading), dễ dính bẫy N+1. | Rõ ràng, tường minh, cú pháp bám sát SQL thuần. |
| Điểm rơi phù hợp | App phức tạp, các bảng có quan hệ (relationship) chằng chịt. | App đơn giản, vừa và nhỏ, cần tối ưu câu lệnh SQL. |
| Độ khó học | Cao (Phải hiểu vòng đời session, relationship). | Thấp hơn nhiều. |
Project này kiên quyết chọn Core vì: SQLite bản chất đã rất đơn giản, chúng ta không cần thêm một lớp "phép thuật" ORM đứng giữa làm gì. Việc viết SQL bằng Core giúp câu query hiện ra rõ ràng trước mắt, dễ debug hơn hẳn và triệt tiêu hoàn toàn rủi ro sinh ra hàng loạt câu query ngầm (bẫy N+1) mà bạn không kiểm soát được.
2. Khai báo Table (Bản vẽ thay thế ORM Model)¶
# admin/src/db/engine.py
from sqlalchemy import Column, ForeignKey, Integer, MetaData, Table, Text
metadata = MetaData()
# Thay vì định nghĩa class dài dòng: class Audio(Base): id = Column(...)
# Chúng ta dùng thẳng Table:
audio = Table(
"audio",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("filename", Text, nullable=False, unique=True),
Column("title", Text, nullable=False),
Column("title_normalized", Text, nullable=False, server_default=""),
Column("size_bytes", Integer, nullable=False),
Column("duration_seconds", Integer, nullable=False, server_default="0"),
# Khóa ngoại: Đặt luật nếu Category bị xóa thì set trường này về NULL
Column("category_id", Integer, ForeignKey("categories.id", ondelete="SET NULL")),
Column("event_id", Integer, ForeignKey("events.id", ondelete="SET NULL")),
Column("uploaded_at", Text, nullable=False),
)
categories = Table(
"categories",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("name", Text, nullable=False, unique=True),
)
Lưu ý nhỏ: Việc khai báo Table ở đây không hề tự động tạo bảng trong database. Bạn hãy coi nó như một bản vẽ (metadata) để SQLAlchemy hiểu cấu trúc bảng trông như thế nào. Việc xây nhà (tạo DB thật) sẽ do lệnh metadata.create_all() đảm nhận ở bước khởi tạo.
3. Cỗ máy Engine + Session bất đồng bộ (Async)¶
# admin/src/db/engine.py
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
# sqlite+aiosqlite:// = Cú pháp gọi driver bất đồng bộ cho SQLite
DATABASE_URL = f"sqlite+aiosqlite:///{DB_PATH}"
engine = create_async_engine(
DATABASE_URL,
echo=False, # Bật True nếu muốn in câu SQL ra console để debug
connect_args={"check_same_thread": False}, # Bắt buộc phải có để SQLite chạy được đa luồng
)
# Xưởng đẻ ra Session — tạo session từ engine
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # Cực kỳ quan trọng: Chốt đơn (commit) xong vẫn cho phép đọc lại data của object đó
)
Ép SQLite chạy theo ý mình — Các lệnh PRAGMA¶
SQLite có khá nhiều tính năng hay ho nhưng lại tắt mặc định mỗi khi mở kết nối mới. Chúng ta phải tự tay ép nó bật lên bằng "hook" của SQLAlchemy:
# admin/src/db/engine.py
from sqlalchemy import event
@event.listens_for(engine.sync_engine, "connect")
def _configure_sqlite(dbapi_conn, _connection_record) -> None:
cursor = dbapi_conn.cursor()
cursor.execute("PRAGMA foreign_keys = ON") # Ép tuân thủ khóa ngoại (FK)
cursor.execute("PRAGMA journal_mode = WAL") # Chế độ WAL: Thằng đọc không cản đường thằng ghi
cursor.execute("PRAGMA synchronous = NORMAL") # Cân bằng giữa tốc độ và độ an toàn
cursor.close()
PRAGMA foreign_keys = ON¶
Nếu bạn quên dòng này, mấy cái lệnh như ON DELETE CASCADE hay ON DELETE SET NULL sẽ bị vô hiệu hóa hoàn toàn trong im lặng. Developer sẽ cực kỳ dễ bỏ sót vì hệ thống không hề báo lỗi, chỉ thấy data rác không tự động bị dọn dẹp như kỳ vọng.
PRAGMA journal_mode = WAL — Chìa khóa vàng cho Concurrency (Đồng thời)¶
Mặc định, SQLite dùng chế độ rollback journal với luật lệ rất hà khắc:
- Thằng nào đang Ghi thì khóa mõm tất cả những thằng Đọc.
- Thằng nào đang Đọc thì trói tay thằng Ghi.
Trong project này, chúng ta có một con bot Discord cứ 2 giây lại quét (polling) bảng lệnh một lần, cùng lúc đó Admin API lại bận rộn ghi dữ liệu. Nếu cứ khóa nhau thế này thì API sẽ bị lag giật.
Chế độ Write-Ahead Logging (WAL) ra đời để giải cứu:
- Thằng Ghi sẽ ghi vào một file phụ
-wal, không đụng chạm đến file DB chính. - Thằng Đọc cứ thoải mái đọc dữ liệu từ DB chính + file wal → Tuyệt đối không ai block ai!
- Lượng Ghi vẫn phải xếp hàng (serial), nhưng lượng Đọc thì chạy song song thoải mái.
Sự đánh đổi (Trade-off):
Mở thư mục ra bạn sẽ thấy đẻ thêm 2 file là db-wal và db-shm. Nhớ copy cả 3 file nếu bạn muốn backup DB nhé.
PRAGMA synchronous = NORMAL¶
FULL(Mặc định): Chậm nhưng an toàn tuyệt đối. Ghi xong là chắc chắn nằm trên ổ cứng, cúp điện cũng không mất.NORMAL: Nhanh hơn đáng kể. Đánh đổi lại: Nếu chốt đơn xong mà server cúp điện cái rụp trong vài giây đầu, bạn có thể bị mất khúc data đang nằm tạm trong WAL. File DB không bị hỏng, chỉ là mất data chưa kịp lưu hẳn.OFF: Tắt luôn an toàn, không bao giờ dùng cho production.
Với app KCDS (mất vài phút data khi crash không phải là vấn đề quá sinh tử) → Chọn NORMAL là hợp lý nhất.
4. SELECT — Truy xuất dữ liệu với Core¶
# admin/src/repositories/audio_repo.py
from sqlalchemy import delete, func, insert, or_, select, update
async def list_all(
self, session: AsyncSession, *, q: str | None = None
) -> list[dict]:
# Lắp ráp câu query từng phần (Mỗi lệnh .where() sẽ sinh ra một câu query mới)
query = (
select(
audio, # Lấy hết cột bảng audio (SELECT audio.*)
categories.c.name.label("category_name"), # Thêm: categories.name AS category_name
events.c.name.label("event_name"),
events.c.date.label("event_date"),
)
.outerjoin(categories, audio.c.category_id == categories.c.id) # LEFT JOIN
.outerjoin(events, audio.c.event_id == events.c.id)
.order_by(audio.c.uploaded_at.desc())
)
# Bộ lọc có điều kiện — Chỉ nhét thêm WHERE nếu user có gõ từ khóa tìm kiếm
if q and (qn := q.strip()):
pattern = f"%{unidecode(qn).lower()}%"
query = query.where(
or_( # Dịch ra: WHERE ... OR ...
audio.c.title_normalized.like(pattern),
func.lower(audio.c.filename).like(pattern), # Gọi hàm lower() của SQLite
)
)
# Đạp ga thực thi — Trả về một Result object
result = await session.execute(query)
# Biến đám data thô thành một List chứa các Dictionary cho Service dễ xài
return [dict(row._mapping) for row in result.fetchall()]
Nhìn thử bản dịch SQL thuần xem Core xịn thế nào:
SELECT audio.*, categories.name AS category_name, events.name AS event_name
FROM audio
LEFT JOIN categories ON audio.category_id = categories.id
LEFT JOIN events ON audio.event_id = events.id
WHERE LOWER(audio.title_normalized) LIKE '%thiền%'
OR LOWER(audio.filename) LIKE '%thiền%'
ORDER BY audio.uploaded_at DESC
5. Thêm mới (INSERT)¶
async def create(
self, session: AsyncSession,
*, filename: str, title: str, size_bytes: int, ...
) -> dict:
result = await session.execute(
insert(audio).values(
filename=filename,
title=title,
title_normalized=title_normalized,
size_bytes=size_bytes,
duration_seconds=duration_seconds,
category_id=category_id,
event_id=event_id,
uploaded_at=datetime.now(TZ).isoformat(),
)
)
# result.inserted_primary_key[0] sẽ nhả ra cái ID vừa mới được tạo
return await self.get_by_id(session, result.inserted_primary_key[0])
6. Sửa đổi (UPDATE)¶
async def update(self, session: AsyncSession, id: int, **kwargs) -> dict | None:
# Tuyệt chiêu: Lọc bỏ mấy cái value None, chỉ update đúng những trường được truyền vào
updates = {k: v for k, v in kwargs.items() if v is not None}
if not updates:
return await self.get_by_id(session, id)
await session.execute(
update(audio)
.where(audio.c.id == id)
.values(**updates)
)
return await self.get_by_id(session, id)
7. Xóa (DELETE)¶
async def delete(self, session: AsyncSession, id: int) -> bool:
result = await session.execute(
delete(audio).where(audio.c.id == id)
)
return result.rowcount > 0 # Trả về True nếu thực sự có ít nhất 1 dòng bị xóa
8. Các hàm tính toán tổng hợp (Aggregate Function)¶
async def get_total_size(self, session: AsyncSession) -> int:
"""Hàm tính SUM(size_bytes) — Dùng để kiểm tra tổng dung lượng ổ cứng đã xài."""
result = await session.execute(
select(func.sum(audio.c.size_bytes))
)
# .scalar() lấy ra đúng 1 con số duy nhất. Nếu bảng trống (NULL) thì nhả về 0.
return result.scalar() or 0
# Tương tự nếu bạn muốn xài COUNT, MAX, MIN
select(func.count(audio.c.id))
select(func.sum(audio_table.c.duration_seconds)).where(...)
9. Transaction — Quản lý cam kết (Commit) tại Repository¶
Project này lựa chọn pattern tự chốt đơn (commit) ngay trong từng hàm của Repository, chứ không dùng cái màng bọc transaction ở tầng session:
# admin/src/db/engine.py — Lúc tạo dependency
async def get_session() -> AsyncIterator[AsyncSession]:
async with AsyncSessionLocal() as session:
yield session # TUYỆT ĐỐI KHÔNG có lệnh session.begin() ở đây
# admin/src/repositories/audio_repo.py — Repo tự lo thân mình
async def create(self, session, *, filename, title, ...) -> dict:
result = await session.execute(insert(audio).values(...))
await session.commit() # ← Tự commit luôn cho nóng
return await self.get_by_id(session, result.inserted_primary_key[0])
Nhưng nếu cần gom 2-3 lệnh thao tác thành 1 khối (Atomic) thì sao?
Rất đơn giản, tầng Repository sẽ gom các lệnh execute() lại và chỉ gọi commit 1 lần ở cuối. Từ bản 2.0, SQLAlchemy đủ thông minh để tự gom chúng vào một Transaction.
# admin/src/repositories/user_repo.py
async def swap_admin(self, session, *, old_admin, old_new_role, successor) -> None:
# SQLAlchemy 2.0 tự động bắt đầu Transaction (begin) ngay ở lệnh execute() đầu tiên
await session.execute(update(users).where(...).values(role=old_new_role))
await session.execute(update(users).where(...).values(role="admin"))
await session.commit() # Chốt đơn cho cả 2 thao tác cùng lúc
# Nếu thao tác 2 bị văng lỗi → hàm bị vỡ → lệnh commit không chạy tới → SQLAlchemy tự động Rollback!
Đánh giá công tâm về Pattern này¶
| Ưu điểm | Nhược điểm |
|---|---|
| Cực kỳ đơn giản, mỗi hàm repo tự lo liệu vòng đời của nó. | Khó quản lý nếu Tầng Service cần điều phối gọi nhiều Repo khác nhau. |
| Không cần vò đầu bứt tai hiểu lifecycle phức tạp của Transaction. | Lỗi giữa chừng → các hàm repo trước đó đã lỡ lưu data mất rồi. |
| Dễ dàng xài lại (Cứ gọi 1 hàm là 1 Transaction gọn gàng). | Tầng Service phải tính toán thứ tự chạy cực kỳ cẩn thận (Xem bài 5 §4). |
Pattern thay thế: Nếu bạn làm các app nặng logic nghiệp vụ, người ta hay bọc async with session.begin(): ở Dependency. Khi đó cả 1 request API sẽ gói trong đúng 1 Transaction, lỗi ở đâu là rollback toàn bộ. Tuy nhiên, pattern này rườm rà hơn và chúng ta không xài nó trong KCDS.
10. Mở bát DB bằng file schema.sql¶
# admin/src/db/engine.py
import aiosqlite
async def init_db() -> None:
schema_path = Path(__file__).parent / "schema.sql"
schema_sql = schema_path.read_text()
# Nhảy qua mặt SQLAlchemy, xài thẳng aiosqlite để chạy cục script SQL khổng lồ
async with aiosqlite.connect(DB_PATH) as conn:
await conn.executescript(schema_sql)
await conn.commit()
# Migration (Cập nhật DB): Mẹo thêm cột mới nếu DB cũ chưa có (ALTER TABLE IF NOT EXISTS)
await _ensure_column("audio", "duration_seconds", "INTEGER NOT NULL DEFAULT 0")
Mẹo viết Migration "chống cháy" cực hay:
async def _ensure_column(table: str, column: str, definition: str) -> None:
"""Hàm kiểm tra: Có cột chưa? Chưa có thì đắp thêm vào."""
async with aiosqlite.connect(DB_PATH) as conn:
# Lấy bản vẽ cấu trúc hiện tại của bảng
cur = await conn.execute(f"PRAGMA table_info({table})")
columns = {row[1] for row in await cur.fetchall()}
# Nếu cột vắng mặt, tiến hành phẫu thuật thêm cột
if column not in columns:
await conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {definition}")
await conn.commit()
logger.info("Migration thành công: đã thêm %s.%s", table, column)
Ánh xạ vào Nguyên lý tổng quát (Bài 0)¶
| Kỹ thuật trong bài | Thuộc Nguyên lý nào? | Cách mang đi áp dụng nơi khác |
|---|---|---|
| Chọn Core thay vì ORM | Explicit > Implicit (Rõ ràng hơn Ngầm định) | SQL hiển hiện rõ ràng, dễ debug. Tập thói quen đánh giá cấp độ trừu tượng mỗi khi chọn tool. |
| Không xài ORM vì chưa đáng | YAGNI (Đừng làm thừa) | Chọn công cụ theo độ lớn của bài toán, đừng ôm đồm thêm độ phức tạp (complexity) không cần thiết. |
Dùng @event bật PRAGMA |
Fail Fast (Thất bại sớm) | Mọi DB đều có "bệnh" riêng (Postgres thì search_path, MySQL thì sql_mode). Phải rào chắn ngay từ lúc mở kết nối. |
| Repo tự xử lý commit | SoC (Tách biệt mối quan tâm) | Việc lưu dữ liệu là việc của Repo, không để rò rỉ lên Service. (Đổi lại sẽ có trade-off, xem Bài 10 mục 9). |
Lấy inserted_primary_key rồi fetch lại |
POLS (Ít gây bất ngờ nhất) | Hàm API làm việc xong thì hãy ói ra món đồ (Dict) dùng được ngay, đừng bắt người khác phải lóc cóc query thêm lần nữa. |
Chuyển giao: Các cấp độ của Abstraction (Trừu tượng)¶
SQLAlchemy Core nằm lấp lửng ở giữa Raw SQL (Gõ chay) và ORM (Gói gọn trong Class). Mỗi level đều có giá của nó:
| Cấp độ | Ưu điểm | Nhược điểm |
|---|---|---|
Raw SQL (aiosqlite.execute("SELECT...")) |
Toàn quyền kiểm soát, tốc độ bàn thờ | Rủi ro bị SQL Injection, khó tái sử dụng, code rác. |
Core (select(audio).where(...)) |
Lắp ráp dễ dàng, tường minh, an toàn | Phải gõ code dài dòng hơn ORM một chút. |
ORM (session.query(Audio).filter(...)) |
Năng suất cực cao, tự nối relationship | Có nhiều query ngầm, dễ sinh bẫy N+1, vòng đời session rối rắm. |
Khi nào nên tăng/giảm cấp độ? Hãy xem Bài 10 mục 2 — "Khi nào Core không đủ xài".
Đừng nhầm lẫn: Connection, Session và Transaction¶
Rất nhiều bạn mới hay bị lú 3 chữ này:
- Connection (Đường ống mạng): Kênh kết nối vật lý tới DB (TCP socket). Tạo ra cái này cực kỳ tốn sức nên thường gom chung vào một cái Hồ (Pool).
- Session (Bàn làm việc): Không gian làm việc logic của SQLAlchemy. Khi cần làm việc, Session sẽ "mượn tạm" 1 Connection từ Pool ra xài.
- Transaction (Giao dịch): Một gói các lệnh thao tác (INSERT, UPDATE). 1 Session có thể thực hiện hàng đống Transaction lần lượt.
Chuỗi: Pool → Connection → Session → Transaction → Statement (câu lệnh SQL).
Bài tập áp dụng¶
Hãy tự tay tạo một Repository cho đối tượng Product:
from sqlalchemy import Column, Integer, MetaData, Table, Text, Numeric, insert, select, delete
metadata = MetaData()
products = Table(
"products", metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("name", Text, nullable=False),
Column("price", Numeric, nullable=False),
Column("stock", Integer, nullable=False, server_default="0"),
)
class ProductRepository:
async def list_all(self, session: AsyncSession) -> list[dict]:
result = await session.execute(select(products).order_by(products.c.name))
return [dict(row._mapping) for row in result.fetchall()]
async def create(self, session: AsyncSession, *, name: str, price: float) -> dict:
result = await session.execute(
insert(products).values(name=name, price=price)
)
return await self.get_by_id(session, result.inserted_primary_key[0])
async def get_by_id(self, session: AsyncSession, id: int) -> dict | None:
result = await session.execute(select(products).where(products.c.id == id))
row = result.fetchone()
return dict(row._mapping) if row else None
Thử thách tư duy:
- Tại sao phải cực khổ bật cờ
PRAGMA foreign_keys = ON? Khi nào thì lệnhON DELETE CASCADEmới chịu chạy? - Giữa 3 hàm
.scalar(),.fetchone(), và.fetchall()có gì khác biệt? - Mô hình "Tự commit trong repository" sẽ phá sản khi nào? Lúc đó thì cách bọc
session.begin()ở hàm Dependency sẽ phát huy tác dụng ra sao? - Cú pháp
result._mappingdùng để làm gì? Tại sao không xài thẳngresult[0]cho rồi?