+2

[Kafka] - Spring Boot Kafka in depth

Tiếp tục chuỗi bài viết về Kafka. Bài viết này mình sẽ đi vào chi tiết một ứng dụng Spring Boot config sử dụng với Kafka.

Cũng giống như các message queue truyền thống ActiveMQ, RabbitMQ, ..., Kafka thường được sử dụng cho các công việc chạy nền hoặc giao tiếp giữa các dịch vụ, đặc biệt là các hệ thống theo kiến trúc Microservices ngày nay (MSA - Microservice Architecture). Chúng ta có thể sử dụng Kafka khi phải di chuyển lượng lớn dữ liệu và cần xử lý theo thời gian thực,...Nói chung là lý thuyết về kafka thì khá là nhiều thứ, mình sẽ viết ở một bài viết khác. Còn bài viết này tập trung vào việc sử dụng Spring Boot với Kafka.

Một ví dụ là khi chúng ta muốn xử lý hành vi của người dùng trên trang web để tạo ra các đề xuất về sản phẩm liên quan (ví dụ người dùng click tìm kiếm bột giặt OMO, thì trang web của chúng sẽ gửi lại thêm các đề xuất liên quan bột giặt ABA, ARIEL,...)

Cấu hình Kafka Client

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.5.2.RELEASE</version>
</dependency>

Sử dụng Java configuration

Để phân chia trách nhiệm, chúng ta sẽ tách việc cấu hình trong 2 class KafkaProductionerConfigKafkaConsumerConfig.

Cấu hình Producer:

@Configuration
class KafkaProducerConfig {

  @Value("${kafka.bootstrap-servers}")
  private String bootstrapServers;

  @Bean
  public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
      bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      StringSerializer.class);
    return props;
  }

  @Bean
  public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
  }

  @Bean
  public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
  }
}
  • ProducerFactory chịu trách nhiệm tạo các instance Kafka Producer để gửi các object message khác nhau (trong trường hợp trên object là String, chúng ta cũng có thể custom message dạng Java object do chúng ta định nghĩa, tham khảo tại đây).
  • KafkaTemplate chịu trách nhiệm gửi tin nhắn đến các topic tương ứng. Chúng ta sẽ tìm hiểu hơn về KafkaTemplate ở phần bên dưới.

Cấu hình Consumer:

@Configuration
class KafkaConsumerConfig {

  @Value("${kafka.bootstrap-servers}")
  private String bootstrapServers;

  @Bean
  public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
      bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
      StringDeserializer.class);
    return props;
  }

  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  }

  @Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
  }
}
  • ConsumerFactory chịu trách nhiệm tạo các instance Kafka Croducer để nhận các object message khác nhau (trong trường hợp trên object là String, chúng ta cũng có thể custom message dạng Java object do chúng ta định nghĩa ở gần cuối bài viết này.
  • ConcurrentKafkaListenerContainerFactory tạo ra container cho các phương thức được gán annotaion @KafkaListener. KafkaListenerContainer sẽ nhận tất cả tin nhắn từ tất cả các topic hoặc partition trên một luồng duy nhất. Chúng ta sẽ xem tìm hiểu chi tiết hơn về vùng chứa trình nghe thông báo trong phần thông báo tiêu thụ.

Sử dụng Spring Boot Configuration

Spring Boot thực hiện hầu hết các cấu hình một cách tự động, vì vậy chúng ta có thể tập trung vào việc xây dựng các Producer gửi tin nhắn và các Consumer nhận tin nhắn. Spring Boot cũng cung cấp tùy chọn ghi đè cấu hình mặc định thông qua file application.yml hay application.properties. Cấu hình Kafka được định nghĩa bởi các thuộc tính cấu hình với tiền tố spring.kafka. *:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup

Tạo Topic

Để Producer có thể gửi tin nhắn, topic cần phải tồn tại trước. Dưới đây là cách cấu hình tạo ra Kafka topics:

@Configuration
class KafkaTopicConfig {

  @Bean
  public NewTopic topic1() {
    return TopicBuilder.name("reflectoring-1").build();
  }

  @Bean
  public NewTopic topic2() {
    return TopicBuilder.name("reflectoring-2").build();
  }
  ...
}

Bean KafkaAdmin chịu trách nhiệm tạo các topic bên trong brocker. Với Spring Boot, một Bean KafkaAdmin được đăng ký tự động. Đối với ứng dụng không phải Spring Boot, chúng ta phải đăng ký Bean KafkaAdmin theo cách thủ công như sau:

@Bean
KafkaAdmin admin() {
 Map<String, Object> configs = new HashMap<>();
 configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
 return new KafkaAdmin(configs);
}

Để tạo một chủ đề (topic), chúng ta đăng ký một Bean NewTopic cho mỗi topic vào application context. Nếu topic đã tồn tại, Bean sẽ bị bỏ qua. Chúng ta có thể sử dụng TopicBuilder để tạo các Bean này. KafkaAdmin cũng tăng số lượng phân vùng (partitions) nếu nó nhận thấy rằng một topic hiện có có ít phân vùng hơn NewTopic.numPartitions.

Sending Messages

Sử dụng KafkaTemplate

KafkaTemplate cung cấp phương pháp thuận tiện để gửi tin nhắn đến các topic:

@Component
class KafkaSenderExample {

 private KafkaTemplate<String, String> kafkaTemplate;
 ...

 @Autowired
 KafkaSenderExample(KafkaTemplate<String, String> kafkaTemplate, ...) {
   this.kafkaTemplate = kafkaTemplate;
   ...
 }

 void sendMessage(String message, String topicName) {
   kafkaTemplate.send(topicName, message);
 }
 ...
}

Tất cả những gì chúng ta cần làm là gọi phương thức send() với tham số là tên topic và nội dung message. Spring Kafka cũng cho phép chúng ta định cấu hình một lời gọi không đồng bộ trả lại kết quả của việc thực thi gửi tin nhắn:

@Component
class KafkaSenderExample {
 ...
 void sendMessageWithCallback(String message) {
   ListenableFuture<SendResult<String, String>> future = 
     kafkaTemplate.send(topic1, message);
 
   future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
     @Override
     public void onSuccess(SendResult<String, String> result) {
       LOG.info("Message [{}] delivered with offset {}",
         message,
         result.getRecordMetadata().offset());
     }
 
     @Override
     public void onFailure(Throwable ex) {
       LOG.warn("Unable to deliver message [{}]. {}", 
         message,
         ex.getMessage());
     }
   });
 }
}

Phương thức send() của KafkaTemplate trả về ListenableFuture<SendResult>. Chúng ta có thể đăng ký ListenableFutureCallback với listener để nhận kết quả của việc gửi và thực hiện một số công việc khác. Nếu không muốn làm việc với Futures, chúng ta có thể đăng ký ProducerListener để thay thế:

@Configuration
class KafkaProducerConfig {
 @Bean
 KafkaTemplate<String, String> kafkaTemplate() {
 KafkaTemplate<String, String> kafkaTemplate = 
   new KafkaTemplate<>(producerFactory());
 ...
 kafkaTemplate.setProducerListener(new ProducerListener<String, String>() {
   @Override
   public void onSuccess(
     ProducerRecord<String, String> producerRecord, 
     RecordMetadata recordMetadata) {
     
     LOG.info("ACK from ProducerListener message: {} offset:  {}",
       producerRecord.value(),
       recordMetadata.offset());
   }
 });
 return kafkaTemplate;
 }
}

Việc chúng ta cấu hình KafkaTemplat bằng ProducerListener cũng cho phép chúng ta implements các phương thức onSuccess()onError().

Sử dụng RoutingKafkaTemplate

Chúng ta có thể sử dụng RoutingKafkaTemplate khi chúng ta có nhiều Producer với các cấu hình khác nhau và chúng ta muốn chọn Producer trong thời gian chạy dựa trên chủ đề (topic).

@Configuration
class KafkaProducerConfig {
 ...

 @Bean
 public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context) {
   // ProducerFactory with Bytes serializer
   Map<String, Object> props = new HashMap<>();
   props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
     bootstrapServers);
   props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
     StringSerializer.class);
   props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
     ByteArraySerializer.class);
   DefaultKafkaProducerFactory<Object, Object> bytesPF = 
     new DefaultKafkaProducerFactory<>(props);
   context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);

   // ProducerFactory with String serializer
   props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
     StringSerializer.class);
   DefaultKafkaProducerFactory<Object, Object> stringPF = 
     new DefaultKafkaProducerFactory<>(props);
   context.registerBean(DefaultKafkaProducerFactory.class, "stringPF", stringPF);

   Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
   map.put(Pattern.compile(".*-bytes"), bytesPF);
   map.put(Pattern.compile("reflectoring-.*"), stringPF);
   return new RoutingKafkaTemplate(map);
 }
 ...
}

RoutingKafkaTemplate sử dụng một Map của các phần tử java.util.regex.PatternProducerFactory để định tuyến các mesage đến ProducerFactory phù hợp đầu tiên với tên topic nhất định. Nếu chúng ta có hai pattern ref.*reflectoring-.*, Thì pattern reflectoring-.* phải đặt ở trước vì pattern ref.* sẽ ghi đè nó.

Trong ví dụ trên, chúng ta đã tạo ra hai pattern .*-bytereflectoring-.*. Tên topic kết thúc bằng -byte và bắt đầu bằng reflectoring sẽ sử dụng ByteArraySerializerStringSerializer tương ứng khi chúng ta sử dụng phiên bản RoutingKafkaTemplate.

Consuming Messages

Message Listener

KafkaMessageListenerContainer sẽ nhận tất cả tin nhắn (messages) từ tất cả các chủ đề (topics) trên một thread duy nhất.

ConcurrentMessageListenerContainer chỉ định các message này cho nhiều instance KafkaMessageListenerContainer để cung cấp khả năng đa luồng.

Sử dụng @KafkaListener tại level method

Annotaion @KafkaListener cho phép chúng ta tạo listener (mình gọi nó là trình lắng nghe), là nơi subscribe message đến từ topic:

@Component
class KafkaListenersExample {

  Logger LOG = LoggerFactory.getLogger(KafkaListenersExample.class);

  @KafkaListener(topics = "reflectoring-1", containerFactory = "kafkaListenerContainerFactory")
  void listener(String data) {
    LOG.info(data);
  }

  @KafkaListener(
    topics = "reflectoring-1, reflectoring-2", 
    groupId = "reflectoring-group-2")
  void commonListenerForMultipleTopics(String message) {
    LOG.info("MultipleTopicListener - {}", message);
  }
}

Để sử dụng annotation này, chúng ta nên thêm annotaion @EnableKafka vào một class bất kỳ được gán annotation @Configuration. Ngoài ra, nó cũng yêu cầu cấu hình các trình lắng nghe cho mỗi loại message (trong ví dụ này listener kafkaListenerContainerFactory được cấu hình trong class KafkaconsumerConfig ở phía trên cho loại message String, chúng ta có thể cấu hình gửi nhận message dạng Java object thay vì String trong bài viết này).

Chúng ta cũng có thể chỉ định nhiều topic cho trình lắng nghe bằng cách sử dụng thuộc tính topics như trên.

Sử dụng @KafkaListener tại level class

Nếu sử dụng @KafkaListener ở level class thì chúng ta cần chỉ định @KafkaHandler tại level method.

@Component
@KafkaListener(id = "class-level", topics = "reflectoring-3")
class KafkaClassListener {
  ...

  @KafkaHandler
  void listen(String message) {
    LOG.info("KafkaHandler[String] {}", message);
  }

  @KafkaHandler(isDefault = true)
  void listenDefault(Object object) {
    LOG.info("KafkaHandler[Default] {}", object);
  }
}

Khi trình lắng nghe nhận được message, nó sẽ chuyển đổi chúng thành các kiểu dữ liệu đích trong các phương thức để tìm ra phương thức nào cần gọi. Trong ví dụ, các message kiểu String sẽ được nhận bởi listening() và kiểu Object sẽ được nhận bởi listenerDefault(). Bất cứ khi nào không có kết quả phù hợp, trình xử lý mặc định (được định nghĩa bởi isDefault = true) sẽ được gọi.

Nhận messages từ partition cụ thể với Offset

Chúng ta có thể cấu hình trình lắng nghe để subscribe từ nhiều chủ đề (topic), phân vùng (partitions) và vị trí đọc message cụ thể. Ví dụ: nếu chúng ta muốn nhận tất cả các message được gửi đến một topic từ thời điểm tạo topic khi khởi động ứng dụng, chúng ta có thể đặt giá trị offset là 0:

@Component
class KafkaListenersExample {
  ...

  @KafkaListener(
    groupId = "reflectoring-group-3",
    topicPartitions = @TopicPartition(
      topic = "reflectoring-1",
      partitionOffsets = { @PartitionOffset(
        partition = "0", 
        initialOffset = "0") }))
  void listenToPartitionWithOffset(
    @Payload String message,
    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
    @Header(KafkaHeaders.OFFSET) int offset) {
      LOG.info("Received message [{}] from partition-{} with offset-{}", 
        message, 
        partition, 
        offset);
  }
}

Vì chúng ta đã chỉ định initialOffset = "0", chúng ta sẽ nhận được tất cả các message bắt đầu từ offset 0 mỗi khi chúng ta khởi động lại ứng dụng. Chúng ta cũng có thể truy xuất metadata hữu ích của message bằng cách sử dụng annotation @Header().

Filter messages

Spring cung cấp một cách thức để lọc các message trước khi chúng đến trình lắng nghe:

class KafkaConsumerConfig {

  @Bean
  KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
  kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(record -> 
      record.value().contains("ignored"));
    return factory;
  }
}

Spring bọc trình lắng nghe bằng FilteringMessageListenerAdapter. Chúng ta sẽ gọi phương thức factory.setRecordFilterStrategy() để triển khia điều kiện lọc. Message phù hợp với bộ lọc sẽ bị loại bỏ trước khi đến trình lắng nghe (cụ thể là đến phương thức được gắn annotation @KafkaListener). Trong ví dụ trên, chúng ta đã thêm một bộ lọc để loại bỏ các message có từ "ignored".

Reply, Forwarding Listener Results với @SendTo

Spring cho phép Forwarding giá trị trả về bên trong phương thức được gán annotation @KafkaLisstener với chỉ định annotation @SendTo đến một topic:

@Component
class KafkaListenersExample {
  ...

  @KafkaListener(topics = "reflectoring-others")
  @SendTo("reflectoring-1")
  String listenAndReply(String message) {
    LOG.info("ListenAndReply [{}]", message);
    return "This is a reply sent after receiving message";
  }
}

Ví dụ về @SendTo mọi người có thể xem tại bài viết Spring Kafka – Forwarding Listener Results using @SendTo.

Spring boot sẽ đưa cho chúng ta một cấu hình mặc định Reply Template. Chúng ta có thể cấu hình override Reply Template này trong khi cấu hình Listener Container Factory bằng cách sử dụng phương thức setReplyTemplate() như dưới đây:

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> listenerEventSendStringMessage() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumingEventSendStringMessage());
        factory.setReplyTemplate(customKafkaTemplate); // customKafkaTemplate là một KafkaTemplate khác
        return factory;
    }

Custom Messages

Bây giờ chúng ta hãy xem cách gửi/nhận một đối tượng Java. Chúng tôi sẽ gửi và nhận các đối tượng User như trong ví dụ dưới đây:

class User {
  private String name;
  ...
}

Để gửi/nhận Java object, chúng ta phải định cấu hình Producer và Consumer sử dụng JSON serializerJSON deserializer:

@Configuration
class KafkaProducerConfig {
  ...

  @Bean
  public ProducerFactory<String, User> userProducerFactory() {
    ...
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
      JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
  }

  @Bean
  public KafkaTemplate<String, User> userKafkaTemplate() {
    return new KafkaTemplate<>(userProducerFactory());
  }
}
@Configuration
class KafkaConsumerConfig {
  ...
  public ConsumerFactory<String, User> userConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "reflectoring-user");

    return new DefaultKafkaConsumerFactory<>(
      props,
      new StringDeserializer(),
      new JsonDeserializer<>(User.class));
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, User> userKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, User> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(userConsumerFactory());
    return factory;
  }
  ...
}

Spring Kafka cung cấp các implementations JsonSerializerJsonDeserializer dựa trên Jackson JSON object mapper. Nó cho phép chúng ta chuyển đổi bất kỳ đối tượng Java nào thành byte []. Trong ví dụ trên, chúng ta tạo thêm một ConcurrentKafkaListenerContainerFactory để thực hiện JSON serialization.

Chúng ta cũng tạo một Listener Container (vùng chứa trình lắng nghe message) riêng biệt userKafkaListenerContainerFactory() để xử lý cho đối tượng User (Java object). Nếu chúng ta có nhiều kiểu đối tượng Java cần serialized/deserialized, chúng ta phải tạo một Listener Container (vùng chứa trình lắng nghe) cho mỗi kiểu đối tượng đó.

Sending Java Objects

@Component
class KafkaSenderExample {
  ...

  @Autowired
  private KafkaTemplate<String, User> userKafkaTemplate;

  void sendCustomMessage(User user, String topicName) {
    userKafkaTemplate.send(topicName, user);
  }
  ...
}

Receiving Java Objects

@Component
class KafkaListenersExample {

  @KafkaListener(
    topics = "reflectoring-user",
    groupId="reflectoring-user",
    containerFactory="userKafkaListenerContainerFactory")
  void listener(User user) {
    LOG.info("CustomUserListener [{}]", user);
  }
}

userKafkaListenerContainerFactory chính là tên Listener Container (vùng chứa trình lắng nghe cho message có kiểu dữ liệu User).

Nếu chúng ta không chỉ định thuộc tính containerFactory, nó sẽ mặc định là kafkaListenerContainerFactory sử dụng StringSerializerStringDeserializer trong trường hợp này của bài viết này.

Tổng kết

Trên đây là hướng dẫn để mọi người hiểu hơn về cách cấu hình Spring Boot Kafka. Hy vọng mọi người sẽ hiểu được ý tưởng tổng thể để có thể áp dụng nó vào dự án của mọi người một cách thoải mái nhất.

Mọi người có thể tìm hiểu các bài viết liên quan tại đây:


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí