Chương 4. Producers: Sourcing Data - Kafka In Action Series
Chương 4: Producers – Sourcing Data 🚀
Trong chương này, chúng ta sẽ tập trung vào Producers – thành phần chịu trách nhiệm gửi dữ liệu vào Kafka. Bạn sẽ được tìm hiểu cách tạo producer đơn giản, cấu hình các tùy chọn khác nhau và làm thế nào để tối ưu tốc độ hoặc đảm bảo tính an toàn khi gửi dữ liệu.
4.1 Một ví dụ về Producer 📝
Producer là gì?
Producer là thành phần trong Kafka chịu trách nhiệm gửi dữ liệu (messages) đến các topic.
- Dữ liệu được gửi vào Kafka dưới dạng key-value pairs.
- Producers có thể chỉ định topic và partition để lưu trữ tin nhắn.
Ví dụ cơ bản về một Kafka Producer
Hãy bắt đầu với đoạn mã Kotlin đơn giản để tạo một Kafka producer gửi tin nhắn vào một topic:
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import java.util.Properties
fun main() {
// 1. Cấu hình Producer
val props = Properties().apply {
put("bootstrap.servers", "localhost:9092")
put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
}
// 2. Tạo Kafka Producer
val producer = KafkaProducer<String, String>(props)
// 3. Gửi dữ liệu
for (i in 1..10) {
val record = ProducerRecord("example-topic", "key-$i", "value-$i")
producer.send(record) { metadata, exception ->
if (exception == null) {
println("Gửi thành công: Topic=${metadata.topic()}, Partition=${metadata.partition()}, Offset=${metadata.offset()}")
} else {
println("Lỗi khi gửi tin nhắn: ${exception.message}")
}
}
}
// 4. Đóng Producer
producer.close()
}
- Cấu hình bootstrap.servers: Danh sách các broker Kafka mà producer kết nối.
- Serializer: Producer cần chuyển đổi key và value thành byte arrays trước khi gửi đến Kafka.
- ProducerRecord: Đại diện cho một tin nhắn Kafka, bao gồm topic, key và value.
- Callback: Xử lý kết quả gửi tin nhắn thành công hoặc thất bại.
Metadata Cluster fetching
Producer's job: Cluster metadata fetching
Metadata for producer know where to write (cause producer can only write to the replica leader of the partiion)
Lần 1 (lần đầu tiên gửi): Producer gửi dữ liệu, nhận metadata từ Kafka (về broker, partition, leader).
Lần 2 (lần gửi tiếp theo): Producer không cần gọi lại Kafka để lấy metadata mà chỉ sử dụng metadata đã lưu để gửi tiếp dữ liệu đến đúng broker (leader của partition)
Producer sẽ kiểm tra lại metadata trong hai trường hợp chính:
- Định kỳ: Một số client libraries của Kafka có thể tự động thực hiện kiểm tra metadata sau một khoảng thời gian nhất định (ví dụ: vài phút) để đảm bảo rằng producer có thông tin mới nhất về các broker và partition.
- Khi có thay đổi: Nếu producer gặp phải lỗi khi gửi dữ liệu (ví dụ: "LeaderNotAvailable" hoặc "NotLeaderForPartition"), nó sẽ yêu cầu lại metadata từ Kafka để tìm đúng broker mới (hoặc leader mới của partition)
How can the client find out which topics exist, what partitions they have, and which brokers currently host those partitions so that it can direct its requests to the right hosts? This information is dynamic, so you can't just configure each client with some static mapping file. Instead all Kafka brokers can answer a metadata request that describes the current state of the cluster: what topics there are, which partitions those topics have, which broker is the leader for those partitions, and the host and port information for these brokers.
In other words, the client needs to somehow find one broker and that broker will tell the client about all the other brokers that exist and what partitions they host. This first broker may itself go down so the best practice for a client implementation is to take a list of two or three URLs to bootstrap from. The user can then choose to use a load balancer or just statically configure two or three of their Kafka hosts in the clients.
The client does not need to keep polling to see if the cluster has changed; it can fetch metadata once when it is instantiated cache that metadata until it receives an error indicating that the metadata is out of date. This error can come in two forms: (1) a socket error indicating the client cannot communicate with a particular broker, (2) an error code in the response to a request indicating that this broker no longer hosts the partition for which data was requested. Cycle through a list of "bootstrap" Kafka URLs until we find one we can connect to. Fetch cluster metadata. Process fetch or produce requests, directing them to the appropriate broker based on the topic/partitions they send to or fetch from. If we get an appropriate error, refresh the metadata and try again. https://kafka.apache.org/protocol
Vì hệ thống phân tán này được thiết kế để xử lý các lỗi tạm thời như sự cố mạng, nên logic xử lý lại (retries) đã được tích hợp sẵn. Tuy nhiên, nếu thứ tự của các tin nhắn là quan trọng, như tin nhắn kiểm tra (audit), ngoài việc thiết lập số lần retry = 3, chúng ta cũng cần thiết lập giá trị
max.in.flight.requests.per.connection
là 1 và acks (số lượng broker gửi phản hồi xác nhận) làall
. Theo quan điểm của chúng tôi, đây là một trong những phương pháp an toàn nhất để đảm bảo rằng các tin nhắn của producer sẽ đến theo đúng thứ tự mà bạn mong muốn. (Trích từ )
Yêu cầu được gửi đi và đang chờ phản hồi, nó được coi là "in flight".
Another option: Using an idempotent producer with enable.idempotence=true
"One thing we do not have to worry about is one producer getting in the way of another producer's data. Thread safety is not an issue because data will not be overwritten but handled by the broker itself and appended to the broker's log."
-
Mỗi producer chỉ gửi dữ liệu tới một hoặc nhiều partition, và dữ liệu từ các producer khác nhau sẽ không làm gián đoạn nhau. Broker xử lý việc ghi dữ liệu vào log và đảm bảo tính nhất quán của dữ liệu.
-
Các producer khác nhau có thể gửi dữ liệu đến cùng một partition, nhưng Kafka sẽ đảm bảo rằng các tin nhắn của mỗi producer được thêm vào log theo đúng thứ tự chúng được gửi (vì mỗi tin nhắn có offset riêng biệt).
Producer Notes 📋
Tính chất quan trọng của producer:
- At least once: Producer đảm bảo tin nhắn được gửi ít nhất một lần, có thể bị trùng lặp.
- At most once: Tin nhắn chỉ được gửi một lần, nhưng có nguy cơ mất mát nếu lỗi xảy ra.
- Exactly once: Producer đảm bảo tin nhắn gửi duy nhất một lần – đây là chế độ an toàn nhất nhưng đòi hỏi cấu hình phức tạp hơn.
4.2 Các tùy chọn của Producer ⚙️
Cấu hình danh sách broker
- bootstrap.servers: Đây là danh sách các địa chỉ broker Kafka mà producer sẽ kết nối.
- Producer chỉ cần kết nối với một broker; Kafka sẽ tự tìm các broker còn lại trong cluster. Theo mình thì có bao nhiêu broker cứ add vô hết do producer sẽ chọn cái đầu tiên trong danh sách. Lỡ như cái đầu có down thì những cái sau sẽ có đất dùng -> an toàn hơn
Ví dụ:
bootstrap.servers = broker1:9092, broker2:9092
Cách để tối ưu tốc độ hoặc đảm bảo an toàn 🏎️🔒
Kafka producer cho phép cân bằng giữa tốc độ và tính an toàn thông qua các cấu hình sau:
- acks (Acknowledgments):
acks | Mô tả | Tốc độ | Độ tin cậy | Kịch bản | Ví dụ |
---|---|---|---|---|---|
0 | Không chờ xác nhận từ broker. | Nhanh nhất | Thấp | Message không quan trọng, ưu tiên tốc độ. | Website click |
1 | Leader xác nhận đã nhận message. | Trung bình | Trung bình | Dữ liệu tái tạo được, cân đối tốc độ - độ tin cậy. | Logging, events |
all/-1 | Tất cả replica xác nhận. | Chậm nhất | Cao nhất | Giao dịch tài chính, dữ liệu quan trọng. | Finance, ngân hàng |
- batch.size và linger.ms:
- batch.size: Kích thước tối đa của batch tin nhắn trước khi gửi.
- linger.ms: Khoảng thời gian tối đa producer chờ đợi trước khi gửi batch.
- Tăng giá trị này giúp giảm số lần gửi tin nhắn, cải thiện hiệu suất.
- retries và retry.backoff.ms:
- retries: Số lần thử lại khi gửi tin nhắn thất bại.
- retry.backoff.ms: Thời gian chờ giữa các lần thử lại.
Timestamps trong Kafka ⏰
Kafka producer có thể đính kèm timestamps vào mỗi tin nhắn. Timestamps giúp xác định:
- Thời gian tạo ra dữ liệu (event time).
- Thời gian tin nhắn được gửi đến Kafka (processing time). Ví dụ:
val record = ProducerRecord("topic", null, System.currentTimeMillis(), "key", "value")
producer.send(record)
4.3 Tạo mã phù hợp với yêu cầu 💻
Các phiên bản client và broker
- Kafka client và Kafka broker cần tương thích phiên bản để tránh lỗi khi gửi/nhận dữ liệu.
- Backward compatibility: Client mới có thể kết nối với broker cũ.
- Forward compatibility: Client cũ có thể kết nối với broker mới, nhưng sẽ mất một số tính năng mới.
Sinh mã tự động cho producer
Listing 4.1 Configuring the audit producer
public class AuditProducer {
...
private static final Logger log = LoggerFactory.getLogger
(AuditProducer.class);Properties kaProperties = new Properties(); //❶
kaProperties.put( "bootstrap.servers",
"localhost:9092,localhost:9093,localhost:9094");
kaProperties.put("acks", "all"); //❷
kaProperties.put("retries", "3"); //❸
kaProperties.put("max.in.flight.requests.per.connection", "1");
...
//❶ Creates properties as before for our configuration
//❷ Sets acks to all to get the strongest guarantee
// ❸ Lets the client retry in case of failure so we don’t have to implement our own failure logic
Notice that we did not have to touch anything except the configuration we send to the producer to address the concern of message loss. The acks configuration change is a small but powerful feature that has a significant impact
Listing 4.2 Waiting for a result
RecordMetadata result =
producer.send(producerRecord).get(); //❶
log.info("kinaction_info offset = {}, topic = {}, timestamp = {}",
result.offset(), result.topic(), result.timestamp());
producer.close();
//❶ Waits on the response from the send callt.
Listing 4.3 Alert class
public class Alert implements Serializable {
private final int alertId;
private String stageId;
private final String alertLevel;
private final String alertMessage;
public Alert(int alertId,
String stageId,
String alertLevel,
String alertMessage) { //❶
this.alertId = alertId;
this.stageId = stageId;
this.alertLevel = alertLevel;
this.alertMessage = alertMessage;
}
public int getAlertId() {
return alertId;
}
public String getStageId() {
return stageId;
}
public void setStageId(String stageId) {
this.stageId = stageId;
}
public String getAlertLevel() {
return alertLevel;
}
public String getAlertMessage() {
return alertMessage;
}
}
//❶ Holds the alert’s ID,level, and messages
Listing 4.4 Our Alert Serializer
public class AlertKeySerde implements Serializer<Alert>,
Deserializer<Alert> {
public byte[] serialize(String topic, Alert key) { //❶
if (key == null) {
return null;
}
return key.getStageId()
.getBytes(StandardCharsets.UTF_8); //❷
}
public Alert deserialize
(String topic, byte[] value) { //❸
//could return Alert in future if needed
return null;
}
//...
}
//❶ Sends the topic and the Alert object to our method //❷ Converts objects to bytes (our end goal) //❸ The rest of the interface methods do not need any logic at this point
Listing 4.5 Alert trending producer
public class AlertTrendingProducer {
private static final Logger log =
LoggerFactory.getLogger(AlertTrendingProducer.class);
public static void main(String[] args)
throws InterruptedException, ExecutionException {
Properties kaProperties = new Properties();
kaProperties.put("bootstrap.servers",
"localhost:9092,localhost:9093,localhost:9094");
kaProperties.put("key.serializer",
AlertKeySerde.class.getName()); //❶
kaProperties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
try (Producer<Alert, String> producer =
new KafkaProducer<>(kaProperties)) {
Alert alert = new Alert(0, "Stage 0", "CRITICAL", "Stage 0 stopped");
ProducerRecord<Alert, String> producerRecord =
new ProducerRecord<>("kinaction_alerttrend",
alert, alert.getAlertMessage()); //❷
RecordMetadata result = producer.send(producerRecord).get();
log.info("kinaction_info offset = {}, topic = {}, timestamp = {}",
result.offset(), result.topic(), result.timestamp());
}
}
}
//❶ Tells our producer client how to serialize our custom Alert object into a key //❷ Instead of null for the second parameter, uses the actual object we want to populate the key
Lời kết 🎯
Trong chương này, chúng ta đã tìm hiểu về:
- Kafka Producer là gì và cách nó hoạt động.
- Cấu hình quan trọng để cân bằng giữa tốc độ và an toàn.
- Tạo mã tự động và đảm bảo tương thích giữa client và broker. Producer là bước đầu tiên trong việc đưa dữ liệu vào Kafka. Hiểu rõ cách cấu hình và tối ưu producer sẽ giúp hệ thống của bạn hiệu suất cao và đáng tin cậy hơn.
All Rights Reserved