成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

RabbitMQ+PHP 教程三(Publish/Subscribe)

Neilyo / 1691人閱讀

摘要:在客戶端中,當(dāng)我們將隊(duì)列名稱作為空字符串提供時(shí),我們創(chuàng)建一個(gè)帶有生成名稱的非持久隊(duì)列方法返回時(shí),變量包含一個(gè)隨機(jī)生成的隊(duì)列名稱。交換和隊(duì)列之間的關(guān)系稱為綁定。

使用 php-amqplib 介紹

在前面的教程中,我們創(chuàng)建了一個(gè)工作隊(duì)列。工作隊(duì)列背后的假設(shè)是每個(gè)任務(wù)都交付給一個(gè)工作人員處理。在這一部分中,我們將做一些完全不同的事情——我們將向多個(gè)消費(fèi)者發(fā)送消息。此模式稱為“發(fā)布/訂閱”。

為了說(shuō)明這個(gè)模式,我們將構(gòu)建一個(gè)簡(jiǎn)單的日志系統(tǒng)。它將由兩個(gè)程序組成,第一個(gè)程序?qū)l(fā)出日志消息,第二個(gè)程序?qū)⒔邮詹⒋蛴∷鼈儭?/p>

在我們的日志系統(tǒng)中,接收程序的每個(gè)運(yùn)行副本都會(huì)收到消息。這樣我們就可以運(yùn)行一個(gè)接收器,并將日志引導(dǎo)到磁盤(pán);同時(shí),我們還可以運(yùn)行另一個(gè)接收器,并在屏幕上看到日志。

本質(zhì)上,已發(fā)布的日志消息將被廣播到所有接收器。

交換機(jī)(Exchanges)

在本教程的前幾部分中,我們從隊(duì)列中發(fā)送和接收消息?,F(xiàn)在是在Rabbit中引入完整消息傳遞模型的時(shí)候了。

讓我們快速瀏覽一下前面教程中介紹的內(nèi)容:

生產(chǎn)者是發(fā)送消息的用戶應(yīng)用程序。

隊(duì)列是存儲(chǔ)消息的緩沖區(qū)。

消費(fèi)者是接收消息的用戶應(yīng)用程序。

RabbitMQ消息傳遞模型的核心思想是,生產(chǎn)者不發(fā)送任何信息直接到隊(duì)列。事實(shí)上,生產(chǎn)者甚至不知道消息是否會(huì)發(fā)送到任何隊(duì)列。

相反,生產(chǎn)商只能向交換機(jī)(Exchange)發(fā)送消息。交換機(jī)做的事情很簡(jiǎn)單。一方面,它接收來(lái)自生產(chǎn)者的信息,另一邊則推他們排隊(duì)。Exchange必須知道如何處理接收到的消息。應(yīng)該附加到特定隊(duì)列嗎?它應(yīng)該被添加到多個(gè)隊(duì)列?還是應(yīng)該被拋棄?。這個(gè)規(guī)則是由交換類型定義的。

有幾種交換類型可用:direct, topic, headers 和 fanout。我們將集中討論最后一個(gè)——fanout。讓我們創(chuàng)建這種類型的交換,并稱之為日志:

$channel->exchange_declare("logs", "fanout", false, false, false);

fanout交換非常簡(jiǎn)單。正如你可能從這個(gè)名字猜到的,它只廣播它收到的所有消息給它所知道的所有隊(duì)列。這正是我們需要的記錄器。

Listing exchanges

列出服務(wù)器上的交換機(jī),你可以運(yùn)行rabbitmqctl:

sudo rabbitmqctl list_exchanges

在這個(gè)列表中會(huì)有一些amq. *交流和默認(rèn)(未命名)交換。默認(rèn)情況下創(chuàng)建這些>,但目前不太可能使用它們。

默認(rèn)的交換機(jī)

在本教程的前幾部分中,我們對(duì)交換機(jī)一無(wú)所知,但仍然能夠?qū)⑾l(fā)送到隊(duì)列中。這是可能的,因?yàn)槲覀兪褂玫氖悄J(rèn)的交換,我們通過(guò)空字符串(“”)來(lái)標(biāo)識(shí)它們。

回想一下我們之前如何發(fā)布消息:

$channel->basic_publish($msg, "", "hello");

我們?cè)谶@里使用默認(rèn)的或無(wú)名的交換:消息路由到指定的routing_key名稱的隊(duì)列,如果它存在的話。路由鍵是第三個(gè)參數(shù):basic_publish

現(xiàn)在,我們可以將其發(fā)布到我們命名的Exchange中:

$channel->exchange_declare("logs", "fanout", false, false, false);
$channel->basic_publish($msg, "logs");
臨時(shí)隊(duì)列(Temporary queues)

也許你還記得以前我們使用的隊(duì)列所指定的名稱(記得hellotask_queue?). 能夠說(shuō)出一個(gè)隊(duì)列對(duì)我們來(lái)說(shuō)至關(guān)重要 -- 我們需要把工人指向同一個(gè)隊(duì)列。當(dāng)你想在生產(chǎn)者和消費(fèi)者之間共享一個(gè)隊(duì)列時(shí),給隊(duì)列一個(gè)名字是很重要的。

但我們的記錄器不是這樣的。我們想了解所有日志消息,而不僅僅是其中的一個(gè)子集。我們也只對(duì)當(dāng)前流動(dòng)的消息感興趣,而不是舊消息。為了解決這個(gè)問(wèn)題,我們需要兩件事。

首先,每當(dāng)我們與Rabbit連接時(shí),我們需要一個(gè)新的空隊(duì)列。為此,我們可以創(chuàng)建一個(gè)帶有隨機(jī)名稱的隊(duì)列,或者更好 - 讓服務(wù)器為我們選擇一個(gè)隨機(jī)隊(duì)列名。

第二,一旦斷開(kāi)消費(fèi)者,隊(duì)列應(yīng)該自動(dòng)刪除。

在php客戶端中,當(dāng)我們將隊(duì)列名稱作為空字符串提供時(shí),我們創(chuàng)建一個(gè)帶有生成名稱的非持久隊(duì)列:

list($queue_name, ,) = $channel->queue_declare("");

方法返回時(shí),queue_name變量包含一個(gè)隨機(jī)生成的RabbitMQ隊(duì)列名稱。例如,它可能看起來(lái)像amq.gen-jzty20brgko-hjmujj0wlg

當(dāng)聲明它關(guān)閉的連接時(shí),隊(duì)列將被刪除,因?yàn)樗宦暶鳛楠?dú)占。

綁定(Bindings)

我們已經(jīng)創(chuàng)建了fanout交換機(jī)和隊(duì)列。現(xiàn)在我們需要告訴Exchange發(fā)送消息到我們的隊(duì)列中。交換和隊(duì)列之間的關(guān)系稱為綁定。

$channel->queue_bind($queue_name, "logs");

從現(xiàn)在開(kāi)始,日志交換將向隊(duì)列添加消息。

列出綁定列表(Listing bindings)

您可以使用現(xiàn)有的綁定列表,使用下面命令:

rabbitmqctl list_bindings
讓我們把所有整理在一起(Putting it all together)

生成日志消息的生成程序與前面的教程沒(méi)有多大區(qū)別。最重要的變化是,我們現(xiàn)在希望把消息發(fā)布到我們的日志交換,而不是無(wú)名的。這里給出emit_log.php代碼:

channel();

$channel->exchange_declare("logs", "fanout", false, false, false);

$data = implode(" ", array_slice($argv, 1));
if(empty($data)) $data = "info: Hello World!";
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, "logs");

echo " [x] Sent ", $data, "
";

$channel->close();
$connection->close();

?>

emit_log.php源碼

如您所見(jiàn),在建立連接之后,我們聲明交換。這一步是必要的,因?yàn)榘l(fā)布到一個(gè)不存在的交換機(jī)是禁止的。

如果沒(méi)有隊(duì)列綁定到Exchange,消息將丟失,但這對(duì)我們來(lái)說(shuō)是好的;如果沒(méi)有用戶正在監(jiān)聽(tīng),我們可以安全地丟棄消息。

receive_logs.php代碼:

channel();

$channel->exchange_declare("logs", "fanout", false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$channel->queue_bind($queue_name, "logs");

echo " [*] Waiting for logs. To exit press CTRL+C", "
";

$callback = function($msg){
  echo " [x] ", $msg->body, "
";
};

$channel->basic_consume($queue_name, "", false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

?>

receive_logs.php

如果要將日志保存到文件中,只需打開(kāi)控制臺(tái)并鍵入:

php receive_logs.php > logs_from_rabbit.log

如果您希望看到屏幕上的日志,生成一個(gè)新的終端并運(yùn)行:

php receive_logs.php

當(dāng)然,然后觸發(fā)日志類型:

php emit_log.php

使用rabbitmqctl list_bindings可以驗(yàn)證代碼實(shí)際上是創(chuàng)建綁定和隊(duì)列是我們想要的。兩receive_logs.php程序運(yùn)行你應(yīng)該看到:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

對(duì)結(jié)果的解釋很簡(jiǎn)單:來(lái)自Exchange日志的數(shù)據(jù)使用服務(wù)器分配的名稱到兩個(gè)隊(duì)列中。這正是我們想要的。

要了解如何偵聽(tīng)一個(gè)消息的子集,讓我們轉(zhuǎn)到RabbitMQ+PHP 教程四(Routing)。

翻譯來(lái)自 RabbitMQ - RabbitMQ tutorial - Publish/Subscribe

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/28262.html

相關(guān)文章

  • RabbitMQ+PHP 教程二(Work Queues)

    摘要:平均每個(gè)消費(fèi)者將得到相同數(shù)量的消息。消息確認(rèn)完成任務(wù)可能需要幾秒鐘。為了確保消息不會(huì)丟失,支持消息確認(rèn)。沒(méi)有任何消息超時(shí)當(dāng)這個(gè)消費(fèi)者中止了,將會(huì)重新分配消息時(shí)。這是因?yàn)橹皇钦{(diào)度消息時(shí),消息進(jìn)入隊(duì)列。 showImg(https://segmentfault.com/img/bVXNuN?w=332&h=111); 介紹 在上一個(gè) Hello World 教程中,我們編寫(xiě)了從指定隊(duì)列發(fā)送...

    iKcamp 評(píng)論0 收藏0
  • MQ對(duì)比之RabbitMQ & Redis

    摘要:消息隊(duì)列選擇是一個(gè)由開(kāi)發(fā)的的開(kāi)源實(shí)現(xiàn)的產(chǎn)品,是一個(gè)消息代理,從生產(chǎn)者接收消息并傳遞消息至消費(fèi)者,期間可根據(jù)規(guī)則路由緩存持久化消息。綁定隊(duì)列和交換機(jī)之間的關(guān)系。根據(jù)消息的屬性和的屬性來(lái)轉(zhuǎn)發(fā)消息。 消息隊(duì)列選擇:RabbitMQ & Redis RabbitMQ RabbitMQ是一個(gè)由erlang開(kāi)發(fā)的AMQP(Advanced Message Queue )的開(kāi)源實(shí)現(xiàn)的產(chǎn)品,Rabbi...

    notebin 評(píng)論0 收藏0
  • [譯] RabbitMQ tutorials (3) ---- 'Pub/Sub'

    摘要:生產(chǎn)者只能把消息發(fā)到交換器。是否要追加到一個(gè)特殊的隊(duì)列是否要追加到許多的隊(duì)列或者丟掉這條消息這些規(guī)則被定義為交換類型。有一點(diǎn)很關(guān)鍵,向不存在的交換器發(fā)布消息是被禁止的。如果仍然沒(méi)有隊(duì)列綁定交換器,消息會(huì)丟失。 發(fā)布與訂閱 (Publish/Subscribe) 在之前的章節(jié)中,我們創(chuàng)建了工作隊(duì)列,之前的工作隊(duì)列的假設(shè)是每個(gè)任務(wù)只被分發(fā)到一個(gè)worker。在這一節(jié)中,我們會(huì)做一些完全不一...

    zzir 評(píng)論0 收藏0
  • 簡(jiǎn)述消息隊(duì)列在電商系統(tǒng)使用場(chǎng)景以及工作模式

    摘要:概述概述消息隊(duì)列,是分布式系統(tǒng)中重要的組件,是一種進(jìn)程間通信或者是同一進(jìn)程的不同線程的通信方式。消息隊(duì)列的使用場(chǎng)景消息隊(duì)列的使用場(chǎng)景異步處理流量控制應(yīng)用解耦應(yīng)用解耦應(yīng)用解耦消息隊(duì)列的一個(gè)作用就是實(shí)現(xiàn)系統(tǒng)應(yīng)用之間的解耦。概述消息隊(duì)列(Message Queue),是分布式系統(tǒng)中重要的組件,是一種進(jìn)程間通信或者是同一進(jìn)程的不同線程的通信方式。和 http 同步協(xié)議不同的是,消息隊(duì)列是一種異步的通...

    Honwhy 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<