成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

RabbitMQ+PHP 教程二(Work Queues)

iKcamp / 2269人閱讀

摘要:平均每個(gè)消費(fèi)者將得到相同數(shù)量的消息。消息確認(rèn)完成任務(wù)可能需要幾秒鐘。為了確保消息不會(huì)丟失,支持消息確認(rèn)。沒(méi)有任何消息超時(shí)當(dāng)這個(gè)消費(fèi)者中止了,將會(huì)重新分配消息時(shí)。這是因?yàn)橹皇钦{(diào)度消息時(shí),消息進(jìn)入隊(duì)列。

介紹

在上一個(gè) Hello World 教程中,我們編寫(xiě)了從指定隊(duì)列發(fā)送和接收消息的程序。在這篇文章中,我們將創(chuàng)建一個(gè)工作隊(duì)列,用于在多個(gè)工人(消費(fèi)者)之間分配耗時(shí)的任務(wù)。

工作隊(duì)列(又名任務(wù)隊(duì)列)背后的主要思想是避免立即執(zhí)行資源密集型任務(wù),必須等待它完成。相反,我們計(jì)劃稍后完成任務(wù)。我們將任務(wù)封裝為消息并將其發(fā)送到隊(duì)列中。后臺(tái)運(yùn)行的一個(gè)工作進(jìn)程將彈出任務(wù)并最終執(zhí)行該任務(wù)。當(dāng)你運(yùn)行許多工人(消費(fèi)者)時(shí),任務(wù)將在他們之間分擔(dān)。

這個(gè)概念在Web應(yīng)用程序中尤其有用,因?yàn)樵诙蘃TTP請(qǐng)求中不可能處理復(fù)雜的任務(wù)。

先決條件

在本教程的前一部分,我們發(fā)送了一條包含“Hello World”的消息?,F(xiàn)在,我們將發(fā)送支持復(fù)雜任務(wù)的字符串。我們沒(méi)有一個(gè)真實(shí)環(huán)境的任務(wù),如圖像進(jìn)行調(diào)整或PDF文件的渲染,讓我們利用sleep()模擬真實(shí)環(huán)境的業(yè)務(wù)功能。我們將字符串中的點(diǎn)數(shù)作為其復(fù)雜度;每個(gè)點(diǎn)都將占“工作”的一秒鐘。例如,由Hello...描述的一個(gè)偽任務(wù)…需要三秒。

new_task.php

我們會(huì)稍微修改send.php代碼從我們先前的例子,允許任意的消息是從命令行發(fā)送。這一計(jì)劃將任務(wù)分配給我們的工作隊(duì)列,所以我們命名它 new_task.php

$data = implode(" ", array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, "", "hello");

echo " [x] Sent ", $data, "
";

我們的上一個(gè)版本的receive.php腳本也需要一些改變:它需要假第二工作在消息體中每一點(diǎn)。它會(huì)從隊(duì)列彈出消息和執(zhí)行任務(wù),所以讓我們把命名worker.php:

$callback = function($msg){
  echo " [x] Received ", $msg->body, "
"; //根據(jù)"."數(shù)量個(gè)數(shù)獲取延遲時(shí)間,單位秒
  sleep(substr_count($msg->body, "."));  //模擬業(yè)務(wù)執(zhí)行時(shí)間延遲
  echo " [x] Done", "
";
};
$channel->basic_consume("hello", "", false, true, false, false, $callback);
單worker簡(jiǎn)單運(yùn)行測(cè)試 消費(fèi)者
php worker.php
消息生產(chǎn)者
php new_task.php "A very hard task which takes two seconds.."
循環(huán)調(diào)度

一個(gè)使用任務(wù)隊(duì)列的優(yōu)點(diǎn)是容易并行工作的能力。如果我們積壓了大量的工作,我們可以增加更多的工人,這樣就可以輕松地規(guī)?;?/p>

首先,讓我們嘗試同時(shí)運(yùn)行兩worker.php腳本。他們都會(huì)從隊(duì)列中獲得消息,看看效果如何?讓我們看看。

你需要打開(kāi)三個(gè)console命令。兩將運(yùn)行worker.php腳本。這些控制臺(tái)將是我們的兩個(gè)消費(fèi)者C1和C2。

消費(fèi)者1
php worker.php
消費(fèi)者2
php worker.php
消息生產(chǎn)者
php new_task.php msg1...

默認(rèn)情況下,RabbitMQ將會(huì)發(fā)送的每一條消息給下一個(gè)消費(fèi)者,在序列。平均每個(gè)消費(fèi)者將得到相同數(shù)量的消息。這種分發(fā)消息的方式稱(chēng)為循環(huán)輪詢(xún)。試著用三個(gè)或更多的工人。

消息確認(rèn)

完成任務(wù)可能需要幾秒鐘。你可能遇到如果一個(gè)消費(fèi)者開(kāi)始一個(gè)長(zhǎng)期的任務(wù),并且只完成了部分任務(wù),那么會(huì)發(fā)生什么?。我們目前的代碼,一旦RabbitMQ發(fā)送一個(gè)消息給客戶(hù)立即標(biāo)記為刪除。在這種情況下,如果您中止一個(gè)消費(fèi)者,我們將丟失它正在處理的消息。我們還將丟失發(fā)送給該消費(fèi)者所有的尚未處理的消息。

如果我們不想失去任何任務(wù)。如果一個(gè)消費(fèi)者意外中止了,我們希望把任務(wù)交給另一個(gè)消費(fèi)者。

為了確保消息不會(huì)丟失,RabbitMQ支持消息確認(rèn)。ACK(nowledgement)消費(fèi)者返回的結(jié)果告訴RabbitMQ有一條消息收到,你可以自由可控的刪除他

如果一個(gè)消費(fèi)者中止了(其通道關(guān)閉,連接被關(guān)閉,或TCP連接丟失)不發(fā)送ACK,RabbitMQ將會(huì)理解這個(gè)消息并沒(méi)有完全處理,將它重新加入隊(duì)列。如果有其他用戶(hù)同時(shí)在線,它就會(huì)快速地傳遞到另一個(gè)消費(fèi)者。這樣,即使意外中止了,也可以確保沒(méi)有丟失信息。

沒(méi)有任何消息超時(shí);當(dāng)這個(gè)消費(fèi)者中止了,RabbitMQ將會(huì)重新分配消息時(shí)。即使處理消息花費(fèi)很長(zhǎng)很長(zhǎng)時(shí)間也很好。

消息確認(rèn)是默認(rèn)關(guān)閉。可通過(guò)設(shè)置的第四個(gè)參數(shù)basic_consume設(shè)置為false(true意味著沒(méi)有ACK)和從消費(fèi)者發(fā)送合適的確認(rèn),一旦我們完成一個(gè)任務(wù)。

$callback = function($msg){
  echo " [x] Received ", $msg->body, "
";
  sleep(substr_count($msg->body, "."));
  echo " [x] Done", "
";
  $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]);
};

$channel->basic_consume("task_queue", "", false, false, false, false, $callback);

使用此代碼,我們可以確信,即使在處理消息時(shí)使用Ctrl + C殺死一名消費(fèi)者,也不會(huì)丟失任何東西。消費(fèi)者中止都未確認(rèn)的消息后很快會(huì)被重新分配。

忘了確認(rèn)(Forgotten acknowledgment)

丟失ACK確認(rèn)是一個(gè)常見(jiàn)的錯(cuò)誤。這是一個(gè)容易犯的錯(cuò)誤,但后果很?chē)?yán)重。當(dāng)你的客戶(hù)退出,消息會(huì)被重新分配(這可能看起來(lái)像是隨機(jī)的分配),RabbitMQ將會(huì)消耗更多的內(nèi)存,它不會(huì)釋放任何延遲確認(rèn)消息。

為了調(diào)試這種錯(cuò)誤,你可以使用rabbitmqctl打印messages_unacknowledged字段:

rabbitmqctl list_queues name messages_ready messages_unacknowledged
消息持久化(Message durability)

我們已經(jīng)學(xué)會(huì)了如何確保即使消費(fèi)者死了,任務(wù)也不會(huì)丟失。但是如果RabbitMQ服務(wù)器停止了,我們的任務(wù)仍然有可能會(huì)丟失。

當(dāng)RabbitMQ退出或崩潰了,會(huì)丟失隊(duì)列和消息除非你不要。要確保消息不會(huì)丟失,需要兩件事:我們需要將隊(duì)列和消息都標(biāo)記為持久的。

首先,我們需要確保RabbitMQ永遠(yuǎn)不會(huì)丟失隊(duì)列。為了做到這一點(diǎn),我們需要聲明它是持久的。為此我們通過(guò)queue_declare作為第三參數(shù)為true:

$channel->queue_declare("hello", false, true, false, false);

雖然這個(gè)命令本身是正確的,但它在我們當(dāng)前的設(shè)置中不起作用。這是因?yàn)槲覀円呀?jīng)定義了一個(gè)名為hello的隊(duì)列,該隊(duì)列不持久。RabbitMQ不允許你重新定義現(xiàn)有隊(duì)列用不同的參數(shù),將返回一個(gè)錯(cuò)誤的任何程序,試圖這么做。但有一個(gè)快速的解決方法-讓我們聲明一個(gè)名稱(chēng)不同的隊(duì)列,例如task_queue:

$channel->queue_declare("task_queue", false, true, false, false);

需要應(yīng)用到生產(chǎn)者和消費(fèi)者代碼中設(shè)置為true。

在這一點(diǎn)上,我們可以確保即使RabbitMQ重啟了,task_queue隊(duì)列不會(huì)丟失?,F(xiàn)在我們要標(biāo)記我們的消息持續(xù)通過(guò)設(shè)置delivery_mode = 2消息屬性,amqpmessage作為屬性數(shù)組的一部分。

$msg = new AMQPMessage($data, array("delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT) );
關(guān)于消息持久性的說(shuō)明(Note on message persistence)

將消息標(biāo)記為持久性不能完全保證消息不會(huì)丟失。雖然它告訴RabbitMQ保存信息到磁盤(pán)上,還有一個(gè)短的時(shí)間窗口時(shí),RabbitMQ 已經(jīng)接受信息并沒(méi)有保存它。另外,RabbitMQ不做fsync(2)每一個(gè)消息--它可能只是保存到緩存并沒(méi)有真正寫(xiě)入到磁盤(pán)。持久性保證不強(qiáng),但對(duì)于我們的簡(jiǎn)單任務(wù)隊(duì)列來(lái)說(shuō)已經(jīng)足夠了。如果你需要更強(qiáng)的保證,那么你可以使用消費(fèi)者確認(rèn)。

公平調(diào)度

您可能已經(jīng)注意到,調(diào)度仍然不完全按照我們的要求工作。例如,在一個(gè)有兩個(gè)消費(fèi)者的情況下,當(dāng)所有的奇數(shù)信息都很重,甚至很輕的消息,一個(gè)消費(fèi)者會(huì)一直忙,而另一個(gè)消費(fèi)者幾乎不做任何工作。嗯,RabbitMQ不知道發(fā)生了什么事,仍將均勻消息發(fā)送。

這是因?yàn)镽abbitMQ只是調(diào)度消息時(shí),消息進(jìn)入隊(duì)列。當(dāng)存在未確認(rèn)的消息時(shí)。它只是盲目的分發(fā)n-th條消息給n-th個(gè)消費(fèi)者。

為了改變這個(gè)分配方式,我們可以調(diào)用basic_qos方法,設(shè)置參數(shù)prefetch_count = 1。這告訴RabbitMQ不要在一個(gè)時(shí)間給一個(gè)消費(fèi)者多個(gè)消息?;蛘撸瑩Q句話說(shuō),在處理和確認(rèn)以前的消息之前,不要向消費(fèi)者發(fā)送新消息。相反,它將發(fā)送給下一個(gè)仍然不忙的消費(fèi)者。

$channel->basic_qos(null, 1, null);
關(guān)于隊(duì)列大小的注釋?zhuān)∟ote about queue size)

如果所有的消費(fèi)者都很忙,你的隊(duì)列填滿(mǎn)了。你會(huì)想留意到這一點(diǎn),也許增加更多的工人,或者有其他的策略。

源碼 new_task.php
channel();

$channel->queue_declare("task_queue", false, true, false, false);

$data = implode(" ", array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
                        array("delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT)
                      );

$channel->basic_publish($msg, "", "task_queue");

echo " [x] Sent ", $data, "
";

$channel->close();
$connection->close();

?>
worker.php
channel();

$channel->queue_declare("task_queue", false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C", "
";

$callback = function($msg){
  echo " [x] Received ", $msg->body, "
";
  sleep(substr_count($msg->body, "."));
  echo " [x] Done", "
";
  $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume("task_queue", "", false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

?>

使用消息的確認(rèn)和預(yù)取,你可以設(shè)置一個(gè)工作隊(duì)列。耐久性的配置選項(xiàng)讓任務(wù)存在,即使RabbitMQ重啟。

學(xué)習(xí)如何向許多消費(fèi)者傳遞同樣的信息, 你可以閱讀下一章節(jié):RabbitMQ+PHP 教程三(Publish/Subscribe)。

翻譯來(lái)自 RabbitMQ - RabbitMQ tutorial - Work Queues

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/26023.html

相關(guān)文章

  • RabbitMQ+PHP 教程一(Hello World)

    摘要:在中間的框是一個(gè)隊(duì)列的消息緩沖區(qū),保持代表的消費(fèi)。本教程介紹,這是一個(gè)開(kāi)放的通用的協(xié)議消息。我們將在本教程中使用,解決依賴(lài)管理。發(fā)送者將連接到,發(fā)送一條消息,然后退出。注意,這與發(fā)送發(fā)布的隊(duì)列匹配。 介紹 RabbitMQ是一個(gè)消息代理器:它接受和轉(zhuǎn)發(fā)消息。你可以把它當(dāng)作一個(gè)郵局:當(dāng)你把郵件放在信箱里時(shí),你可以肯定郵差先生最終會(huì)把郵件送到你的收件人那里。在這個(gè)比喻中,RabbitMQ就...

    silencezwm 評(píng)論0 收藏0
  • 【譯】RabbitMQ系列()-Work模式

    摘要:每個(gè)消費(fèi)者會(huì)得到平均數(shù)量的。為了確保不會(huì)丟失,采用確認(rèn)機(jī)制。如果中斷退出了關(guān)閉了,關(guān)閉了,或是連接丟失了而沒(méi)有發(fā)送,會(huì)認(rèn)為該消息沒(méi)有完整的執(zhí)行,會(huì)將該消息重新入隊(duì)。該消息會(huì)被發(fā)送給其他的。當(dāng)消費(fèi)者中斷退出,會(huì)重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我們寫(xiě)了通過(guò)一個(gè)...

    lcodecorex 評(píng)論0 收藏0
  • RabbitMQ+PHP 消息隊(duì)列環(huán)境配置

    摘要:參考文檔依賴(lài)包安裝環(huán)境配置環(huán)境變量增加內(nèi)容保存退出,并刷新變量測(cè)試是否安裝成功安裝完成以后,執(zhí)行看是否能打開(kāi),用退出,注意后面的點(diǎn)號(hào),那是的結(jié)束符。 參考文檔:http://www.cnblogs.com/phpinfo/p/4104551...http://blog.csdn.net/historyasamirror/ar... 依賴(lài)包安裝 yum install ncurses-d...

    geekidentity 評(píng)論0 收藏0
  • RabbitMQ+PHP 教程三(Publish/Subscribe)

    摘要:在客戶(hù)端中,當(dāng)我們將隊(duì)列名稱(chēng)作為空字符串提供時(shí),我們創(chuàng)建一個(gè)帶有生成名稱(chēng)的非持久隊(duì)列方法返回時(shí),變量包含一個(gè)隨機(jī)生成的隊(duì)列名稱(chēng)。交換和隊(duì)列之間的關(guān)系稱(chēng)為綁定。 使用 php-amqplib 介紹 在前面的教程中,我們創(chuàng)建了一個(gè)工作隊(duì)列。工作隊(duì)列背后的假設(shè)是每個(gè)任務(wù)都交付給一個(gè)工作人員處理。在這一部分中,我們將做一些完全不同的事情——我們將向多個(gè)消費(fèi)者發(fā)送消息。此模式稱(chēng)為發(fā)布/訂閱。 ...

    Neilyo 評(píng)論0 收藏0
  • [譯] RabbitMQ tutorials (2) ---- 'work queue&#

    摘要:這樣的消息分發(fā)機(jī)制稱(chēng)作輪詢(xún)。在進(jìn)程掛了之后,所有的未被確認(rèn)的消息會(huì)被重新分發(fā)。忘記確認(rèn)這是一個(gè)普遍的錯(cuò)誤,丟失。為了使消息不會(huì)丟失,兩件事情需要確保,我們需要持久化隊(duì)列和消息。 工作隊(duì)列 showImg(https://segmentfault.com/img/remote/1460000008229494?w=332&h=111); 在第一篇中,我們寫(xiě)了一個(gè)程序從已經(jīng)聲明的隊(duì)列中收發(fā)...

    joyvw 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<