+3

Kafka consumer các cài đặt cấu hình quan trọng

1. Mở đầu

Tất cả các cấu hình của consumer đều được tài liệu hóa trong tài liệu của Apache Kafka. Hầu hết các tham số có giá trị mặc định hợp lý và không cần phải thay đổi, nhưng một số tham số có ảnh hưởng đến hiệu suất và khả năng sẵn sàng của các consumer. Hãy cùng xem xét một số thuộc tính quan trọng hơn.

2. Các cấu hình quan trọng ảnh hướng đến hiệu suất

fetch.min.bytes

Thuộc tính này cho phép một consumer chỉ định dung lượng dữ liệu tối thiểu mà nó muốn nhận từ broker khi lấy các bản ghi, giá trị mặc định là một byte. Nếu broker nhận được yêu cầu lấy bản ghi từ một consumer nhưng dung lượng dữ liệu mới ít hơn số byte mà fetch.min.bytes yêu cầu, broker sẽ đợi cho đến khi có thêm message trước khi gửi dữ liệu trở lại cho consumer. Điều này giúp giảm tải cho cả consumer và broker, vì họ phải xử lý ít message qua lại hơn trong các trường hợp topic không có nhiều hoạt động mới (hoặc trong các giờ ít hoạt động trong ngày). Bạn nên đặt tham số này cao hơn giá trị mặc định nếu consumer sử dụng quá nhiều CPU khi không có nhiều dữ liệu, hoặc giảm tải cho broker khi có số lượng lớn consumer—mặc dù cần lưu ý rằng việc tăng giá trị này có thể làm tăng độ trễ trong các trường hợp có throughput thấp.

fetch.max.wait.ms

Bằng cách thiết lập fetch.min.bytes, chúng ta yêu cầu Kafka đợi cho đến khi có đủ dữ liệu để gửi trước khi phản hồi cho consumer. fetch.max.wait.ms cho phép chúng ta kiểm soát thời gian chờ. Theo mặc định, Kafka sẽ đợi tối đa 500 ms. Điều này dẫn đến độ trễ thêm lên đến 500 ms trong trường hợp không có đủ dữ liệu được đẩy vào Kafka topic để đáp ứng lượng dữ liệu tối thiểu cần trả về. Nếu chúng ta muốn hạn chế độ trễ tiềm tàng (thường do SLA điều khiển độ trễ tối đa của ứng dụng), chúng ta có thể đặt fetch.max.wait.ms thành giá trị thấp hơn. Nếu bạn đặt fetch.max.wait.ms là 100 ms và fetch.min.bytes là 1 MB, Kafka sẽ nhận được yêu cầu fetch từ consumer và sẽ phản hồi với dữ liệu khi có 1 MB dữ liệu để trả về hoặc sau 100 ms, tùy thuộc vào điều gì xảy ra trước.

fetch.max.bytes

Thuộc tính này cho phép chúng ta chỉ định số byte tối đa mà Kafka sẽ trả về mỗi khi consumer thực hiện polling một broker (mặc định là 50 MB). Nó được sử dụng để giới hạn kích thước bộ nhớ mà consumer sẽ sử dụng để lưu trữ dữ liệu được trả về từ server, không phụ thuộc vào số lượng partition hoặc message được trả về. Lưu ý rằng các bản ghi được gửi đến client theo từng lô (batch), và nếu lô bản ghi đầu tiên mà broker phải gửi vượt quá kích thước này, lô dữ liệu đó sẽ được gửi và giới hạn sẽ bị bỏ qua. Điều này đảm bảo rằng consumer có thể tiếp tục tiến trình. Cũng đáng lưu ý rằng có một cấu hình tương ứng trên broker cho phép quản trị viên Kafka giới hạn kích thước fetch tối đa. Cấu hình broker này có thể hữu ích vì các yêu cầu cho lượng dữ liệu lớn có thể dẫn đến việc đọc lớn từ đĩa và gửi dữ liệu qua mạng lâu, gây ra sự cạnh tranh và tăng tải trên broker.

max.poll.records

Thuộc tính này kiểm soát số lượng bản ghi tối đa mà một lần gọi poll() sẽ trả về. Sử dụng thuộc tính này để kiểm soát số lượng dữ liệu (chứ không phải kích thước dữ liệu) mà ứng dụng của bạn cần xử lý trong một vòng lặp polling.

max.partition.fetch.bytes

Thuộc tính này kiểm soát số byte tối đa mà server sẽ trả về cho mỗi partition (mặc định là 1 MB). Khi KafkaConsumer.poll() trả về ConsumerRecords, đối tượng bản ghi sẽ sử dụng tối đa max.partition.fetch.bytes cho mỗi partition được gán cho consumer. Lưu ý rằng việc kiểm soát mức sử dụng bộ nhớ bằng cách sử dụng cấu hình này có thể khá phức tạp, vì bạn không có quyền kiểm soát số lượng phân vùng sẽ được bao gồm trong phản hồi của broker. Do đó, mình khuyến nghị sử dụng fetch.max.bytes thay thế, trừ khi anh em có lý do đặc biệt để cố gắng xử lý các khối lượng dữ liệu tương tự từ mỗi partition.

session.timeout.ms và heartbeat.interval.ms

Khoảng thời gian mà một consumer có thể không liên lạc với các broker trong khi vẫn được coi là còn sống mặc định là 10 giây. Nếu quá thời gian session.timeout.ms mà consumer không gửi heartbeat đến group coordinator, consumer sẽ bị coi là đã chết và group coordinator sẽ kích hoạt một lần rebalance consumer group để phân phối các partition từ consumer đã chết cho các consumer khác trong nhóm. Thuộc tính này có liên quan chặt chẽ đến heartbeat.interval.ms, điều khiển tần suất mà Kafka consumer gửi heartbeat đến group coordinator, trong khi session.timeout.ms kiểm soát khoảng thời gian mà một consumer có thể không gửi heartbeat. Do đó, hai thuộc tính này thường được điều chỉnh cùng nhau—heartbeat.interval.ms phải nhỏ hơn session.timeout.ms và thường được đặt bằng một phần ba giá trị của thời gian chờ. Ví dụ, nếu session.timeout.ms là 3 giây, heartbeat.interval.ms nên là 1 giây. Việc đặt session.timeout.ms thấp hơn giá trị mặc định sẽ giúp nhóm consumer phát hiện và phục hồi từ sự cố nhanh hơn nhưng cũng có thể gây ra các lần rebalance không mong muốn. Ngược lại, việc đặt session.timeout.ms cao hơn sẽ giảm nguy cơ rebalance không mong muốn nhưng cũng có nghĩa là sẽ mất nhiều thời gian hơn để phát hiện một sự cố thực sự.

max.poll.interval.ms

Thuộc tính này cho phép bạn thiết lập khoảng thời gian mà trong đó consumer có thể không thực hiện polling trước khi nó được coi là đã chết. Như đã đề cập trước đó, heartbeats và session timeouts là cơ chế chính mà Kafka sử dụng để phát hiện các consumer đã chết và lấy lại các phân vùng của chúng. Tuy nhiên, mình cũng đã đề cập rằng heartbeats được gửi bởi một background thread. Có khả năng rằng main thread đang tiêu thụ dữ liệu từ Kafka bị deadlock, nhưng background thread vẫn đang gửi heartbeats. Điều này có nghĩa là các bản ghi từ các partition thuộc về consumer này không được xử lý. Cách dễ nhất để biết liệu consumer vẫn đang xử lý các bản ghi hay không là kiểm tra xem nó có đang yêu cầu thêm bản ghi không. Tuy nhiên, các khoảng thời gian giữa các yêu cầu thêm bản ghi rất khó dự đoán và phụ thuộc vào lượng dữ liệu có sẵn, loại xử lý được thực hiện bởi consumer và đôi khi là độ trễ của các dịch vụ bổ sung. Trong các ứng dụng cần thực hiện xử lý tốn thời gian trên mỗi bản ghi được trả về, max.poll.records được sử dụng để giới hạn lượng dữ liệu trả về và do đó giới hạn thời gian trước khi ứng dụng có thể polling() lại. Ngay cả khi max.poll.records được định nghĩa, khoảng thời gian giữa các lần gọi poll() vẫn khó dự đoán, và max.poll.interval.ms được sử dụng như một biện pháp an toàn hoặc bảo vệ. Nó phải có một khoảng thời gian đủ lớn để rất hiếm khi được đạt đến bởi một consumer khỏe mạnh nhưng thấp đủ để tránh ảnh hưởng đáng kể từ một consumer bị treo. Giá trị mặc định là 5 phút. Khi hết thời gian, background thread sẽ gửi một yêu cầu “rời nhóm” để thông báo cho broker rằng consumer đã chết và nhóm phải thực hiện rebalance, sau đó ngừng gửi heartbeats.

default.api.timeout.ms

Đây là thời gian chờ sẽ áp dụng cho (hầu hết) tất cả các cuộc gọi API được thực hiện bởi consumer khi bạn không chỉ định một thời gian chờ rõ ràng khi gọi API. Mặc định là 1 phút, và vì nó cao hơn thời gian chờ yêu cầu mặc định, nó sẽ bao gồm việc thử lại khi cần. Ngoại lệ đáng chú ý đối với các API sử dụng mặc định này là phương thức poll(), yêu cầu một thời gian chờ rõ ràng.

request.timeout.ms

Đây là khoảng thời gian tối đa mà consumer sẽ chờ phản hồi từ broker. Nếu broker không phản hồi trong khoảng thời gian này, client sẽ giả định rằng broker sẽ không phản hồi, đóng kết nối và cố gắng kết nối lại. Cấu hình này mặc định là 30 giây, và không nên giảm thấp hơn. Quan trọng là phải để broker đủ thời gian để xử lý yêu cầu trước khi từ bỏ—việc gửi lại yêu cầu đến một broker đã bị quá tải không mang lại nhiều lợi ích, và hành động ngắt kết nối và kết nối lại còn rất tốn chi phí và có thể gây cao tải cho broker.

auto.offset.reset

Thuộc tính này điều khiển hành vi của consumer khi nó bắt đầu đọc một partition mà không có commited offset, hoặc nếu commited offset là không hợp lệ (thường là vì consumer đã ngừng hoạt động quá lâu đến nỗi bản ghi với offset đó đã bị loại khỏi broker). Mặc định là "latest", nghĩa là nếu không có offset hợp lệ, consumer sẽ bắt đầu đọc từ các bản ghi mới nhất (những bản ghi được viết sau khi consumer bắt đầu hoạt động). Một tùy chọn khác là "earliest", có nghĩa là nếu không có offset hợp lệ, consumer sẽ đọc tất cả dữ liệu trong partition, bắt đầu từ bản ghi đầu tiên. Đặt auto.offset.reset thành "none" sẽ gây ra một ngoại lệ khi cố gắng tiêu thụ từ một offset không hợp lệ.

enable.auto.commit

Tham số này điều khiển việc consumer có tự động commit các offset hay không, và mặc định là true. Đặt nó thành false nếu bạn muốn kiểm soát khi nào các offset được commit, điều này là cần thiết để giảm thiểu trùng lặp và tránh mất dữ liệu. Nếu chúng ta đặt enable.auto.commit thành true, chúng ta có thể muốn kiểm soát tần suất commit các offset bằng cách sử dụng auto.commit.interval.ms. Chúng ta sẽ thảo luận về các tùy chọn khác nhau để commit các offset chi tiết hơn ở bài viết kế tiếp.

partition.assignment.strategy

Chúng ta đã tìm hiểu rằng các partition được phân bổ cho các consumer trong một consumer group. PartitionAssignor là một lớp, dựa trên các consumer và các topic mà chúng đã đăng ký, quyết định phân bổ các partition cho từng consumer. Theo mặc định, Kafka có các chiến lược phân bổ sau:

Range

Gán cho mỗi consumer một tập hợp liên tiếp các partition từ mỗi topic mà consumer đó đăng ký. Ví dụ, nếu các consumer C1 và C2 đã đăng ký hai topic, T1 và T2, và mỗi topic có ba partition, thì C1 sẽ được gán các partition 0 và 1 từ các topic T1 và T2, trong khi C2 sẽ được gán partition 2 từ các topic đó. Vì mỗi topic có số lượng partition không đồng đều và việc phân bổ được thực hiện độc lập cho mỗi topic, consumer đầu tiên sẽ có nhiều partition hơn so với consumer thứ hai. Điều này xảy ra bất cứ khi nào chiến lược phân bổ Range được sử dụng và số lượng consumer không chia hết số lượng partition trong mỗi topic một cách đồng đều.

RoundRobin

Lấy tất cả các partition từ tất cả các topic đã đăng ký và phân bổ chúng cho các consumer theo thứ tự tuần hoàn, từng cái một. Nếu C1 và C2 sử dụng chiến lược phân bổ RoundRobin, thì C1 sẽ có các partition 0 và 2 từ topic T1, và partition 1 từ topic T2. C2 sẽ có partition 1 từ topic T1, và các partition 0 và 2 từ topic T2. Nói chung, nếu tất cả các consumer đều đăng ký cùng một topic (một kịch bản rất phổ biến), phân bổ RoundRobin sẽ dẫn đến tất cả các consumer có cùng số lượng partition (hoặc ít nhất là chênh lệch một partition).

Sticky

Sticky Assignor có hai mục tiêu: mục tiêu đầu tiên là có một phân bổ cân bằng nhất có thể, và mục tiêu thứ hai là trong trường hợp có rebalance, nó sẽ giữ lại càng nhiều phân bổ càng tốt, giảm thiểu chi phí liên quan đến việc di chuyển các phân bổ partition từ consumer này sang consumer khác. Trong trường hợp phổ biến khi tất cả các consumer đều đăng ký cùng một topic, phân bổ ban đầu từ Sticky Assignor sẽ cân bằng như của RoundRobin Assignor. Các phân bổ sau đó sẽ cũng cân bằng như vậy nhưng sẽ giảm số lượng di chuyển partition. Trong các trường hợp mà các consumer trong cùng một group đăng ký các topic khác nhau, phân bổ đạt được từ Sticky Assignor sẽ cân bằng hơn so với phân bổ của RoundRobin Assignor.

Cooperative Sticky

Chiến lược phân bổ này giống hệt như của Sticky Assignor nhưng hỗ trợ các cooperative rebalances trong đó các consumer có thể tiếp tục tiêu thụ từ các partition không được phân bổ lại. Xem Consumer Groups và Partition Rebalance để hiểu thêm về cooperative rebalances.

client.id

Đây có thể là bất kỳ chuỗi nào và sẽ được các broker sử dụng để nhận diện các yêu cầu gửi từ client, chẳng hạn như các yêu cầu fetch. Nó được sử dụng trong việc ghi log, theo dõi metrics hoặc quota.

client.rack

Theo mặc định, các consumer sẽ lấy message từ replica leader của mỗi partition. Tuy nhiên, khi cluster trải dài trên nhiều trung tâm dữ liệu hoặc nhiều khu vực sẵn có trên cloud, có những lợi ích về hiệu suất và chi phí khi lấy message từ một replica nằm trong cùng khu vực với consumer. Để cho phép lấy message từ replica gần nhất, chúng ta cần thiết lập cấu hình client.rack và xác định khu vực mà client đang nằm. Sau đó, bạn có thể cấu hình các broker để thay thế lớp replica.selector.class mặc định bằng org.apache.kafka.common.replica.RackAwareReplicaSelector.

Chúng ta cũng có thể triển khai lớp replica.selector.class của riêng chúng ta với logic tùy chỉnh để chọn replica tốt nhất để tiêu thụ, dựa trên metadata của client và metadata của partition.

group.instance.id

Đây có thể là bất kỳ chuỗi duy nhất nào và được sử dụng để cung cấp cho một consumer khả năng thành viên nhóm tĩnh.

receive.buffer.bytes và send.buffer.bytes

Đây là kích thước của các bộ đệm TCP gửi và nhận được sử dụng bởi các socket khi ghi và đọc dữ liệu. Nếu các giá trị này được đặt thành -1, hệ điều hành sẽ sử dụng các giá trị mặc định. Việc tăng kích thước các bộ đệm này có thể là một ý tưởng tốt khi các producer hoặc consumer giao tiếp với các broker ở trung tâm dữ liệu khác, vì các liên kết mạng đó thường có độ trễ cao hơn và băng thông thấp hơn.

offsets.retention.minutes

Đây là một cấu hình của broker, nhưng nó quan trọng vì ảnh hưởng của nó đến hành vi của consumer. Chừng nào một consumer group còn có các thành viên hoạt động (tức là các thành viên đang duy trì trạng thái thành viên trong nhóm bằng cách gửi heartbeat), Kafka sẽ giữ lại offset cuối cùng mà nhóm đã cam kết cho mỗi partition, để có thể lấy lại trong trường hợp partition được phân bổ lại hoặc khởi động lại. Tuy nhiên, khi một nhóm trở nên trống rỗng, Kafka chỉ giữ các offset đã cam kết trong khoảng thời gian được cấu hình bởi tham số này—mặc định là 7 ngày. Khi các offset bị xóa, nếu nhóm trở lại hoạt động, nó sẽ hoạt động như một nhóm consumer mới hoàn toàn mà không có ký ức về bất kỳ dữ liệu nào đã tiêu thụ trong quá khứ. Lưu ý rằng hành vi này đã thay đổi vài lần, vì vậy nếu chúng ta sử dụng các phiên bản cũ hơn 2.1.0, hãy kiểm tra tài liệu cho phiên bản của chúng ta để biết hành vi dự kiến.

3. Thông tin kết nối

Nếu anh em muốn trao đổi thêm về bài viết, hãy kết nối với mình qua LinkedIn và Facebook:

Rất mong được kết nối và cùng thảo luận!


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí