摘要:作為消息隊列的一個典型實踐,完全實現(xiàn)了標(biāo)準(zhǔn),與的快快快不同,它追求的穩(wěn)定可靠。同一個隊列不僅可以綁定多個生產(chǎn)者,而且能夠發(fā)送消息到多個消費者。消費者接受并消費消息。幾乎于完全類似是一個繼承了接口的類,方便我們來存儲消息隊列來的消息。
推廣
https://segmentfault.com/l/15...
我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。可以參考源碼:https://github.com/vvsuperman…,項目支持網(wǎng)站: http://rabbitmq.org.cn,最新文章或?qū)崿F(xiàn)會更新在上面
前言消息隊列想必大家都有一定了解:用來解耦,上級模塊不用關(guān)心下級模塊是否執(zhí)行成功,最常見的比如說日志,核心系統(tǒng)并不關(guān)心日志是否成功,日志什么時候記錄。這種情形就可以用消息隊列來解耦。
RabbitMQ作為消息隊列的一個典型實踐,完全實現(xiàn)了AMQ標(biāo)準(zhǔn),與Kafka的快快快不同,它追求的穩(wěn)定、可靠。下面就來幾篇文章來詳細介紹下,均翻譯至RabbitMQ的官方文檔。
RabbitMQ是一個消息的中介(用來接受以及轉(zhuǎn)發(fā)消息),就像是一個非??煽康泥]局,當(dāng)信件放到郵局時,信件就確保能到達,所以,RabbitMQ可以看成是郵箱、郵局、以及郵遞員的合體
RabbitMQ的一些重要概念 produceing(生產(chǎn)者):生產(chǎn)數(shù)據(jù) queue(隊列):類似于郵箱,存在于RabbitMQ服務(wù)器的內(nèi)部,用來存儲消息,并且消息只能存儲在隊列里面。隊列的大小只受RabbitMQ主機內(nèi)存和硬盤的影響。同一個隊列不僅可以綁定多個生產(chǎn)者,而且能夠發(fā)送消息到多個消費者。
Consuming(消費者):接受并消費消息。 Hello World下面我們來寫我們的第一個“Hello World”,我們會使用Java的API來編寫一個生產(chǎn)者來生產(chǎn)消息,以及一個消費者來消費消息
P是我們的生產(chǎn)者,而C是我們的消費者。中間的box是我們的queue:作為消息緩沖,是RabbitMQ用來存儲轉(zhuǎn)發(fā)消息給消費者的。
Java客戶端庫RabbitMQ支持多重協(xié)議,這里我們會用AMQP 0-9-1來說明,它是一個消息隊列的通用協(xié)議。RabbitMQ同時也有多種語言的客戶端,我們在這里用Java來做說明。
首先請下載Java客戶端包以及它所依賴的SLF4J和SLF4J SIMPLE,將它們拷貝到自己的工作區(qū)。
引入RabbitMQ同樣也可以使用Maven來做依賴管理, groupId是com.rabbitmq 以及artifactId amqp-client
發(fā)送請求生產(chǎn)者會發(fā)送消息到MQ,然后退出
在Send.java中,首先我們import一些類
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;
設(shè)置我們的主類
public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException { ... } }
創(chuàng)建Connection
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
這里我們連接的是本地,你當(dāng)然也可以連接到另一個服務(wù)器上,只需要指明服務(wù)器的名稱和ip地址。
下面我們要創(chuàng)建一個Channel,大家可以想象一些,消息的產(chǎn)生和發(fā)送都是通過這個Channel完成的。
當(dāng)然,我們還需要頂一個一個Queue來接受消息
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent "" + message + """);
對于Queue的定義是冥等的,如果不存在才會創(chuàng)建,如果存在則不會再建新的。消息會被格式化成byte的數(shù)組,方便進行任意的轉(zhuǎn)換。
最后,我們關(guān)閉通道
channel.close(); connection.close();
完整的代碼可以看這個地方:send.java
接受請求消費者會從RabbitMQ接收到請求,消息是被推到消費者,而且消費者會一直監(jiān)聽著消息隊列,一旦有有新的消息就會打印出來。
Recv.java幾乎于Send完全類似
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;
Defaultconsumer是一個繼承了Consumer接口的類,方便我們來存儲消息隊列來的消息。建立消費者與我們建立生產(chǎn)者非常類似:
public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); ... } }
你可以注意到我們在消費者定義了一個Queue,因此我們是需要在生產(chǎn)者之前啟動消費者的,我們要確保我們在消費消息之前這個隊列是已經(jīng)存在的。
然后我們需要告訴mq服務(wù)可以推送消息給我們。因為這個推送是異步的,因此我們可以提供一個回調(diào)方法,DefaultConsumer會暫時存儲這個消息,直到消費者以及準(zhǔn)備好來處理接受到的消息了(消息會存儲在消費者中直到消費者有能力來消費它,可以想象一下數(shù)據(jù)庫等高IO操作)
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + message + """); } }; channel.basicConsume(QUEUE_NAME, true, consumer);
完整的Recv.java地址
跑起來我們可以先用javac來編譯程序
javac -cp amqp-client-4.0.2.jar Send.java Recv.java
而后來運行它,這需要我們在路徑加上它的依賴包,我們首先啟動的是消費者
java -cp .:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar Recv
而后啟動發(fā)送者
java -cp .:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar Send
消費者會持續(xù)等待,并打印從生產(chǎn)者哪里來的消息,你可以用(Ctrl-C)來停止它。所以你要另外開啟一個命令行窗口來運行生產(chǎn)者。
查看隊列也許你想知道RabbitMQ中到底有多少個消息,你可以使用rabbitmqctl工具:
sudo rabbitmqctl list_queues
在Windows中:
rabbitmqctl.bat list_queues
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/70889.html
摘要:推廣專題講座開源項目我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。因此一旦有有消息,消息會廣播到所有的消費者。如此一來路由器就能夠把消息發(fā)送給相應(yīng)的隊列了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https...
摘要:推廣專題講座開源項目我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。主題交換機也可以當(dāng)成其它交換機來使用,假如隊列綁定到了那么它會接收所有的消息,就像廣播路由器一樣而如果未使用,那么就跟直達路由器一樣了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性...
摘要:任務(wù)隊列最主要的功能就是解耦高耗時的操作,否則程序會一直等在那里,浪費大量資源。我們將任務(wù)封裝成一個消息發(fā)送給隊列,后臺的任務(wù)進程會得到這個任務(wù)并執(zhí)行它,而且可以配置多個任務(wù)進程,進一步加大吞吐率。為了確保消息不丟失,支持消息確認(rèn)。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的...
摘要:可以參考源碼,項目支持網(wǎng)站,最新文章或?qū)崿F(xiàn)會更新在上面前言在訂閱發(fā)布中我們建立了一個簡單的日志系統(tǒng),從而將消息廣播給一些消費者。因此,發(fā)送到路由鍵的消息會發(fā)送給隊列,發(fā)送到路由鍵或者的消息會發(fā)送給,其它的消息將被丟棄。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決...
摘要:因為消費消息是在另外一個進程中,我們需要阻塞我們的進程直到結(jié)果返回,使用阻塞隊列是一種非常好的方式,這里我們使用了長度為的,的功能是檢查消息的的是不是我們之前所發(fā)送的,如果是,將返回值返回到。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖?..
閱讀 2812·2021-10-14 09:42
閱讀 3619·2021-10-11 10:59
閱讀 2952·2019-08-30 11:25
閱讀 3088·2019-08-29 16:25
閱讀 3234·2019-08-26 17:40
閱讀 1241·2019-08-26 13:30
閱讀 1155·2019-08-26 11:46
閱讀 1337·2019-08-23 15:22