Vượt Giới Hạn Payload 2MB Của Temporal Bằng In-Memory Batch Chunking
📋 Bối Cảnh Nghiệp Vụ Đây không phải là luồng "tạo khách hàng mới" — người dùng vẫn được đăng ký realtime qua app/web như bình thường. Vấn đề nằm ở chỗ sau khi chuyển đổi từ một nền tảng thứ ba (WebEngage), chúng tôi phát hiện hàng chục nghìn khách hàng hiện hữu bị thiếu sự kiện REGISTER trong event store mới, khiến hệ thống loyalty xử lý như thể họ chưa từng đăng ký. Đội ngũ Ops cần một giải pháp để backfill hàng loạt các event bị thiếu này từ một file CSV chứa các số điện thoại bị ảnh hưởng.
🔥 Vấn đề: Chúng tôi đã xây dựng pipeline backfill này để xử lý file CSV với hơn 50.000 số điện thoại. Ban đầu, Temporal Workflow phân tích file và cố gắng truyền toàn bộ danh sách vào một Activity worker duy nhất trong một lần gọi. Ngay lập tức, pipeline bị crash với lỗi vượt quá kích thước payload gRPC từ Temporal Server.
🔍 Nguyên nhân cốt lõi: Temporal áp dụng giới hạn mặc định 2MB payload cho tất cả các tham số input/output truyền qua gRPC. Giới hạn này được thiết kế có chủ ý: nó giúp tối giản lịch sử thực thi workflow, giảm thiểu chi phí mạng, và ngăn chặn worker bị tràn bộ nhớ (heap memory) trong quá trình replay trạng thái. Việc cố gắng truyền một mảng dữ liệu khổng lồ trong một cuộc gọi RPC duy nhất là một anti-pattern nghiêm trọng trong hệ thống phân tán.
✅ Cách khắc phục: Thay vì nhồi nhét toàn bộ mảng dữ liệu khổng lồ trực tiếp vào Activity, chúng tôi đã triển khai kỹ thuật in-memory batch chunking (chia lô trong bộ nhớ) ngay bên trong logic của Workflow. Chúng tôi chia nhỏ danh sách số điện thoại thành các batch nhỏ, nhẹ (100 item mỗi batch), và gọi tuần tự đến Activity worker.
Dưới đây là cách chúng tôi triển khai logic chunking một cách an toàn và tối ưu:
package com.example.temporal;
import com.google.common.collect.Lists;
import io.temporal.activity.ActivityOptions;
import io.temporal.workflow.Workflow;
import java.time.Duration;
import java.util.List;
public class CsvImportWorkflowImpl implements CsvImportWorkflow {
// Sử dụng Logger an toàn của Temporal để tránh log trùng lặp khi replay
private static final org.slf4j.Logger log = Workflow.getLogger(CsvImportWorkflowImpl.class);
private final CsvImportActivities activities = Workflow.newActivityStub(
CsvImportActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(60))
.build());
@Override
public void importCsv(List<String> phoneNumbers) {
log.info("Bắt đầu workflow Import CSV. Tổng số bản ghi: {}", phoneNumbers.size());
// Chia collection lớn thành các lô nhỏ sử dụng thư viện Guava
List<List<String>> batches = Lists.partition(phoneNumbers, 100);
for (List<String> batch : batches) {
log.info("Đang import batch có kích thước: {}", batch.size());
activities.importBatch(batch); // Lệnh gọi gRPC an toàn (< 10 KB)
}
activities.notifyTelegram("Quá trình import dữ liệu hoàn tất thành công.");
}
}
💡 Bài Học Rút Ra (Takeaway):
Không bao giờ coi các tham số Activity như một bộ đệm nhớ cục bộ (local memory buffer). Luôn giữ các payload gRPC ở mức nhẹ nhất (lý tưởng là dưới 100KB) để đảm bảo thực thi hiệu suất cao. Tận dụng chunking trong bộ nhớ bên trong Workflow để chia nhỏ các giao dịch số lượng lớn. Điều này giúp giữ payload mạng nhỏ gọn và thiết lập các ranh giới rollback rõ ràng khi có lỗi xảy ra. ⚠️ Các Giới Hạn Đã Biết (Known Limitations) Cách tiếp cận chunking trong bộ nhớ này hoạt động tốt ở quy mô hiện tại của chúng tôi (~50K số điện thoại ≈ 600KB), nhưng nó có hai giới hạn kiến trúc mà bạn cần lưu ý trước khi áp dụng vào hệ thống của mình:
1.Input của Workflow cũng bị giới hạn bởi payload limit
Giới hạn 2MB của gRPC áp dụng cho cả input của Workflow, không chỉ riêng Activity. Trong triển khai của chúng tôi, toàn bộ danh sách số điện thoại được truyền làm tham số Workflow qua WorkflowClient.start(workflow::importCsv, phoneNumbers). Việc chia nhỏ (chunking) bên trong Workflow chỉ giảm kích thước của mỗi lần gọi Activity — nhưng payload ban đầu kích hoạt Workflow vẫn là toàn bộ collection. Khi quy mô vượt ~150K+ bản ghi, nó sẽ vượt quá giới hạn và Workflow sẽ crash trước khi bất kỳ logic chunking nào kịp thực thi.
Cách khắc phục chuẩn Production:
Upload file CSV lên object storage (S3, GCS, hoặc MinIO), sau đó chỉ truyền tham chiếu file (ví dụ: S3 URI) cho Workflow. Activity đầu tiên sẽ tải xuống và phân tích file, rồi trả về danh sách ID để Workflow thực hiện phân lô.
❌ Hiện tại: Truyền toàn bộ danh sách làm input của Workflow WorkflowClient.start(workflow::importCsv, phoneNumbers);
✅ Tối ưu: Chỉ truyền đường dẫn tham chiếu, để Activity tự lấy dữ liệu WorkflowClient.start(workflow::importCsv, "s3://bucket/imports/job-123.csv");
2.Chưa có ContinueAsNew cho history lớn
Với 50K item và batch size là 100, Workflow tạo ra ~500 lần gọi Activity. Mỗi Activity sinh ra ít nhất 3 event (ActivityTaskScheduled, ActivityTaskStarted, ActivityTaskCompleted), tổng cộng ~1.500 event trong lịch sử thực thi Workflow. Temporal khuyến nghị giữ history dưới 50K event (và đưa ra cảnh báo ở mức 10K). Với dataset lớn hơn đáng kể, bạn nên gọi Workflow.continueAsNew() sau mỗi N batch để reset lịch sử và tránh giảm hiệu năng khi Worker replay.
List<List<String>> batches = Lists.partition(phoneNumbers, 100);
int batchCount = 0;
for (int i = 0; i < batches.size(); i++) {
activities.importBatch(batches.get(i));
batchCount++;
// Reset history sau mỗi 200 batch để tránh phình to lịch sử
if (batchCount % 200 == 0 && i < batches.size() - 1) {
// Truyền các phần tử còn lại cho lần chạy workflow tiếp theo
List<String> remaining = phoneNumbers.subList((i + 1) * 100, phoneNumbers.size());
Workflow.continueAsNew(remaining);
}
}
Bạn đã bao giờ đụng phải giới hạn payload của Temporal trong môi trường production chưa? Cách bạn xử lý việc xử lý hàng loạt (batch processing) ở quy mô lớn trong các durable workflow là gì — dùng object storage, Child Workflows, hay một phương pháp nào khác? Hãy cùng thảo luận dưới phần bình luận nhé!
All rights reserved