成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

剖析Laravel隊(duì)列系統(tǒng)--Worker

CollinPeng / 834人閱讀

摘要:一旦這一切完成,方法會(huì)運(yùn)行在類屬性在命令構(gòu)造后設(shè)置容器解析實(shí)例,在中我們設(shè)置了將使用的緩存驅(qū)動(dòng),我們也根據(jù)命令來決定我們調(diào)用什么方法。作業(yè)只在以上起效在上也無效處理作業(yè)方法調(diào)用觸發(fā)事件觸發(fā)事件。

譯文GitHub https://github.com/yuansir/diving-laravel-zh

原文鏈接https://divinglaravel.com/queue-system/workers

現(xiàn)在,我們知道了Laravel如何將作業(yè)推到不同的隊(duì)列中,讓我們來深入了解workers如何運(yùn)作你的作業(yè)。 首先,我將workers定義為一個(gè)在后臺(tái)運(yùn)行的簡單PHP進(jìn)程,目的是從存儲(chǔ)空間中提取作業(yè)并針對多個(gè)配置選項(xiàng)運(yùn)行它們。

php artisan queue:work

運(yùn)行此命令將指示Laravel創(chuàng)建應(yīng)用程序的一個(gè)實(shí)例并開始執(zhí)行作業(yè),這個(gè)實(shí)例將一直存活著,啟動(dòng)Laravel應(yīng)用程序的操作只在運(yùn)行命令時(shí)發(fā)生一次,同一個(gè)實(shí)例將被用于執(zhí)行你的作業(yè),這意味著:

避免在每個(gè)作業(yè)上啟動(dòng)整個(gè)應(yīng)用程序來節(jié)省服務(wù)器資源。

在應(yīng)用程序中所做的任何代碼更改后必須手動(dòng)重啟worker。

你也可以這樣運(yùn)行:

php artisan queue:work --once

這將啟動(dòng)應(yīng)用程序的一個(gè)實(shí)例,處理單個(gè)作業(yè),然后干掉腳本。

php artisan queue:listen

queue:listen 命令相當(dāng)于無限循環(huán)地運(yùn)行 queue:work --once 命令,這將導(dǎo)致以下問題:

每個(gè)循環(huán)都會(huì)啟動(dòng)一個(gè)應(yīng)用程序?qū)嵗?/p>

分配的worker將選擇一個(gè)工作并執(zhí)行。

worker進(jìn)程將被干掉。

使用 queue:listen 確保為每個(gè)作業(yè)創(chuàng)建一個(gè)新的應(yīng)用程序?qū)嵗?,這意味著代碼更改以后不必手動(dòng)重啟worker,同時(shí)也意味著將消耗更多的服務(wù)器資源。

queue:work 命令

我們來看看 QueueConsoleWorkCommand 類的 handle() 方法,這是當(dāng)你運(yùn)行 php artisan queue:work 時(shí)會(huì)執(zhí)行的方法:

public function handle()
{
    if ($this->downForMaintenance() && $this->option("once")) {
        return $this->worker->sleep($this->option("sleep"));
    }

    $this->listenForEvents();

    $connection = $this->argument("connection")
                    ?: $this->laravel["config"]["queue.default"];

    $queue = $this->getQueue($connection);

    $this->runWorker(
        $connection, $queue
    );
}

首先,我們檢查應(yīng)用程序是否處于維護(hù)模式,并使用 --once 選項(xiàng),在這種情況下,我們希望腳本正常運(yùn)行,因此我們不執(zhí)行任何作業(yè),我們只需要在完全殺死腳本前讓worker在一段時(shí)間內(nèi)休眠。

QueueWorkersleep() 方法看起來像這樣:

public function sleep($seconds)
{
    sleep($seconds);
}
為什么我們不能在 handle() 方法中返回null來終止腳本?

如前所述, queue:listen 命令在循環(huán)中運(yùn)行 WorkCommand

while (true) {
     // This process simply calls "php artisan queue:work --once"
    $this->runProcess($process, $options->memory);
}

如果應(yīng)用程序處于維護(hù)模式,并且 WorkCommand 立即終止,這將導(dǎo)致循環(huán)結(jié)束,下一個(gè)在很短的時(shí)間內(nèi)啟動(dòng),最好在這種情況下導(dǎo)致一些延遲,而不是通過創(chuàng)建我們不會(huì)真正使用的大量應(yīng)用程序?qū)嵗?/p> 監(jiān)聽事件

handle() 方法里面我們調(diào)用 listenForEvents() 方法:

protected function listenForEvents()
{
    $this->laravel["events"]->listen(JobProcessing::class, function ($event) {
        $this->writeOutput($event->job, "starting");
    });

    $this->laravel["events"]->listen(JobProcessed::class, function ($event) {
        $this->writeOutput($event->job, "success");
    });

    $this->laravel["events"]->listen(JobFailed::class, function ($event) {
        $this->writeOutput($event->job, "failed");

        $this->logFailedJob($event);
    });
}

在這個(gè)方法中我們會(huì)監(jiān)聽幾個(gè)事件,這樣我們可以在每次作業(yè)處理中,處理完或處理失敗時(shí)向用戶打印一些信息。

記錄失敗作業(yè)

一旦作業(yè)失敗 logFailedJob() 方法會(huì)被調(diào)用

$this->laravel["queue.failer"]->log(
    $event->connectionName, $event->job->getQueue(),
    $event->job->getRawBody(), $event->exception
);

queue.failer 容器別名在 QueueQueueServiceProvider::registerFailedJobServices() 中注冊:

protected function registerFailedJobServices()
{
    $this->app->singleton("queue.failer", function () {
        $config = $this->app["config"]["queue.failed"];

        return isset($config["table"])
                    ? $this->databaseFailedJobProvider($config)
                    : new NullFailedJobProvider;
    });
}

/**
 * Create a new database failed job provider.
 *
 * @param  array  $config
 * @return IlluminateQueueFailedDatabaseFailedJobProvider
 */
protected function databaseFailedJobProvider($config)
{
    return new DatabaseFailedJobProvider(
        $this->app["db"], $config["database"], $config["table"]
    );
}

如果配置了 queue.failed ,則將使用數(shù)據(jù)庫隊(duì)列失敗,并將有關(guān)失敗作業(yè)的信息簡單地存儲(chǔ)在數(shù)據(jù)庫表中的:

$this->getTable()->insertGetId(compact(
    "connection", "queue", "payload", "exception", "failed_at"
));
運(yùn)行worker

要運(yùn)行worker,我們需要收集兩條信息:

worker的連接信息從作業(yè)中提取

worker找到作業(yè)的隊(duì)列

如果沒有使用 queue.default 配置定義的默認(rèn)連接。您可以為 queue:work 命令提供 --connection=default 選項(xiàng)。

隊(duì)列也是一樣,您可以提供一個(gè) --queue=emails 選項(xiàng),或選擇連接配置中的 queue 選項(xiàng)。一旦這一切完成, WorkCommand::handle() 方法會(huì)運(yùn)行 runWorker()

protected function runWorker($connection, $queue)
{
    $this->worker->setCache($this->laravel["cache"]->driver());

    return $this->worker->{$this->option("once") ? "runNextJob" : "daemon"}(
        $connection, $queue, $this->gatherWorkerOptions()
    );
}

在worker類屬性在命令構(gòu)造后設(shè)置:

public function __construct(Worker $worker)
{
    parent::__construct();

    $this->worker = $worker;
}

容器解析 QueueWorker 實(shí)例,在runWorker()中我們設(shè)置了worker將使用的緩存驅(qū)動(dòng),我們也根據(jù)--once 命令來決定我們調(diào)用什么方法。

如果使用 --once 選項(xiàng),我們只需調(diào)用 runNextJob 來運(yùn)行下一個(gè)可用的作業(yè),然后腳本就會(huì)終止。 否則,我們將調(diào)用 daemon 方法來始終保持進(jìn)程處理作業(yè)。

在開始工作時(shí),我們使用 gatherWorkerOptions() 方法收集用戶給出的命令選項(xiàng),我們稍后會(huì)提供這些選項(xiàng),這個(gè)工具是 runNextJobdaemon 方法。

protected function gatherWorkerOptions()
{
    return new WorkerOptions(
        $this->option("delay"), $this->option("memory"),
        $this->option("timeout"), $this->option("sleep"),
        $this->option("tries"), $this->option("force")
    );
}
守護(hù)進(jìn)程

讓我看看 Worker::daemon() 方法,這個(gè)方法的第一行調(diào)用了 Worker::daemon() 方法

protected function listenForSignals()
{
    if ($this->supportsAsyncSignals()) {
        pcntl_async_signals(true);

        pcntl_signal(SIGTERM, function () {
            $this->shouldQuit = true;
        });

        pcntl_signal(SIGUSR2, function () {
            $this->paused = true;
        });

        pcntl_signal(SIGCONT, function () {
            $this->paused = false;
        });
    }
}

這種方法使用PHP7.1的信號(hào)處理, supportsAsyncSignals() 方法檢查我們是否在PHP7.1上,并加載 pcntl 擴(kuò)展名。

之后pcntl_async_signals() 被調(diào)用來啟用信號(hào)處理,然后我們?yōu)槎鄠€(gè)信號(hào)注冊處理程序:

當(dāng)腳本被指示關(guān)閉時(shí),會(huì)引發(fā)SIGTERM。

SIGUSR2是用戶定義的信號(hào),Laravel用來表示腳本應(yīng)該暫停。

當(dāng)暫停的腳本繼續(xù)進(jìn)行時(shí),會(huì)引發(fā)SIGCONT。

這些信號(hào)從Process Monitor(如 Supervisor )發(fā)送并與我們的腳本進(jìn)行通信。

Worker::daemon() 方法中的第二行讀取最后一個(gè)隊(duì)列重新啟動(dòng)的時(shí)間戳,當(dāng)我們調(diào)用queue:restart 時(shí)該值存儲(chǔ)在緩存中,稍后我們將檢查是否和上次重新啟動(dòng)的時(shí)間戳不符合,來指示worker在之后多次重啟。

最后,該方法啟動(dòng)一個(gè)循環(huán),在這個(gè)循環(huán)中,我們將完成其余獲取作業(yè)的worker,運(yùn)行它們,并對worker進(jìn)程執(zhí)行多個(gè)操作。

while (true) {
    if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
        $this->pauseWorker($options, $lastRestart);

        continue;
    }

    $job = $this->getNextJob(
        $this->manager->connection($connectionName), $queue
    );

    $this->registerTimeoutHandler($job, $options);

    if ($job) {
        $this->runJob($job, $connectionName, $options);
    } else {
        $this->sleep($options->sleep);
    }

    $this->stopIfNecessary($options, $lastRestart);
}
確定worker是否應(yīng)該處理作業(yè)

調(diào)用 daemonShouldRun() 檢查以下情況:

應(yīng)用程序不處于維護(hù)模式

Worker沒有暫停

沒有事件監(jiān)聽器阻止循環(huán)繼續(xù)

如果應(yīng)用程序在維護(hù)模式下,worker使用--force選項(xiàng)仍然可以處理作業(yè):

php artisan queue:work --force

確定worker是否應(yīng)該繼續(xù)的條件之一是:

$this->events->until(new EventsLooping($connectionName, $queue)) === false)

這行觸發(fā) QueueEventLooping 事件,并檢查是否有任何監(jiān)聽器在 handle() 方法中返回false,這種情況下你可以強(qiáng)制您的workers暫時(shí)停止處理作業(yè)。

如果worker應(yīng)該暫停,則調(diào)用 pauseWorker() 方法:

protected function pauseWorker(WorkerOptions $options, $lastRestart)
{
    $this->sleep($options->sleep > 0 ? $options->sleep : 1);

    $this->stopIfNecessary($options, $lastRestart);
}

sleep 方法并傳遞給控制臺(tái)命令的 --sleep 選項(xiàng),這個(gè)方法調(diào)用

public function sleep($seconds)
{
    sleep($seconds);
}

腳本休眠了一段時(shí)間后,我們檢查worker是否應(yīng)該在這種情況下退出并殺死腳本,稍后我們看一下stopIfNecessary 方法,以防腳本不能被殺死,我們只需調(diào)用 continue; 開始一個(gè)新的循環(huán):

if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
    $this->pauseWorker($options, $lastRestart);

    continue;
}
Retrieving 要運(yùn)行的作業(yè)
$job = $this->getNextJob(
    $this->manager->connection($connectionName), $queue
);

getNextJob() 方法接受一個(gè)隊(duì)列連接的實(shí)例,我們從隊(duì)列中獲取作業(yè)

protected function getNextJob($connection, $queue)
{
    try {
        foreach (explode(",", $queue) as $queue) {
            if (! is_null($job = $connection->pop($queue))) {
                return $job;
            }
        }
    } catch (Exception $e) {
        $this->exceptions->report($e);

        $this->stopWorkerIfLostConnection($e);
    }
}

我們簡單地循環(huán)給定的隊(duì)列,使用選擇的隊(duì)列連接從存儲(chǔ)空間(數(shù)據(jù)庫,redis,sqs,...)獲取作業(yè)并返回該作業(yè)。

要從存儲(chǔ)中retrieve作業(yè),我們查詢滿足以下條件的最舊作業(yè):

推送到 queue ,我們試圖從中找到作業(yè)

沒有被其他worker reserved

可以在給定的時(shí)間內(nèi)運(yùn)行,有些作業(yè)在將來被推遲運(yùn)行

我們也取到了很久以來被凍結(jié)的作業(yè)并重試

一旦我們找到符合這一標(biāo)準(zhǔn)的作業(yè),我們將這個(gè)作業(yè)標(biāo)記為reserved,以便其他workers獲取到,我們還會(huì)增加作業(yè)監(jiān)控次數(shù)。

監(jiān)控作業(yè)超時(shí)

下一個(gè)作業(yè)被retrieved之后,我們調(diào)用 registerTimeoutHandler() 方法:

protected function registerTimeoutHandler($job, WorkerOptions $options)
{
    if ($this->supportsAsyncSignals()) {
        pcntl_signal(SIGALRM, function () {
            $this->kill(1);
        });the

        $timeout = $this->timeoutForJob($job, $options);

        pcntl_alarm($timeout > 0 ? $timeout + $options->sleep : 0);
    }
}

再次,如果 pcntl 擴(kuò)展被加載,我們將注冊一個(gè)信號(hào)處理程序干掉worker進(jìn)程如果該作業(yè)超時(shí)的話,在配置了超時(shí)之后我們使用 pcntl_alarm() 來發(fā)送一個(gè) SIGALRM 信號(hào)。

如果作業(yè)所花費(fèi)的時(shí)間超過了超時(shí)值,處理程序?qū)?huì)終止該腳本,如果不是該作業(yè)將通過,并且下一個(gè)循環(huán)將設(shè)置一個(gè)新的報(bào)警覆蓋第一個(gè)報(bào)警,因?yàn)檫M(jìn)程中可能存在單個(gè)報(bào)警。

作業(yè)只在PHP7.1以上起效,在window上也無效 ˉ_(ツ)_/ˉ

處理作業(yè)

runJob() 方法調(diào)用 process():

public function process($connectionName, $job, WorkerOptions $options)
{
    try {
        $this->raiseBeforeJobEvent($connectionName, $job);

        $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
            $connectionName, $job, (int) $options->maxTries
        );

        $job->fire();

        $this->raiseAfterJobEvent($connectionName, $job);
    } catch (Exception $e) {
        $this->handleJobException($connectionName, $job, $options, $e);
    }
}

raiseBeforeJobEvent() 觸發(fā) QueueEventsJobProcessing 事件, raiseAfterJobEvent() 觸發(fā) QueueEventsJobProcessed 事件。 markJobAsFailedIfAlreadyExceedsMaxAttempts() 檢查進(jìn)程是否達(dá)到最大嘗試次數(shù),并將該作業(yè)標(biāo)記為失敗:

protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
{
    $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;

    if ($maxTries === 0 || $job->attempts() <= $maxTries) {
        return;
    }

    $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException(
        "A queued job has been attempted too many times. The job may have previously timed out."
    ));

    throw $e;
}

否則我們在作業(yè)對象上調(diào)用 fire() 方法來運(yùn)行作業(yè)。

從哪里獲取作業(yè)對象

getNextJob() 方法返回一個(gè) ContractsQueueJob 的實(shí)例,這取決于我們使用相應(yīng)的Job實(shí)例的隊(duì)列驅(qū)動(dòng)程序,例如如果數(shù)據(jù)庫隊(duì)列驅(qū)動(dòng)則選擇 QueueJobsDatabaseJob 。

循環(huán)結(jié)束

在循環(huán)結(jié)束時(shí),我們調(diào)用 stopIfNecessary() 來檢查在下一個(gè)循環(huán)開始之前是否應(yīng)該停止進(jìn)程:

protected function stopIfNecessary(WorkerOptions $options, $lastRestart)
{
    if ($this->shouldQuit) {
        $this->kill();
    }

    if ($this->memoryExceeded($options->memory)) {
        $this->stop(12);
    } elseif ($this->queueShouldRestart($lastRestart)) {
        $this->stop();
    }
}

shouldQuit 屬性在兩種情況下設(shè)置,首先listenForSignals() 內(nèi)部的作為 SIGTERM 信號(hào)處理程序,其次在 stopWorkerIfLostConnection()

protected function stopWorkerIfLostConnection($e)
{
    if ($this->causedByLostConnection($e)) {
        $this->shouldQuit = true;
    }
}

在retrieving和處理作業(yè)時(shí),會(huì)在幾個(gè)try ... catch語句中調(diào)用此方法,以確保worker應(yīng)該處于被干掉的狀態(tài),以便我們的Process Control可能會(huì)啟動(dòng)一個(gè)新的數(shù)據(jù)庫連接。

causedByLostConnection() 方法可以在 DatabaseDetectsLostConnections trait中找到。
memoryExceeded() 檢查內(nèi)存使用情況是否超過當(dāng)前設(shè)置的內(nèi)存限制,您可以使用 --memory 選項(xiàng)設(shè)置限制。

轉(zhuǎn)載請注明:?轉(zhuǎn)載自Ryan是菜鳥 | LNMP技術(shù)棧筆記

如果覺得本篇文章對您十分有益,何不 打賞一下

本文鏈接地址:?剖析Laravel隊(duì)列系統(tǒng)--Worker

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/23262.html

相關(guān)文章

  • 剖析Laravel隊(duì)列系統(tǒng)--準(zhǔn)備隊(duì)列作業(yè)

    摘要:原文鏈接我們推送到隊(duì)列的每個(gè)作業(yè)都存儲(chǔ)在按執(zhí)行順序排序的某些存儲(chǔ)空間中,該存儲(chǔ)位置可以是數(shù)據(jù)庫,存儲(chǔ)或像這樣的第三方服務(wù)。這個(gè)數(shù)字從開始,在每次運(yùn)行作業(yè)時(shí)不斷增加。 原文鏈接https://divinglaravel.com/queue-system/preparing-jobs-for-queue Every job we push to queue is stored in som...

    marek 評論0 收藏0
  • 剖析Laravel隊(duì)列系統(tǒng)--推送作業(yè)到隊(duì)列

    摘要:有幾種有用的方法可以使用將作業(yè)推送到特定的隊(duì)列在給定的秒數(shù)之后推送作業(yè)延遲后將作業(yè)推送到特定的隊(duì)列推送多個(gè)作業(yè)推送特定隊(duì)列上的多個(gè)作業(yè)調(diào)用這些方法之后,所選擇的隊(duì)列驅(qū)動(dòng)會(huì)將給定的信息存儲(chǔ)在存儲(chǔ)空間中,供按需獲取。 原文鏈接https://divinglaravel.com/queue-system/pushing-jobs-to-queue There are several ways...

    maochunguang 評論0 收藏0
  • 剖析Laravel隊(duì)列系統(tǒng)--初探

    摘要:配有內(nèi)置的隊(duì)列系統(tǒng),可幫助您在后臺(tái)運(yùn)行任務(wù),并通過簡單的來配置系統(tǒng)在不同情況下起作用。您可以在中管理隊(duì)列配置,默認(rèn)情況下它有使用不同隊(duì)列驅(qū)動(dòng)的幾個(gè)連接,您可以看到項(xiàng)目中可以有多個(gè)隊(duì)列連接,也可以使用多個(gè)隊(duì)列驅(qū)動(dòng)程序。 原文鏈接https://divinglaravel.com/queue-system/before-the-dive Laravel receives a request...

    pubdreamcc 評論0 收藏0
  • Swoft 源碼剖析 - 連接池

    摘要:基于擴(kuò)展實(shí)現(xiàn)真正的數(shù)據(jù)庫連接池這種方案中,項(xiàng)目占用的連接數(shù)僅僅為。一種是連接暫時(shí)不再使用,其占用狀態(tài)解除,可以從使用者手中交回到空閑隊(duì)列中這種我們稱為連接的歸隊(duì)。源碼剖析系列目錄 作者:bromine鏈接:https://www.jianshu.com/p/1a7...來源:簡書著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對原文進(jìn)行了重新的排版。Swoft Github: https:...

    rozbo 評論0 收藏0
  • Laravel5.2隊(duì)列驅(qū)動(dòng)expire參數(shù)設(shè)置帶來的重復(fù)執(zhí)行問題 數(shù)據(jù)庫驅(qū)動(dòng)

    摘要:已經(jīng)取消了參數(shù),都用來執(zhí)行。取數(shù)據(jù)的過程事物處理已經(jīng)打開。取得符合條件的隊(duì)列后程序會(huì)更新該條數(shù)據(jù),并且更新完后即。 connections => [ .... database => [ driver => database, table => jobs, queue => defaul...

    ysl_unh 評論0 收藏0

發(fā)表評論

0條評論

CollinPeng

|高級(jí)講師

TA的文章

閱讀更多
最新活動(dòng)
閱讀需要支付1元查看
<