Xây Dựng Kafka Consumer trong Laravel: Từ JSON, Avro đến Batch Processing và DLQ
Trong hệ thống phân tán, Apache Kafka đóng vai trò quan trọng trong việc xử lý luồng dữ liệu real-time. Bài viết này sẽ hướng dẫn bạn xây dựng Kafka Consumer trong Laravel sử dụng thư viện Junges/Kafka, từ đơn giản (JSON) đến nâng cao (Avro + Batch Processing), kèm cơ chế xử lý lỗi qua Dead Letter Queue (DLQ).
1. Consumer Đơn Giản: Xử Lý JSON
Interface SingleMessageHandler
<?php
namespace App\Console\Commands\KafkaConsumer\Base;
use Junges\Kafka\Message\ConsumedMessage;
interface SingleMessageHandler
{
public function handleMessage(ConsumedMessage $kafkaMessage): void;
}
Lớp trừu tượng AbstractConsumer
<?php
namespace App\Console\Commands\KafkaConsumer\Base;
use Illuminate\Console\Command;
abstract class AbstractConsumer extends Command
{
abstract protected function getTopic(): string;
abstract protected function getGroupId(): string;
abstract public function getOffset(): string;
protected function getBrokers(): string
{
return config('kafka.brokers');
}
}
Lớp trừa tượng AbstractJsonConsumer
<?php
namespace App\Console\Commands\KafkaConsumer\Base;
use Carbon\Exceptions\Exception;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\ConsumedMessage;
use Junges\Kafka\Message\Deserializers\JsonDeserializer;
abstract class AbstractJsonConsumer extends AbstractConsumer implements SingleMessageHandler
{
/**
* @throws \Exception
* @throws Exception
*/
public function handle(): void
{
$arrTopic = [$this->getTopic()];
$consumer = Kafka::createConsumer(
topics: $arrTopic,
groupId: $this->getGroupId(),
brokers: $this->getBrokers()
)->withOption('auto.offset.reset', $this->getOffset())
->usingDeserializer(new JsonDeserializer())
->withDlq();
$handler = $this->handleMessage(...);
$consumer = $consumer->withHandler(static function (ConsumedMessage $message) use ($handler) {
$handler($message);
})->build();
$consumer->consume();
}
}
Triển Khai TestJsonConsumer
<?php
namespace App\Console\Commands\KafkaConsumer;
use App\Console\Commands\KafkaConsumer\Base\AbstractJsonConsumer;
use Junges\Kafka\Message\ConsumedMessage;
class TestJsonConsumer extends AbstractJsonConsumer
{
protected $signature = 'kafka-consumer:test-json';
protected $description = 'Kafka consumer test json consumer';
protected function getTopic(): string
{
return config('kafka.topics.json');
}
protected function getGroupId(): string
{
return config('kafka.consumer_group_id.json');
}
public function getOffset(): string
{
return config('kafka.offset_reset.json');
}
public function handleMessage(ConsumedMessage $kafkaMessage): void
{
$message = $kafkaMessage->getBody();
// xử lý logic với message ở đây
}
}
Chạy Consumer:
bash
php artisan kafka-consumer:test-json
2. Nâng Cấp Lên Avro: Schema Registry và Deserialization
Lớp trừu tượng AbstractAvroConsumer
<?php
namespace App\Console\Commands\KafkaConsumer\Base;
use AvroIOException;
use FlixTech\AvroSerializer\Objects\RecordSerializer;
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use GuzzleHttp\Client;
use Junges\Kafka\Message\Deserializers\AvroDeserializer;
use Junges\Kafka\Message\KafkaAvroSchema;
use Junges\Kafka\Message\Registry\AvroSchemaRegistry;
abstract class AbstractAvroConsumer extends AbstractConsumer
{
abstract protected function getSchemaName(): string;
final protected function getBaseUriSerializer()
{
return config('kafka.base_uri_serializer');
}
/**
* @throws AvroIOException
*/
final protected function AVRODeserializer(): AvroDeserializer
{
$cachedRegistry = new CachedRegistry(
new BlockingRegistry(
new PromisingRegistry(
new Client(['base_uri' => $this->getBaseUriSerializer()])
)
),
new AvroObjectCacheAdapter()
);
$registry = new AvroSchemaRegistry($cachedRegistry);
$recordSerializer = new RecordSerializer($cachedRegistry);
$registry->addBodySchemaMappingForTopic(
$this->getTopic(),
new KafkaAvroSchema($this->getSchemaName())
);
return new AvroDeserializer($registry, $recordSerializer);
}
}
Lớp trừu tượng AbstractSingleAvroConsumer
<?php
namespace App\Console\Commands\KafkaConsumer\Base;
use AvroIOException;
use Carbon\Exceptions\Exception;
use Illuminate\Support\Facades\Log;
use Junges\Kafka\Exceptions\KafkaConsumerException;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\ConsumedMessage;
abstract class AbstractSingleAvroConsumer extends AbstractAvroConsumer implements SingleMessageHandler
{
/**
* @throws AvroIOException
* @throws Exception
* @throws KafkaConsumerException
* @throws \RdKafka\Exception
*/
public function handle(): void
{
$arrTopic = [$this->getTopic()];
$deserializer = $this->AVRODeserializer();
$consumer = Kafka::createConsumer(
topics: $arrTopic,
groupId: $this->getGroupId(),
brokers: $this->getBrokers()
)->withOption('auto.offset.reset', $this->getOffset())
->usingDeserializer($deserializer)
->withDlq();
$handler = $this->handleMessage(...);
$consumer = $consumer->withHandler(static function (ConsumedMessage $message) use ($handler) {
try {
$handler($message);
} catch (\Throwable $exception) {
report($exception);
Log::channel('kafka-error')->error('Error when consuming message: ' . $exception->getMessage(), [
'message' => $message->getBody(),
'partition' => $message->getPartition(),
'offset' => $message->getOffset(),
'error' => $exception
]);
throw $exception;
}
})->build();
$consumer->consume();
}
}
Triển khai single avro mesage với TestSingleAvroConsumer
<?php
namespace App\Console\Commands\KafkaConsumer;
use App\Console\Commands\KafkaConsumer\Base\AbstractSingleAvroConsumer;
use Junges\Kafka\Message\ConsumedMessage;
class TestSingleAvroConsumer extends AbstractSingleAvroConsumer
{
protected $signature = 'kafka-consumer:test-avro';
protected $description = 'Kafka consumer test';
protected function getTopic(): string
{
return config('kafka.topics.test');
}
protected function getSchemaName(): string
{
return config('kafka.schema_avro.test');
}
protected function getGroupId(): string
{
return config('kafka.consumer_group_id.test');
}
public function getOffset(): string
{
return config('kafka.offset_reset.test');
}
public function handleMessage(ConsumedMessage $kafkaMessage): void
{
//Chú ý là không cần try catch trong hàm này
$message = $kafkaMessage->getBody();
//do something with message
echo 'Title before: ' . $message['before']['title'] . PHP_EOL;
echo 'Title after: ' . $message['after']['title'] . PHP_EOL . PHP_EOL;
throw new \Exception('Co loi xay ra');
}
}
3. Consumer batch mesages
Interface BatchingMessagesHandler
<?php
namespace App\Console\Commands\KafkaConsumer\Base;
use Illuminate\Support\Collection;
interface BatchingMessagesHandler
{
public function handleMessages(Collection $messages): void;
}
Lớp trừu tượng AbstractBatchAvroConsumer
<?php
namespace App\Console\Commands\KafkaConsumer\Base;
use AvroIOException;
use Carbon\Exceptions\Exception;
use Illuminate\Support\Collection;
use Junges\Kafka\Exceptions\KafkaConsumerException;
use Junges\Kafka\Facades\Kafka;
abstract class AbstractBatchAvroConsumer extends AbstractAvroConsumer implements BatchingMessagesHandler
{
/**
* @throws AvroIOException
* @throws Exception
* @throws KafkaConsumerException
* @throws \RdKafka\Exception
*/
public function handle(): void
{
$arrTopic = [$this->getTopic()];
$deserializer = $this->AVRODeserializer();
$consumer = Kafka::createConsumer(
topics: $arrTopic,
groupId: $this->getGroupId(),
brokers: $this->getBrokers()
)->withOption('auto.offset.reset', $this->getOffset())
->enableBatching()
->withBatchSizeLimit($this->getBatchSizeLimit())
->withBatchReleaseInterval($this->getBatchReleaseInterval())
->usingDeserializer($deserializer)
->withDlq();
$handler = $this->handleMessages(...);
$consumer = $consumer->withHandler(static function (Collection $messages) use ($handler) {
$handler($messages);
})->build();
$consumer->consume();
}
abstract protected function getBatchSizeLimit(): int;
abstract protected function getBatchReleaseInterval(): int;
}
Thực hiện consumer batch mesages
<?php
namespace App\Console\Commands\KafkaConsumer;
use App\Console\Commands\KafkaConsumer\Base\AbstractBatchAvroConsumer;
use Illuminate\Support\Collection;
use Junges\Kafka\Message\ConsumedMessage;
class TestBatchAvroConsumer extends AbstractBatchAvroConsumer
{
protected $signature = 'kafka-consumer:test-batching';
protected $description = 'Kafka consumer test batching';
protected function getTopic(): string
{
return config('kafka.topics.test');
}
protected function getSchemaName(): string
{
return config('kafka.schema_avro.test');
}
protected function getGroupId(): string
{
return config('kafka.consumer_group_id.test');
}
public function getOffset(): string
{
return config('kafka.offset_reset.test');
}
protected function getBatchSizeLimit(): int
{
return 100;
}
protected function getBatchReleaseInterval(): int
{
return 1500; // 1.5s
}
public function handleMessages(Collection $messages): void
{
/** @var ConsumedMessage $message */
foreach ($messages as $message) {
//Logic với từng message ở đây
}
}
}
All rights reserved