+5

Kafka multi-broker trên Docker

Giới thiệu

Trong những năm gần đây, với sự bùng nổ của AI thì domain Big data, vốn đã rất phổ biến, cũng theo đó được quan tâm ngày càng nhiều hơn. Và trong những luồng Data streaming, ETL, v.v. thì có một thành phần trung chuyển không thể thiếu là Message queue. Message queue thì hiện có nhiều loại phổ biến và được dùng nhiều, tuy vậy, khi đề cập tới Big data, Apache Kafka được tin tưởng và sử dụng bởi rất nhiều đơn vị, bao gồm cả các corp lớn như Netflix, Tesla, Meta, etc. Chi tiết hơn về Apache Kafka các bạn có thể tìm hiểu thêm trên mạng, vì đã có rất nhiều bài viết phân tích về con Kafka này rồi 😄, trong bài này mình sẽ chỉ đề cập tới việc cài đặt nó với multi broker trên Docker. image.png

Cài đặt

Các thành phần

Trước hết thì các bạn cần cài đặt Docker, các bước rất chi tiết và đơn giản đã được ghi rõ trên document của Docker, có thể xem tại đây. Trong bài viết này, mình sẽ cài đặt Kafka bao gồm 1 Zookeeper, 3 broker và 1 service Kafdrop cho mục đích visualization.

Docker compose

Bật Docker lên, tạo 1 file docker-compose.yml tại 1 directory bất kỳ và bắt đầu thôi. Mình để file cài đặt của mình dưới này, ae có thể copy paste về, và chạy command docker compose up -d để khởi tạo và chạy các container (nhớ là phải bật Docker trước đã nhé 😉).

version: "3"

services:
  zookeeper:
    image: zookeeper:3.4.9
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
      ZOO_SERVERS: server.1=zookeeper:2888:3888
    volumes:
      - ./data/zookeeper/data:/data
      - ./data/zookeeper/datalog:/datalog
  kafka1:
    image: confluentinc/cp-kafka:5.3.0
    hostname: kafka1
    container_name: kafka-broker-1
    ports:
      - "9091:9091"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://kafka1:9091
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
    volumes:
      - ./data/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
  kafka2:
    image: confluentinc/cp-kafka:5.3.0
    hostname: kafka2
    container_name: kafka-broker-2
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:29092,LISTENER_DOCKER_EXTERNAL://kafka2:9092
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:29092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
    volumes:
      - ./data/kafka2/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
  kafka3:
    image: confluentinc/cp-kafka:5.3.0
    hostname: kafka3
    container_name: kafka-broker-3
    ports:
      - "9093:9093"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 3
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:29093,LISTENER_DOCKER_EXTERNAL://kafka3:9093
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:29093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
    volumes:
      - ./data/kafka3/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    depends_on:
      - kafka1
      - kafka2
      - kafka3
    ports:
      - "9123:9123"
    environment:
      SERVER_PORT: 9123
      MANAGEMENT_SERVER_PORT: 9123
      KAFKA_BROKERCONNECT: kafka-broker-1:19091,kafka-broker-2:29092,kafka-broker-3:29093

Zookeeper

Service đầu tiên mình cài đặt sẽ là Zookeeper, tiên quyết để các broker chạy vì thằng này sẽ có vai trò điều phối các broker. Hiện tại đã có cách cài đặt Kafka không cần Zookeeper, nhưng mình sẽ chưa đề cập đến trong bài này 😁. \

  • Trong service Zookeeper, field image để xác định image được dùng, hostname để đặt hostname cho container. Hostname được đặt cho container nằm xác định được container đó trong 1 network, giúp các container có thể giao tiếp với nhau.
  • Tại port, ta có thể thấy có 2 port được gắn vào. Port đầu tiên được gọi là HOST_PORT, và port còn lại là CONTAINER_PORT. Port này khá gây lú cho ae mới bắt đầu, có thể hiểu đơn giản là các container trong 1 network sẽ gọi nhau thông qua CONTAINER_PORT, còn các client bên ngoài gọi tới service sẽ thông qua CONTAINER_PORT
  • environment để xác định các cấu hình
  • volumes để mount directory từ máy tới container, giúp không bị mất data khi stop/restart container. Volumes sẽ được viết theo định dạng [SOURCE]:[TARGET]:[MODE], trong đó source là directory tại máy, và target là directory trong container, mode mình thường để luôn là mặc định là read-write.

Kafka broker

Các field như port, image, v.v. trong service broker tương tự như với Zookeeper nên mình sẽ không viết thêm ở đây 😉. Ngoài ra, depends_on để xác định thời điểm chạy container sau khi container mà nó depend đã được chạy. Trong này mình sẽ mô tả kỹ hơn các biến trong environment\

  • KAFKA_ZOOKEEPER_CONNECT: xác định Zookeeper
  • KAFKA_BROKER_ID: id của broker, các broker cần được đảm bảo có unique id
  • KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: replication factor của các topic, lưu ý value cho field này cần <= số broker
  • KAFKA_LISTENERS: xác định physical network interface mà Kafka listen
  • KAFKA_ADVERTISED_LISTENERS: xác định phương thức mà client có thể gọi tới, là metadata sẽ được gửi lại cho client.
  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: các cặp key-value để định nghĩa các phương thức cho các listener khác nhau
  • KAFKA_INTER_BROKER_LISTENER_NAME: xác định listener dành cho các lời gọi internal

Trong cài đặt, ví dụ với broker 1, client ở trong Docker network sẽ connect với listener LISTENER_DOCKER_INTERNAL với port 19091 và host kafka1. Ngược lại, client phía ngoài sẽ connect tới broker 1 thông qua listener LISTENER_DOCKER_EXTERNAL với port 9091 và host là localhost.

Kafdrop

Kafdrop là một service được dùng cho mục đích visualization, cung cấp UI để người dùng dễ dàng thao tác với Kafka cluster. Cấu hình của Kafdrop khá đơn giản, chỉ cần xác định các broker trong biến KAFKA_BROKERCONNECT. Ngoài ra field SERVER_PORTMANAGEMENT_SERVER_PORT để mình chỉnh port cho Kafdrop, vì port default của nó là 9000 nhưng mà mình đang chạy 1 service khác ở 9000 mất rồi 😄.
Trong con Kafdrop này thì ae có thể view các message trong các partition của các broker, check các topic và các thông số liên quan như replication factor, leader, tạo topic, etc. Nói chung là dùng khá tiện và nên dùng 🤌.

image.png

Test

Sau khi đã cài đặt và mở Docker desktop lên check các container chạy ngon nghẻ, ae viết vài dòng code để check xem cụm Kafka của mình có hoạt động đúng không. Ở đây mình dùng Python vì nó nhanh và nó tiện, còn các bạn có thể dùng Java, nhưng nó sẽ dài và viết lâu hơn. Để gửi message vào Kafka viết bằng Python, ae cần install 1 lib là kafka-python, sau đó vào vụt luôn thôi.
Ở đây mình tạo 1 file kafka_prod.py có nội dung như sau:

from kafka import KafkaProducer
import json
import datetime

producer = KafkaProducer(bootstrap_servers=['localhost:9091', 'localhost:9092', 'localhost:9093'], 
                        value_serializer=lambda x: json.dumps(x).encode('utf-8'))
topic = 'tuan'


msg = {
    "current_time": str(datetime.datetime.now())
}

producer.send(topic, value=msg)
producer.flush()
# print(msg)

Chạy file này bằng cmd python kafka_prod.py, và khi thấy nó print ra terminal dict như đã viết là OK, sau đó vào Kafdrop check thử, thấy message vừa gửi là đã đẩy message vào thành công rồi 👌.

image.png

Tham khảo

https://www.confluent.io/blog/kafka-listeners-explained/
https://medium.com/@fintechdevlondon/kafka-listeners-and-advertised-listeners-what-are-they-and-what-do-they-do-9b004e2eb93d
https://demanejar.github.io/posts/Kafka-In-Depth/


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí