+4

[Laravel] Cách vận hành job queue

Giới thiệu

Đã bao giờ bạn tự hỏi, queue laravel thật sự hoạt động như thế nào chưa 😄, vừa rồi có dịp tìm hiểu nên mình cũng muốn giới thiệu chi tiết tới mọi người luôn

Nhóm để chạy queue:work

WorkCommand

Là class sẽ đăng kí comman chạy cho laravel

Worker

1 đơn vị xử lí, nơi tạo vòng lặp xử lí, bóc tách job ra khỏi queue để xử lí (có thể hiểu giống như 1 process của CPU)

    /**
     * Listen to the given queue in a loop.
     *
     * @param  string  $connectionName
     * @param  string  $queue
     * @param  \Illuminate\Queue\WorkerOptions  $options
     * @return int
     */
    public function daemon($connectionName, $queue, WorkerOptions $options)
    {
        if ($this->supportsAsyncSignals()) {
            $this->listenForSignals();
        }

        $lastRestart = $this->getTimestampOfLastQueueRestart();

        [$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0];

        while (true) {
            // Before reserving any jobs, we will make sure this queue is not paused and
            // if it is we will just pause this worker for a given amount of time and
            // make sure we do not need to kill this worker process off completely.
            if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
                $status = $this->pauseWorker($options, $lastRestart);

                if (! is_null($status)) {
                    return $this->stop($status);
                }

                continue;
            }

            // First, we will attempt to get the next job off of the queue. We will also
            // register the timeout handler and reset the alarm for this job so it is
            // not stuck in a frozen state forever. Then, we can fire off this job.
            $job = $this->getNextJob(
                $this->manager->connection($connectionName), $queue
            );

            if ($this->supportsAsyncSignals()) {
                $this->registerTimeoutHandler($job, $options);
            }

            // If the daemon should run (not in maintenance mode, etc.), then we can run
            // fire off this job for processing. Otherwise, we will need to sleep the
            // worker so no more jobs are processed until they should be processed.
            if ($job) {
                $jobsProcessed++;

                $this->runJob($job, $connectionName, $options);

                if ($options->rest > 0) {
                    $this->sleep($options->rest);
                }
            } else {
                $this->sleep($options->sleep);
            }

            if ($this->supportsAsyncSignals()) {
                $this->resetTimeoutHandler();
            }

            // Finally, we will check to see if we have exceeded our memory limits or if
            // the queue should restart based on other indications. If so, we'll stop
            // this worker and let whatever is "monitoring" it restart the process.
            $status = $this->stopIfNecessary(
                $options, $lastRestart, $startTime, $jobsProcessed, $job
            );

            if (! is_null($status)) {
                return $this->stop($status);
            }
        }
    }
  • Trong woker tạo một vòng lặp liên tục
  • Nếu queue bị pause thì stop job lại
  • Lấy job tiếp theo từ database job (có thể là database, redis, ...)
  • Nếu có job sẽ tiến hành xử lí
  • Xử lí xong job, sleep 1 khoảng thời gian, rồi tiến hành chạy lại vòng lặp lấy job ra chạy tiếp

Queue

Nơi tạo load define job, payload, event, setting connection để lưu trữ queue

- createPayload
- createObjectPayload
- getDisplayName
- getJobBackoff
- getJobExpiration
- createStringPayload
- pushOn
- laterOn
- bulk

Job

Đơn vị nhỏ nhất chứa công việc cần được xử lí, sẽ được đẩy vào queue và xử lí dần dần

abstract class Job
public function uuid()
public function fire()
public function delete()
public function isDeleted()
public function release($delay = 0)
protected function failed($e)

Nhóm của queue:listen

Thường được dùng để chạy command cho việc phát triển ở local, giống queue:work phía trên nhưng command này giúp lắng nghe thay đổi của source code để reload lại queue để apply code mới của chúng ta mỗi lần chúng ta edit

ListenCommand

Nơi define command cho queue:listen

class Listener

Nơi run process cho command queue:listen, như ta thấy cũng giống trên kia, nó chạy bằng 1 vòng while (true)

    /**
     * Listen to the given queue connection.
     *
     * @param  string  $connection
     * @param  string  $queue
     * @param  \Illuminate\Queue\ListenerOptions  $options
     * @return void
     */
    public function listen($connection, $queue, ListenerOptions $options)
    {
        $process = $this->makeProcess($connection, $queue, $options);

        while (true) {
            $this->runProcess($process, $options->memory);
        }
    }

Cách supervisor mở các queue để chạy

Dùng supervisor, ta có thể mở số lượng woker tuỳ ý, phù hợp với yêu cầu business và cấu hình server, thông thường đối với 1 server tầm trung mình nghĩ mở tầm 8 => 10 woker là đủ

Mình đã mô tả sơ qua cách vận hành của job và worker trong laravel rồi nhé, hy vọng sẽ giúp ích được cho các bạn 😀


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí