摘要:生產(chǎn)者只能把消息發(fā)到交換器。是否要追加到一個(gè)特殊的隊(duì)列是否要追加到許多的隊(duì)列或者丟掉這條消息這些規(guī)則被定義為交換類型。有一點(diǎn)很關(guān)鍵,向不存在的交換器發(fā)布消息是被禁止的。如果仍然沒(méi)有隊(duì)列綁定交換器,消息會(huì)丟失。
發(fā)布與訂閱 (Publish/Subscribe)
在之前的章節(jié)中,我們創(chuàng)建了工作隊(duì)列,之前的工作隊(duì)列的假設(shè)是每個(gè)任務(wù)只被分發(fā)到一個(gè)worker。在這一節(jié)中,我們會(huì)做一些完全不一樣的事--把一條消息發(fā)送給多個(gè)消費(fèi)者,這個(gè)模式叫做“發(fā)布/訂閱”(publish/subscribe)。
舉個(gè)例子,我們要構(gòu)建一個(gè)簡(jiǎn)易的日志系統(tǒng)。由兩個(gè)程序組成---一個(gè)來(lái)發(fā)出日志消息,另一個(gè)接收并把消息顯示出來(lái)。
在我們的日志系統(tǒng)當(dāng)中,每一個(gè)正在運(yùn)行的接收程序都會(huì)收到消息。這樣,我們可以運(yùn)行一個(gè)receiver并把log定向到磁盤,然后再跑一個(gè)receiver,看看它是否會(huì)在屏幕上顯示日志。
事實(shí)上,被發(fā)布的消息會(huì)被廣播到所有的receiver那里。
交換器(Exchanges)在之前的引導(dǎo)中,我們從一個(gè)隊(duì)列中做了收發(fā)的操作。是時(shí)候介紹在Rabbit中的全部的消息模型了。
讓我們先快速地回顧一下之前學(xué)習(xí)的,
producer 是一個(gè)發(fā)送消息的應(yīng)用
queue 是一個(gè)存儲(chǔ)消息的buffer
consumer 是一個(gè)接收消息的應(yīng)用
RabbitMQ中,消息模型的核心思想是生產(chǎn)者絕不會(huì)把消息直接發(fā)到隊(duì)列。實(shí)際上,生產(chǎn)者通常不知道一條消息是否已經(jīng)被發(fā)送到任意一個(gè)隊(duì)列中。
生產(chǎn)者只能把消息發(fā)到交換器。交換器是個(gè)簡(jiǎn)單的東西。一方面接收從生產(chǎn)者那邊來(lái)的消息,另一方面把他們push到隊(duì)列中。交換器一定要知道當(dāng)它們接收到消息之后要如何處理。是否要追加到一個(gè)特殊的隊(duì)列?是否要追加到許多的隊(duì)列?或者丟掉這條消息?這些規(guī)則被定義為交換類型。
以下是可以使用的交換類型:direct, topic, header, fanout。我們介紹一下最后一個(gè)--fanout。讓我們先創(chuàng)建一個(gè)fanout類型的交換器“l(fā)ogs”:
ch.assertExchange("logs", "fanout", {durable: false})
fanout類型的交換器非常簡(jiǎn)單,我們可以從單單從名字上猜測(cè),它就是把它接收到的消息廣播給所有已知的隊(duì)列。這也就是我們的logger所需要的。
列出所有的交換器(Listing exchanges)
你可以使用rabbitmqctl
$sudo rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic logs fanout ...done.
在列表中,一些amq.*的交換器和一些默認(rèn)的(未命名的),都是被默認(rèn)創(chuàng)建的,但是可能是你用不到的
未命名的交換器(Nameless exchange)
在之前的章節(jié)中我們未提過(guò)交換器,但是我們?nèi)匀荒軌虬严鞯疥?duì)列中,這就是我們使用了默認(rèn)的交換器,因?yàn)槲覀兪褂昧丝盏淖址?")。
之前我們是這樣發(fā)布一條消息的
ch.sendToQueue("hello", new Buffer("Hello World!"));
這里我們使用默認(rèn)的或者未命名的交換器,如果第一個(gè)參數(shù)存在的話,消息會(huì)被路由到這個(gè)參數(shù)名的隊(duì)列。
現(xiàn)在,我們可以使用我們定義好的交換器
ch.publish("logs", "", new Buffer("Hello World!"));
第二個(gè)參數(shù)為空的話代表我們不想把消息推到指定的隊(duì)列,只是想發(fā)布到logs的交換器中。
臨時(shí)隊(duì)列 (Temporary queues)你還記得我們之前用的聲明過(guò)的隊(duì)列(hello 和 task_queue)嗎?。能夠指明一個(gè)隊(duì)列的名字對(duì)我們來(lái)說(shuō)是重要的--我們需要把workers指到相同的隊(duì)列。
當(dāng)你想要分享給消費(fèi)者和生產(chǎn)者隊(duì)列的時(shí)候,給隊(duì)列起一個(gè)名字很重要。
但著不是我們logger這個(gè)程序需要的,我們想監(jiān)聽(tīng)所有的log消息,不是一部分log消息。同樣的,我們對(duì)正在流動(dòng)的消息也感興趣(not in the old ones).我們需要完成兩件事情:
第一,不管我們什么時(shí)候連接Rabbit,都需要一個(gè)新的,空的隊(duì)列。我們可以創(chuàng)建一個(gè)隨機(jī)的隊(duì)列名字,或者讓服務(wù)器為我們隨機(jī)選擇一個(gè)隊(duì)列名字。
第二,不管我們什么時(shí)候斷開(kāi)與消費(fèi)者的連接,隊(duì)列需要自動(dòng)銷毀。
在amqp.node的客戶端中,當(dāng)我們傳入字符串的時(shí)候,可以創(chuàng)建一個(gè)帶有名字的未持久化的隊(duì)列
ch.assertQueue("", {exclusive: true});
這個(gè)方法返回一個(gè)帶有隨機(jī)名字的隊(duì)列實(shí)例,比如amq.gen-JzTY20BRgKO-HjmUJj0wLg。
當(dāng)連接被斷開(kāi)的時(shí)候,這個(gè)隊(duì)列會(huì)被銷毀,因?yàn)槲覀冊(cè)诼暶鞯臅r(shí)候{exclusive:true}
我們已經(jīng)創(chuàng)建了一個(gè)fanout類型的交換器和一個(gè)隊(duì)列,現(xiàn)在我們需要告訴交換器把消息發(fā)送給隊(duì)列,隊(duì)列與交換器之間的關(guān)系我們稱之為綁定。
ch.bindQueue(queue_name, "logs", "");
現(xiàn)在開(kāi)始,logs的交換器為追加消息到我們的隊(duì)列
Listing bindings:
你可以列出已經(jīng)存在的綁定關(guān)系,你應(yīng)該猜到。rabbitmqctl list_bindings。
整合(Putting it all together)生產(chǎn)者的程序,用來(lái)發(fā)出log消息,和之前章節(jié)沒(méi)有太多的不同,最重要的改變就是現(xiàn)在我們是把消息發(fā)布到我們的logs的交換器中,而不是之前的在未聲明的情況下使用。發(fā)送的時(shí)候我們需要提供一個(gè)路由鍵,但是在fanout類型當(dāng)中,這個(gè)可以忽略。下面是emit_log.js的代碼
#!/usr/bin/env node var amqp = require("amqplib/callback_api"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var ex = "logs"; var msg = process.argv.slice(2).join(" ") || "Hello World!"; ch.assertExchange(ex, "fanout", {durable: false}); ch.publish(ex, "", new Buffer(msg)); console.log(" [x] Sent %s", msg); }); setTimeout(function() { conn.close(); process.exit(0) }, 500); });
(emit_log.js 源碼)
正如你所見(jiàn),在與交換器建立連接之后。有一點(diǎn)很關(guān)鍵,向不存在的交換器發(fā)布消息是被禁止的。
如果仍然沒(méi)有隊(duì)列綁定交換器,消息會(huì)丟失。但是對(duì)我們來(lái)說(shuō)還好,如果仍然沒(méi)有消費(fèi)者監(jiān)聽(tīng),我們可以安全地丟棄這些消息。
receive_logs.js的代碼
#!/usr/bin/env node var amqp = require("amqplib/callback_api"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var ex = "logs"; ch.assertExchange(ex, "fanout", {durable: false}); ch.assertQueue("", {exclusive: true}, function(err, q) { console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue); ch.bindQueue(q.queue, ex, ""); ch.consume(q.queue, function(msg) { console.log(" [x] %s", msg.content.toString()); }, {noAck: true}); }); }); });
(receive_logs,js源碼)
如果你想要保存log,你可以打開(kāi)控制臺(tái)輸入
$ ./receive_logs.js > logs_from_rabbit.log
如果你想在屏幕上看到log,再打開(kāi)一個(gè)控制臺(tái)
$ ./receive_logs.js
當(dāng)然,需要發(fā)出logs
$ ./emit_log.js
使用rabbitmqctl list_bindings,你可以確定剛才的代碼確實(shí)創(chuàng)建了交換器和隊(duì)列,有兩個(gè)receive_logs.js的程序在運(yùn)行。
$ sudo rabbitmqctl list_bindings Listing bindings ... logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] ...done.
這個(gè)結(jié)果的簡(jiǎn)要解釋:數(shù)據(jù)從logs交換器到兩個(gè)服務(wù)器分配的隊(duì)列。這也是我們想要的結(jié)果。
要如何監(jiān)聽(tīng)一部分的消息?讓我們移到下一章。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/88110.html
摘要:這樣的消息分發(fā)機(jī)制稱作輪詢。在進(jìn)程掛了之后,所有的未被確認(rèn)的消息會(huì)被重新分發(fā)。忘記確認(rèn)這是一個(gè)普遍的錯(cuò)誤,丟失。為了使消息不會(huì)丟失,兩件事情需要確保,我們需要持久化隊(duì)列和消息。 工作隊(duì)列 showImg(https://segmentfault.com/img/remote/1460000008229494?w=332&h=111); 在第一篇中,我們寫了一個(gè)程序從已經(jīng)聲明的隊(duì)列中收發(fā)...
摘要:一個(gè)表示編譯器檢測(cè)到一個(gè)無(wú)效的引用值。在實(shí)際情況中,往往是在獲取一個(gè)未被賦值的引用時(shí)被拋出。任何一個(gè)函數(shù)上下文都有一個(gè)被稱為活動(dòng)對(duì)象的變量對(duì)象。沒(méi)有找到的話,就會(huì)認(rèn)為引用名沒(méi)有基礎(chǔ)值并拋出的錯(cuò)誤。下沒(méi)有下的屬性僅存在于被啟動(dòng)的情況下。 和其他語(yǔ)言相比,javascript中的對(duì)于undefined的理解還是有點(diǎn)讓人困惑的。特別是試著理解ReferenceErrors錯(cuò)誤(x is no...
摘要:概述技術(shù)棧錯(cuò)誤詳情報(bào)警機(jī)器人經(jīng)常有如下警告過(guò)程確定報(bào)錯(cuò)位置有日志就很好辦首先看日志在哪里打的從三個(gè)地方入手我們自己的代碼沒(méi)有的代碼從上下來(lái)沒(méi)有的代碼在容器中執(zhí)行 bug概述 技術(shù)棧 nginx uwsgi bottle 錯(cuò)誤詳情 報(bào)警機(jī)器人經(jīng)常有如下警告: 1 2018-xx-xxT06:59:03.038Z 660ece0ebaad admin/admin 14 - - Sock...
摘要:允許接收和轉(zhuǎn)發(fā)消息。一個(gè)等待接收消息的程序是一個(gè)消費(fèi)者。發(fā)送者會(huì)先連接到發(fā)送一條消息,然后退出。注意這里的是要和之前的名稱一致。翻譯日期另因?yàn)橄肴腴T第一次想著翻譯,第一次然后希望多多提出不足。 gitBook https://joursion.gitbooks.io/... Title: RabbitMQ tutorials ---- Hello World (Javascript) ...
閱讀 2434·2021-11-18 10:02
閱讀 696·2021-10-08 10:04
閱讀 2271·2021-09-03 10:51
閱讀 3552·2019-08-30 15:44
閱讀 2807·2019-08-29 14:09
閱讀 2474·2019-08-29 12:21
閱讀 2071·2019-08-26 13:45
閱讀 1813·2019-08-26 13:25