成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

【譯】RabbitMQ系列(三) - 發(fā)布/訂閱模式

WrBug / 1583人閱讀

摘要:發(fā)布訂閱模式在之前的文章里,創(chuàng)建了。我們稱(chēng)之為發(fā)布訂閱模式。其實(shí)我們是用到了默認(rèn)的,用空字符串來(lái)標(biāo)識(shí)??兆址砹藳](méi)有名字的被路由到了由指定名字的。和這種關(guān)系的建立我們稱(chēng)之為從現(xiàn)在開(kāi)始這個(gè)就會(huì)將推向我們的隊(duì)列了。

發(fā)布訂閱模式

在之前的文章里,創(chuàng)建了work queue。work queue中,每一個(gè)task都會(huì)派發(fā)給一個(gè)worker。在本章中,我們會(huì)完成完全不一樣的事情 - 我們會(huì)派發(fā)一條message給多個(gè)消費(fèi)者。我們稱(chēng)之為發(fā)布訂閱模式。

為了更好來(lái)說(shuō)明,我們將要構(gòu)建一個(gè)簡(jiǎn)單的日志系統(tǒng)。會(huì)由兩部分代碼構(gòu)成,第一部分來(lái)發(fā)送日志message,第二部分會(huì)接受并打印日志。

在我們的日志系統(tǒng)中,每一個(gè)接收程序都會(huì)收到日志message。這種方式下,我們可以運(yùn)行一個(gè)接收程序?qū)⑷罩颈4娴酱疟P(pán),同時(shí)使用另外一個(gè)接收程序?qū)⑷罩敬蛴〉狡聊弧?/p>

本質(zhì)上來(lái)說(shuō),發(fā)布的日志message會(huì)廣播到所有運(yùn)行的接收者。

Exchanges

在之前的章節(jié)我們通過(guò)queue收發(fā)message。現(xiàn)在開(kāi)始介紹Rabbit中的full messaging model。

首先讓我們快速的回憶一下之前的章節(jié)

producer是一個(gè)發(fā)送message的用戶(hù)程序。

queue是保存message的緩沖區(qū)

consumer是接收message的用戶(hù)程序

RabbitMQ的messaging model的核心思想是producer不會(huì)直接向queue發(fā)送message。實(shí)際上,很多時(shí)候producer也不知道m(xù)essage會(huì)發(fā)送到哪些queue。

這里,producer將message發(fā)送到exchange。exchange是一個(gè)非常簡(jiǎn)單的東西。一方面它從producer側(cè)接收message,另一方面它把message推送到queue去。 exchange必須知道對(duì)接收到的message接著要去做什么。是轉(zhuǎn)發(fā)到特定的queue?還是轉(zhuǎn)發(fā)到多個(gè)queue?還是干脆丟棄掉。這個(gè)規(guī)則取決于定義時(shí)exchange的類(lèi)型。

exchange有四種可選的類(lèi)型:direct, topic, headers和fanout. 今天我們聚焦于最后一種-fanout。讓我們創(chuàng)建一個(gè)fanout類(lèi)型的exchange,命名為logs

channel.exchangeDeclare("logs","fanout");

fanout類(lèi)型的exchange是非常簡(jiǎn)單的??梢詮拿稚洗蟾挪鲁銎溆猛荆鼜V播所有的message到它所知道的queue去。這也正是日志應(yīng)用所期望的。

列出所有的exhange,可以使用rabbitmqctl命令 sudo rabbitmqctl list_exchanges,在列表總會(huì)出現(xiàn)一些amq.* 的exchange,和默認(rèn)的exchange。這些是默認(rèn)自動(dòng)創(chuàng)建的,我們不會(huì)使用到它們。

沒(méi)有名字的exchange。在之前的章節(jié)里我們沒(méi)有提到過(guò)exchanges,我們直接將message發(fā)送到queue。其實(shí)我們是用到了默認(rèn)的exchange,用空字符串”“來(lái)標(biāo)識(shí)?;叵胍幌拢覀兿裣旅孢@樣發(fā)布message:
channel.basicPublish("","hello",null,message.getBytes()); 第一個(gè)參數(shù)就是exchange的名字??兆址砹藳](méi)有名字的exchange:message被路由到了由routingKey指定名字的queue。

現(xiàn)在,我們可以向有名字的exchange發(fā)布message。

channel.basicPublish("logs","",null,message.getBytes());
Temporary Queue

之前我們使用queue時(shí)都會(huì)指定名字,如hello和task_queue。給一個(gè)queue命名是很重要的,因?yàn)槲覀円oworker指出相同的queue。當(dāng)需要在生產(chǎn)者和消費(fèi)者間共享一個(gè)queue時(shí),就必須給queue取好名字。

但是在我們?nèi)罩緫?yīng)用中,情況卻有所不同。我們需要接收到所有的log message。我們也關(guān)注當(dāng)前流動(dòng)的message。我們需要搞定2個(gè)事情。

首先,當(dāng)連接到Rabbit時(shí),我們需要一個(gè)全新的,空的queue。因此我們可以自己創(chuàng)建一個(gè)隨意名字的queue,或是由服務(wù)器選擇隨意的queue名字,這當(dāng)然是更好的選擇。

其次,當(dāng)我們斷開(kāi)接收者時(shí),該queue可以被自動(dòng)刪除。

在java客戶(hù)端中,當(dāng)我們使用無(wú)參的queueDeclare()時(shí),我們創(chuàng)建的是使用自動(dòng)生成名字的一個(gè)不持久的,自動(dòng)刪除queue:

String queueName = channel.queueDeclare().getQueue();

可以通過(guò)這里來(lái)學(xué)習(xí)到exclusive標(biāo)志和其他queue的相關(guān)屬性。

這時(shí)queue就具有一個(gè)隨機(jī)的名字,比如像amq.gen-JzTY20BRgKO-HjmUJj0wLg.

Bindings


我們已經(jīng)創(chuàng)建了一個(gè)fanout exchange和queue.現(xiàn)在我們要設(shè)置exchange,讓它把message發(fā)送到我們的queue。exchange和queue這種關(guān)系的建立我們稱(chēng)之為binding.

channel.queueBind(queueName,"logs","");

從現(xiàn)在開(kāi)始logs這個(gè)exchange就會(huì)將message推向我們的隊(duì)列了。

可以使用命令rabbitmqctl list_bindings 來(lái)列出當(dāng)前所有的binding。
開(kāi)始執(zhí)行


生產(chǎn)者程序,和之前章節(jié)的代碼變化不大,主要的變化是我們將message發(fā)送到exchange而不是一個(gè)queue。你發(fā)現(xiàn)我們?cè)诎l(fā)送的時(shí)候會(huì)填上一個(gè)routingKey,這個(gè)值在fanout類(lèi)型的exchange中是被忽略的。下面是生產(chǎn)者EmitLog.java的代碼

public class EmitLog {

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = argv.length < 1 ? "info: Hello World!" :
                            String.join(" ", argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent "" + message + """);
    }
  }
}

如你所見(jiàn),在建立connection之后我們聲明了exchange.這一步是必要的,發(fā)布Message到一個(gè)不存在的exchange是不允許的。

如果沒(méi)有queue綁定到exchange的時(shí)候,發(fā)布的message是會(huì)丟失的,但在現(xiàn)在這個(gè)場(chǎng)景是OK的。下面是ReceiveLogs.java的代碼:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogs {
  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received "" + message + """);
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

首先進(jìn)行編譯

javac -cp $CP EmitLog.java ReceiveLog.java

如果要把日志保存到文件,則

java -cp $CP ReceiveLogs > logs_from_rabbit.log

如果要在控制臺(tái)看日志,在另一個(gè)終端

java -cp $CP ReceiveLogs

最后來(lái)發(fā)送日志

java -cp $CP EmitLog

使用rabbitmqctl list_bindings,來(lái)確認(rèn)程序創(chuàng)建了我們?cè)诖a中指定的binding和queue. 運(yùn)行兩個(gè)ReceiveLogs程序,你會(huì)看到像下面的輸出

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/73895.html

相關(guān)文章

  • RabbitMQ系列(五) - 主題模式

    摘要:主題模式在上一章我們改進(jìn)了我們的日志系統(tǒng),如果使用我們只能簡(jiǎn)單進(jìn)行廣播,而使用則允許消費(fèi)者可以進(jìn)行一定程度的選擇。為的會(huì)同時(shí)發(fā)布到這兩個(gè)。當(dāng)為時(shí),會(huì)接收所有的。當(dāng)中沒(méi)有使用通配符和時(shí),的行為和一致。 主題模式 在上一章我們改進(jìn)了我們的日志系統(tǒng),如果使用fanout我們只能簡(jiǎn)單進(jìn)行廣播,而使用direct則允許消費(fèi)者可以進(jìn)行一定程度的選擇。但是direct還是有其局限性,其路由不支持多個(gè)...

    pingan8787 評(píng)論0 收藏0
  • RabbitMQ系列(四) - 路由模式

    摘要:路由模式在之前的文章中我們建立了一個(gè)簡(jiǎn)單的日志系統(tǒng)。更形象的表示,如對(duì)中的感興趣。為了進(jìn)行說(shuō)明,像下圖這么來(lái)設(shè)置如圖,可以看到有兩個(gè)綁到了類(lèi)型為的上。如圖的設(shè)置中,一個(gè)為的就會(huì)同時(shí)發(fā)送到和。接收程序可以選擇要接收日志的嚴(yán)重性級(jí)別。 路由模式 在之前的文章中我們建立了一個(gè)簡(jiǎn)單的日志系統(tǒng)。我們可以通過(guò)這個(gè)系統(tǒng)將日志message廣播給很多接收者。 在本篇文章中,我們?cè)谶@之上,添加一個(gè)新的功...

    liuchengxu 評(píng)論0 收藏0
  • [] RabbitMQ tutorials (3) ---- 'Pub/Sub'

    摘要:生產(chǎn)者只能把消息發(fā)到交換器。是否要追加到一個(gè)特殊的隊(duì)列是否要追加到許多的隊(duì)列或者丟掉這條消息這些規(guī)則被定義為交換類(lèi)型。有一點(diǎn)很關(guān)鍵,向不存在的交換器發(fā)布消息是被禁止的。如果仍然沒(méi)有隊(duì)列綁定交換器,消息會(huì)丟失。 發(fā)布與訂閱 (Publish/Subscribe) 在之前的章節(jié)中,我們創(chuàng)建了工作隊(duì)列,之前的工作隊(duì)列的假設(shè)是每個(gè)任務(wù)只被分發(fā)到一個(gè)worker。在這一節(jié)中,我們會(huì)做一些完全不一...

    zzir 評(píng)論0 收藏0
  • RabbitMQ系列(六)-RPC模式

    摘要:如果涉及返回值,就要用到本章提到的了。方法發(fā)送請(qǐng)求,并阻塞知道結(jié)果返回。當(dāng)有消息時(shí),進(jìn)行計(jì)算并通過(guò)指定的發(fā)送給客戶(hù)端。當(dāng)接收到,則檢查。如果和之前的匹配,則將消息返回給應(yīng)用進(jìn)行處理。 RPC模式 在第二章中我們學(xué)習(xí)了如何使用Work模式在多個(gè)worker之間派發(fā)時(shí)間敏感的任務(wù)。這種情況是不涉及到返回值的,worker執(zhí)行任務(wù)就好。如果涉及返回值,就要用到本章提到的RPC(Remote ...

    894974231 評(píng)論0 收藏0
  • RabbitMQ系列(二)-Work模式

    摘要:每個(gè)消費(fèi)者會(huì)得到平均數(shù)量的。為了確保不會(huì)丟失,采用確認(rèn)機(jī)制。如果中斷退出了關(guān)閉了,關(guān)閉了,或是連接丟失了而沒(méi)有發(fā)送,會(huì)認(rèn)為該消息沒(méi)有完整的執(zhí)行,會(huì)將該消息重新入隊(duì)。該消息會(huì)被發(fā)送給其他的。當(dāng)消費(fèi)者中斷退出,會(huì)重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我們寫(xiě)了通過(guò)一個(gè)...

    lcodecorex 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<