成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

[譯] RabbitMQ tutorials (3) ---- 'Pub/Sub'

zzir / 812人閱讀

摘要:生產(chǎn)者只能把消息發(fā)到交換器。是否要追加到一個(gè)特殊的隊(duì)列是否要追加到許多的隊(duì)列或者丟掉這條消息這些規(guī)則被定義為交換類型。有一點(diǎn)很關(guān)鍵,向不存在的交換器發(fā)布消息是被禁止的。如果仍然沒(méi)有隊(duì)列綁定交換器,消息會(huì)丟失。

發(fā)布與訂閱 (Publish/Subscribe)

在之前的章節(jié)中,我們創(chuàng)建了工作隊(duì)列,之前的工作隊(duì)列的假設(shè)是每個(gè)任務(wù)只被分發(fā)到一個(gè)worker。在這一節(jié)中,我們會(huì)做一些完全不一樣的事--把一條消息發(fā)送給多個(gè)消費(fèi)者,這個(gè)模式叫做“發(fā)布/訂閱”(publish/subscribe)。

舉個(gè)例子,我們要構(gòu)建一個(gè)簡(jiǎn)易的日志系統(tǒng)。由兩個(gè)程序組成---一個(gè)來(lái)發(fā)出日志消息,另一個(gè)接收并把消息顯示出來(lái)。

在我們的日志系統(tǒng)當(dāng)中,每一個(gè)正在運(yùn)行的接收程序都會(huì)收到消息。這樣,我們可以運(yùn)行一個(gè)receiver并把log定向到磁盤,然后再跑一個(gè)receiver,看看它是否會(huì)在屏幕上顯示日志。

事實(shí)上,被發(fā)布的消息會(huì)被廣播到所有的receiver那里。

交換器(Exchanges)

在之前的引導(dǎo)中,我們從一個(gè)隊(duì)列中做了收發(fā)的操作。是時(shí)候介紹在Rabbit中的全部的消息模型了。

讓我們先快速地回顧一下之前學(xué)習(xí)的,

producer 是一個(gè)發(fā)送消息的應(yīng)用

queue 是一個(gè)存儲(chǔ)消息的buffer

consumer 是一個(gè)接收消息的應(yīng)用

RabbitMQ中,消息模型的核心思想是生產(chǎn)者絕不會(huì)把消息直接發(fā)到隊(duì)列。實(shí)際上,生產(chǎn)者通常不知道一條消息是否已經(jīng)被發(fā)送到任意一個(gè)隊(duì)列中。

生產(chǎn)者只能把消息發(fā)到交換器。交換器是個(gè)簡(jiǎn)單的東西。一方面接收從生產(chǎn)者那邊來(lái)的消息,另一方面把他們push到隊(duì)列中。交換器一定要知道當(dāng)它們接收到消息之后要如何處理。是否要追加到一個(gè)特殊的隊(duì)列?是否要追加到許多的隊(duì)列?或者丟掉這條消息?這些規(guī)則被定義為交換類型。

以下是可以使用的交換類型:direct, topic, header, fanout。我們介紹一下最后一個(gè)--fanout。讓我們先創(chuàng)建一個(gè)fanout類型的交換器“l(fā)ogs”:

ch.assertExchange("logs", "fanout", {durable: false})

fanout類型的交換器非常簡(jiǎn)單,我們可以從單單從名字上猜測(cè),它就是把它接收到的消息廣播給所有已知的隊(duì)列。這也就是我們的logger所需要的。

列出所有的交換器(Listing exchanges)
你可以使用rabbitmqctl

$sudo rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.

在列表中,一些amq.*的交換器和一些默認(rèn)的(未命名的),都是被默認(rèn)創(chuàng)建的,但是可能是你用不到的

未命名的交換器(Nameless exchange)
在之前的章節(jié)中我們未提過(guò)交換器,但是我們?nèi)匀荒軌虬严鞯疥?duì)列中,這就是我們使用了默認(rèn)的交換器,因?yàn)槲覀兪褂昧丝盏淖址?")。
之前我們是這樣發(fā)布一條消息的
ch.sendToQueue("hello", new Buffer("Hello World!"));
這里我們使用默認(rèn)的或者未命名的交換器,如果第一個(gè)參數(shù)存在的話,消息會(huì)被路由到這個(gè)參數(shù)名的隊(duì)列。

現(xiàn)在,我們可以使用我們定義好的交換器

ch.publish("logs", "", new Buffer("Hello World!"));

第二個(gè)參數(shù)為空的話代表我們不想把消息推到指定的隊(duì)列,只是想發(fā)布到logs的交換器中。

臨時(shí)隊(duì)列 (Temporary queues)

你還記得我們之前用的聲明過(guò)的隊(duì)列(hello 和 task_queue)嗎?。能夠指明一個(gè)隊(duì)列的名字對(duì)我們來(lái)說(shuō)是重要的--我們需要把workers指到相同的隊(duì)列。
當(dāng)你想要分享給消費(fèi)者和生產(chǎn)者隊(duì)列的時(shí)候,給隊(duì)列起一個(gè)名字很重要。

但著不是我們logger這個(gè)程序需要的,我們想監(jiān)聽(tīng)所有的log消息,不是一部分log消息。同樣的,我們對(duì)正在流動(dòng)的消息也感興趣(not in the old ones).我們需要完成兩件事情:
第一,不管我們什么時(shí)候連接Rabbit,都需要一個(gè)新的,空的隊(duì)列。我們可以創(chuàng)建一個(gè)隨機(jī)的隊(duì)列名字,或者讓服務(wù)器為我們隨機(jī)選擇一個(gè)隊(duì)列名字。
第二,不管我們什么時(shí)候斷開(kāi)與消費(fèi)者的連接,隊(duì)列需要自動(dòng)銷毀。

amqp.node的客戶端中,當(dāng)我們傳入字符串的時(shí)候,可以創(chuàng)建一個(gè)帶有名字的未持久化的隊(duì)列

ch.assertQueue("", {exclusive: true});

這個(gè)方法返回一個(gè)帶有隨機(jī)名字的隊(duì)列實(shí)例,比如amq.gen-JzTY20BRgKO-HjmUJj0wLg
當(dāng)連接被斷開(kāi)的時(shí)候,這個(gè)隊(duì)列會(huì)被銷毀,因?yàn)槲覀冊(cè)诼暶鞯臅r(shí)候{exclusive:true}

綁定 (Bindings)

我們已經(jīng)創(chuàng)建了一個(gè)fanout類型的交換器和一個(gè)隊(duì)列,現(xiàn)在我們需要告訴交換器把消息發(fā)送給隊(duì)列,隊(duì)列與交換器之間的關(guān)系我們稱之為綁定。
ch.bindQueue(queue_name, "logs", "");
現(xiàn)在開(kāi)始,logs的交換器為追加消息到我們的隊(duì)列

Listing bindings:

你可以列出已經(jīng)存在的綁定關(guān)系,你應(yīng)該猜到。rabbitmqctl list_bindings。

整合(Putting it all together)

生產(chǎn)者的程序,用來(lái)發(fā)出log消息,和之前章節(jié)沒(méi)有太多的不同,最重要的改變就是現(xiàn)在我們是把消息發(fā)布到我們的logs的交換器中,而不是之前的在未聲明的情況下使用。發(fā)送的時(shí)候我們需要提供一個(gè)路由鍵,但是在fanout類型當(dāng)中,這個(gè)可以忽略。下面是emit_log.js的代碼

#!/usr/bin/env node

var amqp = require("amqplib/callback_api");

amqp.connect("amqp://localhost", function(err, conn) {
  conn.createChannel(function(err, ch) {
    var ex = "logs";
    var msg = process.argv.slice(2).join(" ") || "Hello World!";

    ch.assertExchange(ex, "fanout", {durable: false});
    ch.publish(ex, "", new Buffer(msg));
    console.log(" [x] Sent %s", msg);
  });

  setTimeout(function() { conn.close(); process.exit(0) }, 500);
});

(emit_log.js 源碼)

正如你所見(jiàn),在與交換器建立連接之后。有一點(diǎn)很關(guān)鍵,向不存在的交換器發(fā)布消息是被禁止的。
如果仍然沒(méi)有隊(duì)列綁定交換器,消息會(huì)丟失。但是對(duì)我們來(lái)說(shuō)還好,如果仍然沒(méi)有消費(fèi)者監(jiān)聽(tīng),我們可以安全地丟棄這些消息。

receive_logs.js的代碼

#!/usr/bin/env node

var amqp = require("amqplib/callback_api");

amqp.connect("amqp://localhost", function(err, conn) {
  conn.createChannel(function(err, ch) {
    var ex = "logs";

    ch.assertExchange(ex, "fanout", {durable: false});

    ch.assertQueue("", {exclusive: true}, function(err, q) {
      console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
      ch.bindQueue(q.queue, ex, "");

      ch.consume(q.queue, function(msg) {
        console.log(" [x] %s", msg.content.toString());
      }, {noAck: true});
    });
  });
});

(receive_logs,js源碼)

如果你想要保存log,你可以打開(kāi)控制臺(tái)輸入

$ ./receive_logs.js > logs_from_rabbit.log

如果你想在屏幕上看到log,再打開(kāi)一個(gè)控制臺(tái)

$ ./receive_logs.js

當(dāng)然,需要發(fā)出logs

$ ./emit_log.js

使用rabbitmqctl list_bindings,你可以確定剛才的代碼確實(shí)創(chuàng)建了交換器和隊(duì)列,有兩個(gè)receive_logs.js的程序在運(yùn)行。

$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

這個(gè)結(jié)果的簡(jiǎn)要解釋:數(shù)據(jù)從logs交換器到兩個(gè)服務(wù)器分配的隊(duì)列。這也是我們想要的結(jié)果。

要如何監(jiān)聽(tīng)一部分的消息?讓我們移到下一章。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/88110.html

相關(guān)文章

  • [] RabbitMQ tutorials (2) ---- �39;work queue&#

    摘要:這樣的消息分發(fā)機(jī)制稱作輪詢。在進(jìn)程掛了之后,所有的未被確認(rèn)的消息會(huì)被重新分發(fā)。忘記確認(rèn)這是一個(gè)普遍的錯(cuò)誤,丟失。為了使消息不會(huì)丟失,兩件事情需要確保,我們需要持久化隊(duì)列和消息。 工作隊(duì)列 showImg(https://segmentfault.com/img/remote/1460000008229494?w=332&h=111); 在第一篇中,我們寫了一個(gè)程序從已經(jīng)聲明的隊(duì)列中收發(fā)...

    joyvw 評(píng)論0 收藏0
  • []Understanding javascript�39;s �39;undefined

    摘要:一個(gè)表示編譯器檢測(cè)到一個(gè)無(wú)效的引用值。在實(shí)際情況中,往往是在獲取一個(gè)未被賦值的引用時(shí)被拋出。任何一個(gè)函數(shù)上下文都有一個(gè)被稱為活動(dòng)對(duì)象的變量對(duì)象。沒(méi)有找到的話,就會(huì)認(rèn)為引用名沒(méi)有基礎(chǔ)值并拋出的錯(cuò)誤。下沒(méi)有下的屬性僅存在于被啟動(dòng)的情況下。 和其他語(yǔ)言相比,javascript中的對(duì)于undefined的理解還是有點(diǎn)讓人困惑的。特別是試著理解ReferenceErrors錯(cuò)誤(x is no...

    galaxy_robot 評(píng)論0 收藏0
  • Socket Error 104 bug

    摘要:概述技術(shù)棧錯(cuò)誤詳情報(bào)警機(jī)器人經(jīng)常有如下警告過(guò)程確定報(bào)錯(cuò)位置有日志就很好辦首先看日志在哪里打的從三個(gè)地方入手我們自己的代碼沒(méi)有的代碼從上下來(lái)沒(méi)有的代碼在容器中執(zhí)行 bug概述 技術(shù)棧 nginx uwsgi bottle 錯(cuò)誤詳情 報(bào)警機(jī)器人經(jīng)常有如下警告: 1 2018-xx-xxT06:59:03.038Z 660ece0ebaad admin/admin 14 - - Sock...

    keithyau 評(píng)論0 收藏0
  • []RabbitMQ tutorials (一)[JavaScript]

    摘要:允許接收和轉(zhuǎn)發(fā)消息。一個(gè)等待接收消息的程序是一個(gè)消費(fèi)者。發(fā)送者會(huì)先連接到發(fā)送一條消息,然后退出。注意這里的是要和之前的名稱一致。翻譯日期另因?yàn)橄肴腴T第一次想著翻譯,第一次然后希望多多提出不足。 gitBook https://joursion.gitbooks.io/... Title: RabbitMQ tutorials ---- Hello World (Javascript) ...

    BlackHole1 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看
<