Laravel Deep Dive: queue system phần 3

Đẩy job vào queue.

Có một vài cách để đưa jobs vào queue:

    Queue::push(new InvoiceEmail($order));

    Bus::dispatch(new InvoiceEmail($order));

    dispatch(new InvoiceEmail($order));

    (new InvoiceEmail($order))->dispatch();

Việc gọi phương thức bằng facade Queue như đã biết sẽ gọi trên queue driver mà ta chọn để sử dụng (cấu hình trong file config/queue).

Ngoài ra còn một vài phương thức hữu dụng khác là:

    // Đẩy job vào một queue cụ thể.
    Queue::pushOn('emails', new InvoiceEmail($order));

    // Đẩy job vào queue sau một khoảng thời gian(đơn vị là giây).
    Queue::later(60, new InvoiceEmail($order));

    // Đẩy job vào một queue cụ thể sau một khoảng thời gian(đơn vị là giây).
    Queue::laterOn('emails', 60, new InvoiceEmail($order));

    // Đẩy nhiều job vào queue
    Queue::bulk([
        new InvoiceEmail($order),
        new ThankYouEmail($order)
    ]);

    // Đẩy nhiều job vào một queue cụ thể.
    Queue::bulk([
        new InvoiceEmail($order),
        new ThankYouEmail($order)
    ], null, 'emails');

Sau khi bất kỳ phương thức nào ở trên được gọi thì queue driver sẽ lưu các thông tin cần thiết vào để thực hiện việc lấy và chạy các job trong queue.

Sử dụng các câu lệnh bus.

Việc gửi job vào queue bằng việc sử dụng câu lệnh bus cho chúng ta thêm nhiều các để điều khiển thao tác này hơn. Ta có thể thiết lập các connection, queue, delay time được chọn từ class job của chúng ta, quyết định nếu job nên được queue hay sẽ chạy ngay lập tức. Nói chung sử dung bus ta có thể xử lý toàn bộ quá trình queue bằng class job.

Bus facade đại diện cho alias của Contracts\Bus\Dispatcher container. Alias này là của một đối tượng của class Bus\Dispatcher bên trong class Bus\BusServiceProvider:

    $this->app->singleton(Dispatcher::class, function ($app) {
        return new Dispatcher($app, function ($connection = null) use ($app) {
            return $app[QueueFactoryContract::class]->connection($connection);
        });
    });

Như vậy gọi Bus::dispatch() là gọi phương thức dispatch() của class Bus\Dispatcher:

    public function dispatch($command)
    {
        if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
            return $this->dispatchToQueue($command);
        } else {
            return $this->dispatchNow($command);
        }
    }

Phương thức trên quyết định job có nên được gửi vào queue hoặc chạy ngay hay không. Phương thức commandShouldBeQueued() kiểm tra nếu class job có là một đối tượng của Contracts\Queue\ShouldQueue, như vậy job chỉ được queue khi ta cài đặt interface ShouldQueue cho job. Dưới đây là cách mà Bus gửi job vào queue:

    public function dispatchToQueue($command)
    {
        $connection = isset($command->connection) ? $command->connection : null;

        $queue = call_user_func($this->queueResolver, $connection);

        if (! $queue instanceof Queue) {
            throw new RuntimeException('Queue resolver did not return a Queue implementation.');
        }

        if (method_exists($command, 'queue')) {
            return $command->queue($queue, $command);
        } else {
            return $this->pushCommandToQueue($queue, $command);
        }
    }

Đầu tiên Laravel kiểm tra xem connection có được định nghĩa trong class job hay không bằng việc sử dụng thuộc tính này ta có thế thiết lập connection nào sẽ được laravel sử dụng để gửi job vào queue, nếu không Laravel sẽ sử dụng connection mặc định. Sử dụng giá trị của thuộc tính connection Laravel sử dụng closure queueResolver để tạo một đối tượng của queue driver cần được dùng. closure này được thiết lập bên trong Bus\BusServiceProvider trong khi đăng ký đối tượng Dispatcher:

    function ($connection = null) use ($app) {
        return $app[Contracts\Queue\Factory::class]->connection($connection);
    }

Contracts\Queue\Factory là alias cho Queue\Manager, vì vậy closure ở trên sẽ trả về một đối tượng của QueueManager và thiết lập connection cho manager để biết driver nào sử dụng.

Cuối cùng dispatchToQueue check nếu job class có phương thức queue hay không, như vậy nếu ta cài đặt phương thức queue này trong class job ta có toàn quyền điều khiển quá trình queue như: cách job được queue, thiết lập một số thông tin như delay, timeout ... Trong trường hợp không có phương thức này, phương thức push() sẽ được gọi trên queue driver được chọn.

    protected function pushCommandToQueue($queue, $command)
    {
        if (isset($command->queue, $command->delay)) {
            return $queue->laterOn($command->queue, $command->delay, $command);
        }

        if (isset($command->queue)) {
            return $queue->pushOn($command->queue, $command);
        }

        if (isset($command->delay)) {
            return $queue->later($command->delay, $command);
        }

        return $queue->push($command);
    }

Phương thức trên kiểm tra các thuộc tính queue và delay trong class job, nếu chúng được tạo trong job class chúng sẽ được sử dụng. Như vậy ta có thế thiết lập các thông số đó trong class job:

    class SendInvoiceEmail{
        public $connection = 'default';

        public $queue = 'emails';

        public $delay = 60;

        public $tries = 3;

        public $timeout = 20;
    }

Thiết lập cấu hình job trong khi đẩy job vào queue.

Sử dụng helper dispatch() cho phép chúng ta có thể đẩy job vào queue như sau:

    dispatch(new InvoiceEmail($order))
            ->onConnection('default')
            ->onQueue('emails')
            ->delay(60);

Khi tạo class job, ta có thể sử dụng trait Bus\Queueable, trait này có một vài phương thức ta có thể sử dụng để thiết lập các thuộc tính của job trước khi gửi nó vào queue:

    public function onQueue($queue)
    {
        $this->queue = $queue;

        return $this;
    }

Phương thức dispatch() được định nghĩa như sau trong Foundation/helpers.php:

    function dispatch($job)
    {
        return new PendingDispatch($job);
    }

Nó trả về môt đối tượng của class PendingDispatch, bên trong class này có định nghĩa phương thức onQueue() ở trên vì vậy cho phép ta gọi ngay bên trong phương thức dispatch().

Sử dụng dispatchable trait

Ta có thể sử dụng Bus\Dispatchable trait ở class job để có thể gửi job bằng chính đối tượng của job:

(new InvoiceEmail($order))->dispatch();

Phương thức dispatch() của Dispatchable trait:

public static function dispatch()
    {
        return new PendingDispatch(new static(...func_get_args()));
    }

Nó sử dụng đối tượng của PendingDispatch class nên ta có thể sử dụng nhiều phương thức khác như sau:

    (new InvoiceEmail($order))->dispatch()->onQueue('emails');

Tài Liệu Tham Khảo.

  1. https://divinglaravel.com/queue-system/pushing-jobs-to-queue

All Rights Reserved