成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

白話rabbitmq(一): HelloWorld

garfileo / 426人閱讀

摘要:作為消息隊列的一個典型實踐,完全實現(xiàn)了標(biāo)準(zhǔn),與的快快快不同,它追求的穩(wěn)定可靠。同一個隊列不僅可以綁定多個生產(chǎn)者,而且能夠發(fā)送消息到多個消費者。消費者接受并消費消息。幾乎于完全類似是一個繼承了接口的類,方便我們來存儲消息隊列來的消息。

推廣
RabbitMQ專題講座

https://segmentfault.com/l/15...

CoolMQ開源項目

我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。可以參考源碼:https://github.com/vvsuperman…,項目支持網(wǎng)站: http://rabbitmq.org.cn,最新文章或?qū)崿F(xiàn)會更新在上面

前言

消息隊列想必大家都有一定了解:用來解耦,上級模塊不用關(guān)心下級模塊是否執(zhí)行成功,最常見的比如說日志,核心系統(tǒng)并不關(guān)心日志是否成功,日志什么時候記錄。這種情形就可以用消息隊列來解耦。

RabbitMQ作為消息隊列的一個典型實踐,完全實現(xiàn)了AMQ標(biāo)準(zhǔn),與Kafka的快快快不同,它追求的穩(wěn)定、可靠。下面就來幾篇文章來詳細介紹下,均翻譯至RabbitMQ的官方文檔。

RabbitMQ是一個消息的中介(用來接受以及轉(zhuǎn)發(fā)消息),就像是一個非??煽康泥]局,當(dāng)信件放到郵局時,信件就確保能到達,所以,RabbitMQ可以看成是郵箱、郵局、以及郵遞員的合體

RabbitMQ的一些重要概念 produceing(生產(chǎn)者):生產(chǎn)數(shù)據(jù)

queue(隊列):

類似于郵箱,存在于RabbitMQ服務(wù)器的內(nèi)部,用來存儲消息,并且消息只能存儲在隊列里面。隊列的大小只受RabbitMQ主機內(nèi)存和硬盤的影響。同一個隊列不僅可以綁定多個生產(chǎn)者,而且能夠發(fā)送消息到多個消費者。

Consuming(消費者):接受并消費消息。

Hello World

下面我們來寫我們的第一個“Hello World”,我們會使用Java的API來編寫一個生產(chǎn)者來生產(chǎn)消息,以及一個消費者來消費消息

P是我們的生產(chǎn)者,而C是我們的消費者。中間的box是我們的queue:作為消息緩沖,是RabbitMQ用來存儲轉(zhuǎn)發(fā)消息給消費者的。

Java客戶端庫

RabbitMQ支持多重協(xié)議,這里我們會用AMQP 0-9-1來說明,它是一個消息隊列的通用協(xié)議。RabbitMQ同時也有多種語言的客戶端,我們在這里用Java來做說明。

首先請下載Java客戶端包以及它所依賴的SLF4J和SLF4J SIMPLE,將它們拷貝到自己的工作區(qū)。

引入RabbitMQ同樣也可以使用Maven來做依賴管理, groupId是com.rabbitmq 以及artifactId amqp-client

發(fā)送請求

生產(chǎn)者會發(fā)送消息到MQ,然后退出
在Send.java中,首先我們import一些類

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

設(shè)置我們的主類

public class Send {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException {
      ...
  }
}   

創(chuàng)建Connection

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

這里我們連接的是本地,你當(dāng)然也可以連接到另一個服務(wù)器上,只需要指明服務(wù)器的名稱和ip地址。

下面我們要創(chuàng)建一個Channel,大家可以想象一些,消息的產(chǎn)生和發(fā)送都是通過這個Channel完成的。

當(dāng)然,我們還需要頂一個一個Queue來接受消息
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent "" + message + """);

對于Queue的定義是冥等的,如果不存在才會創(chuàng)建,如果存在則不會再建新的。消息會被格式化成byte的數(shù)組,方便進行任意的轉(zhuǎn)換。

最后,我們關(guān)閉通道

channel.close();
connection.close();

完整的代碼可以看這個地方:send.java

接受請求

消費者會從RabbitMQ接收到請求,消息是被到消費者,而且消費者會一直監(jiān)聽著消息隊列,一旦有有新的消息就會打印出來。

Recv.java幾乎于Send完全類似

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

Defaultconsumer是一個繼承了Consumer接口的類,方便我們來存儲消息隊列來的消息。建立消費者與我們建立生產(chǎn)者非常類似:

public class Recv {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException,
             java.lang.InterruptedException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    ...
    }
}

你可以注意到我們在消費者定義了一個Queue,因此我們是需要在生產(chǎn)者之前啟動消費者的,我們要確保我們在消費消息之前這個隊列是已經(jīng)存在的。

然后我們需要告訴mq服務(wù)可以推送消息給我們。因為這個推送是異步的,因此我們可以提供一個回調(diào)方法,DefaultConsumer會暫時存儲這個消息,直到消費者以及準(zhǔn)備好來處理接受到的消息了(消息會存儲在消費者中直到消費者有能力來消費它,可以想象一下數(shù)據(jù)庫等高IO操作)

Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope,
                             AMQP.BasicProperties properties, byte[] body)
      throws IOException {
    String message = new String(body, "UTF-8");
    System.out.println(" [x] Received "" + message + """);
  }
};
channel.basicConsume(QUEUE_NAME, true, consumer);

完整的Recv.java地址

跑起來

我們可以先用javac來編譯程序

javac -cp amqp-client-4.0.2.jar Send.java Recv.java

而后來運行它,這需要我們在路徑加上它的依賴包,我們首先啟動的是消費者

java -cp .:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar Recv

而后啟動發(fā)送者

java -cp .:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar Send

消費者會持續(xù)等待,并打印從生產(chǎn)者哪里來的消息,你可以用(Ctrl-C)來停止它。所以你要另外開啟一個命令行窗口來運行生產(chǎn)者。

查看隊列

也許你想知道RabbitMQ中到底有多少個消息,你可以使用rabbitmqctl工具:

sudo rabbitmqctl list_queues

在Windows中:

rabbitmqctl.bat list_queues




文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/70889.html

相關(guān)文章

  • 白話RabbitMQ(三):發(fā)布/訂閱

    摘要:推廣專題講座開源項目我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。因此一旦有有消息,消息會廣播到所有的消費者。如此一來路由器就能夠把消息發(fā)送給相應(yīng)的隊列了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https...

    Ververica 評論0 收藏0
  • 白話RabbitMQ(五): 主題路由器(Topic Exchange)

    摘要:推廣專題講座開源項目我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。主題交換機也可以當(dāng)成其它交換機來使用,假如隊列綁定到了那么它會接收所有的消息,就像廣播路由器一樣而如果未使用,那么就跟直達路由器一樣了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性...

    Gilbertat 評論0 收藏0
  • 白話RabbitMQ(二): 任務(wù)隊列

    摘要:任務(wù)隊列最主要的功能就是解耦高耗時的操作,否則程序會一直等在那里,浪費大量資源。我們將任務(wù)封裝成一個消息發(fā)送給隊列,后臺的任務(wù)進程會得到這個任務(wù)并執(zhí)行它,而且可以配置多個任務(wù)進程,進一步加大吞吐率。為了確保消息不丟失,支持消息確認(rèn)。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的...

    fnngj 評論0 收藏0
  • 白話RabbitMQ(四): 建立路由

    摘要:可以參考源碼,項目支持網(wǎng)站,最新文章或?qū)崿F(xiàn)會更新在上面前言在訂閱發(fā)布中我們建立了一個簡單的日志系統(tǒng),從而將消息廣播給一些消費者。因此,發(fā)送到路由鍵的消息會發(fā)送給隊列,發(fā)送到路由鍵或者的消息會發(fā)送給,其它的消息將被丟棄。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決...

    CoderStudy 評論0 收藏0
  • 白話RabbitMQ(六): RPC

    摘要:因為消費消息是在另外一個進程中,我們需要阻塞我們的進程直到結(jié)果返回,使用阻塞隊列是一種非常好的方式,這里我們使用了長度為的,的功能是檢查消息的的是不是我們之前所發(fā)送的,如果是,將返回值返回到。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖?..

    KevinYan 評論0 收藏0

發(fā)表評論

0條評論

garfileo

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<