Redis Delayed Queue: Giải thích một lần và mãi mãi
Hàng đợi trì hoãn (Delayed Queue) về cơ bản là một hàng đợi tin nhắn có cơ chế trì hoãn thực thi. Vậy nó hữu ích trong những trường hợp kinh doanh nào?
Các tình huống thực tế
Khi thanh toán đơn hàng thất bại, cần nhắc nhở người dùng định kỳ.
Trong trường hợp người dùng truy cập đồng thời, bạn có thể trì hoãn việc gửi email cho người dùng trong vòng 2 phút.
Sử dụng Redis để triển khai hàng đợi tin nhắn cơ bản
Như chúng ta đã biết, với các middleware hàng đợi tin nhắn chuyên nghiệp như Kafka hay RabbitMQ, người tiêu dùng cần thực hiện một loạt bước phức tạp trước khi có thể tiêu thụ tin nhắn.
Ví dụ, với RabbitMQ, bạn phải:
- Tạo Exchange
- Tạo Queue
- Gắn kết Queue với Exchange bằng các quy tắc định tuyến
- Gửi tin nhắn với routing key và thông tin header thích hợp
Trong phần lớn các trường hợp, kể cả khi chỉ có một consumer, ta vẫn phải làm các bước trên.
Nhưng với Redis, khi chỉ có một nhóm tiêu thụ, mọi thứ đơn giản hơn rất nhiều. Tuy nhiên, Redis không phải là hàng đợi tin nhắn chuyên dụng và thiếu các tính năng nâng cao – ví dụ, không có cơ chế xác nhận (ACK). Do đó, nếu bạn cần độ tin cậy cao, Redis không phù hợp.
Triển khai hàng đợi bất đồng bộ cơ bản
Redis cung cấp cấu trúc dữ liệu list, rất phù hợp cho hàng đợi bất đồng bộ.
- Thêm vào hàng đợi:
rpush
hoặclpush
- Lấy ra khỏi hàng đợi:
lpop
hoặcrpop
> rpush queue Leapcell_1 Leapcell_2 Leapcell_3
(integer) 3
> lpop queue
"Leapcell_1"
> llen queue
(integer) 2
Vấn đề 1: Hàng đợi trống thì sao?
Khi hàng đợi trống, client thực hiện liên tục các lệnh pop
nhưng không nhận được dữ liệu. Đây là vòng lặp lãng phí, gây tốn CPU và làm tăng QPS trên Redis.
Giải pháp: thêm thời gian ngủ (sleep
) – ví dụ, ngủ 1 giây giữa mỗi lần pop – giúp giảm CPU và QPS.
Vấn đề 2: Độ trễ hàng đợi
Sleep
giúp giảm tài nguyên tiêu tốn, nhưng lại tạo độ trễ (ví dụ: 1 giây). Để giảm độ trễ:
Giải pháp: dùng blpop
hoặc brpop
– các lệnh pop có chặn (blocking pop).
Khi không có dữ liệu, thread sẽ tự động "ngủ" và tỉnh ngay khi có dữ liệu, giúp độ trễ gần như bằng 0.
Vấn đề 3: Kết nối rỗi bị ngắt
Khi dùng blpop
, nếu thread chờ quá lâu, kết nối Redis có thể bị ngắt vì không hoạt động. Khi đó, lệnh blpop
sẽ ném ra ngoại lệ.
Giải pháp: xử lý ngoại lệ và viết lại logic retry.
Xử lý xung đột khoá phân tán
Khi không thể lấy được khoá phân tán, có ba cách xử lý:
- Ném ngoại lệ: Cho người dùng biết và tự retry sau.
- Ngủ rồi thử lại: Đợi vài giây rồi thử lại.
- Đưa vào hàng đợi trì hoãn: Xử lý lại sau, tránh gây tắc nghẽn tức thời.
Triển khai Delayed Queue
Dùng Redis zset
, trong đó:
- score là timestamp (thời gian thực thi)
- Thêm tin nhắn:
zadd score1 value1
- Lấy tin nhắn đến hạn:
zrangebyscore min max
- Lấy một tin nhắn:
zrangebyscore key min max withscores limit 0 1
private Jedis jedis;
public void redisDelayQueueTest() {
String key = "delay_queue";
// In real applications, it is recommended to use a business ID and a randomly generated unique ID as the value.
// The unique ID ensures message uniqueness, and the business ID avoids carrying too much data in the value.
String orderId1 = UUID.randomUUID().toString();
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, orderId1);
String orderId2 = UUID.randomUUID().toString();
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, orderId2);
new Thread() {
@Override
public void run() {
while (true) {
Set<String> resultList;
// Get only the first item (non-destructive read)
resultList = jedis.zrangebyscore(key, System.currentTimeMillis(), 0, 1);
if (resultList.size() == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
} else {
// Remove the fetched data
if (jedis.zrem(key, resultList.iterator().next()) > 0) {
String orderId = resultList.iterator().next();
log.info("orderId = {}", orderId);
this.handleMsg(orderId);
}
}
}
}
}.start();
}
public void handleMsg(T msg) {
System.out.println(msg);
}
Việc triển khai ở trên cũng hoạt động tốt trong một kịch bản đa luồng. Giả sử bạn có hai luồng T1 và T2, và có thể nhiều hơn. Logic tiến hành như thế này, đảm bảo chỉ có một luồng xử lý một thông báo:
- T1, T2 và các luồng khác gọi
zrangebyscore
và nhận tin nhắn A. - T1 bắt đầu xóa tin nhắn A. Vì đây là thao tác nguyên tử nên T2 và các luồng khác đợi T1 hoàn tất
zrem
trước khi tiếp tục. - T1 xóa thành công tin nhắn A và xử lý nó.
- T2 và những người khác cố gắng xóa tin nhắn A nhưng không thành công vì tin nhắn này đã bị xóa rồi — họ từ bỏ việc xử lý.
Ngoài ra, hãy chắc chắn thêm xử lý ngoại lệ vào handleMsg
, để một tác vụ lỗi duy nhất không khiến toàn bộ vòng lặp xử lý bị sập.
Tối ưu hóa hơn nữa
Trong thuật toán trên, cùng một tác vụ có thể được nhiều tiến trình tìm nạp và chỉ có một tiến trình thành công trong việc xóa tác vụ đó bằng cách sử dụng zrem
. Các tiến trình khác sẽ tìm nạp tác vụ một cách vô ích—điều này thật lãng phí. Để cải thiện điều này, bạn có thể sử dụng tập lệnh Lua để tối ưu hóa logic bằng cách kết hợp zrangebyscore
và zrem
thành một hoạt động nguyên tử duy nhất ở phía máy chủ.
Theo cách này, nhiều tiến trình cạnh tranh cho cùng một tác vụ sẽ không dẫn đến việc tìm nạp không cần thiết.
Sử dụng Lua Script để tối ưu hóa hơn nữa
Tập lệnh Lua sẽ kiểm tra các tin nhắn đã hết hạn, xóa chúng và trả về tin nhắn nếu xóa thành công. Nếu không, nó sẽ trả về một chuỗi rỗng:
String luaScript = "local resultArray = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'limit', 0, 1)\n" +
"if #resultArray > 0 then\n" +
" if redis.call('zrem', KEYS[1], resultArray[1]) > 0 then\n" +
" return resultArray[1]\n" +
" else\n" +
" return ''\n" +
" end\n" +
"else\n" +
" return ''\n" +
"end";
jedis.eval(luaScript, ScriptOutputType.VALUE, new String[]{key}, String.valueOf(System.currentTimeMillis()));
Ưu điểm của Redis Delayed Queue
- Sắp xếp theo
score
hiệu quả vớizset
- Chạy hoàn toàn trong bộ nhớ, rất nhanh
- Hỗ trợ clustering để nâng cao hiệu suất
- Có cơ chế lưu trữ (persistence) bằng AOF hoặc RDB
Nhược điểm
- Tính tin cậy chưa cao như các MQ chuyên dụng
- Không có retry – phải tự triển khai
- Không có ACK – nếu client lấy tin nhắn rồi chết giữa chừng, dữ liệu sẽ bị mất
Khuyến nghị: Nếu yêu cầu độ tin cậy cao, hãy dùng một MQ chuyên dụng như Kafka hoặc RabbitMQ.
Triển khai bằng Redisson
Redisson hỗ trợ RDelayedQueue
, xây dựng trên RQueue
:
RQueue<String> destinationQueue = ...
RDelayedQueue<String> delayedQueue = getDelayedQueue(destinationQueue);
// Send message to queue after 10 seconds
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// Send message to queue after 1 minute
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
Khi không còn dùng nữa, nên huỷ hàng đợi:
RDelayedQueue<String> delayedQueue = ...
delayedQueue.destroy();
Quá tiện lợi phải không nào?
All Rights Reserved