成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

白話RabbitMQ(三):發(fā)布/訂閱

Ververica / 3439人閱讀

摘要:推廣專題講座開源項(xiàng)目我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請(qǐng)大家圍觀。因此一旦有有消息,消息會(huì)廣播到所有的消費(fèi)者。如此一來路由器就能夠把消息發(fā)送給相應(yīng)的隊(duì)列了。

推廣
RabbitMQ專題講座

https://segmentfault.com/l/15...

CoolMQ開源項(xiàng)目

我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請(qǐng)大家圍觀??梢詤⒖荚创a:https://github.com/vvsuperman…,項(xiàng)目支持網(wǎng)站: http://rabbitmq.org.cn,最新文章或?qū)崿F(xiàn)會(huì)更新在上面

前言

在第二章中我們描述了任務(wù)隊(duì)列,在任務(wù)隊(duì)列中一個(gè)消息只會(huì)發(fā)送給一個(gè)消費(fèi)者。而在這一章中我們將消息發(fā)送給許多個(gè)消費(fèi)者,我們稱之為“發(fā)布/訂閱”

為了更好的闡述這個(gè)模式,我們會(huì)建立一個(gè)新的簡(jiǎn)單的logging系統(tǒng),包含2個(gè)步驟-第一步發(fā)送log信息,第二步能夠接受并將信息打印出來,而且在第二步中所有的消費(fèi)者都會(huì)接受到同樣的消息,比如一個(gè)消費(fèi)者用來將log信息寫到磁盤,另外一個(gè)接受信息并顯示在屏幕上。因此一旦有有消息,消息會(huì)廣播到所有的消費(fèi)者。

交換機(jī)(Exchanges)

前面的章節(jié)中我們是直接通過queue來處理消息,現(xiàn)在我們來介紹一種更完善的模式

讓我們迅速瀏覽一遍前面的主題:

生產(chǎn)者是一個(gè)客戶端程序,用來發(fā)送消息

隊(duì)列是一個(gè)緩沖,用來存儲(chǔ)消息

消費(fèi)者是一個(gè)客戶端程序,用來接受消息

RabbitMQ的核心思想是生產(chǎn)者不會(huì)將消息直接發(fā)送給隊(duì)列,意味著生產(chǎn)者是完全看不到隊(duì)列的。反之,生產(chǎn)者只能將消息發(fā)送給路由器(Exchange),再由路由器來決定該如何來處理消息,是將消息發(fā)送給一個(gè)隊(duì)列呢,還是發(fā)送給許多個(gè)隊(duì)列,或者直接無視,具體的規(guī)則是根據(jù)路由器的類型而定的。

路由器的類型有這樣幾種:直連路由器(dirct), 主題路由器(topic),頭部路由器(headers),以及多廣播路由器(fanout)

channel.exchangeDeclare("logs", "fanout");

廣播路由器聽起來就很簡(jiǎn)單,它會(huì)將消息廣播到所有的它所知道的隊(duì)列,而這正是我們所需要的。

默認(rèn)路由器

在前面的章節(jié)中雖然沒有設(shè)置任何路由器,但依然能夠?qū)⑾l(fā)送到隊(duì)列,這是因?yàn)槲覀兊氖悄J(rèn)路由器:使用空字符串("")來做的定義:

channel.basicPublish("", "hello", null, message.getBytes());

第一個(gè)參數(shù)是exchange的名稱,在這里是空字符串,消息會(huì)通過路由健(routingKey)發(fā)送到該鍵所對(duì)應(yīng)的隊(duì)列。

然而現(xiàn)在,我們有了確認(rèn)的路由器

channel.basicPublish( "logs", "", null, message.getBytes());

臨時(shí)隊(duì)列
我們之前隊(duì)列都有名字(Hello隊(duì)列和task_queue隊(duì)列),給隊(duì)列起名字非常重要-需要將消費(fèi)者綁定到特定的queue上面,以及需要把消息從生產(chǎn)者發(fā)送給特定的消費(fèi)者。

但對(duì)于日志來說,消息會(huì)發(fā)送到所有的消費(fèi)者,而并非個(gè)別,We"re also interested only in currently flowing messages not in the old ones.為了滿足當(dāng)前需求我們可以做兩件事

一旦連接上RabbitMQ,需要一個(gè)新的空隊(duì)列來接受消息,我們可以隨機(jī)起個(gè)名字,甚至根本不起名,而讓RabbitMQ來命名它。

一旦消費(fèi)者斷開連接,這個(gè)隊(duì)列就能被刪除掉

我們可以這樣定義一個(gè)不需要持久化、獨(dú)立的、能夠被自動(dòng)刪除的隊(duì)列

String queueName = channel.queueDeclare().getQueue();

這個(gè)名稱是RabbitMQ隨機(jī)分配的,比如amq.gen-JzTY20BRgKO-HjmUJj0wLg.

綁定

我們已經(jīng)聲明了一個(gè)廣播路由器,現(xiàn)在需要告訴這個(gè)路由器需要把信息發(fā)送給哪些隊(duì)列,路由器和隊(duì)列間的這個(gè)關(guān)系就稱之為綁定。

channel.queueBind(queueName, "logs", "");

如此一來路由器就能夠把消息發(fā)送給相應(yīng)的隊(duì)列了。

整合

發(fā)送者與我們之前的代碼基本相同,最重大的區(qū)別我們現(xiàn)在是發(fā)送給帶名稱的路由器了,同時(shí)我們也需要一個(gè)路由鍵,但這里也不需要,因?yàn)閺V播路由器會(huì)忽略這個(gè)值,這是我們EmitLog.java的代碼

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent "" + message + """);

        channel.close();
        connection.close();
    }
    //...
}

可以看到,一旦我們建立的連接立即定義了一個(gè)路由器,這個(gè)步驟對(duì)我們非常重要,因?yàn)槭菄?yán)禁將消息發(fā)送給并不存在的路由的。

同時(shí),如果路由器沒有綁定隊(duì)列,消息也會(huì)丟失掉,但這對(duì)于我們來說是ok的:如果并沒有消費(fèi)者在監(jiān)聽,我們可以直接丟棄掉這個(gè)消息。

ReciveLogs.java代碼如下:

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogs {
  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received "" + message + """);
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

編譯代碼

javac -cp $CP EmitLog.java ReceiveLogs.java

如果你希望將log存儲(chǔ)到本機(jī)上

java -cp $CP ReceiveLogs > logs_from_rabbit.log

如果你希望在屏幕上顯示log信息,打開一個(gè)新的終端:

java -cp $CP ReceiveLogs

發(fā)送消息

java -cp $CP EmitLog

如此一來,就能夠存儲(chǔ)消息的同時(shí)進(jìn)行打印了。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/68123.html

相關(guān)文章

  • 白話RabbitMQ(四): 建立路由

    摘要:可以參考源碼,項(xiàng)目支持網(wǎng)站,最新文章或?qū)崿F(xiàn)會(huì)更新在上面前言在訂閱發(fā)布中我們建立了一個(gè)簡(jiǎn)單的日志系統(tǒng),從而將消息廣播給一些消費(fèi)者。因此,發(fā)送到路由鍵的消息會(huì)發(fā)送給隊(duì)列,發(fā)送到路由鍵或者的消息會(huì)發(fā)送給,其它的消息將被丟棄。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決...

    CoderStudy 評(píng)論0 收藏0
  • 【譯】RabbitMQ系列() - 發(fā)布/訂閱模式

    摘要:發(fā)布訂閱模式在之前的文章里,創(chuàng)建了。我們稱之為發(fā)布訂閱模式。其實(shí)我們是用到了默認(rèn)的,用空字符串來標(biāo)識(shí)。空字符串代表了沒有名字的被路由到了由指定名字的。和這種關(guān)系的建立我們稱之為從現(xiàn)在開始這個(gè)就會(huì)將推向我們的隊(duì)列了。 發(fā)布訂閱模式 在之前的文章里,創(chuàng)建了work queue。work queue中,每一個(gè)task都會(huì)派發(fā)給一個(gè)worker。在本章中,我們會(huì)完成完全不一樣的事情 - 我們會(huì)...

    WrBug 評(píng)論0 收藏0
  • RabbitMQ發(fā)布訂閱實(shí)戰(zhàn)-實(shí)現(xiàn)延時(shí)重試隊(duì)列

    摘要:本文將會(huì)講解如何使用實(shí)現(xiàn)延時(shí)重試和失敗消息隊(duì)列,實(shí)現(xiàn)可靠的消息消費(fèi),消費(fèi)失敗后,自動(dòng)延時(shí)將消息重新投遞,當(dāng)達(dá)到一定的重試次數(shù)后,將消息投遞到失敗消息隊(duì)列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發(fā)的開源消息隊(duì)列。本文假設(shè)讀者對(duì)RabbitMQ是什么已經(jīng)有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網(wǎng)的 RabbitMQ Tutorials 入門...

    Heier 評(píng)論0 收藏0
  • RabbitMQ發(fā)布訂閱實(shí)戰(zhàn)-實(shí)現(xiàn)延時(shí)重試隊(duì)列

    摘要:本文將會(huì)講解如何使用實(shí)現(xiàn)延時(shí)重試和失敗消息隊(duì)列,實(shí)現(xiàn)可靠的消息消費(fèi),消費(fèi)失敗后,自動(dòng)延時(shí)將消息重新投遞,當(dāng)達(dá)到一定的重試次數(shù)后,將消息投遞到失敗消息隊(duì)列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發(fā)的開源消息隊(duì)列。本文假設(shè)讀者對(duì)RabbitMQ是什么已經(jīng)有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網(wǎng)的 RabbitMQ Tutorials 入門...

    vslam 評(píng)論0 收藏0
  • RabbitMq 最全的性能調(diào)優(yōu)筆記

    摘要:性能調(diào)優(yōu)筆記避免雷區(qū)要避免流控機(jī)制觸發(fā)服務(wù)端默認(rèn)配置是當(dāng)內(nèi)存使用達(dá)到,磁盤空閑空間小于,即啟動(dòng)內(nèi)存報(bào)警,磁盤報(bào)警報(bào)警后服務(wù)端觸發(fā)流控機(jī)制。最佳線程生產(chǎn)者使用多線程發(fā)送數(shù)據(jù)到三到五個(gè)線程性能發(fā)送最佳,超過它也不能提高生產(chǎn)的發(fā)送速率。 RabbitMq 性能調(diào)優(yōu)筆記 [TOC] 避免雷區(qū) 要避免流控機(jī)制觸發(fā) 服務(wù)端默認(rèn)配置是當(dāng)內(nèi)存使用達(dá)到40%,磁盤空閑空間小于50M,即啟動(dòng)內(nèi)存報(bào)警,磁...

    Tony 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<