0

Nghe DB Thay Vì Hỏi: Tại Sao CDC Là "Chân Ái" Hơn Event Sourcing Trong Hệ Thống Thực Tế?

Trong các hệ thống lớn, việc giữ cho dữ liệu giữa Database, Search Index (ElasticSearch) và Cache (Redis) luôn đồng bộ là một bài toán cực kỳ đau đầu.

Ngày xưa, chúng ta hay dùng cách "Polling": Cứ 5 giây lại chạy một câu query SELECT để xem có gì mới không. Cách này vừa chậm, vừa gây tải cho DB, lại dễ sót dữ liệu. Rồi người ta nhắc đến Event Sourcing (ES) như một liều thuốc tiên. Nhưng sau vài tháng "vã" với ES, nhiều anh em nhận ra: "Nó phức tạp quá mức cần thiết!".

Hôm nay, mình sẽ cùng anh em mổ xẻ tại sao Change Data Capture (CDC) mới là thứ chúng ta thực sự cần trong 80% trường hợp.

1. Event Sourcing: "Chơi với dao hai lưỡi"

Về lý thuyết, Event Sourcing rất đẹp: Bạn không lưu trạng thái hiện tại, bạn lưu tất cả các sự kiện đã xảy ra.

Ưu điểm: Có thể rebuild lại trạng thái tại bất kỳ thời điểm nào trong quá khứ. Audit trail cực chuẩn.

Nhược điểm: Bạn phải thay đổi hoàn toàn cách viết code. Logic nghiệp vụ bị phân mảnh. Việc xử lý Versioning cho Event (Schema evolution) là một cơn ác mộng.

2. CDC: "Lắng nghe hơi thở từ Transaction Log"

CDC không bắt bạn thay đổi code ứng dụng. Nó "nằm vùng" ngay tại Database (thường là qua Binlog của MySQL hoặc WAL của Postgres). Mỗi khi có một lệnh INSERT, UPDATE, DELETE thành công, CDC sẽ "nghe" thấy và đẩy vào một Message Queue (như Kafka).

Tại sao CDC (như Debezium) lại ít đau đầu hơn?

Non-invasive: Code Backend của bạn cứ viết như bình thường, lưu DB như bình thường.

Guaranteed Delivery: Vì nó đọc từ Transaction Log, nếu DB commit thành công, CDC chắc chắn sẽ bắt được.

Rebuild State dễ dàng: Bạn muốn sync lại 1 triệu bản ghi cũ sang ElasticSearch? Chỉ cần bảo CDC "đọc lại từ đầu log" là xong.

3. Case Study: Pipeline đồng bộ Search Index

Hãy tưởng tượng bạn có một file CSV chứa thông tin sản phẩm. Bạn cần:

Import vào SQL Database.

Ngay lập tức đồng bộ sang ElasticSearch để User tìm kiếm.

Nếu đồng bộ lỗi, phải có cơ chế retry hoặc rollback index.

Dưới đây là sơ đồ trình tự (Sequence Diagram) của luồng hoạt động này

4. So sánh nhanh

Đặc điểm Event Sourcing Change Data Capture (CDC)
Độ phức tạp code Rất cao (CQRS, Aggregates) Thấp (Giữ nguyên CRUD)
Tính trễ (Latency) Cực thấp (Real-time) Thấp (Gần như Real-time)
Audit Trail Tự nhiên, chi tiết hành vi Theo vết thay đổi dữ liệu
Khả năng áp dụng Hệ thống mới (Greenfield) Cả cũ lẫn mới (Legacy ok)

5. Demo Pipeline (Conceptual Code)

Mình sẽ minh họa một script mô phỏng luồng dữ liệu từ CSV -> DB -> CDC Event -> Search Index.

import time
import json
import uuid

# Giả lập Database (Source of Truth)
class MockDatabase:
    def __init__(self):
        self.data = {}
        self.transaction_log = [] # Đây chính là Binlog/WAL

    def commit_transaction(self, action, record_id, payload):
        # Lưu vào data chính
        if action == "INSERT" or action == "UPDATE":
            self.data[record_id] = payload
        elif action == "DELETE":
            self.data.pop(record_id, None)
        
        # Ghi vào Log (CDC sẽ đọc từ đây)
        event = {
            "lsn": len(self.transaction_log) + 1, # Log Sequence Number
            "action": action,
            "id": record_id,
            "data": payload,
            "ts": time.time()
        }
        self.transaction_log.append(event)
        return event

# Giả lập ElasticSearch (Search Index)
class MockElasticSearch:
    def __init__(self):
        self.index = {}

    def sync(self, event):
        print(f"[ES Sync] Đang xử lý event LSN: {event['lsn']}...")
        if event['action'] in ["INSERT", "UPDATE"]:
            self.index[event['id']] = event['data']
        elif event['action'] == "DELETE":
            self.index.pop(event['id'], None)
        print(f"[ES Sync] Thành công. Trạng thái Index hiện tại: {len(self.index)} items.")

# Giả lập Debezium/Kafka (CDC Connector)
class MockCDCConnector:
    def __init__(self, db, target):
        self.db = db
        self.target = target
        self.last_processed_lsn = 0

    def poll_and_dispatch(self):
        # Quét log chưa xử lý
        new_events = [e for e in self.db.transaction_log if e['lsn'] > self.last_processed_lsn]
        for event in new_events:
            self.target.sync(event)
            self.last_processed_lsn = event['lsn']

# --- LUỒNG CHẠY THỬ ---

db = MockDatabase()
es = MockElasticSearch()
cdc = MockCDCConnector(db, es)

print("--- BƯỚC 1: IMPORT CSV VÀO DB ---")
csv_rows = [
    {"id": "p1", "name": "iPhone 15", "price": 1000},
    {"id": "p2", "name": "Samsung S24", "price": 900}
]

for row in csv_rows:
    db.commit_transaction("INSERT", row['id'], row)

print("\n--- BƯỚC 2: CDC PHÁT HIỆN THAY ĐỔI VÀ SYNC SANG ES ---")
cdc.poll_and_dispatch()

print("\n--- BƯỚC 3: CẬP NHẬT GIÁ TRONG DB ---")
db.commit_transaction("UPDATE", "p1", {"id": "p1", "name": "iPhone 15", "price": 950})

print("\n--- BƯỚC 4: CDC TIẾP TỤC SYNC LẦN NỮA ---")
cdc.poll_and_dispatch()

print("\n--- KẾT QUẢ CUỐI CÙNG TRÊN ES ---")
print(json.dumps(es.index, indent=2))

Kết luận

Đừng cố gắng trở thành "purist" với Event Sourcing nếu đội ngũ của bạn chưa sẵn sàng cho độ phức tạp của nó. CDC với những công cụ như Debezium và Kafka mang lại sự cân bằng hoàn hảo:

Bạn vẫn có một Audit Trail xịn sò từ DB.

Bạn có khả năng rebuild state bất cứ lúc nào.

Và quan trọng nhất: Bạn không phải đập đi xây lại toàn bộ code CRUD quen thuộc.

Anh em đã từng "vã" với cái nào trong hai cái này chưa? Cùng thảo luận ở phần comment nhé!


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí