Bỏ qua

Bài 4: SQLAlchemy Core + aiosqlite

Project reference: admin/src/db/engine.py, admin/src/repositories/audio_repo.py


1. Tại sao không dùng SQLAlchemy ORM?

SQLAlchemy có 2 chế độ:

ORM (Session + Models) Core (Expression Language)
Cách dùng session.query(Audio).filter(...) select(audio_table).where(...)
SQL control Ẩn, lazy loading, N+1 risk Explicit, gần SQL
Phù hợp App phức tạp, nhiều relationship App đơn giản, SQL quan trọng
Learning curve Cao (session lifecycle, relationship) Thấp hơn

Project này chọn Core vì SQLite đủ đơn giản để không cần một lớp ORM đứng giữa. Query SQL viết rõ ràng hơn, dễ debug hơn, và không có lazy loading ẩn nào có thể dẫn đến N+1 query không biết từ đâu ra.


2. Khai báo Table (thay thế ORM Model)

# admin/src/db/engine.py
from sqlalchemy import Column, ForeignKey, Integer, MetaData, Table, Text

metadata = MetaData()

# Thay vì:  class Audio(Base): id = Column(...)
# Dùng:
audio = Table(
    "audio",
    metadata,
    Column("id", Integer, primary_key=True, autoincrement=True),
    Column("filename", Text, nullable=False, unique=True),
    Column("title", Text, nullable=False),
    Column("title_normalized", Text, nullable=False, server_default=""),
    Column("size_bytes", Integer, nullable=False),
    Column("duration_seconds", Integer, nullable=False, server_default="0"),
    # Foreign key với ON DELETE SET NULL
    Column("category_id", Integer, ForeignKey("categories.id", ondelete="SET NULL")),
    Column("event_id", Integer, ForeignKey("events.id", ondelete="SET NULL")),
    Column("uploaded_at", Text, nullable=False),
)

categories = Table(
    "categories",
    metadata,
    Column("id", Integer, primary_key=True, autoincrement=True),
    Column("name", Text, nullable=False, unique=True),
)

Lưu ý: khai báo Table không tạo bảng trong database. Đây chỉ là metadata — mô tả schema để SQLAlchemy biết cấu trúc bảng trông như thế nào. Database thật được tạo bởi metadata.create_all() trong bước khởi tạo.


3. Async Engine + Session

# admin/src/db/engine.py
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine

# sqlite+aiosqlite:// = driver async cho SQLite
DATABASE_URL = f"sqlite+aiosqlite:///{DB_PATH}"

engine = create_async_engine(
    DATABASE_URL,
    echo=False,                                      # True = log SQL ra console
    connect_args={"check_same_thread": False},       # bắt buộc cho SQLite multi-thread
)

# Session factory — tạo session từ engine
AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,  # sau commit, objects vẫn accessible (không refresh)
)

Config SQLite mỗi connection — PRAGMA

SQLite có nhiều behavior tắt mặc định mỗi connection. Phải bật thủ công qua SQLAlchemy event:

# admin/src/db/engine.py
from sqlalchemy import event

@event.listens_for(engine.sync_engine, "connect")
def _configure_sqlite(dbapi_conn, _connection_record) -> None:
    cursor = dbapi_conn.cursor()
    cursor.execute("PRAGMA foreign_keys = ON")       # enforce FK constraints
    cursor.execute("PRAGMA journal_mode = WAL")      # reader không block writer
    cursor.execute("PRAGMA synchronous = NORMAL")    # balance durability vs speed
    cursor.close()

Đây là SQLAlchemy event — hook chạy mỗi khi engine mở connection mới.

PRAGMA foreign_keys = ON

Nếu quên, ON DELETE CASCADEON DELETE SET NULL không hoạt động (silent). Dev dễ bỏ sót vì không có error, chỉ thấy data không cascade như expected.

PRAGMA journal_mode = WAL — concurrency cho SQLite

Mặc định SQLite dùng rollback journal:

  • Writer block mọi reader trong lúc ghi
  • Reader block writer

Với project có bot scheduler polling bot_commands mỗi 2s + admin API ghi qua SQLAlchemy → contention cao, request có thể bị delay.

Write-Ahead Logging (WAL):

  • Writer ghi vào file -wal riêng, không đụng DB chính
  • Reader đọc snapshot từ DB + wal → không bị block bởi writer
  • Writer vẫn serial (1 writer cùng lúc), nhưng reader chạy parallel

Trade-off:

Ưu Nhược
Concurrent read + write tốt hơn nhiều Xuất hiện 3 file: db, db-wal, db-shm — backup phải copy cả 3
Performance tốt hơn (ít fsync) Cần checkpoint định kỳ (SQLite tự làm)
Crash-safe Ít durable hơn synchronous=FULL (chấp nhận được với NORMAL)

Khi nào WAL đặc biệt cần: Long-running async process đọc DB cùng lúc với write traffic — ví dụ bot scheduler của project này.

PRAGMA synchronous = NORMAL

  • FULL (mặc định): fsync sau mỗi commit — chậm nhưng durable 100% (commit xong = disk chắc chắn có data, kể cả power loss ngay sau đó)
  • NORMAL: fsync chỉ khi checkpoint — nhanh hơn, trade-off: power loss trong vài giây sau commit có thể mất data đang trong WAL. DB file không corrupt, chỉ mất data chưa checkpoint.
  • OFF: không fsync — không dùng cho production

Với app KCDS (mất vài phút data khi crash không nghiêm trọng) → NORMAL hợp lý.


4. SELECT — Query với Core

# admin/src/repositories/audio_repo.py
from sqlalchemy import delete, func, insert, or_, select, update

async def list_all(
    self, session: AsyncSession, *, q: str | None = None
) -> list[dict]:
    # Xây query từng bước (immutable — mỗi .where() trả về query mới)
    query = (
        select(
            audio,                                           # SELECT audio.*
            categories.c.name.label("category_name"),       # + categories.name AS category_name
            events.c.name.label("event_name"),
            events.c.date.label("event_date"),
        )
        .outerjoin(categories, audio.c.category_id == categories.c.id)  # LEFT JOIN
        .outerjoin(events, audio.c.event_id == events.c.id)
        .order_by(audio.c.uploaded_at.desc())
    )

    # Conditional filter — chỉ thêm WHERE khi có giá trị
    if q and (qn := q.strip()):
        pattern = f"%{unidecode(qn).lower()}%"
        query = query.where(
            or_(                                             # WHERE ... OR ...
                audio.c.title_normalized.like(pattern),
                func.lower(audio.c.filename).like(pattern), # SQLite function
            )
        )

    # Execute — trả về Result object
    result = await session.execute(query)

    # Convert rows → list of dict
    return [dict(row._mapping) for row in result.fetchall()]

So sánh với SQL thuần:

-- Equivalent SQL
SELECT audio.*, categories.name AS category_name, events.name AS event_name
FROM audio
LEFT JOIN categories ON audio.category_id = categories.id
LEFT JOIN events ON audio.event_id = events.id
WHERE LOWER(audio.title_normalized) LIKE '%thiền%'
   OR LOWER(audio.filename) LIKE '%thiền%'
ORDER BY audio.uploaded_at DESC


5. INSERT

async def create(
    self, session: AsyncSession,
    *, filename: str, title: str, size_bytes: int, ...
) -> dict:
    result = await session.execute(
        insert(audio).values(
            filename=filename,
            title=title,
            title_normalized=title_normalized,
            size_bytes=size_bytes,
            duration_seconds=duration_seconds,
            category_id=category_id,
            event_id=event_id,
            uploaded_at=datetime.now(TZ).isoformat(),
        )
    )
    # result.inserted_primary_key[0] = ID vừa insert
    return await self.get_by_id(session, result.inserted_primary_key[0])

6. UPDATE

async def update(self, session: AsyncSession, id: int, **kwargs) -> dict | None:
    # Lọc bỏ None values — chỉ update fields được truyền vào
    updates = {k: v for k, v in kwargs.items() if v is not None}
    if not updates:
        return await self.get_by_id(session, id)

    await session.execute(
        update(audio)
        .where(audio.c.id == id)
        .values(**updates)
    )
    return await self.get_by_id(session, id)

7. DELETE

async def delete(self, session: AsyncSession, id: int) -> bool:
    result = await session.execute(
        delete(audio).where(audio.c.id == id)
    )
    return result.rowcount > 0  # True nếu có row bị xóa

8. Aggregate Function

async def get_total_size(self, session: AsyncSession) -> int:
    """SUM(size_bytes) — dùng cho storage limit check."""
    result = await session.execute(
        select(func.sum(audio.c.size_bytes))
    )
    return result.scalar() or 0  # scalar() lấy single value, or 0 nếu NULL (bảng rỗng)
# Tương tự với COUNT, MAX, MIN
select(func.count(audio.c.id))
select(func.sum(audio_table.c.duration_seconds)).where(...)

9. Transaction — Commit trong Repository

Project này chọn pattern commit-per-repository-method, không bọc transaction wrapper ở session dependency:

# admin/src/db/engine.py — session dependency
async def get_session() -> AsyncIterator[AsyncSession]:
    async with AsyncSessionLocal() as session:
        yield session   # KHÔNG có session.begin() ở đây
# admin/src/repositories/audio_repo.py — repo tự commit sau mutation
async def create(self, session, *, filename, title, ...) -> dict:
    result = await session.execute(insert(audio).values(...))
    await session.commit()   # ← commit ngay trong repo
    return await self.get_by_id(session, result.inserted_primary_key[0])

Khi cần atomic nhiều statement (ví dụ swap admin, reorder playlist), repository gom nhiều execute() rồi commit 1 lần cuối — tận dụng auto-begun transaction của SQLAlchemy 2.0:

# admin/src/repositories/user_repo.py
async def swap_admin(self, session, *, old_admin, old_new_role, successor) -> None:
    # SQLAlchemy 2.0 tự begin TX khi execute() đầu tiên
    await session.execute(update(users).where(...).values(role=old_new_role))
    await session.execute(update(users).where(...).values(role="admin"))
    await session.commit()   # commit cả 2 statement cùng lúc
    # Nếu statement 2 raise → commit không chạy → SA rollback auto

Trade-off của pattern này

Ưu điểm Nhược điểm
Đơn giản — mỗi repo self-contained Service orchestrate ≥ 2 repo: commit từng bước
Không cần hiểu lifecycle TX phức tạp Lỗi giữa chừng → các repo trước đã persist
Reuse repo method dễ (1 call = 1 TX) Phải cẩn thận thứ tự (xem bài 5 §4)

Pattern thay thế phổ biến (không dùng trong project này nhưng newbie sẽ gặp): bọc async with session.begin(): ở dependency → cả request là 1 transaction, auto-rollback khi handler raise. Phù hợp hơn khi business logic phức tạp cần atomic toàn cục. Xem SQLAlchemy docs — Session Basics.


10. Khởi tạo DB với schema.sql

# admin/src/db/engine.py
import aiosqlite

async def init_db() -> None:
    schema_path = Path(__file__).parent / "schema.sql"
    schema_sql = schema_path.read_text()

    # Dùng aiosqlite trực tiếp cho executescript (SQLAlchemy không support)
    async with aiosqlite.connect(DB_PATH) as conn:
        await conn.executescript(schema_sql)
        await conn.commit()

    # Migration: thêm cột mới nếu chưa có (ALTER TABLE IF NOT EXISTS)
    await _ensure_column("audio", "duration_seconds", "INTEGER NOT NULL DEFAULT 0")

Pattern migration đơn giản:

async def _ensure_column(table: str, column: str, definition: str) -> None:
    """Thêm column nếu chưa tồn tại — idempotent migration."""
    async with aiosqlite.connect(DB_PATH) as conn:
        # Lấy schema hiện tại của bảng
        cur = await conn.execute(f"PRAGMA table_info({table})")
        columns = {row[1] for row in await cur.fetchall()}
        if column not in columns:
            await conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {definition}")
            await conn.commit()
            logger.info("Migration: added %s.%s", table, column)

Nguyên lý tổng quát

Pattern trong bài Nguyên lý (bài 0) Chuyển giao
Chọn Core thay ORM Explicit > Implicit — SQL nhìn thấy, debug dễ Đánh giá abstraction level mỗi khi chọn library
Không ORM vì chưa phức tạp YAGNI — không thêm complexity cho use case chưa có Nguyên tắc chung: chọn công cụ theo requirement, không chọn theo trend
@event.listens_for("connect") bật PRAGMA Fail Fast nếu quên — FK bị silent disable Mọi DB có quirks riêng (Postgres search_path, MySQL sql_mode) — đọc docs kỹ
Repo tự commit sau mutation SoC — transaction là concern của repo, không leak sang service Trade-off: atomic cross-repo khó; xem bài 10 §9
inserted_primary_key[0] + fetch lại POLS — trả về dict đầy đủ, không phải chỉ id API method luôn trả "useful thing", không leak detail của backend

Chuyển giao: abstraction level

SQLAlchemy Core ở giữa raw SQLORM. Mỗi level có trade-off:

Level Ưu Nhược
Raw SQL (aiosqlite.execute("SELECT...")) Full control, nhanh nhất SQL injection risk, không portable, khó compose
Core (select(audio).where(...)) Composable, portable, explicit Verbose hơn ORM
ORM (session.query(Audio).filter(...)) Productive, relationship tự Hidden query, N+1 risk, session lifecycle phức tạp

Khi nào lên/xuống level? Xem bài 10 §2 — "Khi nào Core không đủ".

Connection vs Session vs Transaction (khái niệm newbie hay nhầm)

  • Connection — kênh vật lý tới DB (TCP socket hoặc file handle với SQLite). Expensive để tạo → dùng pool.
  • Session — logical workspace của SQLAlchemy. 1 session "mượn" 1 connection từ pool khi cần.
  • Transaction — unit of atomicity trong DB. 1 session có thể chạy nhiều transaction tuần tự.

Hierarchy: pool → connection → session → transaction → statement.


Bài tập áp dụng

Tạo repository cho entity Product:

from sqlalchemy import Column, Integer, MetaData, Table, Text, Numeric, insert, select, delete

metadata = MetaData()
products = Table(
    "products", metadata,
    Column("id", Integer, primary_key=True, autoincrement=True),
    Column("name", Text, nullable=False),
    Column("price", Numeric, nullable=False),
    Column("stock", Integer, nullable=False, server_default="0"),
)

class ProductRepository:
    async def list_all(self, session: AsyncSession) -> list[dict]:
        result = await session.execute(select(products).order_by(products.c.name))
        return [dict(row._mapping) for row in result.fetchall()]

    async def create(self, session: AsyncSession, *, name: str, price: float) -> dict:
        result = await session.execute(
            insert(products).values(name=name, price=price)
        )
        return await self.get_by_id(session, result.inserted_primary_key[0])

    async def get_by_id(self, session: AsyncSession, id: int) -> dict | None:
        result = await session.execute(select(products).where(products.c.id == id))
        row = result.fetchone()
        return dict(row._mapping) if row else None

Câu hỏi tự kiểm tra: 1. Tại sao phải bật PRAGMA foreign_keys = ON? Khi nào ON DELETE CASCADE hoạt động? 2. .scalar() vs .fetchone() vs .fetchall() — khác nhau gì? 3. Project này chọn commit ở repository method. Khi nào pattern này gặp vấn đề? Khi nào nên chuyển sang session.begin() ở dependency? 4. result._mapping là gì? Tại sao không dùng result[0]?