Thực chiến Kafka trong Laravel: Viết Artisan Command làm Producer "bắn" Message siêu tốc
Chào anh em!
Tiếp nối bài viết giải quyết vấn đề "Out-of-order" trên Kafka, hôm nay chúng ta sẽ bắt tay vào code thực tế. Để test luồng Kafka, cách nhanh nhất, "đỡ lằng nhằng" nhất không phải là tạo API hay Web route, mà là dựng ngay một Artisan Command.
Command này sẽ đóng vai trò như một Test Producer, giúp chúng ta giả lập các sự kiện (events) như tạo đơn hàng, hủy đơn hàng và bắn thẳng lên Kafka broker để xem Consumer bên kia nhận được gì.
Mở terminal lên và gõ cùng mình nhé
1. Chuẩn bị "Vũ khí"
Giả định anh em đã cài đặt Kafka Broker (thông qua Docker) và đã cài package junges/kafka vào project Laravel của mình. Nếu chưa, chỉ mất 1 dòng lệnh:
composer require mateusjunges/laravel-kafka
Đừng quên cấu hình các biến môi trường KAFKA_BROKERS trong file .env nhé.
2. Khởi tạo Artisan Command
Chạy lệnh make command quen thuộc của Laravel:
php artisan make:command TestKafkaProducer
Mở file app/Console/Commands/TestKafkaProducer.php vừa được tạo ra. Chúng ta sẽ khai báo signature để có thể truyền vào order_id và tên topic muốn test một cách linh hoạt.
3. Code logic cho Producer (Hạng nặng)
Ở bài trước, chúng ta đã nhấn mạnh tầm quan trọng của Message Key và Versioning để chống "Out-of-order". Trong code demo này, chúng ta sẽ áp dụng đúng chuẩn mực đó luôn.
Anh em thay thế toàn bộ nội dung class bằng đoạn code sau:
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\Message;
use Throwable;
class TestKafkaProducer extends Command
{
/**
* Tên lệnh và các tham số (argument/options).
* Truyền {order_id} bắt buộc, option --topic mặc định là 'order-events'
*/
protected $signature = 'kafka:test-producer {order_id} {--topic=order-events} {--event=ORDER_CREATED}';
/**
* Mô tả command
*/
protected $description = 'Bắn test message giả lập event của Order lên Kafka Topic';
public function handle()
{
$orderId = $this->argument('order_id');
$topic = $this->option('topic');
$eventName = $this->option('event');
$this->info("🚀 Bắt đầu giả lập Event: [{$eventName}] cho Order ID: {$orderId}");
$this->info("📡 Target Topic: {$topic}");
// 1. Chuẩn bị Payload (Data body)
// Luôn có version và timestamp để Consumer phòng thủ (Idempotent Consumer)
$payload = [
'event_name' => $eventName,
'order_id' => (int) $orderId,
'status' => $eventName === 'ORDER_CREATED' ? 'pending' : 'updated',
'amount' => rand(100000, 500000), // Random số tiền
'version' => time(), // Dùng timestamp làm version đơn giản nhất
'created_at' => now()->toIso8601String()
];
try {
// 2. Build Message chuẩn chỉ
// ĐƯA order_id VÀO KEY ĐỂ ĐẢM BẢO CÁC EVENT CỦA CÙNG 1 ĐƠN HÀNG VÀO CÙNG 1 PARTITION!
$message = new Message(
headers: ['system' => 'laravel-producer-test', 'env' => app()->environment()],
body: $payload,
key: (string) $orderId
);
// 3. Bắn (Publish) lên Kafka
$this->withProgressBar(1, function () use ($topic, $message) {
Kafka::publishOn($topic)
->withMessage($message)
->send();
});
$this->newLine(2);
$this->info("✅ Bắn message thành công!");
// In payload ra màn hình để anh em dễ đối chiếu
$this->line(json_encode($payload, JSON_PRETTY_PRINT));
} catch (Throwable $e) {
// Log lại lỗi nếu không kết nối được Kafka Broker
$this->error("❌ Không thể bắn message. Lỗi: " . $e->getMessage());
return Command::FAILURE;
}
return Command::SUCCESS;
}
}
4. Chạy test thử và xem phép màu
Giờ thì mở Terminal lên, chúng ta sẽ test các kịch bản khác nhau cực kỳ nhanh gọn.
Kịch bản 1: Tạo một đơn hàng mới (ID: 100)
php artisan kafka:test-producer 100
Output:
🚀 Bắt đầu giả lập Event: [ORDER_CREATED] cho Order ID: 100
📡 Target Topic: order-events
1/1 [▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓] 100%
✅ Bắn message thành công!
{
"event_name": "ORDER_CREATED",
"order_id": 100,
"status": "pending",
"amount": 345000,
"version": 1715498800,
"created_at": "2024-05-12T07:26:40+00:00"
}
Kịch bản 2: Khách hàng đổi ý, giả lập event Update cho chính đơn hàng ID 100 đó
php artisan kafka:test-producer 100 --event=ORDER_UPDATED
Vì chúng ta đã set key: (string) $orderId trong code, anh em có thể hoàn toàn yên tâm rằng message thứ 2 này chắc chắn sẽ nối đuôi message thứ 1 rơi vào cùng một Partition. Consumer bên kia sẽ đọc được đúng thứ tự!
5. Kinh nghiệm xương máu khi viết Producer
- Test Connection trước khi Send: Trong môi trường Production, nếu Kafka Broker sập, hàm
send()có thể bị timeout rất lâu làm treo process của bạn. Hãy cấu hìnhrequest.timeout.mstrong file config của Kafka về mức hợp lý (VD: 3000ms - 5000ms). - Đừng log Array trực tiếp vào Body: Nhiều anh em có thói quen truyền cả một object Model (VD:
$user) vào body. Điều này rất rủi ro vì Model có thể chứa các relations ẩn hoặc dữ liệu nhạy cảm (password). Hãy luôn Map data ra mảng (Array) và chỉ gửi những fields thực sự cần thiết (DTO). - Luôn dùng try-catch: Message Queue là hệ thống network qua mạng, tỷ lệ fail luôn có. Hãy catch exception, và nếu fail, bạn nên có cơ chế ghi lại vào Database (bảng
failed_jobschẳng hạn) để có một cronjob khác retry bắn lại sau.
Lời kết
Việc tạo ra một Artisan Command để test Producer như thế này giúp ích cực nhiều trong quá trình phát triển (Development). Bạn không cần phải mở Postman, gọi API loằng ngoằng, chỉ gõ vài phím trên Terminal là có thể sinh ra hàng ngàn event để stress test hệ thống Consumer của mình.
Bài viết sau mình sẽ hướng dẫn anh em viết phần Consumer làm sao để hứng data này và xử lý lưu vào DB mà không lo chết RAM hay lock DB nhé!
Anh em thử chạy code xem có vướng mắc gì ở phần config kết nối không? Cứ quăng lỗi lên phần bình luận mình sẽ giải đáp nhé!
All rights reserved