+3

[Debezium Series] Cấu hình sử dụng Debezium + PostgreSQL + Kafka Connect

Giới thiệu

Tiếp nối series Debezium cơ bản hôm nay cùng mình tìm hiểu về Debezium + PostgreSQL + Kafka Connect nhé.

Debezium + Kafka Connect

như trong phần 1 + 2 mình đã giới thiệu qua về Debezium, khi kết hợp cùng kafka nó đóng vai trò như một connector để gửi records vào Kafka và các sink connectors để truyền records từ các topic Kafka đến các hệ thống khác. Kafka Connect đóng vai trò phát hiện ra các thay đổi và push event vào trong Kafka.

image.png

Setup Debezium + Kafka Connect

Trong bài viết này, mình sẽ dùng Postgresql để làm demo, nên cài đặt Postgresql là tất nhiên rồi. Ngoài ra chúng ta sẽ cần cài thêm những cái như sau

  1. Debezium UI: giao diện web giúp bạn cấu hình và quản lý các connector(thằng này sẽ không có authen đâu nha, nếu muốn thì bạn setup thêm 1 em NGINX ở ngoài)
  2. Debezium Connect: Nó giúp ae theo dõi và bắt được những thay đổi diễn ra ở phía database. Debezium Connect sẽ đọc transaction log từ source database và phát hiện những thay đổi và gửi đến những streaming service như Kafka, AWS Kinesis...
  3. Kafka: một nền tảng streaming phân tán, chủ yếu được áp dụng làm hệ thống phân tán, “vận chuyển” message và thu thập, xử lý, lưu trữ và phân tích dữ liệu.
  4. Zookeeper: một dịch vụ phối hợp nguồn mở, phân tán dành cho các ứng dụng phân tán, tất nhiên nó thường đi kèm với kafka.

Để nhanh nhất có thể nên mình sẽ cung cấp docker file để ae có thể đem về làm demo cho các síp nhanh nhất có thể luôn nha. (mình dùng bản 1.7.2, bản này cũng khá cũ rồi, ae nhớ nâng cấp lên nha, tui lấy luôn docker file từ hồi demo cho síp nên nó hơi cũ)

version: '3'
services:
  zookeeper:
    container_name: zookeeper
    image: quay.io/debezium/zookeeper:1.7.2.Final
    networks:
      - ui-network
    ports:
      - "2181:2181"
  kafka:
    container_name: kafka
    image: quay.io/debezium/kafka:1.7.2.Final
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
    networks:
      - ui-network
  db-pg:
    container_name: db-pg
    image: quay.io/debezium/example-postgres:1.7.2.Final
    ports:
      - "65432:5432"
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
    networks:
      - ui-network
  connect:
    container_name: connect
    image: quay.io/debezium/connect:nightly
    ports:
      - "8083:8083"
    depends_on:
      - kafka
      - db-pg
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
      - ENABLE_DEBEZIUM_KC_REST_EXTENSION=true
      - ENABLE_DEBEZIUM_SCRIPTING=true
      - CONNECT_REST_EXTENSION_CLASSES=io.debezium.kcrestextension.DebeziumConnectRestExtension
    networks:
      - ui-network
  debezium-ui:
    container_name: debezium-ui
    image: quay.io/debezium/debezium-ui:1.7.2.Final
    ports:
      - "8080:8080"
    environment:
      - KAFKA_CONNECT_URIS=http://connect:8083
    depends_on:
      - connect
    networks:
      - ui-network
networks:
  ui-network:
    external: false

Và ae sẽ lên trình duyệt gõ localhost:8080 sẽ nhận được 1 trong 2 hình cái gì đó như này. image.png image.png

Nếu như bị giống hình 1 thì đừng lo lắng quá, vô restart con connect lại là được nha (vì khi start có thể nó start trước con kafka nên bị như vậy thôi)

Tiếp theo ae có thể nhấn vô "Create a connector" vào mò mẫn trong đó, vô vàn thứ hay ho chờ ae trong đó nha.

Mình sẽ cung cấp cho ae 1 connector luôn để tiết kiệm thời gian nha.

POST http://localhost:8080/api/connector/1/postgres
{
    "name": "test.31",
    "config": {
        "topic.prefix": "prefix",
        "database.hostname": "db-pg",
        "database.port":"5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname": "postgres",
        "schema.include.list": "inventory",
        "table.include.list": "inventory.orders ",
        "heartbeat.interval.ms": 30000,
        "heartbeat.action.query": "select * from inventory.orders where id = 10001",
        "query.fetch.size": 500,
        "topic.creation.groups": "debezium-etl",
        "topic.creation.debezium-etl.include": "",
        "topic.creation.debezium-etl.exclude": "",
        "topic.creation.default.partitions": -1,
        "topic.creation.default.replication.factor": -1,
        "plugin.name": "pgoutput",
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "skipped.operations": "r",
        "snapshot.mode": "never",
        "slot.name": "debezium_order",
        "publication.autocreate.mode": "filtered",
        "publication.name": "dbz_order_publication"
    }
}

Nếu bạn chưa hiểu những ý nghĩa config này, hãy đọc lại P2 tại đây

Ngoài ra config publication sẽ là những cái bạn cần đọc chi tiết hơn, nó khá quan trọng trong vận hành connector đó (xem thêm)

Debezium cung cấp 3 loại publication (config publication.autocreate.mode)

  1. all_tables - Nếu một publication tồn tại, connector sẽ sử dụng nó. Nếu không có publication nào tồn tại, connector sẽ tạo ra một publication cho tất cả các bảng trong cơ sở dữ liệu mà connector đang theo dõi sự thay đổi. Để connector tạo ra một publication, nó phải truy cập cơ sở dữ liệu thông qua một tài khoản người dùng cơ sở dữ liệu có quyền tạo publications và thực hiện các hoạt động sao chép. Bạn cấp quyền cần thiết bằng cách sử dụng lệnh SQL sau: CREATE PUBLICATION <publication_name> FOR ALL TABLES;.
  2. disabled - Connector không cố gắng tạo ra một publication. Một quản trị viên cơ sở dữ liệu hoặc người dùng được cấu hình để thực hiện các hoạt động sao chép phải đã tạo ra publication trước khi chạy connector. Nếu connector không thể tìm thấy publication, connector sẽ ném ra một ngoại lệ và dừng lại.
  3. filtered - Nếu một publication tồn tại, connector sẽ sử dụng nó. Nếu không có publication nào tồn tại, connector sẽ tạo ra một publication mới cho các bảng phù hợp với cấu hình bộ lọc hiện tại như được chỉ định bởi các thuộc tính cấu hình connector schema.include.list, schema.exclude.list, và table.include.list, và table.exclude.list. Ví dụ: CREATE PUBLICATION <publication_name> FOR TABLE <tbl1, tbl2, tbl3>. Nếu publication tồn tại, connector cập nhật publication cho các bảng phù hợp với cấu hình bộ lọc hiện tại. Ví dụ: ALTER PUBLICATION <publication_name> SET TABLE <tbl1, tbl2, tbl3>.

Thường thì bạn sẽ rất khó để dùng all_tables vì nó đòi quyền khá cao(gần như quyền root), mình vẫn khuyến kích bạn nên sài filtered nhé.

và kết quả nhận được là image.png

Bạn có thể thực hiện sau SQL sau

SELECT slot_name, plugin, slot_type
FROM pg_replication_slots;

bạn sẽ nhận thấy có 1 dòng slot_name là debezium_order, đó chính là slot của bạn tạo ra.

Bạn tiếp tục thực hiện câu SQL sau

SELECT * FROM pg_stat_replication;

Bạn sẽ thấy connector của bạn đang được kết nối từ client nào.

SELECT * FROM pg_publication;
SELECT *
FROM pg_publication_tables
where pubname = 'dbz_order_publication';

Câu SQL này cung cấp cho bạn publication được tạo ra, trạng thái của nó và danh sách table đang được kết nối tới connector.

Và đây là kết quả, message đã được bắn lên kafka. Các bạn có thể kết nối tới kafka và xử lý dữ liệu theo nhu cầu của mình(phần 2 có code xử lý mẫu rồi nhen)

image.png

Và thế là kết thúc series cơ bản về Debezium, Trong series này mình đã mô tả lại khá kỹ những kiến thức cơ bản và những lưu ý khi sử dụng, chúc các ae tiếp nạp thêm kiến thức mới thành công.

Nếu setup gặp lỗi khó quá thì có thể ib cho mình(thông tin trong phần contact), mình sẽ giúp đỡ nếu có thể. *khó quá không fix được mới ib nhaaaaa.

Series

Phần 1: Debezium là gì? ứng dụng thực tế.

Phần 2: Cấu hình sử dụng Debezium Engine + PostgreSQL

Phần 3: Cấu hình sử dụng Debezium + PostgreSQL + Kafka Connect


All Rights Reserved

Viblo
Let's register a Viblo Account to get more interesting posts.