Kafka Consumer Commits và Offsets
1. Mở đầu
Mỗi khi chúng ta gọi poll(), nó sẽ trả về các message đã được ghi vào Kafka mà các consumer trong nhóm của chúng ta chưa đọc. Điều này có nghĩa là chúng ta có cách theo dõi những message đã được đọc bởi một consumer trong nhóm. Một đặc điểm độc đáo của Kafka là nó không theo dõi việc xác nhận từ các consumer theo cách mà nhiều hệ thống hàng đợi JMS thực hiện. Thay vào đó, Kafka cho phép các consumer tự theo dõi vị trí của mình (offset) trong mỗi partition.
Chúng ta gọi hành động cập nhật vị trí hiện tại trong partition là commit offset. Không giống như các hệ thống hàng đợi message truyền thống, Kafka không commit từng message riêng lẻ. Thay vào đó, các consumer commit message cuối cùng mà họ đã xử lý thành công từ một partition và ngầm định rằng tất cả các message trước đó cũng đã được xử lý thành công.
Làm thế nào để consumer commit một offset? Khi consumer thực hiện commit, nó gửi một message đến Kafka, và Kafka sẽ cập nhật một topic đặc biệt tên là consumer_offsets với offset đã được commit cho mỗi partition. Khi tất cả các consumer trong nhóm hoạt động bình thường, điều này không ảnh hưởng nhiều. Tuy nhiên, nếu một consumer gặp sự cố hoặc một consumer mới tham gia vào nhóm, việc này sẽ kích hoạt quá trình rebalance. Sau khi rebalance, mỗi consumer có thể được gán một tập partition mới khác với những partition mà nó đã xử lý trước đó. Để biết nên tiếp tục từ đâu, consumer sẽ đọc offset đã commit gần nhất của từng partition và tiếp tục xử lý từ đó.
Nếu committed offset nhỏ hơn offset của message cuối cùng mà client đã xử lý, các message giữa offset cuối cùng đã xử lý và committed offset sẽ được xử lý hai lần. Xem Hình 1: Các message được xử lý lại
Nếu committed offset lớn hơn offset của message cuối cùng mà client thực tế đã xử lý, tất cả các message giữa offset cuối cùng đã xử lý và committed offset sẽ bị bỏ sót bởi consumer group. Xem Hình 2. Các message bị bỏ sót giữa các offset
Việc quản lý offset rõ ràng có ảnh hưởng lớn đến client application. KafkaConsumer API cung cấp nhiều cách khác nhau để commit offset.
2. Offset nào được commit?
Khi commit offset tự động hoặc không chỉ định offset cụ thể, hành vi mặc định của Kafka là commit offset ngay sau offset cuối cùng mà poll() trả về. Điều này rất quan trọng khi bạn muốn commit thủ công các offset cụ thể hoặc điều chỉnh để commit đúng offset. Tuy nhiên, việc lặp lại cụm từ “commit offset lớn hơn một đơn vị so với offset cuối cùng mà client nhận được từ poll()” có thể gây rắc rối, và trong 99% trường hợp, điều này không quá quan trọng. Vì vậy, chúng ta thống nhất sẽ sử dụng cụm từ “commit offset cuối cùng” để mô tả hành vi mặc định này. Nếu anh em cần điều chỉnh offset thủ công, hãy lưu ý đến chi tiết này.
3. Automatic Commit
Cách đơn giản nhất để commit offsets là để consumer thực hiện tự động. Khi chúng ta cấu hình enable.auto.commit=true, mỗi 5 giây, consumer sẽ commit offset mới nhất mà client nhận được từ poll(). Khoảng thời gian 5 giây là giá trị mặc định và có thể được điều chỉnh bằng cách thay đổi auto.commit.interval.ms. Các commit tự động này đều được điều khiển bởi vòng lặp poll(). Mỗi lần thực hiện poll, consumer sẽ kiểm tra xem có phải đã đến thời điểm commit chưa, và nếu đúng, nó sẽ commit các offset đã trả về trong lần poll gần nhất.
Trước khi sử dụng tùy chọn tự động này, chúng cần hiểu sâu các hệ quả của nó mang lại là gi?
Theo giá trị mặc định, việc commit tự động diễn ra mỗi năm giây một lần. Ví dụ, nếu consumer của chúng ta gặp sự cố ba giây sau lần commit gần nhất, sau khi rebalancing hoàn tất, các consumer còn lại sẽ bắt đầu tiêu thụ các partition trước đây thuộc về broker bị lỗi. Tuy nhiên, chúng sẽ bắt đầu từ offset đã commit gần nhất, nghĩa là offset đó đã cũ ba giây. Do đó, tất cả các sự kiện trong ba giây đó sẽ bị xử lý hai lần. Mặc dù bạn có thể cấu hình khoảng thời gian commit để thực hiện thường xuyên hơn và giảm thời gian các bản ghi bị trùng lặp, nhưng không thể hoàn toàn loại bỏ tình trạng này.
Khi bật tính năng tự động commit, khi đến thời điểm commit các offset, lần poll tiếp theo sẽ commit offset cuối cùng từ lần poll trước đó. Hệ thống không biết các sự kiện nào đã được xử lý, vì vậy việc xử lý toàn bộ các sự kiện được trả về từ poll() trước khi gọi poll() lần nữa là rất quan trọng. (Tương tự như poll(), phương thức close() cũng tự động commit các offset.) Điều này thường không gây ra vấn đề lớn, nhưng bạn cần chú ý khi xử lý ngoại lệ hoặc khi thoát khỏi vòng lặp poll() trước thời hạn.
Tự động commit rất tiện lợi, nhưng nó không cho phép các nhà phát triển kiểm soát đủ để tránh việc nhận các thông điệp bị trùng lặp.
4. Commit Current Offset
Hầu hết các nhà phát triển thường kiểm soát thời điểm commit offset để tránh mất message và giảm số lượng message bị trùng lặp khi rebalance. API của Consumer cho phép commit offset tại thời điểm mà nhà phát triển ứng dụng thấy hợp lý, thay vì dựa vào bộ đếm thời gian.
Khi đặt enable.auto.commit=false, các offset chỉ được commit khi ứng dụng chủ động thực hiện điều đó. API đơn giản và đáng tin cậy nhất để commit là commitSync(). API này sẽ commit offset mới nhất được trả về bởi poll() và trả về sau khi commit thành công, đồng thời sẽ ném ra ngoại lệ nếu commit không thành công vì lý do nào đó.
Quan trọng là chúng ta cần nhớ rằng commitSync() sẽ lưu lại offset mới nhất được trả về bởi poll(). Do đó, nếu chúng ta gọi commitSync() trước khi hoàn tất việc xử lý tất cả các bản ghi trong danh sách, chúng ta có nguy cơ bỏ lỡ những message đã được lưu lại nhưng chưa được xử lý, trong trường hợp ứng dụng gặp sự cố. Nếu ứng dụng gặp sự cố khi vẫn đang xử lý các bản ghi trong bộ sưu tập, tất cả các tin nhắn từ đầu batch gần nhất đến thời điểm rebalance sẽ được xử lý hai lần.
5. Asynchronous Commit
Một nhược điểm của việc mannual commit là ứng dụng sẽ bị block cho đến khi broker phản hồi yêu cầu commit. Điều này có thể làm giảm tốc độ xử lý của ứng dụng. Tốc độ xử lý có thể được cải thiện bằng cách xác nhận ít thường xuyên hơn, nhưng điều này cũng có thể làm tăng khả năng trùng lặp mà rebalance có thể tạo ra.
Một tùy chọn khác là API commit không đồng bộ. Thay vì chờ broker phản hồi yêu cầu commit chúng ta chỉ gửi yêu cầu và tiếp tục thực hiện công việc xử lí message.
Bất lợi là trong khi commitSync() sẽ thử lại việc commit cho đến khi thành công hoặc gặp lỗi không thể thử lại, commitAsync() sẽ không thử lại. Lý do nó không thử lại là vì khi commitAsync() nhận được phản hồi từ server, có thể đã có một commit sau đó đã thành công. Hãy tưởng tượng rằng chúng ta gửi một yêu cầu để commit offset 2000. Có một sự cố tạm thời trong liên lạc, nên broker không nhận được yêu cầu và do đó không phản hồi. Trong khi đó, chúng ta đã xử lý một batch khác và thành công trong việc commit offset 3000. Nếu bây giờ commitAsync() thử lại commit đã thất bại trước đó, nó có thể thành công trong việc commit offset 2000 sau khi offset 3000 đã được xử lý và commit. Trong trường hợp xảy ra khi rebalance, điều này sẽ gây ra nhiều trùng lặp.
Mình đề cập đến sự phức tạp này và tầm quan trọng của thứ tự commit chính xác vì commitAsync() cũng cho phép chúng ta truyền vào một callback sẽ được kích hoạt khi broker phản hồi. Việc sử dụng callback để ghi lại lỗi commit hoặc đếm chúng trong một metric là điều phổ biến, nhưng nếu chúng ta muốn sử dụng callback để thử lại commit, chúng ta cần lưu ý đến vấn đề liên quan đến thứ tự commit.
6. Retrying Async Commits
Một cách đơn giản để đảm bảo đúng thứ tự commit khi thử lại không đồng bộ là sử dụng số thứ tự tăng dần đều. Mỗi lần commit, tăng số thứ tự và thêm số này vào callback của commitAsync. Khi chuẩn bị thử lại, kiểm tra xem số thứ tự commit mà callback nhận được có bằng với biến tăng dần hay không; nếu bằng thì không có commit mới nào và có thể thử lại an toàn. Nếu số thứ tự biến tăng dần cao hơn, không thử lại vì đã có một commit mới được gửi.
7. Kết hợp Commit Sync và Async
Thông thường, việc thỉnh thoảng không commit thành công mà không thử lại không phải là một vấn đề lớn vì nếu sự cố chỉ tạm thời, commit tiếp theo sẽ thành công. Nhưng nếu chúng ta biết rằng đây là commit cuối cùng trước khi dừng consumer hoặc trước khi xảy ra rebalance, chúng ta muốn đảm bảo chắc chắn rằng commit này thành công.
Do đó, một pattern phổ biến là kết hợp commitAsync() với commitSync() ngay trước khi stop hệ thống. Đây là cách nó hoạt động:
-Khi mọi thứ hoạt động bình thường, chúng ta sử dụng commitAsync. Nó nhanh hơn, và nếu một lần commit thất bại, commit tiếp theo sẽ đóng vai trò như một lần thử lại.
-Nhưng nếu chúng ta đang đóng ứng dụng, sẽ không có "commit tiếp theo". Chúng ta gọi commitSync(), vì nó sẽ thử lại cho đến khi thành công hoặc gặp lỗi không thể khắc phục.
8. Thông tin kết nối
Nếu anh em muốn trao đổi thêm về bài viết, hãy kết nối với mình qua LinkedIn và Facebook:
- LinkedIn: https://www.linkedin.com/in/nguyentrungnam/
- Facebook: https://www.facebook.com/trungnam.nguyen.395/
Rất mong được kết nối và cùng thảo luận!
All Rights Reserved