Chương 4. Producers: Sourcing Data - Kafka In Action Series
Chương 4: Producers – Sourcing Data 🚀
Trong chương này, chúng ta sẽ tập trung vào Producers – thành phần chịu trách nhiệm gửi dữ liệu vào Kafka. Bạn sẽ được tìm hiểu cách tạo producer đơn giản, cấu hình các tùy chọn khác nhau và làm thế nào để tối ưu tốc độ hoặc đảm bảo tính an toàn khi gửi dữ liệu.
4.1 Một ví dụ về Producer 📝
Producer là gì?
Producer là thành phần trong Kafka chịu trách nhiệm gửi dữ liệu (messages) đến các topic.
- Dữ liệu được gửi vào Kafka dưới dạng key-value pairs.
- Producers có thể chỉ định topic và partition để lưu trữ tin nhắn.
Ví dụ cơ bản về một Kafka Producer
Hãy bắt đầu với đoạn mã Kotlin đơn giản để tạo một Kafka producer gửi tin nhắn vào một topic:
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import java.util.Properties
fun main() {
// 1. Cấu hình Producer
val props = Properties().apply {
put("bootstrap.servers", "localhost:9092")
put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
}
// 2. Tạo Kafka Producer
val producer = KafkaProducer<String, String>(props)
// 3. Gửi dữ liệu
for (i in 1..10) {
val record = ProducerRecord("example-topic", "key-$i", "value-$i")
producer.send(record) { metadata, exception ->
if (exception == null) {
println("Gửi thành công: Topic=${metadata.topic()}, Partition=${metadata.partition()}, Offset=${metadata.offset()}")
} else {
println("Lỗi khi gửi tin nhắn: ${exception.message}")
}
}
}
// 4. Đóng Producer
producer.close()
}
- Cấu hình bootstrap.servers: Danh sách các broker Kafka mà producer kết nối.
- Serializer: Producer cần chuyển đổi key và value thành byte arrays trước khi gửi đến Kafka.
- ProducerRecord: Đại diện cho một tin nhắn Kafka, bao gồm topic, key và value.
- Callback: Xử lý kết quả gửi tin nhắn thành công hoặc thất bại.
Producer Notes 📋
Tính chất quan trọng của producer:
- At least once: Producer đảm bảo tin nhắn được gửi ít nhất một lần, có thể bị trùng lặp.
- At most once: Tin nhắn chỉ được gửi một lần, nhưng có nguy cơ mất mát nếu lỗi xảy ra.
- Exactly once: Producer đảm bảo tin nhắn gửi duy nhất một lần – đây là chế độ an toàn nhất nhưng đòi hỏi cấu hình phức tạp hơn.
4.2 Các tùy chọn của Producer ⚙️
Cấu hình danh sách broker
- bootstrap.servers: Đây là danh sách các địa chỉ broker Kafka mà producer sẽ kết nối.
- Producer chỉ cần kết nối với một hoặc vài broker; Kafka sẽ tự tìm các broker còn lại trong cluster. Ví dụ:
bootstrap.servers = broker1:9092, broker2:9092
Cách để tối ưu tốc độ hoặc đảm bảo an toàn 🏎️🔒
Kafka producer cho phép cân bằng giữa tốc độ và tính an toàn thông qua các cấu hình sau:
- acks (Acknowledgments):
- acks=0: Producer không chờ phản hồi từ broker. Tốc độ nhanh nhưng không đảm bảo tin nhắn đã được lưu trữ.
- acks=1: Producer chờ phản hồi từ leader partition. Tốc độ khá nhanh, đảm bảo tin nhắn đã được lưu trên leader.
- acks=all: Producer chờ tất cả replicas xác nhận đã lưu tin nhắn. Chậm hơn nhưng an toàn nhất.
- batch.size và linger.ms:
- batch.size: Kích thước tối đa của batch tin nhắn trước khi gửi.
- linger.ms: Khoảng thời gian tối đa producer chờ đợi trước khi gửi batch.
- Tăng giá trị này giúp giảm số lần gửi tin nhắn, cải thiện hiệu suất.
- retries và retry.backoff.ms:
- retries: Số lần thử lại khi gửi tin nhắn thất bại.
- retry.backoff.ms: Thời gian chờ giữa các lần thử lại.
Timestamps trong Kafka ⏰
Kafka producer có thể đính kèm timestamps vào mỗi tin nhắn. Timestamps giúp xác định:
- Thời gian tạo ra dữ liệu (event time).
- Thời gian tin nhắn được gửi đến Kafka (processing time). Ví dụ:
val record = ProducerRecord("topic", null, System.currentTimeMillis(), "key", "value")
producer.send(record)
4.3 Tạo mã phù hợp với yêu cầu 💻
Các phiên bản client và broker
- Kafka client và Kafka broker cần tương thích phiên bản để tránh lỗi khi gửi/nhận dữ liệu.
- Backward compatibility: Client mới có thể kết nối với broker cũ.
- Forward compatibility: Client cũ có thể kết nối với broker mới, nhưng sẽ mất một số tính năng mới.
Sinh mã tự động cho producer
Bạn có thể sử dụng các công cụ như Avro, Protobuf, hoặc JSON Schema để:
- Tự động tạo mã producer dựa trên schema dữ liệu.
- Đảm bảo tính nhất quán giữa producer và consumer. Ví dụ với Avro:
- Tạo schema:
{
"type": "record",
"name": "Invoice",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"}
]
}
- Sinh mã Kotlin: Sử dụng Avro plugin để tạo class tự động từ schema.
- Gửi tin nhắn:
val invoice = Invoice("INV-123", 100.50)
val record = ProducerRecord("invoices", invoice.toString())
producer.send(record)
Lời kết 🎯
Trong chương này, chúng ta đã tìm hiểu về:
- Kafka Producer là gì và cách nó hoạt động.
- Cấu hình quan trọng để cân bằng giữa tốc độ và an toàn.
- Tạo mã tự động và đảm bảo tương thích giữa client và broker. Producer là bước đầu tiên trong việc đưa dữ liệu vào Kafka. Hiểu rõ cách cấu hình và tối ưu producer sẽ giúp hệ thống của bạn hiệu suất cao và đáng tin cậy hơn.
All rights reserved