Laravel Deep Dive: queue system phần 1

Lời Nói Đầu

Trong khoa học máy tính khi gặp phải các bài toán mà các công việc, tác vụ có mức độ ưu tiên như nhau, trong đó cần có sự sắp xếp và thực hiện một cách tuần tự theo thứ tự đến trước và đến sau của nó, người ta thường áp dụng hệ thống queue vào. Trong xứ lý với ưng dụng web cũng vậy, nhiều khi với một luồng xử lý cơ bản nhưng một tác vụ nào đó mất quá nhiều thời gian để thực hiện, mà output của nó lại không thực sự cần thiết cho bước tiếp theo ta có thể áp dụng hệ thống queue vào để làm tăng hiệu quả cho ứng dụng. Ở bài này mình sẽ giới thiệu về hệ thông queue của laravel, cách nó tạo các connection, kết nối driver và tạo queue cho các tác vụ.

Cách laravel quản lý và tạo các connection cho queue system

Nhận request từ clients, thực hiện các tác vụ liên quan tới request và trả về response cho user là luồng làm việc đồng bộ, bình thường ở các web server hiện nay. Nhưng thỉnh thoảng chúng ta cần làm một vài việc để không làm ảnh hưởng tới luồng làm việc này về thời gian lẫn hiệu năng. Ví dụ ta cần gửi e mail thông báo đăng ký thành công cho user khi họ đăng ký ở website, tuy nhiên ta không muốn user chờ đợi cho đến khi server nhận được request, tạo email theo thông tin người dùng và gửi đến email cho họ, thay vào đó ta muốn chuyển họ đến trang đăng nhập, còn việc gửi email kia sẽ được thực hiện ngầm. Lúc này hệ thống queue sẽ thể hiện lợi ích của nó.

Ở Laravel có mang đến cho ta sẵn hệ thống queue giúp ta chạy ngầm các task và cấu hình để xử lý các tình huống khác nhau.

Ta sẽ quản lý hệ thống queue của laravel thông qua file config/queue.php . Mặc định laravel mang đến cho ta một vài connection , mỗi connection sử dụng các queue driver khác nhau. Dưới đây là file config/queue.php

<?php

return [
    // Thiết lập default connection của queue system của Laravel
    'default' => env('QUEUE_DRIVER', 'sync'),
    // Đây là các connection mà laravel hỗ trợ
    'connections' => [

        'sync' => [
            'driver' => 'sync',
        ],

        'database' => [
            'driver' => 'database',
            'table' => 'jobs',
            'queue' => 'default',
            'retry_after' => 90,
        ],

        'beanstalkd' => [
            'driver' => 'beanstalkd',
            'host' => 'localhost',
            'queue' => 'default',
            'retry_after' => 90,
        ],

        'sqs' => [
            'driver' => 'sqs',
            'key' => 'your-public-key',
            'secret' => 'your-secret-key',
            'prefix' => 'https://sqs.us-east-1.amazonaws.com/your-account-id',
            'queue' => 'your-queue-name',
            'region' => 'us-east-1',
        ],

        'redis' => [
            'driver' => 'redis',
            'connection' => 'default',
            'queue' => 'default',
            'retry_after' => 90,
        ],

    ],
    //DB tracking jobs failed mặc định của laravel
    'failed' => [
        'database' => env('DB_CONNECTION', 'mysql'),
        'table' => 'failed_jobs',
    ],

];

Trong một project ta có thể sử dụng các connection cũng như các driver queue khác nhau. Dưới đây là một ví dụ về sử dụng queue:

Queue::push(new SendInvoice($order));

return redirect('thank-you');

Ta thực hiện việc đưa task gửi email cho user vào queue sau đó trả về response để redirect đến action thank-you. Ở trên Queue facade thay mặt cho queue container alias để thực hiện việc queue này. Alias queue được đăng ký trong file Queue\QueueServiceProvider

protected function registerManager()
{
    $this->app->singleton('queue', function ($app) {
        return tap(new QueueManager($app), function ($manager) {
            $this->registerConnectors($manager);
        });
    });
}

Như vậy Queue facade đại diện cho Queue\QueueManager class được đăng ký như một singleton trong container. Ta cũng đăng ký các connection cho các driver khác nhau thông qua hàm registerConnectors():

public function registerConnectors($manager)
{
    foreach (['Null', 'Sync', 'Database', 'Redis', 'Beanstalkd', 'Sqs'] as $connector) {
        $this->{"register{$connector}Connector"}($manager);
    }
}    

Phương thức này sẽ gọi phương thức register{DriverName}Connector. Ví dụ ta đăng ký Redis connector.

protected function registerRedisConnector($manager)
{
    $manager->addConnector('redis', function () {
        return new RedisConnector($this->app['redis']);
    });
}

Một connector là một class có chứa phương thức connect() để tạo một đối tượng của driver mong muốn. Driver của redis connector được tạo trong file Queue\Connectors\RedisConnector

public function connect(array $config)
{
    return new RedisQueue(
        $this->redis, $config['queue'],
        Arr::get($config, 'connection', $this->connection),
        Arr::get($config, 'retry_after', 60)
    );
}

Kể từ lúc này QueueManager được đăng ký vào container và nó biết cách làm thể nào để connect với các driver queue có sẵn trong file config. Việc thực hiện connect này được thực hiện bởi magic method của QueueManager

public function __call($method, $parameters)
{
    return $this->connection()->$method(...$parameters);
}

Vậy là khi ta gọi Queue::push(), queue manager sẽ chọn ra queue driver mà ta mong muốn sử dụng (đã được config) và connect với nó và gọi phương thức push. Cùng nhìn vào phương thức connection của class QueueManager:

public function connection($name = null)
{
    $name = $name ?: $this->getDefaultDriver();

    if (! isset($this->connections[$name])) {
        $this->connections[$name] = $this->resolve($name);

        $this->connections[$name]->setContainer($this->app);
    }

    return $this->connections[$name];
}

Khi không có tên connection nào được chỉ ra thì laravel sẽ lấy connection mặc định cửa connection trong file config/queue.php

public function getDefaultDriver()
{
    return $this->app['config']['queue.default'];
}

Khi queue driver được chỉ rõ queue manager sẽ kiểm tra driver này đã được load trước đó hay chưa, nếu chưa thì sẽ bắt đầu connect và load driver đó sử dụng method resolve():

protected function resolve($name)
{
    $config = $this->getConfig($name);

    return $this->getConnector($config['driver'])
                ->connect($config)
                ->setConnectionName($name);
}

Ban đầu nó load cấu hình của connection từ file config/queue.php sau đó xác định vị trí của connector để lấy được driver, gọi phương thứ connect() và thiết lập cho nó một cái tên để sử dụng.

Như vậy mỗi khi ta gọi phương thức Queue::push() là ta đang gọi phương thức push trên một queue driver được chỉ định để sử dụng. Tuy các driver này hoạt động khác nhau, tuy nhiên Laravel cung cấp một giao diện đồng nhất để ta làm việc với bất kỳ driver nào.

Nếu ta muốn sử dụng một driver khác mà không muốn sử dụng các driver có sẵn ta có thể sử dụng phương thức Queue::addConnector() với tên của driver mà ta mong muốn cùng với đó là một closure giải thích cách kết nối với driver đó. Và để sử dụng driver đó ta gọi

Queue::connection('my-connection')->push(...);

Tài liệu tham khảo

  1. https://divinglaravel.com/queue-system/before-the-dive

All Rights Reserved