+2

Xây Dựng Kafka Consumer trong Laravel: Từ JSON, Avro đến Batch Processing và DLQ

Trong hệ thống phân tán, Apache Kafka đóng vai trò quan trọng trong việc xử lý luồng dữ liệu real-time. Bài viết này sẽ hướng dẫn bạn xây dựng Kafka Consumer trong Laravel sử dụng thư viện Junges/Kafka, từ đơn giản (JSON) đến nâng cao (Avro + Batch Processing), kèm cơ chế xử lý lỗi qua Dead Letter Queue (DLQ).


1. Consumer Đơn Giản: Xử Lý JSON

Interface SingleMessageHandler

<?php

namespace App\Console\Commands\KafkaConsumer\Base;

use Junges\Kafka\Message\ConsumedMessage;

interface SingleMessageHandler
{
    public function handleMessage(ConsumedMessage $kafkaMessage): void;
}

Lớp trừu tượng AbstractConsumer

<?php

namespace App\Console\Commands\KafkaConsumer\Base;

use Illuminate\Console\Command;

abstract class AbstractConsumer extends Command
{
    abstract protected function getTopic(): string;

    abstract protected function getGroupId(): string;

    abstract public function getOffset(): string;

    protected function getBrokers(): string
    {
        return config('kafka.brokers');
    }
}

Lớp trừa tượng AbstractJsonConsumer

<?php

namespace App\Console\Commands\KafkaConsumer\Base;

use Carbon\Exceptions\Exception;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\ConsumedMessage;
use Junges\Kafka\Message\Deserializers\JsonDeserializer;

abstract class AbstractJsonConsumer extends AbstractConsumer implements SingleMessageHandler
{
    /**
     * @throws \Exception
     * @throws Exception
     */
    public function handle(): void
    {
        $arrTopic = [$this->getTopic()];

        $consumer = Kafka::createConsumer(
            topics: $arrTopic,
            groupId: $this->getGroupId(),
            brokers: $this->getBrokers()
        )->withOption('auto.offset.reset', $this->getOffset())
            ->usingDeserializer(new JsonDeserializer())
            ->withDlq();

        $handler = $this->handleMessage(...);
        $consumer = $consumer->withHandler(static function (ConsumedMessage $message) use ($handler) {
            $handler($message);
        })->build();

        $consumer->consume();
    }
}

Triển Khai TestJsonConsumer

<?php

namespace App\Console\Commands\KafkaConsumer;

use App\Console\Commands\KafkaConsumer\Base\AbstractJsonConsumer;
use Junges\Kafka\Message\ConsumedMessage;

class TestJsonConsumer extends AbstractJsonConsumer
{
    protected $signature = 'kafka-consumer:test-json';
    protected $description = 'Kafka consumer test json consumer';

    protected function getTopic(): string
    {
        return config('kafka.topics.json');
    }

    protected function getGroupId(): string
    {
        return config('kafka.consumer_group_id.json');
    }

    public function getOffset(): string
    {
        return config('kafka.offset_reset.json');
    }

    public function handleMessage(ConsumedMessage $kafkaMessage): void
    {
        $message = $kafkaMessage->getBody();
        // xử lý logic với message ở đây
    }
}

Chạy Consumer:

bash
php artisan kafka-consumer:test-json

2. Nâng Cấp Lên Avro: Schema Registry và Deserialization

Lớp trừu tượng AbstractAvroConsumer

<?php

namespace App\Console\Commands\KafkaConsumer\Base;

use AvroIOException;
use FlixTech\AvroSerializer\Objects\RecordSerializer;
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use GuzzleHttp\Client;
use Junges\Kafka\Message\Deserializers\AvroDeserializer;
use Junges\Kafka\Message\KafkaAvroSchema;
use Junges\Kafka\Message\Registry\AvroSchemaRegistry;

abstract class AbstractAvroConsumer extends AbstractConsumer
{
    abstract protected function getSchemaName(): string;

    final protected function getBaseUriSerializer()
    {
        return config('kafka.base_uri_serializer');
    }

    /**
     * @throws AvroIOException
     */
    final protected function AVRODeserializer(): AvroDeserializer
    {
        $cachedRegistry = new CachedRegistry(
            new BlockingRegistry(
                new PromisingRegistry(
                    new Client(['base_uri' => $this->getBaseUriSerializer()])
                )
            ),
            new AvroObjectCacheAdapter()
        );

        $registry = new AvroSchemaRegistry($cachedRegistry);
        $recordSerializer = new RecordSerializer($cachedRegistry);

        $registry->addBodySchemaMappingForTopic(
            $this->getTopic(),
            new KafkaAvroSchema($this->getSchemaName())
        );

        return new AvroDeserializer($registry, $recordSerializer);
    }
}

Lớp trừu tượng AbstractSingleAvroConsumer

<?php

namespace App\Console\Commands\KafkaConsumer\Base;

use AvroIOException;
use Carbon\Exceptions\Exception;
use Illuminate\Support\Facades\Log;
use Junges\Kafka\Exceptions\KafkaConsumerException;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\ConsumedMessage;

abstract class AbstractSingleAvroConsumer extends AbstractAvroConsumer implements SingleMessageHandler
{
    /**
     * @throws AvroIOException
     * @throws Exception
     * @throws KafkaConsumerException
     * @throws \RdKafka\Exception
     */
    public function handle(): void
    {
        $arrTopic = [$this->getTopic()];
        $deserializer = $this->AVRODeserializer();

        $consumer = Kafka::createConsumer(
            topics: $arrTopic,
            groupId: $this->getGroupId(),
            brokers: $this->getBrokers()
        )->withOption('auto.offset.reset', $this->getOffset())
            ->usingDeserializer($deserializer)
            ->withDlq();

        $handler = $this->handleMessage(...);
        $consumer = $consumer->withHandler(static function (ConsumedMessage $message) use ($handler) {
            try {
                $handler($message);
            } catch (\Throwable $exception) {
                report($exception);
                Log::channel('kafka-error')->error('Error when consuming message: ' . $exception->getMessage(), [
                    'message' => $message->getBody(),
                    'partition' => $message->getPartition(),
                    'offset' => $message->getOffset(),
                    'error' => $exception
                ]);

                throw $exception;
            }
        })->build();

        $consumer->consume();
    }
}

Triển khai single avro mesage với TestSingleAvroConsumer

<?php

namespace App\Console\Commands\KafkaConsumer;

use App\Console\Commands\KafkaConsumer\Base\AbstractSingleAvroConsumer;
use Junges\Kafka\Message\ConsumedMessage;

class TestSingleAvroConsumer extends AbstractSingleAvroConsumer
{
    protected $signature = 'kafka-consumer:test-avro';
    protected $description = 'Kafka consumer test';

    protected function getTopic(): string
    {
        return config('kafka.topics.test');
    }

    protected function getSchemaName(): string
    {
        return config('kafka.schema_avro.test');
    }

    protected function getGroupId(): string
    {
        return config('kafka.consumer_group_id.test');
    }

    public function getOffset(): string
    {
        return config('kafka.offset_reset.test');
    }

    public function handleMessage(ConsumedMessage $kafkaMessage): void
    {
        //Chú ý là không cần try catch trong hàm này
        $message = $kafkaMessage->getBody();

        //do something with message
        echo 'Title before: ' . $message['before']['title'] . PHP_EOL;
        echo 'Title after: ' . $message['after']['title'] . PHP_EOL . PHP_EOL;

        throw new \Exception('Co loi xay ra');
    }
}

3. Consumer batch mesages

Interface BatchingMessagesHandler

<?php

namespace App\Console\Commands\KafkaConsumer\Base;

use Illuminate\Support\Collection;

interface BatchingMessagesHandler
{
    public function handleMessages(Collection $messages): void;
}

Lớp trừu tượng AbstractBatchAvroConsumer

<?php

namespace App\Console\Commands\KafkaConsumer\Base;

use AvroIOException;
use Carbon\Exceptions\Exception;
use Illuminate\Support\Collection;
use Junges\Kafka\Exceptions\KafkaConsumerException;
use Junges\Kafka\Facades\Kafka;

abstract class AbstractBatchAvroConsumer extends AbstractAvroConsumer implements BatchingMessagesHandler
{
    /**
     * @throws AvroIOException
     * @throws Exception
     * @throws KafkaConsumerException
     * @throws \RdKafka\Exception
     */
    public function handle(): void
    {
        $arrTopic = [$this->getTopic()];
        $deserializer = $this->AVRODeserializer();

        $consumer = Kafka::createConsumer(
            topics: $arrTopic,
            groupId: $this->getGroupId(),
            brokers: $this->getBrokers()
        )->withOption('auto.offset.reset', $this->getOffset())
            ->enableBatching()
            ->withBatchSizeLimit($this->getBatchSizeLimit())
            ->withBatchReleaseInterval($this->getBatchReleaseInterval())
            ->usingDeserializer($deserializer)
            ->withDlq();

        $handler = $this->handleMessages(...);
        $consumer = $consumer->withHandler(static function (Collection $messages) use ($handler) {
            $handler($messages);
        })->build();

        $consumer->consume();
    }

    abstract protected function getBatchSizeLimit(): int;
    abstract protected function getBatchReleaseInterval(): int;
}

Thực hiện consumer batch mesages

<?php

namespace App\Console\Commands\KafkaConsumer;

use App\Console\Commands\KafkaConsumer\Base\AbstractBatchAvroConsumer;
use Illuminate\Support\Collection;
use Junges\Kafka\Message\ConsumedMessage;

class TestBatchAvroConsumer extends AbstractBatchAvroConsumer
{
    protected $signature = 'kafka-consumer:test-batching';
    protected $description = 'Kafka consumer test batching';

    protected function getTopic(): string
    {
        return config('kafka.topics.test');
    }

    protected function getSchemaName(): string
    {
        return config('kafka.schema_avro.test');
    }

    protected function getGroupId(): string
    {
        return config('kafka.consumer_group_id.test');
    }

    public function getOffset(): string
    {
        return config('kafka.offset_reset.test');
    }

    protected function getBatchSizeLimit(): int
    {
        return 100;
    }

    protected function getBatchReleaseInterval(): int
    {
        return 1500; // 1.5s
    }

    public function handleMessages(Collection $messages): void
    {
        /** @var ConsumedMessage $message */
        foreach ($messages as $message) {
            //Logic với từng message ở đây
        }
    }
}

All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí