Xây Dựng Kafka Consumer trong Laravel: Từ JSON, Avro đến Batch Processing và DLQ
Trong hệ thống phân tán, Apache Kafka đóng vai trò quan trọng trong việc xử lý luồng dữ liệu real-time. Bài viết này sẽ hướng dẫn bạn xây dựng Kafka Consumer trong Laravel sử dụng thư viện Junges/Kafka, từ đơn giản (JSON) đến nâng cao (Avro + Batch Processing), kèm cơ chế xử lý lỗi qua Dead Letter Queue (DLQ).
1. Consumer Đơn Giản: Xử Lý JSON
Interface SingleMessageHandler
<?php
namespace App\Console\Commands\KafkaConsumer\Base;
use Junges\Kafka\Message\ConsumedMessage;
interface SingleMessageHandler
{
    public function handleMessage(ConsumedMessage $kafkaMessage): void;
}
Lớp trừu tượng AbstractConsumer
<?php
namespace App\Console\Commands\KafkaConsumer\Base;
use Illuminate\Console\Command;
abstract class AbstractConsumer extends Command
{
    abstract protected function getTopic(): string;
    abstract protected function getGroupId(): string;
    abstract public function getOffset(): string;
    protected function getBrokers(): string
    {
        return config('kafka.brokers');
    }
}
Lớp trừa tượng AbstractJsonConsumer
<?php
namespace App\Console\Commands\KafkaConsumer\Base;
use Carbon\Exceptions\Exception;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\ConsumedMessage;
use Junges\Kafka\Message\Deserializers\JsonDeserializer;
abstract class AbstractJsonConsumer extends AbstractConsumer implements SingleMessageHandler
{
    /**
     * @throws \Exception
     * @throws Exception
     */
    public function handle(): void
    {
        $arrTopic = [$this->getTopic()];
        $consumer = Kafka::createConsumer(
            topics: $arrTopic,
            groupId: $this->getGroupId(),
            brokers: $this->getBrokers()
        )->withOption('auto.offset.reset', $this->getOffset())
            ->usingDeserializer(new JsonDeserializer())
            ->withDlq();
        $handler = $this->handleMessage(...);
        $consumer = $consumer->withHandler(static function (ConsumedMessage $message) use ($handler) {
            $handler($message);
        })->build();
        $consumer->consume();
    }
}
Triển Khai TestJsonConsumer
<?php
namespace App\Console\Commands\KafkaConsumer;
use App\Console\Commands\KafkaConsumer\Base\AbstractJsonConsumer;
use Junges\Kafka\Message\ConsumedMessage;
class TestJsonConsumer extends AbstractJsonConsumer
{
    protected $signature = 'kafka-consumer:test-json';
    protected $description = 'Kafka consumer test json consumer';
    protected function getTopic(): string
    {
        return config('kafka.topics.json');
    }
    protected function getGroupId(): string
    {
        return config('kafka.consumer_group_id.json');
    }
    public function getOffset(): string
    {
        return config('kafka.offset_reset.json');
    }
    public function handleMessage(ConsumedMessage $kafkaMessage): void
    {
        $message = $kafkaMessage->getBody();
        // xử lý logic với message ở đây
    }
}
Chạy Consumer:
bash
php artisan kafka-consumer:test-json
2. Nâng Cấp Lên Avro: Schema Registry và Deserialization
Lớp trừu tượng AbstractAvroConsumer
<?php
namespace App\Console\Commands\KafkaConsumer\Base;
use AvroIOException;
use FlixTech\AvroSerializer\Objects\RecordSerializer;
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use GuzzleHttp\Client;
use Junges\Kafka\Message\Deserializers\AvroDeserializer;
use Junges\Kafka\Message\KafkaAvroSchema;
use Junges\Kafka\Message\Registry\AvroSchemaRegistry;
abstract class AbstractAvroConsumer extends AbstractConsumer
{
    abstract protected function getSchemaName(): string;
    final protected function getBaseUriSerializer()
    {
        return config('kafka.base_uri_serializer');
    }
    /**
     * @throws AvroIOException
     */
    final protected function AVRODeserializer(): AvroDeserializer
    {
        $cachedRegistry = new CachedRegistry(
            new BlockingRegistry(
                new PromisingRegistry(
                    new Client(['base_uri' => $this->getBaseUriSerializer()])
                )
            ),
            new AvroObjectCacheAdapter()
        );
        $registry = new AvroSchemaRegistry($cachedRegistry);
        $recordSerializer = new RecordSerializer($cachedRegistry);
        $registry->addBodySchemaMappingForTopic(
            $this->getTopic(),
            new KafkaAvroSchema($this->getSchemaName())
        );
        return new AvroDeserializer($registry, $recordSerializer);
    }
}
Lớp trừu tượng AbstractSingleAvroConsumer
<?php
namespace App\Console\Commands\KafkaConsumer\Base;
use AvroIOException;
use Carbon\Exceptions\Exception;
use Illuminate\Support\Facades\Log;
use Junges\Kafka\Exceptions\KafkaConsumerException;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\ConsumedMessage;
abstract class AbstractSingleAvroConsumer extends AbstractAvroConsumer implements SingleMessageHandler
{
    /**
     * @throws AvroIOException
     * @throws Exception
     * @throws KafkaConsumerException
     * @throws \RdKafka\Exception
     */
    public function handle(): void
    {
        $arrTopic = [$this->getTopic()];
        $deserializer = $this->AVRODeserializer();
        $consumer = Kafka::createConsumer(
            topics: $arrTopic,
            groupId: $this->getGroupId(),
            brokers: $this->getBrokers()
        )->withOption('auto.offset.reset', $this->getOffset())
            ->usingDeserializer($deserializer)
            ->withDlq();
        $handler = $this->handleMessage(...);
        $consumer = $consumer->withHandler(static function (ConsumedMessage $message) use ($handler) {
            try {
                $handler($message);
            } catch (\Throwable $exception) {
                report($exception);
                Log::channel('kafka-error')->error('Error when consuming message: ' . $exception->getMessage(), [
                    'message' => $message->getBody(),
                    'partition' => $message->getPartition(),
                    'offset' => $message->getOffset(),
                    'error' => $exception
                ]);
                throw $exception;
            }
        })->build();
        $consumer->consume();
    }
}
Triển khai single avro mesage với TestSingleAvroConsumer
<?php
namespace App\Console\Commands\KafkaConsumer;
use App\Console\Commands\KafkaConsumer\Base\AbstractSingleAvroConsumer;
use Junges\Kafka\Message\ConsumedMessage;
class TestSingleAvroConsumer extends AbstractSingleAvroConsumer
{
    protected $signature = 'kafka-consumer:test-avro';
    protected $description = 'Kafka consumer test';
    protected function getTopic(): string
    {
        return config('kafka.topics.test');
    }
    protected function getSchemaName(): string
    {
        return config('kafka.schema_avro.test');
    }
    protected function getGroupId(): string
    {
        return config('kafka.consumer_group_id.test');
    }
    public function getOffset(): string
    {
        return config('kafka.offset_reset.test');
    }
    public function handleMessage(ConsumedMessage $kafkaMessage): void
    {
        //Chú ý là không cần try catch trong hàm này
        $message = $kafkaMessage->getBody();
        //do something with message
        echo 'Title before: ' . $message['before']['title'] . PHP_EOL;
        echo 'Title after: ' . $message['after']['title'] . PHP_EOL . PHP_EOL;
        throw new \Exception('Co loi xay ra');
    }
}
3. Consumer batch mesages
Interface BatchingMessagesHandler
<?php
namespace App\Console\Commands\KafkaConsumer\Base;
use Illuminate\Support\Collection;
interface BatchingMessagesHandler
{
    public function handleMessages(Collection $messages): void;
}
Lớp trừu tượng AbstractBatchAvroConsumer
<?php
namespace App\Console\Commands\KafkaConsumer\Base;
use AvroIOException;
use Carbon\Exceptions\Exception;
use Illuminate\Support\Collection;
use Junges\Kafka\Exceptions\KafkaConsumerException;
use Junges\Kafka\Facades\Kafka;
abstract class AbstractBatchAvroConsumer extends AbstractAvroConsumer implements BatchingMessagesHandler
{
    /**
     * @throws AvroIOException
     * @throws Exception
     * @throws KafkaConsumerException
     * @throws \RdKafka\Exception
     */
    public function handle(): void
    {
        $arrTopic = [$this->getTopic()];
        $deserializer = $this->AVRODeserializer();
        $consumer = Kafka::createConsumer(
            topics: $arrTopic,
            groupId: $this->getGroupId(),
            brokers: $this->getBrokers()
        )->withOption('auto.offset.reset', $this->getOffset())
            ->enableBatching()
            ->withBatchSizeLimit($this->getBatchSizeLimit())
            ->withBatchReleaseInterval($this->getBatchReleaseInterval())
            ->usingDeserializer($deserializer)
            ->withDlq();
        $handler = $this->handleMessages(...);
        $consumer = $consumer->withHandler(static function (Collection $messages) use ($handler) {
            $handler($messages);
        })->build();
        $consumer->consume();
    }
    abstract protected function getBatchSizeLimit(): int;
    abstract protected function getBatchReleaseInterval(): int;
}
Thực hiện consumer batch mesages
<?php
namespace App\Console\Commands\KafkaConsumer;
use App\Console\Commands\KafkaConsumer\Base\AbstractBatchAvroConsumer;
use Illuminate\Support\Collection;
use Junges\Kafka\Message\ConsumedMessage;
class TestBatchAvroConsumer extends AbstractBatchAvroConsumer
{
    protected $signature = 'kafka-consumer:test-batching';
    protected $description = 'Kafka consumer test batching';
    protected function getTopic(): string
    {
        return config('kafka.topics.test');
    }
    protected function getSchemaName(): string
    {
        return config('kafka.schema_avro.test');
    }
    protected function getGroupId(): string
    {
        return config('kafka.consumer_group_id.test');
    }
    public function getOffset(): string
    {
        return config('kafka.offset_reset.test');
    }
    protected function getBatchSizeLimit(): int
    {
        return 100;
    }
    protected function getBatchReleaseInterval(): int
    {
        return 1500; // 1.5s
    }
    public function handleMessages(Collection $messages): void
    {
        /** @var ConsumedMessage $message */
        foreach ($messages as $message) {
            //Logic với từng message ở đây
        }
    }
}
All rights reserved
 
  
 