Viblo Code
-2

Laravel deep dive: queue system phần 5

Phương thức deamon

Như ở part trước ta biết khi khởi động tiến trình chạy queue bằng câu lệnh queue:work nếu tùy chọn --once không được cung cấp ta sẽ gọi phương thức deamon khi đó tiến trình sẽ được giữ tồn tại mãi mãi.

Phương thức deamon của class Worker:

    /**
     * Listen to the given queue in a loop.
     *
     * @param  string  $connectionName
     * @param  string  $queue
     * @param  \Illuminate\Queue\WorkerOptions  $options
     * @return void
     */
    public function daemon($connectionName, $queue, WorkerOptions $options)
    {
        $this->listenForSignals();

        $lastRestart = $this->getTimestampOfLastQueueRestart();

        while (true) {
            // Before reserving any jobs, we will make sure this queue is not paused and
            // if it is we will just pause this worker for a given amount of time and
            // make sure we do not need to kill this worker process off completely.
            if (! $this->daemonShouldRun($options)) {
                $this->pauseWorker($options, $lastRestart);

                continue;
            }

            // First, we will attempt to get the next job off of the queue. We will also
            // register the timeout handler and reset the alarm for this job so it is
            // not stuck in a frozen state forever. Then, we can fire off this job.
            $job = $this->getNextJob(
                $this->manager->connection($connectionName), $queue
            );

            $this->registerTimeoutHandler($job, $options);

            // If the daemon should run (not in maintenance mode, etc.), then we can run
            // fire off this job for processing. Otherwise, we will need to sleep the
            // worker so no more jobs are processed until they should be processed.
            if ($job) {
                $this->runJob($job, $connectionName, $options);
            } else {
                $this->sleep($options->sleep);
            }

            // Finally, we will check to see if we have exceeded our memory limits or if
            // the queue should restart based on other indications. If so, we'll stop
            // this worker and let whatever is "monitoring" it restart the process.
            $this->stopIfNecessary($options, $lastRestart);
        }
    }

ở dòng đầu tiên của phương thức deamon có gọi phương thức listenForSignals():

    protected function listenForSignals()
    {
        if ($this->supportsAsyncSignals()) {
            pcntl_async_signals(true);

            pcntl_signal(SIGTERM, function () {
                $this->shouldQuit = true;
            });

            pcntl_signal(SIGUSR2, function () {
                $this->paused = true;
            });

            pcntl_signal(SIGCONT, function () {
                $this->paused = false;
            });
        }
    }

Phương thức này sử dụng PHP7.1 signal handler là phương thức supportsAsyncSignals() để kiểm tra phiên bản PHP đang được sử dụng và load extension pcntl nếu phiên bản đang dùng là PHP 7.1. Sau đó phương thức pcntl_async_signals() được gọi để cho phép xử lý các signal, và sau đó ta đăng ký các signal cho handler:

  • SIGTERM: được phát ra khi câu lệnh được chỉ thi tắt.
  • SIGUSR2: được người dùng định nghĩa và được sử dụng đẻ chỉ thị khi câu lệnh tạm dừng.
  • SIGCONT: được phát ra khi câu lệnh tạm dừng được tiếp tục thực hiện.

Ở dòng thứ hai trong phương thức deamon() sẽ tìm timestamp của lần khởi động lại cuối cùng của queue, giá trị này được lưu vào cache khi ta gọi queue:restart, sau đó ta sẽ kiểm tra nếu timestamp của lần khởi động cuối cùng không trùng thì sẽ chỉ thị cho tiến trình nên khởi động lại. Cuối cùng phương thức sẽ bắt đầu vòng lặp bao gồm: tìm job, chạy chúng và làm một vài hành động đối với tiến trình

    while (true) {
        if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
            $this->pauseWorker($options, $lastRestart);

            continue;
        }

        $job = $this->getNextJob(
            $this->manager->connection($connectionName), $queue
        );

        $this->registerTimeoutHandler($job, $options);

        if ($job) {
            $this->runJob($job, $connectionName, $options);
        } else {
            $this->sleep($options->sleep);
        }

        $this->stopIfNecessary($options, $lastRestart);
    }

Xác định tiến trình có nên xử lý job không.

Gọi phương thức deamonShouldRun() ta sẽ kiểm tra các trường hợp sau:

  • Ứng dụng có đang bảo trì hay không.
  • Tiến trình không bị tạm dừng.
  • Không có event listener nào chặn vòng lặp tiếp diễn.

Nếu ứng dụng ở trạng thái bảo trị ta ta chỉ có thể thực thi job nếu ta chạy tiến trình với tùy chọn --force

    php artisan queue:work --force

Một điều kiện nữa xác định tiến trình có nên tiếp tục là:

    $this->events->until(new Events\Looping($connectionName, $queue)) === false)

Dòng trên sẽ bắn ra event Queue\Event\Looping và kiểm tra nếu bất kỳ listener nào trả về false trong phương thức handle của nó, sử dụng kết quả trên ta có thể bắt tiến trình dừng việc xử lý job tạm thời.

Trong trường hợp tiến trình nên bị dừng lại thì phương thức pauseWorker() được gọi:

    protected function pauseWorker(WorkerOptions $options, $lastRestart)
    {
        $this->sleep($options->sleep > 0 ? $options->sleep : 1);

        $this->stopIfNecessary($options, $lastRestart);
    }

phương thức trên sẽ gọi phương thức sleep với tùy chọn --sleep. Sau khi tiến trình sleep trong một khoảng thời gian, ta kiểm tra nếu tiến trình nên thoát và hủy tiến thì phương thức stopIfNecessary() sẽ được gọi, nếu ngược lại ta sẽ gọi continue và bắt đầu vòng lặp mới.

	if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
	    $this->pauseWorker($options, $lastRestart);

	    continue;
	}

Lấy job để chạy.

	$job = $this->getNextJob(
	    $this->manager->connection($connectionName), $queue
	);

Phương thức getNextJob() nhận vào một đối tượng của queue connection và queue mà ta sẽ tìm job từ đó để chạy:

	protected function getNextJob($connection, $queue)
	{
	    try {
	        foreach (explode(',', $queue) as $queue) {
	            if (! is_null($job = $connection->pop($queue))) {
	                return $job;
	            }
	        }
	    } catch (Exception $e) {
	        $this->exceptions->report($e);

	        $this->stopWorkerIfLostConnection($e);
	    }
	}

Ta đơn giản sẽ lặp trong queue và sử dụng queue connection để lấy job từ không gian lưu trữ (database, redis, sqs...) và trả về job.

Để lấy job từ nơi lưu trữ ta sẽ truy vấn job từ các job cũ nhất thỏa mãn điều kiện:

  • Đã được push vào queue mà ta đang sử dụng.
  • Chưa được lấy bởi tiến trình khác.
  • Có thể chạy vào đúng thời điểm đã được xác định.
  • Các job bị hoãn thực hiện và ta phải thực hiện lại chúng.

Khi đã tìm được job thỏa mãn điều kiện ta sẽ đánh dấu để tiến trình khác không lấy nó nữa.

Theo dõi job timeout.

Sau khi job được lấy ra ta gọi phương thức registerTimeoutHandler():

	protected function registerTimeoutHandler($job, WorkerOptions $options)
	{
	    if ($this->supportsAsyncSignals()) {
	        pcntl_signal(SIGALRM, function () {
	            $this->kill(1);
	        });the

	        $timeout = $this->timeoutForJob($job, $options);

	        pcntl_alarm($timeout > 0 ? $timeout : 0);
	    }
	}

Một lần nữa pcntl lại được load, ta sẽ đăng ký signal handler sẽ hủy tiến trình nếu job timeout, ta sử dụng pctnl_alarm() để gửi tín hiệu SIGALRM sau khi thời gian timeout được cấu hình bị vượt quá. Nếu job thực hiện lâu quá giá trị timeout, handler sẽ hủy tiến trình, nếu không thì job sẽ được thực hiện và vòng lặp mới sẽ thiết lập lại tín hiệu cảnh báo này.

Xử lý job.

Phương thức runJob() sẽ gọi phương thức process():

	public function process($connectionName, $job, WorkerOptions $options)
	{
	    try {
	        $this->raiseBeforeJobEvent($connectionName, $job);

	        $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
	            $connectionName, $job, (int) $options->maxTries
	        );

	        $job->fire();

	        $this->raiseAfterJobEvent($connectionName, $job);
	    } catch (Exception $e) {
	        $this->handleJobException($connectionName, $job, $options, $e);
	    }
	}

Ở đây phương thức raiseBeforeJobEvent() sẽ bắn ra Queue\Events\JobProcessing event, và raiseAfterJobEvent() sẽ bắn ra event Queue\Events\JobProcessed.

Phương thức markJobAsFailedIfAlreadyExceedsMaxAttempts() sẽ kiểm tra nếu job này đã vượt qua sẽ lần thực hiện lại thì sẽ được đánh dấu lại.

	protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
	{
	    $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;

	    if ($maxTries === 0 || $job->attempts() <= $maxTries) {
	        return;
	    }

	    $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException(
	        'A queued job has been attempted too many times. The job may have previously timed out.'
	    ));

	    throw $e;
	}

Ngược lại ta gọi phương thức fire() trên đối tượng của job để chạy nó.

Kết thúc vòng lặp.

Ở cuối vòng lặp ta gọi phương thức stopIfNecessary() để kiểm tra ta có nên hủy tiến trình trước khi bắt đầu vòng lặp mới hay không trước khi bắt đầu một vòng lặp mới:

    protected function stopIfNecessary(WorkerOptions $options, $lastRestart)
    {
        if ($this->shouldQuit) {
            $this->kill();
        }

        if ($this->memoryExceeded($options->memory)) {
            $this->stop(12);
        } elseif ($this->queueShouldRestart($lastRestart)) {
            $this->stop();
        }
    }

Thuộc tính shouldQuit được thiết lập trong 2 trường hợp:

  • Một là nếu signal handler cho signal SIGTERM được thiết lập trong listenForSignals().
  • Hai là trong stopWorkerIfLostConnection().
    protected function stopWorkerIfLostConnection($e)
    {
        if ($this->causedByLostConnection($e)) {
            $this->shouldQuit = true;
        }
    }

Phương thức memoryExceeded() kiểm tra nếu bộ nhớ bị sử dụng quá giới hạn cho phép hay không. Cuối cùng phương thức queueShouldRestart() so sánh timestamp hiện tại của signal khởi động, nếu nó không trùng với thời gian được lưu lại trong khi khởi động tiến trình. Điều này có nghĩa signal khởi động mới được gửi trong quá trình lặp, trong trường hợp ta hủy tiến trình thì nó có thể được khởi động lại.

Tài liệu tham khảo.

1.) https://divinglaravel.com/queue-system/workers


All Rights Reserved