Laravel deep dive: queue system phần 4

Cách Laravel chạy các job được đẩy vào queue.

Để thực hiện việc chạy job từ queue Laravel có một tiến trình PHP chạy ngầm có vai trò lấy các jobs được lưu trữ và chạy chúng với các thông tin đã được cấu hình trước đó.

    php artisan queue:work

Với việc chạy câu lệnh này sẽ hướng dẫn Laravel tạo một đối tượng của ứng dụng (cụ thể ở đây là website) và bắt đầu thực hiện chạy các jobs. Đối tượng này sẽ tồn tại vô hạn có nghĩa là việc tạo đối tượng ứng dụng của Laravel chỉ diễn ra một lần và khi câu lệnh trên được chạy thì đối tượng đã được tạo trước đó sẽ được sử dụng để chạy các jobs. Cụ thể là:

  • Ta sẽ tiết kiệm được các tài nguyên của server bằng việc tránh khởi động lại toàn bộ ứng dụng mỗi khi chạy jobs.
  • Mỗi lần thay đổi xử lý của code trong ứng dụng, ta cần khởi động lại tiến trình trên để cập nhật lại code.

Ngoài ra ta có thể chạy câu lệnh

    php artisan queue:work --once

Câu lệnh này sẽ tạo một đối tượng của ứng dụng, thực hiện chạy một job rồi sẽ tự hủy.

Câu lệnh queue:listen sẽ chạy queue:work --once trong một vòng lặp vô hạn, vì vậy:

  • Một đối tượng của ứng dụng sẽ được khởi động trong mỗi vòng lặp.
  • Tiến trình sẽ lấy một job và thực hiện nó.
  • Sau đó tiến trình sẽ bị hủy.

Sử dụng queue:listen đảm bảo rằng một đối tượng mới của ứng sẽ được tạo cho mỗi job, điều này đồng nghĩa đối tượng ứng dụng luôn được cập nhật code mới nhất do đó ta không cần khởi động bằng tay tiến trình chạy ngầm khi code của ứng dụng thay đổi, tuy nhiên mặt trái của nó là sẽ ngốn nhiền tài nguyên hơn.

Đi sâu vào từng câu lệnh

queue:work

Khi chạy câu lệnh này phương thức handle() của class Queue\Console\WorkCommand sẽ được gọi:

    public function handle()
    {
        if ($this->downForMaintenance() && $this->option('once')) {
            return $this->worker->sleep($this->option('sleep'));
        }

        $this->listenForEvents();

        $connection = $this->argument('connection')
                        ?: $this->laravel['config']['queue.default'];

        $queue = $this->getQueue($connection);

        $this->runWorker(
            $connection, $queue
        );
    }

Đầu tiên nó sẽ kiểm tra rằng ứng dụng có ở chế độ bảo trì và có sử dụng tùy chọn --once không, nếu có ta sẽ không thực hiện việc chạy job và sẽ yêu cầu tiến trình sleep trong một khoảng thời gian trước khi hủy. Phương thức sleep của Queue\Worker:

    public function sleep($seconds)
    {
        sleep($seconds);
    }

Ở đây ta sẽ cho tiến trình sleep trong một khoảng thời gian thay vì trả về null là do với câu lệnh queue:listen thì việc khi vòng lặp kết thúc và việc một vòng lặp mới bắt đầu sẽ diến ra trong thời gian quá ngắn (khi ứng dụng đang bảo trì) dẫn đến việc ngôn tài nguyên của server.

Trong phương thức handle() có gọi phương thức listenForEvent():

    protected function listenForEvents()
    {
        $this->laravel['events']->listen(JobProcessing::class, function ($event) {
            $this->writeOutput($event->job, 'starting');
        });

        $this->laravel['events']->listen(JobProcessed::class, function ($event) {
            $this->writeOutput($event->job, 'success');
        });

        $this->laravel['events']->listen(JobFailed::class, function ($event) {
            $this->writeOutput($event->job, 'failed');

            $this->logFailedJob($event);
        });
    }

Ở phương thức này ta sẽ bắt được các event mà tiến trình tạo ra khi chạy jobs, do đó cho phép ta in ra các thông tin mỗi khi jobs được xử lý, hoàn thành hoặc thất bại.

Trong trường hợp jobs thất bại phương thức logFailedJob sẽ được gọi:

    $this->laravel['queue.failer']->log(
        $event->connectionName, $event->job->getQueue(),
        $event->job->getRawBody(), $event->exception
    );

queue.failer container alias được đăng ký ở Queue\QueueServiceProvider::registerFailedJobServices():

    protected function registerFailedJobServices()
    {
        $this->app->singleton('queue.failer', function () {
            $config = $this->app['config']['queue.failed'];

            return isset($config['table'])
                        ? $this->databaseFailedJobProvider($config)
                        : new NullFailedJobProvider;
        });
    }

    /**
     * Create a new database failed job provider.
     *
     * @param  array  $config
     * @return \Illuminate\Queue\Failed\DatabaseFailedJobProvider
     */
    protected function databaseFailedJobProvider($config)
    {
        return new DatabaseFailedJobProvider(
            $this->app['db'], $config['database'], $config['table']
        );
    }

Trong trường hợp queue.failed được cấu hình các giá trị, thì một database sẽ được dùng để lưu các thông tin đơn giản của việc thực hiện jobs thất bại.

    $this->getTable()->insertGetId(compact(
        'connection', 'queue', 'payload', 'exception', 'failed_at'
    ));

Khởi động tiến trình chạy job.

Để chạy tiến trình này ta cần hai thông tin:

  • Connection mà tiến trình dùng để lấy job ra.
  • Queue mà tiến trình sẽ sử dụng để tìm job.

Ta có thể cung cấp tùy chọn --connection=default cho câu lệnh queue:work, tuy nhiên nếu ta không định nghĩa default connection thì giá trị mặc định được cấu hình trong queue.default sẽ được sử dụng. Về queue cũng vậy ta có thế cung cấp tùy chọn --queue=emails hoặc giá trị mặc định của cno sẽ được sử dụng.

Khi đó phương thức WorkCommand:handle() sẽ chạy phương thức runWorker():

    protected function runWorker($connection, $queue)
    {
        $this->worker->setCache($this->laravel['cache']->driver());

        return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
            $connection, $queue, $this->gatherWorkerOptions()
        );
    }

Thuộc tính worker của class được thiết lập trong khi câu lệnh được tạo:

    public function __construct(Worker $worker)
    {
        parent::__construct();

        $this->worker = $worker;
    }

Bên trong phương thức runWorker() ta thiết lập cache driver cho tiến trình sẽ sử dụng, và quyết định phương thức nào sẽ được sử dụng dựa vào tùy chọn --once. Trong trường hợp có tùy chọn --once ta sẽ gọi phương thức runNextJob() để chạy job tiếp theo và sau đó hủy tiến trình. Ngược lại thì phương thức deamon() được gọi để giữ có tiến trình xử lý jobs tiếp tục tồn tại.

Trong khi bắt đầu tiến trình sẽ lấy các thông tin của câu lệnh cung cấp sử dụng phương thức gatherWorkerOptions(), sau đó cung cấp các thông tin đó cho phương thức runNextJob() hoặc deamon().

    protected function gatherWorkerOptions()
    {
        return new WorkerOptions(
            $this->option('delay'), $this->option('memory'),
            $this->option('timeout'), $this->option('sleep'),
            $this->option('tries'), $this->option('force')
        );
    }

Kết Luận.

Như vậy để tiến hành chay queue ta cần một tiến trình chạy ngầm để lắng nghe, lấy các jobs và thông tin của nó và chạy với những cấu hình đã được thiết lập sẵn. Câu lệnh chủ yếu để chạy queue là queue:word, với các tùy chọn tham số khác nhau thì cách làm việc của nó cũng khác nhau. Khi chạy job các event của việc chạy job sẽ được bắn ra và ta có thể ghi nhận được để xử lý nếu có lỗi hoặc đem ra thông báo.

Tài Liệu Tham Khảo.

1.) https://divinglaravel.com/queue-system/workers


All Rights Reserved