摘要:推廣專題講座開源項(xiàng)目我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請(qǐng)大家圍觀。因此一旦有有消息,消息會(huì)廣播到所有的消費(fèi)者。如此一來路由器就能夠把消息發(fā)送給相應(yīng)的隊(duì)列了。
推廣
https://segmentfault.com/l/15...
我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請(qǐng)大家圍觀??梢詤⒖荚创a:https://github.com/vvsuperman…,項(xiàng)目支持網(wǎng)站: http://rabbitmq.org.cn,最新文章或?qū)崿F(xiàn)會(huì)更新在上面
前言在第二章中我們描述了任務(wù)隊(duì)列,在任務(wù)隊(duì)列中一個(gè)消息只會(huì)發(fā)送給一個(gè)消費(fèi)者。而在這一章中我們將消息發(fā)送給許多個(gè)消費(fèi)者,我們稱之為“發(fā)布/訂閱”
為了更好的闡述這個(gè)模式,我們會(huì)建立一個(gè)新的簡(jiǎn)單的logging系統(tǒng),包含2個(gè)步驟-第一步發(fā)送log信息,第二步能夠接受并將信息打印出來,而且在第二步中所有的消費(fèi)者都會(huì)接受到同樣的消息,比如一個(gè)消費(fèi)者用來將log信息寫到磁盤,另外一個(gè)接受信息并顯示在屏幕上。因此一旦有有消息,消息會(huì)廣播到所有的消費(fèi)者。
交換機(jī)(Exchanges)前面的章節(jié)中我們是直接通過queue來處理消息,現(xiàn)在我們來介紹一種更完善的模式
讓我們迅速瀏覽一遍前面的主題:
生產(chǎn)者是一個(gè)客戶端程序,用來發(fā)送消息
隊(duì)列是一個(gè)緩沖,用來存儲(chǔ)消息
消費(fèi)者是一個(gè)客戶端程序,用來接受消息
RabbitMQ的核心思想是生產(chǎn)者不會(huì)將消息直接發(fā)送給隊(duì)列,意味著生產(chǎn)者是完全看不到隊(duì)列的。反之,生產(chǎn)者只能將消息發(fā)送給路由器(Exchange),再由路由器來決定該如何來處理消息,是將消息發(fā)送給一個(gè)隊(duì)列呢,還是發(fā)送給許多個(gè)隊(duì)列,或者直接無視,具體的規(guī)則是根據(jù)路由器的類型而定的。
路由器的類型有這樣幾種:直連路由器(dirct), 主題路由器(topic),頭部路由器(headers),以及多廣播路由器(fanout)
channel.exchangeDeclare("logs", "fanout");
廣播路由器聽起來就很簡(jiǎn)單,它會(huì)將消息廣播到所有的它所知道的隊(duì)列,而這正是我們所需要的。
默認(rèn)路由器在前面的章節(jié)中雖然沒有設(shè)置任何路由器,但依然能夠?qū)⑾l(fā)送到隊(duì)列,這是因?yàn)槲覀兊氖悄J(rèn)路由器:使用空字符串("")來做的定義:
channel.basicPublish("", "hello", null, message.getBytes());
第一個(gè)參數(shù)是exchange的名稱,在這里是空字符串,消息會(huì)通過路由健(routingKey)發(fā)送到該鍵所對(duì)應(yīng)的隊(duì)列。
然而現(xiàn)在,我們有了確認(rèn)的路由器
channel.basicPublish( "logs", "", null, message.getBytes());
臨時(shí)隊(duì)列
我們之前隊(duì)列都有名字(Hello隊(duì)列和task_queue隊(duì)列),給隊(duì)列起名字非常重要-需要將消費(fèi)者綁定到特定的queue上面,以及需要把消息從生產(chǎn)者發(fā)送給特定的消費(fèi)者。
但對(duì)于日志來說,消息會(huì)發(fā)送到所有的消費(fèi)者,而并非個(gè)別,We"re also interested only in currently flowing messages not in the old ones.為了滿足當(dāng)前需求我們可以做兩件事
一旦連接上RabbitMQ,需要一個(gè)新的空隊(duì)列來接受消息,我們可以隨機(jī)起個(gè)名字,甚至根本不起名,而讓RabbitMQ來命名它。
一旦消費(fèi)者斷開連接,這個(gè)隊(duì)列就能被刪除掉
我們可以這樣定義一個(gè)不需要持久化、獨(dú)立的、能夠被自動(dòng)刪除的隊(duì)列
String queueName = channel.queueDeclare().getQueue();
這個(gè)名稱是RabbitMQ隨機(jī)分配的,比如amq.gen-JzTY20BRgKO-HjmUJj0wLg.
綁定我們已經(jīng)聲明了一個(gè)廣播路由器,現(xiàn)在需要告訴這個(gè)路由器需要把信息發(fā)送給哪些隊(duì)列,路由器和隊(duì)列間的這個(gè)關(guān)系就稱之為綁定。
channel.queueBind(queueName, "logs", "");
如此一來路由器就能夠把消息發(fā)送給相應(yīng)的隊(duì)列了。
整合發(fā)送者與我們之前的代碼基本相同,最重大的區(qū)別我們現(xiàn)在是發(fā)送給帶名稱的路由器了,同時(shí)我們也需要一個(gè)路由鍵,但這里也不需要,因?yàn)閺V播路由器會(huì)忽略這個(gè)值,這是我們EmitLog.java的代碼
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent "" + message + """); channel.close(); connection.close(); } //... }
可以看到,一旦我們建立的連接立即定義了一個(gè)路由器,這個(gè)步驟對(duì)我們非常重要,因?yàn)槭菄?yán)禁將消息發(fā)送給并不存在的路由的。
同時(shí),如果路由器沒有綁定隊(duì)列,消息也會(huì)丟失掉,但這對(duì)于我們來說是ok的:如果并沒有消費(fèi)者在監(jiān)聽,我們可以直接丟棄掉這個(gè)消息。
ReciveLogs.java代碼如下:
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + message + """); } }; channel.basicConsume(queueName, true, consumer); } }
編譯代碼
javac -cp $CP EmitLog.java ReceiveLogs.java
如果你希望將log存儲(chǔ)到本機(jī)上
java -cp $CP ReceiveLogs > logs_from_rabbit.log
如果你希望在屏幕上顯示log信息,打開一個(gè)新的終端:
java -cp $CP ReceiveLogs
發(fā)送消息
java -cp $CP EmitLog
如此一來,就能夠存儲(chǔ)消息的同時(shí)進(jìn)行打印了。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/68123.html
摘要:可以參考源碼,項(xiàng)目支持網(wǎng)站,最新文章或?qū)崿F(xiàn)會(huì)更新在上面前言在訂閱發(fā)布中我們建立了一個(gè)簡(jiǎn)單的日志系統(tǒng),從而將消息廣播給一些消費(fèi)者。因此,發(fā)送到路由鍵的消息會(huì)發(fā)送給隊(duì)列,發(fā)送到路由鍵或者的消息會(huì)發(fā)送給,其它的消息將被丟棄。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決...
摘要:發(fā)布訂閱模式在之前的文章里,創(chuàng)建了。我們稱之為發(fā)布訂閱模式。其實(shí)我們是用到了默認(rèn)的,用空字符串來標(biāo)識(shí)。空字符串代表了沒有名字的被路由到了由指定名字的。和這種關(guān)系的建立我們稱之為從現(xiàn)在開始這個(gè)就會(huì)將推向我們的隊(duì)列了。 發(fā)布訂閱模式 在之前的文章里,創(chuàng)建了work queue。work queue中,每一個(gè)task都會(huì)派發(fā)給一個(gè)worker。在本章中,我們會(huì)完成完全不一樣的事情 - 我們會(huì)...
摘要:本文將會(huì)講解如何使用實(shí)現(xiàn)延時(shí)重試和失敗消息隊(duì)列,實(shí)現(xiàn)可靠的消息消費(fèi),消費(fèi)失敗后,自動(dòng)延時(shí)將消息重新投遞,當(dāng)達(dá)到一定的重試次數(shù)后,將消息投遞到失敗消息隊(duì)列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發(fā)的開源消息隊(duì)列。本文假設(shè)讀者對(duì)RabbitMQ是什么已經(jīng)有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網(wǎng)的 RabbitMQ Tutorials 入門...
摘要:本文將會(huì)講解如何使用實(shí)現(xiàn)延時(shí)重試和失敗消息隊(duì)列,實(shí)現(xiàn)可靠的消息消費(fèi),消費(fèi)失敗后,自動(dòng)延時(shí)將消息重新投遞,當(dāng)達(dá)到一定的重試次數(shù)后,將消息投遞到失敗消息隊(duì)列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發(fā)的開源消息隊(duì)列。本文假設(shè)讀者對(duì)RabbitMQ是什么已經(jīng)有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網(wǎng)的 RabbitMQ Tutorials 入門...
摘要:性能調(diào)優(yōu)筆記避免雷區(qū)要避免流控機(jī)制觸發(fā)服務(wù)端默認(rèn)配置是當(dāng)內(nèi)存使用達(dá)到,磁盤空閑空間小于,即啟動(dòng)內(nèi)存報(bào)警,磁盤報(bào)警報(bào)警后服務(wù)端觸發(fā)流控機(jī)制。最佳線程生產(chǎn)者使用多線程發(fā)送數(shù)據(jù)到三到五個(gè)線程性能發(fā)送最佳,超過它也不能提高生產(chǎn)的發(fā)送速率。 RabbitMq 性能調(diào)優(yōu)筆記 [TOC] 避免雷區(qū) 要避免流控機(jī)制觸發(fā) 服務(wù)端默認(rèn)配置是當(dāng)內(nèi)存使用達(dá)到40%,磁盤空閑空間小于50M,即啟動(dòng)內(nèi)存報(bào)警,磁...
閱讀 1616·2021-11-23 09:51
閱讀 1185·2019-08-30 13:57
閱讀 2268·2019-08-29 13:12
閱讀 2020·2019-08-26 13:57
閱讀 1205·2019-08-26 11:32
閱讀 983·2019-08-23 15:08
閱讀 710·2019-08-23 14:42
閱讀 3091·2019-08-23 11:41