0

[Series Hệ thống chịu tải cao] Bài 4: "Cầu nối" dữ liệu – Đưa Data từ Kafka sang Elasticsearch trong 1 nốt nhạc

Chào anh em! Ở các bài trước, chúng ta đã biết Kafka ghi cực nhanh, còn ES thì tìm cực giỏi. Câu hỏi đặt ra là: Làm sao để dữ liệu quẹt thẻ từ Kafka nhảy sang được ES để sếp xem báo cáo ngay lập tức?

Nhiều anh em sẽ nghĩ ngay đến việc viết một con Worker (bằng Go hay Node.js), code logic: Đọc từ Kafka -> Transform dữ liệu -> Ghi vào ES.

  • Cách này ổn không? Ổn, nhưng cực. Bạn phải tự lo việc quản lý lỗi, quản lý con trỏ (Offset), rồi lỡ Worker chết thì sao?

Hôm nay, mình giới thiệu với anh em một "trợ thủ" đắc lực: Kafka Connect (Elasticsearch Sink Connector).

1. Kafka Connect là cái gì?

Hãy tưởng tượng Kafka Connect giống như một cái ổ cắm điện đa năng.

  1. Một đầu cắm vào Kafka.
  2. Đầu kia bạn muốn cắm vào đâu cũng được (MySQL, MongoDB, S3, và tất nhiên là cả Elasticsearch).

Bạn không cần viết một dòng code logic nào cả. Tất cả những gì bạn làm là khai báo một file cấu hình (JSON).

2. Tại sao nên dùng Connector thay vì tự viết code?

Trong một hệ thống chịu tải cao như Metro (AFC), sự ổn định là trên hết:

  • Tự động phục hồi: Nếu kết nối tới ES bị gián đoạn, Connector sẽ tự thử lại (Retry) cho đến khi thành công.
  • Mở rộng (Scaling): Bạn có thể chạy nhiều "Worker" cho Connector để đẩy dữ liệu nhanh hơn nếu lượng quẹt thẻ tăng đột biến.
  • Quản lý Offset: Nó tự ghi nhớ đã đẩy đến message nào rồi. Nếu hệ thống sập và khởi động lại, nó sẽ tiếp tục ngay tại chỗ đó, không thiếu một bản ghi nào.

3. "Thực chiến": Cấu hình Sink Connector

Để đẩy dữ liệu từ Topic METRO_TAP_EVENTS sang ES, anh em chỉ cần một đoạn config kiểu như thế này gửi lên Kafka Connect:

{
  "name": "elasticsearch-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "3",
    "topics": "METRO_TAP_EVENTS",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "_doc",
    "key.ignore": "false",
    "schema.ignore": "true"
  }
}
  • tasks.max: 3 -> Chạy 3 luồng song song cho máu.
  • topics -> Lấy dữ liệu từ cái "hồ" này.
  • connection.url -> Đổ dữ liệu vào "thủ thư" này.

4. Lưu ý về Transformation (SMT)

Đôi khi dữ liệu trong Kafka là dữ liệu thô (Raw), nhưng bạn muốn khi sang ES nó phải đẹp đẽ hơn (ví dụ: đổi định dạng ngày tháng, ẩn thông tin nhạy cảm của khách hàng). Kafka Connect hỗ trợ Single Message Transforms (SMT) – giúp bạn chỉnh sửa dữ liệu ngay trên "đường đi" mà không cần phải viết thêm một service trung gian nào.

Kinh nghiệm thực tế: Đừng để "Nghẽn" ở đầu ra

Một lỗi kinh điển mình từng gặp: Kafka xử lý được 1 triệu sự kiện/giây, nhưng Elasticsearch chỉ chịu nhiệt được 100k sự kiện/giây.

Hậu quả: Dữ liệu bị tồn đọng (Lag) trong Kafka. Sếp nhìn Dashboard cứ thấy dữ liệu của... 10 phút trước.

Giải pháp: Lúc này, chúng ta phải tận dụng cơ chế Bulk API của ES (gộp nhiều bản ghi rồi đẩy một lần) thay vì đẩy từng cái một. May mắn là Kafka Connect đã hỗ trợ sẵn cấu hình batch.size cho việc này.

Tạm kết

Việc kết hợp Kafka và Elasticsearch thông qua Connector giúp hệ thống của bạn cực kỳ linh hoạt và bền bỉ. Dữ liệu "chảy" một cách tự nhiên và an toàn qua các thành phần của hệ thống.

Bài cuối cùng (Bài 5), chúng ta sẽ nói về một chủ đề mà kỹ sư nào cũng lo lắng: High Availability (HA). Làm sao để cả cụm Kafka và ES vẫn sống khỏe ngay cả khi có vài con server đột ngột... bốc khói?


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí