+10

Kafka Connect là gì? Mình đã dùng Kafka Connect để đồng bộ dữ liệu như thế nào? (Phần 1)

Xin chào anh đã quay trở lại, nếu là lần đầu thì anh em có thể xem từ đầu Series của mình 👉️ tại đây

Link playlist Youtube về Kafka của mình tại đây 👉️👉️👉️👉️ Link

Tiếp tục hôm nay mình sẽ giới thiệu 1 thành phần trong Apache Kafka mà có thể ít ae biết tới và sử dụng nó là cái vẹo gì thì anh đọc tới hết bài nhé.

Let's go

Như tiêu đề của bài viết thì không gì khác chính là thằng Apache Kafka Connect

Đôi lời giới thiệu về bản thân, à nhầm Kafka Connect

  1. Là một công cụ để sử dụng cho việc đồng bộ dữ liệu giữa các Database như: MongoDB, MySQL, SQLServer... ngoài ra cũng có thể dùng cho HDFS và cũng có thể để "ghi" data xuống Elastichsearch .

  2. Kafka Connect nó cũng có các khả năng giống như Apache kafka

    • Dễ dàng mở rộng (High scalable)
    • Độ tin cậy cao (High reliable)
    • Dữ liệu truyền tải có độ tin cậy cao (High durable)
    • Thông lượng cao (High performance)
    • Chịu lỗi cao (High fault tolerance)

Tiếp theo ae cùng mình đi vào chi tiết kiến trúc và các thành phần

Tổng quan kiến trúc của Kafka Connect

image.png

Quay sang trái, à nhầm 😄 từ phải sang trái của bức hình đầu tiên thì mình cũng đã nhắc ở trên rồi Datasource 1 đầu để đọc lên và 1 đầu để ghi xuống gọi là SourceSink .

Để ae dễ hiểu thì mình sẽ giải thích lần lượt như sau;

Concept tất cả các thành phần của thằng Kafka connect nó bao gồm:

  1. Connector
  2. Convertor
  3. Task
  4. Workers
  5. Transforms
  6. Dead letter queue

1. Connector

Thành phần này 😂 (nghe có vẻ hơi nhạy cảm nhỉ nhưng mà thôi) có nhiệm vụ là Connect tới Datasource đọc dữ liệu từ Datasource như MySQL là binlog và MongoDB là Oplog ....

** Lưu ý: ** là Connector và Convert không nằm bên trong hay được hỗ trợ bởi thằng Kafka hay KafkaConnect mà nó được viết dưới dạng Plugin (có thể bằng ngôn ngữ Java) build dưới dạng jar file và khi start Kafka Connect ae cần cấu hình đúng Path nới chưa các file Jar lib.


Một số tham số cấu hình 1 Connector submit tới kafka Connect để làm những công việc nêu trên sẽ như sau :

{
  "name": "******",
  "config": {
    "tasks.max":"3",
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "connection.uri": "mongodb://192.168.20.121:27017,192.168.20.122:27017,192.168.20.123:27017",
    "database": "name",
    "collection": "answer",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false,
    "publish.full.document.only":true,
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "topic.creation.enable":"true",
    "topic.creation.default.replication.factor":"3",
    "topic.creation.default.partitions":3,
    "copy.existing":true,
    "poll.max.batch.size":"1000",
    "poll.await.time.ms":"5000",
    "batch.size":0,
    "change.stream.full.document":"whenAvailable"
  }
}

2. Converter

  • Ông này là 1 thành phần quan trọng trong việc xử lý dữ liệu trong hệ thống Kafka Connect hỗ trợ convert dữ liệu (message event) ở định dạng cụ thể, khi ghi hoặc đọc dữ liệu .

  • Task (mình sẽ giải thích ở phía dưới ) sử dụng Convertor để chuyển đổi dữ liệu giữa các thành phần trong hệ thống

  • Các định dạng Convertor có thể được hỗ trợ giữa Kafka và Kafka Connect bao gồm:

    • StringConverter
    • JsonConverter
    • JsonSchemaConverter
    • ByteArrayConverter
    • ProtobufConverter

Để dễ hiểu thì ae nhìn hình minh họa bên dưới như sau:

image.png

👉️ Khi một Connector được tạo ra nó sẽ điều phối các Task xử lý dữ liệu

3. Task

ông này là nhân tố chính xử lý dữ liệu giữa Kafkacác Datasource Sink hình ảnh minh họa như sau:

image.png

Về chỗ Distributed mình sẽ giải thích ở bên dưới nhé ae 😀.

Rebalancing các Task

  • Khi lần đầu tiên connector được khởi tạo và kết nối tới Cluster thì các Worker (mình sẽ giải thích ngay bên dưới) sẽ Rebalancing 100% các Connect và Task của connect đến các Worker có cùng 1 số lượng Task xử lý.

(Các Task sẽ được phân phối đều nhau cho các Worker).

  • Khi các Worker Fail over (có 1 task nào đó xử lý convert data bị exception xảy ra .... ) thì các Task cũng rebalancing trên các Worker
  • Khi các Task fail over sẽ ko tự động start lại mà phải restart lại thủ công (việc này có thể thực thi thông qua call API tới Kafa Connect)

Ví dụ:

Lấy trạng thái và restart Task như sau

image.png

4. Worker

Tiếp theo là tới Worker

  • Connector và Task là các đơn vị xử lý thì thằng Worker chính là Scheduled thực thi

  • Có 2 mode Worker

    • Standalone chế độ cơ bản chỉ có 1 Kafka Connect Server duy nhất đảm nhiệm và thực thi tất cả các Connectors và Task tham số được truyền theo paramameter khi chạy,

      các bản ghi đọc từ Datasource sẽ được lưu trữ trên file system (Chính vì như vậy mà dữ liệu luôn có tính tin cậy cao) trước khi ghi xuống các Datasource khác .

    • Distributed mình đã nhắc ở phía trên là chế độ cung cấp khả năng có thể mở rộngchịu lỗi các Server Kafka Connect run trên cùng 1 cụm Kafka Server và có chung 1 Group id sẽ tạo lên 1 cụm cluster

      Giao tiếp hay tương tác với các Kafka connect thông qua API. Xử lý: khi đọc log dữ liệu từ các Datasource và được converter thì dữ liệu sẽ được đưa lên kafka Topic theo tên đã được định nghĩa trong tham số của connector.

5. Transform

  • Trong quá trình đồng bộ giữa 2 Datasource có trong TH ae muốn bỏ bớt 1 vài trường hoặc chuyển đổi kiểu dữ liệu thì có thể sử dụng Transform Transform được sử dụng khi Sink xuống Datasource (DB đầu ghi xuống dữ liệu)

    Ví dụ: với hàm Cast với trường IDScore

    khi tạo connector ae set thêm tham số như sau:

      "transforms": "Cast",
      "transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
      "transforms.Cast.spec": "ID:string,score:float64"
    

    với đầu vào là :

    {"ID": 46920,"score": 4761}

    thì đầu ra sẽ như sau:

    {"ID": "46290","score": 4761.0}


### 6. Dead Letter Queue.

Cuối cùng là Dead Letter Queue

  • Khi mà 1 record không hợp lệ xảy ra với nhiều lý do khác nhau

    Ví dụ: 1 record Source lên với định dạng JSON nhưng khi Sink lại cấu hình ở định dạng Khác JSON

  • Mặc dù có lỗi xảy ra nhưng Connector vẫn tiếp tục xử lý và gửi record lỗi tới Dead Letter Queue (TH mà trong config connector ae có define Dead Letter Queue )

  • Tiếp theo là khi 1 message tới DLT Queue sẽ có thông tin header tại đây sẽ có thông tin lý do lỗi tại sao (cũng oke phết nhỉ ).

    Ví dụ: thông số config :

      errors.tolerance: all 
      errors.deadletterqueue.topic.name: <topic-name>
    

Lời kết :

Cảm ơn ae đã đọc và theo dõi và đọc bài viết của mình, hy vọng nhận được đóng góp ý kiến và ủng hộ nhiều hơn để mình có động lực ra các bài viết tiếp theo nhé ae. 👋👋👋👋👋.

Mời ae cùng đón chờ phần tiếp theo của bài viết nhé!!


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí