+2

Vai trò của Controller trong hệ thống Kafka

1. Cluster Membership

Kafka sử dụng Apache ZooKeeper để quản lý danh sách các broker đang tham gia vào một cụm. Mỗi broker có một mã định danh duy nhất, được đặt trong tệp cấu hình của broker hoặc được tạo tự động. Mỗi khi một broker khởi động, nó sẽ tự động đăng ký với ID của mình trong ZooKeeper bằng cách tạo một ephemeral node. Các broker Kafka, controller, và một số công cụ khác trong hệ sinh thái Kafka sẽ đăng ký theo dõi đường dẫn /brokers/ids trong ZooKeeper, nơi các broker được đăng ký, để có thể nhận thông báo khi broker được thêm hoặc gỡ bỏ khỏi cụm.

Nếu chúng ta cố gắng khởi động một broker khác với cùng một ID, chúng ta sẽ gặp lỗi—broker mới sẽ cố gắng đăng ký nhưng sẽ thất bại vì đã có một node ZooKeeper cho ID broker đó rồi.

Khi một broker mất kết nối với ZooKeeper (thường là do broker dừng hoạt động, nhưng cũng có thể xảy ra do network partition hoặc garbage-collection pause), ephemeral node mà broker tạo ra khi khởi động sẽ tự động bị xóa khỏi ZooKeeper. Các thành phần Kafka đang theo dõi danh sách các broker sẽ nhận được thông báo rằng broker này đã ngừng hoạt động.

Mặc dù node đại diện cho broker sẽ bị xóa khi broker dừng hoạt động, nhưng ID của broker vẫn tồn tại trong các cấu trúc dữ liệu khác. Ví dụ, danh sách các bản sao của mỗi topic vẫn lưu trữ ID của các broker cho những bản sao đó. Do vậy, nếu một broker bị mất hoàn toàn và chúng ta khởi động lại một broker mới với cùng ID của broker cũ, broker này sẽ ngay lập tức gia nhập lại cụm và thay thế broker đã mất với các partition và topic được gán cho nó như trước.

Ephemeral node

Ephemeral node là một loại node tạm thời trong hệ thống ZooKeeper. Khi một client (chẳng hạn như broker trong Kafka) kết nối với ZooKeeper và tạo một ephemeral node, node này sẽ tồn tại cho đến khi kết nối giữa client và ZooKeeper bị mất hoặc client đóng kết nối. Khi kết nối bị ngắt (ví dụ do broker dừng hoạt động, gặp lỗi, hoặc mất kết nối mạng), ephemeral node sẽ tự động bị xóa bởi ZooKeeper. Điều này giúp ZooKeeper theo dõi các thành phần như broker còn đang hoạt động hay đã rời khỏi hệ thống.

2. Controller

Controller là một trong những broker của Kafka, ngoài việc thực hiện các chức năng thông thường của broker, nó còn chịu trách nhiệm bầu chọn leader cho các partition. Broker đầu tiên khởi động trong cluster sẽ trở thành controller bằng cách tạo một ephemeral node trong ZooKeeper được gọi là /controller. Khi các broker khác khởi động, chúng cũng cố gắng tạo node này nhưng nhận được thông báo lỗi “node already exists”, khiến chúng "nhận ra" rằng controller node đã tồn tại và cluster đã có một controller. Các broker sẽ tạo một "watch" trong ZooKeeper đối với node controller để nhận thông báo khi có thay đổi với node này. Bằng cách này, hệ thống đảm bảo rằng trong cluster chỉ có một controller tại một thời điểm.

Khi controller broker bị dừng hoặc mất kết nối với ZooKeeper, ephemeral node sẽ biến mất. Điều này bao gồm bất kỳ trường hợp nào mà client ZooKeeper được sử dụng bởi controller ngừng gửi heartbeats tới ZooKeeper trong khoảng thời gian dài hơn giá trị cấu hình của zookeeper.session.timeout.ms. Khi ephemeral node biến mất, các broker khác trong cluster sẽ nhận được thông báo thông qua cơ chế "watch" của ZooKeeper rằng controller đã biến mất và sẽ tự động cố gắng tạo node controller mới trong ZooKeeper. Broker đầu tiên tạo thành công node controller mới trong ZooKeeper sẽ trở thành controller tiếp theo, trong khi các broker khác sẽ nhận được lỗi "node already exists" và tiếp tục đặt "watch" trên node controller mới.

Mỗi khi một controller mới được bầu chọn, nó sẽ nhận được một số thứ tự epoch của controller cao hơn thông qua một thao tác tăng có điều kiện trong ZooKeeper. Các broker biết số thứ tự epoch hiện tại của controller, và nếu chúng nhận được message từ một controller có số thứ tự thấp hơn, chúng sẽ bỏ qua message đó. Điều này rất quan trọng vì controller broker có thể bị mất kết nối với ZooKeeper do một đợt thu gom rác (garbage collection) dài—trong thời gian đó, một controller mới sẽ được bầu chọn. Khi controller cũ quay lại hoạt động sau đợt thu gom rác, nó có thể tiếp tục gửi message tới các broker mà không biết rằng đã có một controller mới—trong trường hợp này, controller cũ được coi là "zombie". Số thứ tự epoch của controller trong message cho phép các broker bỏ qua message từ các controller cũ, đây là một dạng của "zombie fencing" (rào chắn chống zombie).

Khi controller lần đầu tiên khởi động, nó phải đọc bản đồ trạng thái replica mới nhất từ ZooKeeper trước khi có thể bắt đầu quản lý metadata của cluster và thực hiện bầu chọn leader. Quá trình tải dữ liệu sử dụng các API bất đồng bộ (asynchronous) và truyền các yêu cầu đọc tới ZooKeeper. Tuy nhiên, trong các cluster với số lượng phân vùng lớn, quá trình tải có thể mất vài giây hoặc hơn thế.

Khi controller nhận thấy một broker đã rời khỏi cluster (bằng cách theo dõi đường dẫn ZooKeeper liên quan hoặc nhận được ControlledShutdownRequest từ broker), nó biết rằng tất cả các partition có leader trên broker đó sẽ cần một leader mới. Controller sẽ xem xét tất cả các partition cần leader mới và xác định ai sẽ là leader mới (thông thường là replica tiếp theo trong danh sách replica của partition đó). Sau đó, nó lưu trạng thái mới vào ZooKeeper (sử dụng các yêu cầu async theo dạng pipeline để giảm độ trễ) và gửi yêu cầu LeaderAndISR đến tất cả các broker chứa replica cho các partition đó. Yêu cầu này chứa thông tin về leader mới và các follower cho các phân vùng. Các yêu cầu này được nhóm lại để tăng hiệu quả, vì vậy mỗi yêu cầu bao gồm thông tin về learder mới cho nhiều partition có replica trên cùng một broker. Mỗi leader mới biết rằng nó cần bắt đầu phục vụ các yêu cầu từ producer và consumer, trong khi các follower biết rằng chúng cần bắt đầu sao chép message từ leader mới. Vì mỗi broker trong cluster có một MetadataCache chứa bản đồ của tất cả các broker và các replica trong cluster, controller gửi thông tin về sự thay đổi leader đến tất cả các broker trong một yêu cầu UpdateMetadata để các broker có thể cập nhật cache của mình. Một quy trình tương tự lặp lại khi một broker khởi động lại—sự khác biệt chính là tất cả các replica trong broker bắt đầu như các follower và cần phải theo kịp leader trước khi chúng đủ điều kiện để trở thành leader.

Tóm lại, Kafka sử dụng tính năng ephemeral node của ZooKeeper để bầu chọn một controller và thông báo cho controller khi có node gia nhập hoặc rời khỏi cụm (cluster). Controller có nhiệm vụ bầu chọn các leader cho các partition và bản sao (replica) mỗi khi nó phát hiện có node gia nhập hoặc rời khỏi cụm. Controller sử dụng số epoch để ngăn chặn tình huống "split brain" trong đó hai nút cùng tin rằng mỗi node là controller hiện tại.

3. KRaft: Kafka’s New Raft-Based Controller

Bắt đầu từ năm 2019, cộng đồng Apache Kafka đã bắt tay vào một dự án đầy tham vọng: chuyển từ controller dựa trên ZooKeeper sang controller dựa trên quorum Raft. Phiên bản thử nghiệm của controller mới, mang tên KRaft, là một phần của bản phát hành Apache Kafka 2.8. Phiên bản sản xuất đầu tiên của KRaft sẽ được đưa vào bản phát hành Apache Kafka 3.0, dự kiến vào giữa năm 2021, và các cụm Kafka sẽ có thể chạy với controller truyền thống dựa trên ZooKeeper hoặc KRaft.

Tại sao cộng đồng Kafka quyết định thay thế controller? Controller hiện tại của Kafka đã trải qua nhiều lần viết lại, cải tiến, nhưng mặc dù đã cải thiện cách nó sử dụng ZooKeeper để lưu trữ thông tin về topic, partition và replica, rõ ràng là mô hình hiện tại không thể mở rộng để hỗ trợ số lượng partition mà chúng ta mong muốn. Một số mối quan tâm đã được biết đến đã thúc đẩy sự thay đổi này:

Cập nhật metadata được ghi vào ZooKeeper một cách đồng bộ nhưng được gửi đến các broker một cách bất đồng bộ. Thêm vào đó, việc nhận cập nhật từ ZooKeeper cũng là bất đồng bộ. Tất cả điều này dẫn đến các trường hợp ngoại lệ mà metadata không nhất quán giữa các broker, controller và ZooKeeper. Những trường hợp này rất khó phát hiện. Mỗi khi controller được khởi động lại, nó phải đọc toàn bộ metadata cho tất cả các broker và partitions từ ZooKeeper và sau đó gửi metadata này đến tất cả các broker. Mặc dù đã có nhiều nỗ lực trong nhiều năm, điều này vẫn là một nút thắt lớn—khi số lượng partitions và brokers tăng lên, việc khởi động lại controller trở nên chậm hơn. Kiến trúc nội bộ xung quanh việc sở hữu metadata không tốt—một số thao tác được thực hiện qua controller, một số khác qua bất kỳ broker nào, và một số trực tiếp trên ZooKeeper. ZooKeeper là một hệ thống phân tán riêng biệt, và, giống như Kafka, nó đòi hỏi một số chuyên môn để vận hành. Các nhà phát triển muốn sử dụng Kafka vì vậy cần phải học hai hệ thống phân tán, không chỉ một.

Với tất cả những mối quan ngại này, cộng đồng Apache Kafka đã chọn thay thế controller hiện tại dựa trên ZooKeeper.

Trong kiến trúc hiện tại, ZooKeeper có hai chức năng quan trọng: nó được sử dụng để bầu chọn controller và lưu trữ metadata của cluster—bao gồm các broker đã đăng ký, cấu hình, topics, partitions và replicas. Thêm vào đó, chính controller quản lý metadata—nó được sử dụng để bầu chọn leader, tạo và xóa topics, và phân bổ lại replicas. Tất cả những chức năng này sẽ cần được thay thế trong controller mới.

Ý tưởng cốt lõi của thiết kế controller mới là Kafka có một kiến trúc dựa trên log, nơi trạng thái được biểu diễn dưới dạng một dòng sự kiện. Những lợi ích của cách biểu diễn này đã được cộng đồng hiểu rõ—nhiều consumer có thể nhanh chóng cập nhật trạng thái mới nhất bằng cách phát lại các sự kiện. Log thiết lập thứ tự rõ ràng giữa các sự kiện và đảm bảo rằng các consumer luôn di chuyển theo một dòng thời gian duy nhất. Kiến trúc controller mới mang lại những lợi ích tương tự cho việc quản lý metadata của Kafka.

Trong kiến trúc mới, các nút controller tạo thành một quorum Raft để quản lý log các sự kiện metadata. Log này ghi lại mọi thay đổi đối với metadata của cluster. Tất cả thông tin hiện đang lưu trữ trong ZooKeeper, chẳng hạn như topics, partitions, ISRs, cấu hình, v.v., sẽ được chuyển sang lưu trữ trong log này.

Với thuật toán Raft, các node controller sẽ tự bầu chọn một leader trong số các node controller mà không cần dựa vào hệ thống bên ngoài. Leader của log metadata được gọi là controller hiện tại. Controller hiện tại xử lý tất cả các RPC từ các broker. Các controller follower sao chép dữ liệu từ controller hiện tại và đóng vai trò dự phòng nếu controller hiện tại gặp sự cố. Vì các controller sẽ đồng bộ hóa trạng thái mới nhất, nên việc chuyển giao giữa các controller sẽ không yêu cầu thời gian tải lại lâu, mà chỉ cần chuyển giao trạng thái mới cho controller mới.

Thay vì để controller gửi các cập nhật tới các broker khác, các broker sẽ lấy các cập nhật từ controller hiện tại thông qua API MetadataFetch mới. Tương tự như một yêu cầu fetch, các broker sẽ theo dõi offset của sự thay đổi metadata mới nhất mà họ đã lấy và chỉ yêu cầu các cập nhật mới hơn từ controller. Các broker sẽ lưu trữ metadata vào đĩa, điều này cho phép chúng khởi động nhanh chóng ngay cả khi có hàng triệu partitions.

Các broker sẽ đăng ký với quorum của controller và sẽ giữ trạng thái đăng ký cho đến khi bị gỡ bỏ bởi một quản trị viên, vì vậy khi một broker ngừng hoạt động, nó sẽ ngoại tuyến nhưng vẫn được đăng ký. Các broker đang hoạt động nhưng không cập nhật metadata mới nhất sẽ bị ngăn chặn và không thể phục vụ các yêu cầu của client. Trạng thái bị ngăn chặn mới này sẽ ngăn chặn các trường hợp mà client gửi các sự kiện tới một broker không còn là leader nhưng quá lỗi thời để nhận ra rằng nó không còn là leader.

Trong quá trình di chuyển sang quorum của controller, tất cả các hoạt động trước đây yêu cầu các client hoặc broker giao tiếp trực tiếp với ZooKeeper sẽ được chuyển qua controller. Điều này cho phép di chuyển mượt mà bằng cách thay thế controller mà không cần thay đổi bất kỳ broker nào.

4. Thông tin kết nối

Nếu anh em muốn trao đổi thêm về bài viết, hãy kết nối với mình qua LinkedIn và Facebook:

Rất mong được kết nối và cùng thảo luận!


All Rights Reserved

Viblo
Let's register a Viblo Account to get more interesting posts.