Bỏ qua

Bài 7: discord.py + Async Patterns trong Bot

Project reference: bot/src/scheduler.py, bot/src/main.py, bot/src/notifier.py


1. discord.py là gì

discord.py là thư viện async dành cho Discord API. Thư viện này đóng gói cả WebSocket connection lẫn HTTP API của Discord thành các coroutines Python, cho phép bot tương tác với Discord mà không cần tự xử lý tầng mạng phía dưới.

# bot/src/main.py
import discord

# Intents = khai báo events bạn muốn nhận
intents = discord.Intents.default()
intents.voice_states = True   # cần để biết user join/leave voice channel

client = discord.Client(intents=intents)

@client.event
async def on_ready():
    """Chạy khi bot connect thành công vào Discord."""
    print(f"Bot đã online: {client.user}")

client.run("TOKEN")

2. Event-Driven Architecture

discord.py dùng event-driven model — bot lắng nghe events từ Discord:

# bot/src/main.py
@client.event
async def on_ready():
    """Bot vừa online."""
    for guild in client.guilds:
        await sync_guild_channels(guild)  # sync channel cache vào SQLite

    # Khởi động scheduler loop
    client.scheduler = Scheduler(client, ...)
    client.scheduler.start()

@client.event
async def on_guild_join(guild: discord.Guild):
    """Bot được thêm vào server mới."""
    await sync_guild_channels(guild)

# Nếu cần slash commands (v2):
@client.event
async def on_interaction(interaction: discord.Interaction):
    ...

Events là async def — chạy trong event loop của discord.py.


3. discord.ext.tasks — Scheduled Loop

discord.ext.tasks cung cấp decorator @tasks.loop() để tạo background loop:

# bot/src/scheduler.py
from discord.ext import tasks

class Scheduler:
    def __init__(self, client, ...):
        # Tạo task loop từ method
        self._task = tasks.loop(seconds=2)(self.tick)
        self._task.before_loop(self._before_loop)  # chạy 1 lần trước loop

    def start(self) -> None:
        if self._task.is_running():
            return
        self._task.start()

    async def _before_loop(self) -> None:
        """Chạy 1 lần trước vòng lặp đầu tiên."""
        await self.client.wait_until_ready()  # chờ bot fully connected
        await self.state_repo.clear()          # reset stale DB state từ crash trước

    async def tick(self) -> None:
        """Chạy mỗi 2 giây."""
        try:
            await self._drain_commands()   # xử lý manual commands
            await self._check_schedule()   # trigger auto-play nếu đến giờ
        except _TICK_RECOVERABLE as e:
            logger.warning("Tick error (recoverable): %s", e)
            # Không re-raise → loop tiếp tục
            # discord.ext.tasks tự restart loop nếu exception thoát ra

Tại sao không asyncio.sleep() trong loop?

# ❌ Sai
async def my_loop():
    while True:
        await do_something()
        await asyncio.sleep(2)  # block event loop 2s — không handle events khác
# ✅ Đúng — discord.ext.tasks quản lý timing, không block
@tasks.loop(seconds=2)
async def my_loop():
    await do_something()

discord.ext.tasks tích hợp với event loop — không block Discord events trong 2s nghỉ.


4. Async Mutex — Race Condition trong Bot

Bot có thể nhận lệnh manual (qua bot_commands queue) đồng thời với auto-trigger từ schedule. Cần đảm bảo chỉ 1 session phát nhạc tại một thời điểm.

Hãy xem tình huống race condition trông như thế nào:

# ❌ Race condition tiềm ẩn
async def check_and_trigger():
    if not currently_playing():  # check ở đây
        # ... 50ms sau, auto-trigger khác cũng check và cùng thấy "not playing"
        await start_playback()   # 2 playback cùng start!

Cách giải quyết: luôn ghi nhận mutex trước await đầu tiên

# bot/src/scheduler.py
class Scheduler:
    _current_entry_id: int | None = None   # in-memory mutex

    async def _maybe_start(self, entry: dict) -> None:
        """Trigger auto-play nếu đến giờ và không đang phát gì."""
        if self._current_entry_id is not None:
            return   # mutex locked — đang phát entry khác

        # Claim mutex NGAY LẬP TỨC (synchronous) trước bất kỳ await nào
        self._current_entry_id = entry["id"]

        # Bây giờ mới tạo async task
        self._spawn_playback(entry)

    def _spawn_playback(self, entry: dict) -> None:
        """Tạo background task với strong reference."""
        task = asyncio.create_task(self._start_playback(entry))

        # QUAN TRỌNG: asyncio chỉ giữ weak reference đến task
        # → GC có thể hủy task giữa chừng!
        self._pending_tasks.add(task)          # strong reference
        task.add_done_callback(self._pending_tasks.discard)  # tự cleanup khi xong

    async def _start_playback(self, entry: dict) -> None:
        try:
            await self._execute_playback(entry)
        finally:
            self._current_entry_id = None   # release mutex

Tại sao gán SYNC (không phải async)?

Python event loop là single-threaded cooperative multitasking. Code chạy đồng bộ (không có await) không bao giờ bị interrupt. Khi gán self._current_entry_id = entry["id"], không có context switch nào xảy ra → không có race condition.

Khi code gặp await, event loop có thể chuyển sang coroutine khác → đây là điểm có thể race.


5. asyncio.create_task() vs await

# await — chờ xong mới tiếp tục (tick bị block)
await self._start_playback(entry)   # ❌ tick loop đứng chờ 60 phút!

# create_task — spawn background coroutine, tiếp tục ngay
asyncio.create_task(self._start_playback(entry))  # ✅ tick tiếp tục mỗi 2s

create_task schedule coroutine để chạy "concurrently" trong event loop — không block caller.


6. Voice Client — Phát Audio

async def _acquire_voice_client(self, channel: discord.VoiceChannel) -> discord.VoiceClient:
    """Join voice channel, disconnect nếu đang ở channel khác."""
    existing_vc = channel.guild.voice_client

    if existing_vc is not None:
        if existing_vc.channel.id == channel.id:
            return existing_vc  # đã ở đúng channel rồi
        await existing_vc.disconnect(force=True)  # disconnect khỏi channel cũ

    return await channel.connect()  # join channel mới

async def _run_playlist(self, vc: discord.VoiceClient, files: list[Path], stop_pred) -> None:
    """Phát lần lượt các file audio."""
    for filepath in files:
        if stop_pred():   # check _stop_requested
            break

        # FFmpegPCMAudio: đọc file → FFmpeg decode → PCM → Discord voice
        source = discord.FFmpegPCMAudio(str(filepath))
        vc.play(source)

        # Chờ file xong
        while vc.is_playing() and not stop_pred():
            await asyncio.sleep(0.5)

        if stop_pred():
            vc.stop()
            break

        # Gap giữa các track
        await asyncio.sleep(TRACK_GAP_SECONDS)

7. Error Handling trong Async — Narrow Exception

# bot/src/scheduler.py

# Định nghĩa rõ exception nào là "recoverable" (log + tiếp tục)
_TICK_RECOVERABLE = (
    aiosqlite.Error,          # DB connection issue
    OSError,                  # file not found, disk error
    discord.HTTPException,    # Discord API rate limit, server error
    discord.ClientException,  # voice connection issue
    ValueError,               # parse error (time format, etc.)
)

async def tick(self) -> None:
    try:
        await self._drain_commands()
        await self._check_schedule()
    except _TICK_RECOVERABLE as e:
        logger.warning("Tick recoverable error: %s", e)
        # Loop tiếp tục — next tick sẽ retry
    # KeyError, TypeError, AttributeError KHÔNG bị catch
    # → những lỗi này là bug logic → muốn nó crash để phát hiện sớm

Tại sao không catch Exception toàn bộ?

# ❌ Nuốt bug logic
try:
    await self._drain_commands()
except Exception:
    pass  # AttributeError vì typo code cũng bị nuốt!

Narrow exception set → chỉ recover những gì mình hiểu và kiểm soát được. Bug thật (sai logic, typo) → crash loud → phát hiện sớm.


8. Graceful Shutdown

async def shutdown(self) -> None:
    """Tắt bot sạch sẽ — gọi từ on_close hoặc SIGTERM handler."""
    self._stop_requested = True    # signal tất cả loops dừng

    # Dừng audio đang phát
    if self._current_vc and self._current_vc.is_playing():
        self._current_vc.stop()

    # Chờ tất cả background tasks xong (timeout 8s)
    if self._pending_tasks:
        await asyncio.wait_for(
            asyncio.gather(*self._pending_tasks, return_exceptions=True),
            timeout=SHUTDOWN_TIMEOUT_SECONDS,
        )

    # Dừng tick loop
    self._task.cancel()

    # Cleanup DB state
    await self.state_repo.clear()
# bot/src/main.py — handle SIGTERM (Docker stop)
import signal

def _setup_signal_handlers(client, loop):
    def _handle_signal():
        loop.create_task(client.close())  # trigger on_close → scheduler.shutdown()

    try:
        # Unix: SIGTERM từ Docker, SIGINT từ Ctrl+C
        loop.add_signal_handler(signal.SIGTERM, _handle_signal)
        loop.add_signal_handler(signal.SIGINT, _handle_signal)
    except NotImplementedError:
        # Windows không support add_signal_handler
        signal.signal(signal.SIGTERM, lambda *_: loop.create_task(client.close()))

9. SQLite làm Message Queue — Polling Pattern

Admin API → INSERT bot_commands → SQLite
Bot tick (2s) → SELECT * → xử lý → DELETE

Concurrency gotcha: Pattern này có 2 process (admin API + bot) cùng đọc/ghi 1 file SQLite. Mặc định SQLite dùng rollback journal → writer block mọi reader, reader block writer → bot tick có thể delay không đoán được. Bắt buộc bật PRAGMA journal_mode = WAL cho cả 2 process — xem Bài 4 §3 "Config SQLite mỗi connection" để cấu hình qua SQLAlchemy event.

# bot/src/scheduler.py
async def _drain_commands(self) -> None:
    """Xử lý tất cả pending commands trong queue."""
    async with self._db_session() as session:
        commands = await self.commands_repo.list_pending(session)

    for cmd in commands:
        try:
            await self._execute_command(cmd)
        except _TICK_RECOVERABLE as e:
            logger.warning("Command %s failed: %s", cmd["command"], e)
        finally:
            # Xóa command dù thành công hay lỗi — không retry
            await self.commands_repo.delete(session, cmd["id"])

async def _execute_command(self, cmd: dict) -> None:
    match cmd["command"]:
        case "play":
            entry = await self.repo.get_by_id(session, cmd["entry_id"])
            await self._maybe_play_entry(entry)
        case "pause":
            if self._current_vc and self._current_vc.is_playing():
                self._current_vc.pause()
        case "resume":
            if self._current_vc and self._current_vc.is_paused():
                self._current_vc.resume()
        case "stop":
            self._stop_requested = True
            if self._current_vc:
                self._current_vc.stop()

10. Notifier — Fire and Forget

# bot/src/notifier.py

async def send_reminder(channel: discord.TextChannel, entry: dict) -> None:
    """Nhắc trước khi bắt đầu buổi tập."""
    await _send(channel, f"🧘 **{entry['name']}** sẽ bắt đầu sau {entry['remind']} phút...")

async def send_start(channel: discord.TextChannel, entry: dict, vc: discord.VoiceChannel) -> None:
    await _send(channel, f"🔔 **{entry['name']}** bắt đầu! Vào {vc.mention} để tham gia.")

async def send_end(channel: discord.TextChannel) -> None:
    await _send(channel, "✨ Buổi tập đã kết thúc. Hẹn gặp lại!")

async def _send(channel: discord.TextChannel, content: str) -> None:
    """Internal: gửi message, log lỗi nhưng không crash playback."""
    try:
        await channel.send(content)
    except discord.Forbidden:
        # Bot không có permission gửi tin ở channel này
        logger.warning("Không có quyền gửi tin vào %s", channel.name)
    except discord.HTTPException as e:
        logger.warning("Gửi tin thất bại: %s", e)
    # Không re-raise → playback tiếp tục dù notification lỗi

Pattern: Notification là "best-effort" — không được làm crash logic chính (phát nhạc). Lỗi gửi tin chỉ log.


Nguyên lý tổng quát

Pattern trong bài Nguyên lý (bài 0) Chuyển giao
@client.event async def on_ready IoC qua event hook — framework gọi khi event xảy ra Pattern chung cho event-driven: Node.js EventEmitter, Rust tokio::select!
Gán _current_entry_id SYNC trước await Cooperative scheduling — code sync không bị interrupt Áp dụng với mọi single-threaded event loop (JS, Python asyncio, Rust async)
_TICK_RECOVERABLE narrow exception Fail Loud với bug, Fail Soft với I/O Kỷ luật chung: catch cụ thể, không catch Exception
_pending_tasks strong reference POLS — tránh silent failure từ GC Mental model async: task không có reference = có thể bị GC hủy
SQLite làm command queue YAGNI — không cần Redis khi 1 bot + polling 2s đủ Nhưng biết khi nào pattern này gãy — xem bài 10 §8
notifier.py fire-and-forget SoC — notification không được block business logic Pattern chung: separate critical path và best-effort

Cooperative vs Preemptive Scheduling

Đây là mental model quan trọng nhất của bài này.

Preemptive (OS thread) Cooperative (async)
Ai chuyển context? OS kernel (bất kỳ lúc nào) Chính task (tại await)
Race condition Mọi lúc Chỉ tại điểm await
Cần lock? Hầu hết mutation shared state Chỉ khi mutation qua await
Lợi Không cần nghĩ về race Predictable, debug dễ hơn
Hại Phải lock khắp nơi Phải hiểu await là interruption point

"Gán SYNC trước await" (bài §4) = tận dụng cooperative nature. Trong preemptive model, pattern này không work — phải dùng mutex/atomic.

Chuyển giao: long-running async service

Pattern project này (tick loop + command queue + notifier) là template chung cho:

  • Job worker (Celery alternative) — poll queue, process, ack
  • IoT device controller — poll state, execute command
  • Game server loop — tick game state, broadcast updates
  • Trading bot — poll market, execute strategy

Cùng pattern: scheduled tick + command drain + critical section + graceful shutdown.

Khi nào async không phù hợp

Bài 10 §5 có chi tiết. Tóm tắt: CPU-bound task, team không quen, scale bằng process thay vì concurrency — 3 trường hợp sync code tốt hơn.


Bài tập áp dụng

Tạo bot đơn giản với scheduled greeting:

import discord
from discord.ext import tasks

client = discord.Client(intents=discord.Intents.default())
greeted_today = set()

@tasks.loop(minutes=1)
async def check_greeting():
    """Gửi greeting vào 9:00 sáng mỗi ngày."""
    from datetime import datetime
    now = datetime.now()

    if now.hour == 9 and now.minute == 0:
        today = now.date().isoformat()
        if today not in greeted_today:
            channel = client.get_channel(YOUR_CHANNEL_ID)
            if channel:
                try:
                    await channel.send("☀️ Chào buổi sáng!")
                except discord.HTTPException as e:
                    print(f"Gửi thất bại: {e}")
            greeted_today.add(today)

@client.event
async def on_ready():
    check_greeting.start()

client.run("TOKEN")

Câu hỏi tự kiểm tra: 1. Tại sao gán _current_entry_id synchronously (trước await) mới tránh được race condition? 2. asyncio.create_task() vs await — khi nào dùng cái nào? 3. Tại sao không catch KeyError, AttributeError trong tick loop? 4. Tại sao cần self._pending_tasks.add(task) thay vì chỉ asyncio.create_task()?