Bài 7: discord.py + Async Patterns trong Bot¶
Project reference:
bot/src/scheduler.py,bot/src/main.py,bot/src/notifier.py
1. discord.py là gì¶
discord.py là thư viện async dành cho Discord API. Thư viện này đóng gói cả WebSocket connection lẫn HTTP API của Discord thành các coroutines Python, cho phép bot tương tác với Discord mà không cần tự xử lý tầng mạng phía dưới.
# bot/src/main.py
import discord
# Intents = khai báo events bạn muốn nhận
intents = discord.Intents.default()
intents.voice_states = True # cần để biết user join/leave voice channel
client = discord.Client(intents=intents)
@client.event
async def on_ready():
"""Chạy khi bot connect thành công vào Discord."""
print(f"Bot đã online: {client.user}")
client.run("TOKEN")
2. Event-Driven Architecture¶
discord.py dùng event-driven model — bot lắng nghe events từ Discord:
# bot/src/main.py
@client.event
async def on_ready():
"""Bot vừa online."""
for guild in client.guilds:
await sync_guild_channels(guild) # sync channel cache vào SQLite
# Khởi động scheduler loop
client.scheduler = Scheduler(client, ...)
client.scheduler.start()
@client.event
async def on_guild_join(guild: discord.Guild):
"""Bot được thêm vào server mới."""
await sync_guild_channels(guild)
# Nếu cần slash commands (v2):
@client.event
async def on_interaction(interaction: discord.Interaction):
...
Events là async def — chạy trong event loop của discord.py.
3. discord.ext.tasks — Scheduled Loop¶
discord.ext.tasks cung cấp decorator @tasks.loop() để tạo background loop:
# bot/src/scheduler.py
from discord.ext import tasks
class Scheduler:
def __init__(self, client, ...):
# Tạo task loop từ method
self._task = tasks.loop(seconds=2)(self.tick)
self._task.before_loop(self._before_loop) # chạy 1 lần trước loop
def start(self) -> None:
if self._task.is_running():
return
self._task.start()
async def _before_loop(self) -> None:
"""Chạy 1 lần trước vòng lặp đầu tiên."""
await self.client.wait_until_ready() # chờ bot fully connected
await self.state_repo.clear() # reset stale DB state từ crash trước
async def tick(self) -> None:
"""Chạy mỗi 2 giây."""
try:
await self._drain_commands() # xử lý manual commands
await self._check_schedule() # trigger auto-play nếu đến giờ
except _TICK_RECOVERABLE as e:
logger.warning("Tick error (recoverable): %s", e)
# Không re-raise → loop tiếp tục
# discord.ext.tasks tự restart loop nếu exception thoát ra
Tại sao không asyncio.sleep() trong loop?
# ❌ Sai
async def my_loop():
while True:
await do_something()
await asyncio.sleep(2) # block event loop 2s — không handle events khác
# ✅ Đúng — discord.ext.tasks quản lý timing, không block
@tasks.loop(seconds=2)
async def my_loop():
await do_something()
discord.ext.tasks tích hợp với event loop — không block Discord events trong 2s nghỉ.
4. Async Mutex — Race Condition trong Bot¶
Bot có thể nhận lệnh manual (qua bot_commands queue) đồng thời với auto-trigger từ schedule. Cần đảm bảo chỉ 1 session phát nhạc tại một thời điểm.
Hãy xem tình huống race condition trông như thế nào:
# ❌ Race condition tiềm ẩn
async def check_and_trigger():
if not currently_playing(): # check ở đây
# ... 50ms sau, auto-trigger khác cũng check và cùng thấy "not playing"
await start_playback() # 2 playback cùng start!
Cách giải quyết: luôn ghi nhận mutex trước await đầu tiên
# bot/src/scheduler.py
class Scheduler:
_current_entry_id: int | None = None # in-memory mutex
async def _maybe_start(self, entry: dict) -> None:
"""Trigger auto-play nếu đến giờ và không đang phát gì."""
if self._current_entry_id is not None:
return # mutex locked — đang phát entry khác
# Claim mutex NGAY LẬP TỨC (synchronous) trước bất kỳ await nào
self._current_entry_id = entry["id"]
# Bây giờ mới tạo async task
self._spawn_playback(entry)
def _spawn_playback(self, entry: dict) -> None:
"""Tạo background task với strong reference."""
task = asyncio.create_task(self._start_playback(entry))
# QUAN TRỌNG: asyncio chỉ giữ weak reference đến task
# → GC có thể hủy task giữa chừng!
self._pending_tasks.add(task) # strong reference
task.add_done_callback(self._pending_tasks.discard) # tự cleanup khi xong
async def _start_playback(self, entry: dict) -> None:
try:
await self._execute_playback(entry)
finally:
self._current_entry_id = None # release mutex
Tại sao gán SYNC (không phải async)?
Python event loop là single-threaded cooperative multitasking. Code chạy đồng bộ (không có await) không bao giờ bị interrupt. Khi gán self._current_entry_id = entry["id"], không có context switch nào xảy ra → không có race condition.
Khi code gặp await, event loop có thể chuyển sang coroutine khác → đây là điểm có thể race.
5. asyncio.create_task() vs await¶
# await — chờ xong mới tiếp tục (tick bị block)
await self._start_playback(entry) # ❌ tick loop đứng chờ 60 phút!
# create_task — spawn background coroutine, tiếp tục ngay
asyncio.create_task(self._start_playback(entry)) # ✅ tick tiếp tục mỗi 2s
create_task schedule coroutine để chạy "concurrently" trong event loop — không block caller.
6. Voice Client — Phát Audio¶
async def _acquire_voice_client(self, channel: discord.VoiceChannel) -> discord.VoiceClient:
"""Join voice channel, disconnect nếu đang ở channel khác."""
existing_vc = channel.guild.voice_client
if existing_vc is not None:
if existing_vc.channel.id == channel.id:
return existing_vc # đã ở đúng channel rồi
await existing_vc.disconnect(force=True) # disconnect khỏi channel cũ
return await channel.connect() # join channel mới
async def _run_playlist(self, vc: discord.VoiceClient, files: list[Path], stop_pred) -> None:
"""Phát lần lượt các file audio."""
for filepath in files:
if stop_pred(): # check _stop_requested
break
# FFmpegPCMAudio: đọc file → FFmpeg decode → PCM → Discord voice
source = discord.FFmpegPCMAudio(str(filepath))
vc.play(source)
# Chờ file xong
while vc.is_playing() and not stop_pred():
await asyncio.sleep(0.5)
if stop_pred():
vc.stop()
break
# Gap giữa các track
await asyncio.sleep(TRACK_GAP_SECONDS)
7. Error Handling trong Async — Narrow Exception¶
# bot/src/scheduler.py
# Định nghĩa rõ exception nào là "recoverable" (log + tiếp tục)
_TICK_RECOVERABLE = (
aiosqlite.Error, # DB connection issue
OSError, # file not found, disk error
discord.HTTPException, # Discord API rate limit, server error
discord.ClientException, # voice connection issue
ValueError, # parse error (time format, etc.)
)
async def tick(self) -> None:
try:
await self._drain_commands()
await self._check_schedule()
except _TICK_RECOVERABLE as e:
logger.warning("Tick recoverable error: %s", e)
# Loop tiếp tục — next tick sẽ retry
# KeyError, TypeError, AttributeError KHÔNG bị catch
# → những lỗi này là bug logic → muốn nó crash để phát hiện sớm
Tại sao không catch Exception toàn bộ?
# ❌ Nuốt bug logic
try:
await self._drain_commands()
except Exception:
pass # AttributeError vì typo code cũng bị nuốt!
Narrow exception set → chỉ recover những gì mình hiểu và kiểm soát được. Bug thật (sai logic, typo) → crash loud → phát hiện sớm.
8. Graceful Shutdown¶
async def shutdown(self) -> None:
"""Tắt bot sạch sẽ — gọi từ on_close hoặc SIGTERM handler."""
self._stop_requested = True # signal tất cả loops dừng
# Dừng audio đang phát
if self._current_vc and self._current_vc.is_playing():
self._current_vc.stop()
# Chờ tất cả background tasks xong (timeout 8s)
if self._pending_tasks:
await asyncio.wait_for(
asyncio.gather(*self._pending_tasks, return_exceptions=True),
timeout=SHUTDOWN_TIMEOUT_SECONDS,
)
# Dừng tick loop
self._task.cancel()
# Cleanup DB state
await self.state_repo.clear()
# bot/src/main.py — handle SIGTERM (Docker stop)
import signal
def _setup_signal_handlers(client, loop):
def _handle_signal():
loop.create_task(client.close()) # trigger on_close → scheduler.shutdown()
try:
# Unix: SIGTERM từ Docker, SIGINT từ Ctrl+C
loop.add_signal_handler(signal.SIGTERM, _handle_signal)
loop.add_signal_handler(signal.SIGINT, _handle_signal)
except NotImplementedError:
# Windows không support add_signal_handler
signal.signal(signal.SIGTERM, lambda *_: loop.create_task(client.close()))
9. SQLite làm Message Queue — Polling Pattern¶
⚠ Concurrency gotcha: Pattern này có 2 process (admin API + bot) cùng đọc/ghi 1 file SQLite. Mặc định SQLite dùng
rollback journal→ writer block mọi reader, reader block writer → bot tick có thể delay không đoán được. Bắt buộc bậtPRAGMA journal_mode = WALcho cả 2 process — xem Bài 4 §3 "Config SQLite mỗi connection" để cấu hình qua SQLAlchemy event.
# bot/src/scheduler.py
async def _drain_commands(self) -> None:
"""Xử lý tất cả pending commands trong queue."""
async with self._db_session() as session:
commands = await self.commands_repo.list_pending(session)
for cmd in commands:
try:
await self._execute_command(cmd)
except _TICK_RECOVERABLE as e:
logger.warning("Command %s failed: %s", cmd["command"], e)
finally:
# Xóa command dù thành công hay lỗi — không retry
await self.commands_repo.delete(session, cmd["id"])
async def _execute_command(self, cmd: dict) -> None:
match cmd["command"]:
case "play":
entry = await self.repo.get_by_id(session, cmd["entry_id"])
await self._maybe_play_entry(entry)
case "pause":
if self._current_vc and self._current_vc.is_playing():
self._current_vc.pause()
case "resume":
if self._current_vc and self._current_vc.is_paused():
self._current_vc.resume()
case "stop":
self._stop_requested = True
if self._current_vc:
self._current_vc.stop()
10. Notifier — Fire and Forget¶
# bot/src/notifier.py
async def send_reminder(channel: discord.TextChannel, entry: dict) -> None:
"""Nhắc trước khi bắt đầu buổi tập."""
await _send(channel, f"🧘 **{entry['name']}** sẽ bắt đầu sau {entry['remind']} phút...")
async def send_start(channel: discord.TextChannel, entry: dict, vc: discord.VoiceChannel) -> None:
await _send(channel, f"🔔 **{entry['name']}** bắt đầu! Vào {vc.mention} để tham gia.")
async def send_end(channel: discord.TextChannel) -> None:
await _send(channel, "✨ Buổi tập đã kết thúc. Hẹn gặp lại!")
async def _send(channel: discord.TextChannel, content: str) -> None:
"""Internal: gửi message, log lỗi nhưng không crash playback."""
try:
await channel.send(content)
except discord.Forbidden:
# Bot không có permission gửi tin ở channel này
logger.warning("Không có quyền gửi tin vào %s", channel.name)
except discord.HTTPException as e:
logger.warning("Gửi tin thất bại: %s", e)
# Không re-raise → playback tiếp tục dù notification lỗi
Pattern: Notification là "best-effort" — không được làm crash logic chính (phát nhạc). Lỗi gửi tin chỉ log.
Nguyên lý tổng quát¶
| Pattern trong bài | Nguyên lý (bài 0) | Chuyển giao |
|---|---|---|
@client.event async def on_ready |
IoC qua event hook — framework gọi khi event xảy ra | Pattern chung cho event-driven: Node.js EventEmitter, Rust tokio::select! |
Gán _current_entry_id SYNC trước await |
Cooperative scheduling — code sync không bị interrupt | Áp dụng với mọi single-threaded event loop (JS, Python asyncio, Rust async) |
_TICK_RECOVERABLE narrow exception |
Fail Loud với bug, Fail Soft với I/O | Kỷ luật chung: catch cụ thể, không catch Exception |
_pending_tasks strong reference |
POLS — tránh silent failure từ GC | Mental model async: task không có reference = có thể bị GC hủy |
| SQLite làm command queue | YAGNI — không cần Redis khi 1 bot + polling 2s đủ | Nhưng biết khi nào pattern này gãy — xem bài 10 §8 |
notifier.py fire-and-forget |
SoC — notification không được block business logic | Pattern chung: separate critical path và best-effort |
Cooperative vs Preemptive Scheduling¶
Đây là mental model quan trọng nhất của bài này.
| Preemptive (OS thread) | Cooperative (async) | |
|---|---|---|
| Ai chuyển context? | OS kernel (bất kỳ lúc nào) | Chính task (tại await) |
| Race condition | Mọi lúc | Chỉ tại điểm await |
| Cần lock? | Hầu hết mutation shared state | Chỉ khi mutation qua await |
| Lợi | Không cần nghĩ về race | Predictable, debug dễ hơn |
| Hại | Phải lock khắp nơi | Phải hiểu await là interruption point |
"Gán SYNC trước await" (bài §4) = tận dụng cooperative nature. Trong preemptive model, pattern này không work — phải dùng mutex/atomic.
Chuyển giao: long-running async service¶
Pattern project này (tick loop + command queue + notifier) là template chung cho:
- Job worker (Celery alternative) — poll queue, process, ack
- IoT device controller — poll state, execute command
- Game server loop — tick game state, broadcast updates
- Trading bot — poll market, execute strategy
Cùng pattern: scheduled tick + command drain + critical section + graceful shutdown.
Khi nào async không phù hợp¶
Bài 10 §5 có chi tiết. Tóm tắt: CPU-bound task, team không quen, scale bằng process thay vì concurrency — 3 trường hợp sync code tốt hơn.
Bài tập áp dụng¶
Tạo bot đơn giản với scheduled greeting:
import discord
from discord.ext import tasks
client = discord.Client(intents=discord.Intents.default())
greeted_today = set()
@tasks.loop(minutes=1)
async def check_greeting():
"""Gửi greeting vào 9:00 sáng mỗi ngày."""
from datetime import datetime
now = datetime.now()
if now.hour == 9 and now.minute == 0:
today = now.date().isoformat()
if today not in greeted_today:
channel = client.get_channel(YOUR_CHANNEL_ID)
if channel:
try:
await channel.send("☀️ Chào buổi sáng!")
except discord.HTTPException as e:
print(f"Gửi thất bại: {e}")
greeted_today.add(today)
@client.event
async def on_ready():
check_greeting.start()
client.run("TOKEN")
Câu hỏi tự kiểm tra:
1. Tại sao gán _current_entry_id synchronously (trước await) mới tránh được race condition?
2. asyncio.create_task() vs await — khi nào dùng cái nào?
3. Tại sao không catch KeyError, AttributeError trong tick loop?
4. Tại sao cần self._pending_tasks.add(task) thay vì chỉ asyncio.create_task()?