摘要:消息確認(rèn)為,會(huì)等待的顯式確認(rèn)。在消息發(fā)送到之后會(huì)立刻路由到中,因此未持久化的在重啟后會(huì)丟失元數(shù)據(jù)以及綁定,對(duì)和消息的持久化無(wú)影響。指定如果一個(gè)或者有多個(gè)的情況下,只有最大的那個(gè)才會(huì)生效。要求集群中至少要有一個(gè)磁盤節(jié)點(diǎn),儲(chǔ)存了所有的元數(shù)據(jù)。
Connection & Channel
Connection 代表一個(gè) TCP 連接,Channel 是建立在 Connection 上的虛擬連接。RabbitMQ 每條指令都是通過(guò) Channel 完成的。
對(duì)于 OS 而言,創(chuàng)建和銷毀 TCP 連接的代價(jià)非常高,在高峰期很容易遇到瓶頸。程序中一般會(huì)有多個(gè)線程需要與 RabbitMQ建立通信,消費(fèi)或生產(chǎn)消息,通過(guò) TCP 連接復(fù)用來(lái)減少性能開(kāi)銷。
Connection 可以創(chuàng)建多個(gè) Channel ,但是 Channel 不是線程安全的所以不能在線程間共享。
Connection 在創(chuàng)建時(shí)可以傳入一個(gè) ExecutorService ,這個(gè)線程池時(shí)給該 Connection 上的 Consumer 用的。
Channel.isOpen 以及 Connection.isOpen 方法是同步的,因此如果在發(fā)送消息時(shí)頻繁調(diào)用會(huì)產(chǎn)生競(jìng)爭(zhēng)。我們可以認(rèn)為在 createChannel 方法后 Channel 以及處于開(kāi)啟狀態(tài)。若在使用過(guò)程中 Channel 關(guān)閉了,那么只要捕獲拋出的 ShutDownSignalException 就可以了,同時(shí)建議捕獲 IOException 以及 SocketException 防止連接意外關(guān)閉。
Exchange & Queue消費(fèi)者和生產(chǎn)者都可以聲明一個(gè)已經(jīng)存在的 Exchange 或者 Queue ,前提是參數(shù)完全匹配現(xiàn)有的 Exchange 或者 Queue,否則會(huì)拋出異常。
QueueDeclare 參數(shù):
exclusive: 排他隊(duì)列,只有同一個(gè) Connection 的 Channel 可以訪問(wèn),且在 Connection 關(guān)閉或者客戶端退出后自動(dòng)刪除,即使 durable 為 true 。
queuePurge(String queue):清空隊(duì)列
Exchange 可以綁定另一個(gè) Exchange:exchangeBind(String destination, String source, String routeKey), 從 source 到 destination
若業(yè)務(wù)允許,則最好預(yù)先創(chuàng)建好 Exchange 以及 Queue 并進(jìn)行綁定(rabbitmqadmin),防止 Exchange 沒(méi)有綁定 Queue 或 綁定錯(cuò)誤的 Queue 而導(dǎo)致消息丟失(關(guān)鍵信息應(yīng)當(dāng)使用 mandatory 參數(shù))。
Alternate Exchange: 在 Channel.exchangeDeclare 時(shí)添加 alternate-exchange 參數(shù)或在 Policy 中聲明。mandatory 為 true 時(shí),未被路由的消息會(huì)被發(fā)送到 Alternate Exchange 。建議 Exchange Type 設(shè)置為 fanout ,否則當(dāng) RoutingKey 依然不匹配就會(huì)被返回 Producer。
P.S. 有些書上講備份交換器和 mandatory 參數(shù)一起使用 mandatory 參數(shù)失效是錯(cuò)的,當(dāng) RoutingKey 不匹配 Alternate Exchange 依然會(huì)被返回 Producer 。
(rabbitmq v3.7 測(cè)試)
MapPublish & Consume Publish Confirmarg = new HashMap () {{ put("alternate-exchange", "alt"); }}; channel.exchangeDeclare("normalExchange", "direct", true, false, arg); channel.exchangeDeclare("alt", "fanout", true, false, null); channel.queueDeclare("normalQueue", true, false, false, null); channel.queueDeclare("notSend", true, false, false, null); channel.queueBind("normalQueue", "normalExchange", "key"); channel.queueBind("notSend", "alt", "");
消息發(fā)送到服務(wù)器后可能還沒(méi)來(lái)的及刷到磁盤中,服務(wù)器就掛掉,從而造成消息丟失。 Publish Confirm 能夠在消息確實(shí)到達(dá)服務(wù)器(開(kāi)啟持久化的消息會(huì)在刷入磁盤之后)之后返回一個(gè)確認(rèn)給 Publisher。
通過(guò) channel.confirmSelected 把 Channel 設(shè)置為 Confirm 模式,并為 Channel 添加一個(gè) ConfirmLister 來(lái)監(jiān)聽(tīng)返回的確認(rèn)。
SortedSetunconfirmedSet = new TreeSet<>(); channel.confirmSelect(); channel.addConfirmListener((deliveryTag, multiple) -> { System.out.println("handleAck: " + deliveryTag + " " + multiple); if (multiple) { unconfirmedSet.headSet(deliveryTag - 1).clear(); } else { unconfirmedSet.remove(deliveryTag); } }, (deliveryTag, multiple) -> { System.out.println("handleNack: " + deliveryTag + " " + multiple); if (multiple) { unconfirmedSet.headSet(deliveryTag - 1).clear(); } else { unconfirmedSet.remove(deliveryTag); } }); while (true) { long seq = channel.getNextPublishSeqNo(); channel.basicPublish("normalExchange", "key", true, null, message.getBytes(StandardCharsets.UTF_8)); unconfirmedSet.add(seq); Thread.sleep(1000); }
除了異步處理的方式之外還有批量確認(rèn)以及事務(wù)的方法。批量確認(rèn)的速度在大量連續(xù)發(fā)送的情況下和異步的方法差不多。不管怎樣這兩種消息確認(rèn)的方法都要比事務(wù)的方式快7倍左右。
Consumer一般應(yīng)當(dāng)實(shí)現(xiàn) Consumer 接口或者繼承 DefaultConsumer ,Consumer 通過(guò) consumerTag 來(lái)進(jìn)行區(qū)分。
消費(fèi)消息有兩種方式,一種是 Push ,一種是 Get。
Push 是由 RabbitMQ 以輪詢的方式將消息推送到 Consumer ,方法為 basicConsume 。一般一個(gè) Channel 對(duì)應(yīng)一個(gè) Consumer 。
Get 由客戶端主動(dòng)從 RabbitMQ 拉取一條消息,方法為 basicGet 。__不能循環(huán)執(zhí)行 basicGet 來(lái)代替 basicConsume ,不然會(huì)嚴(yán)重影響性能。__
消息確認(rèn):autoAck 為 false ,RabbitMQ 會(huì)等待 basicAck 的顯式確認(rèn)。除非 Consumer 連接斷開(kāi)否則一直等待確認(rèn)。當(dāng) Consumer 顯式調(diào)用 basicReject 或者 basicNack 并將 requeue 設(shè)為 true 后會(huì)將消息重新入隊(duì)投遞。一般我們?cè)跇I(yè)務(wù)處理完之后再 ack .
mandatory : 當(dāng) Exchange 無(wú)法匹配 Queue 或 Exchange 時(shí),mandatory 為 true 的消息會(huì)被返回給 Producer,否則會(huì)被丟棄。 通過(guò) Channel.addReturnListener 來(lái)添加 ReturnListener 監(jiān)視器。
queueDeclare 時(shí)添加 x-message-ttl 參數(shù),單位毫秒。
Maparg = new HashMap () {{ put("x-message-ttl", "1000000"); }}; channel.exchangeDeclare("normalExchange", "direct", true, false, arg);
使用 AMQP.BasicProperties.Builder 創(chuàng)建 AMQP.BasicProperties 并設(shè)置 expiration 參數(shù)。
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("100000"); channel.basicPublish("normalExchange", "key", true, builder.build(), message.getBytes(StandardCharsets.UTF_8));Dead Letter Exchange (DLX)
Dead Letter(死信):
Basic.Reject / Basic.Nack 并且 requeue 為 true
消息 TTL 過(guò)期
隊(duì)列達(dá)到最大長(zhǎng)度
當(dāng)消息成為 Dead Letter 之后, RabbitMQ 會(huì)自動(dòng)把這個(gè)消息發(fā)到 DLX 上。
// 當(dāng)發(fā)送到 normalQueue 中的消息成為 Dead Letter 之后會(huì)自動(dòng)以 // dead-letter 為 routingKey 發(fā)送到 dlxQueue Exchange Maparg = new HashMap () {{ put("x-dead-letter-exchange", "dlx"); put("x-dead-letter-routing-key", "dead-letter"); }}; channel.queueDeclare("normalQueue", true, false, false, arg); channel.exchangeDeclare("dlx", "direct", true, false, false, null); channel.queueDeclare("dlxQueue", true, false, false, null);
DLX 其他用法:延遲隊(duì)列,消息 發(fā)送到一個(gè)暫存的、沒(méi)有 Consumer 的 Queue 并設(shè)置 TTL,Consumer 消費(fèi) DLX 綁定的 Queue 的消息,建議給暫存的 Queue 設(shè)置一個(gè)最大的 TTL,防止消息沒(méi)有設(shè)置 TTL 而一直堆積在 Queue 中。
Priority消息的消費(fèi)可以有優(yōu)先級(jí),Queue 的最大優(yōu)先級(jí)可以通過(guò) x-max-priority 進(jìn)行設(shè)置。
MapDurabilityarg = new HashMap () {{ put("x-max-priority", 5); }}; channel.queueDeclare("normalQueue", true, false, false, arg); channel.exchangeDeclare("normalExchange", "direct", true, false, null); AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.priority(2); channel.basicPublish("normalExchange", "key", true, builder.build(), messsage.getBytes(StandardCharsets.UTF_8));
Exchange , Queue , 消息都可以進(jìn)行持久化。在消息發(fā)送到 Exchange 之后會(huì)立刻路由到 Queue 中,因此未持久化的 Exchange 在重啟后會(huì)丟失 Exchange 元數(shù)據(jù)以及綁定,對(duì) Queue 和消息的持久化無(wú)影響。
未持久化的 Queue 在重啟后會(huì)丟失,包括 Queue 中的消息,不管消息是否設(shè)置了持久化。
未持久化的消息在重啟后會(huì)丟失,即使所在的 Queue 已持久化。
channel.queueDeclare("normalQueue", true, false, false, null); // Queue 持久化 channel.exchangeDeclare("normalExchange", "direct", true, false, null); // Exchange 持久化 channel.queueBind("normalQueue", "normalExchange", "key"); AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.deliveryMode(2); // 消息持久化 channel.basicPublish("normalExchange", "key", true, builder.build(), messsage.getBytes(StandardCharsets.UTF_8));Qos
Qos 的作用時(shí)負(fù)載均衡。當(dāng)一個(gè)隊(duì)列有兩個(gè) Consumer ,一個(gè)性能很好 A,另一個(gè)不那么好 B,RabbitMQ 會(huì)輪詢,將消息平均地分給這兩個(gè) Consumer??梢?jiàn) B 上的堆積的消息會(huì)越來(lái)越多,而 A 上的線程可能會(huì)空閑。 Qos 的作用就是防止一個(gè) Consumer 堆積了過(guò)多的消息,把這些消息分給其他 Consumer。
global 參數(shù):
channel.basicQos(3, false); // each Consumer limit 3 channel.basicQos(5, true); // this channel limit to 5
global 參數(shù)會(huì)讓 RabbitMQ 調(diào)用更多資源,盡量不要設(shè)置(默認(rèn)值為 false)。
RelibilityRabbitMQ 支持最少一次和最多一次。
最少一次:
- 啟用 Publisher Confirm 或者 事務(wù)保證消息能夠到達(dá)服務(wù)器。 - 啟用 mandatory 參數(shù)保證消息不回被 Exchange 丟掉。 - 消息和 Queue 開(kāi)啟持久化。 - Consumer autoAck off, 并確保消息在處理完之后再 ackPolicy
Policy 可以很方便的批量設(shè)置 Exchange 以及 Queue 的屬性,但是 Policy 的優(yōu)先級(jí)較低,請(qǐng)注意。
Policy 可以通過(guò) HTTP API, web console,以及 cli 的方式。
rabbitmqctl set_policy [-p vhost] [--priority prirority] [--apply-to apply-to] {name} {pattern} {defination}
vhost : 指定 vhost
proiority : 如果一個(gè) Queue 或者 Exchange 有多個(gè) Policy 的情況下,只有 priority 最大的那個(gè) Policy 才會(huì)生效。
apply-to : 應(yīng)用到
Exchange and Queue
Exchange
Queue
name : Policy 的名字
pattern : Exchange 或者 Queue 名字的正則表達(dá)式
defination : 屬性值,可以通過(guò) management > Admin > Policies 的查看。
ClusterRabbitMQ 會(huì)把所有的元數(shù)據(jù)存儲(chǔ)到所有的節(jié)點(diǎn)上,但是隊(duì)列是分散在集群中所有的節(jié)點(diǎn)上的。
Build A Cluster with docker我們嘗試使用 Docker Compose 創(chuàng)建一個(gè)由 3 個(gè)服務(wù)組成的集群
version: "3" services: node1: image: rabbitmq:3.7-management-alpine container_name: node1 hostname: node1 environment: RABBITMQ_ERLANG_COOKIE: secret_cookie_here ports: - "5673:5672" - "15673:15672" node2: image: rabbitmq:3.7-management-alpine container_name: node2 hostname: node2 environment: RABBITMQ_ERLANG_COOKIE: secret_cookie_here ports: - "5674:5672" - "15674:15672" node3: image: rabbitmq:3.7-management-alpine container_name: node3 hostname: node3 environment: RABBITMQ_ERLANG_COOKIE: secret_cookie_here ports: - "5675:5672" - "15675:15672"
通過(guò)設(shè)置 hostname ,容器內(nèi)部的 rabbitmq 的 nodename 就變成類似 rabbitmq@node1。同時(shí)集群中的 RabbitMQ 需要相同的 RABBITMQ_ERLANG_COOKIE 來(lái)進(jìn)行互相認(rèn)證。
啟動(dòng)服務(wù):
docker-compose up -d
然后將 node2 , node3 加入 node1 ,注意,加入集群之前 RabbitMQ 必須停止:
# 停止 rabbitmq docker-compose exec node2 rabbitmqctl stop_app docker-compose exec node3 rabbitmqctl stop_app # 加入 node1 docker-compose exec node2 rabbitmqctl join_cluster rabbitmq@node1 docker-compose exec node3 rabbitmqctl join_cluster rabbitmq@node1 # 重新啟動(dòng) docker-compose exec node2 rabbitmqctl start_app docker-compose exec node3 rabbitmqctl start_app
在任意一個(gè)節(jié)點(diǎn)上查詢集群狀態(tài):
docker-compose exec node2 rabbitmqctl cluster_status
可以看到如下?tīng)顟B(tài):
Cluster status of node rabbit@node2 ... [{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]}, {running_nodes,[rabbit@node3,rabbit@node1,rabbit@node2]}, {cluster_name,<<"rabbit@node2">>}, {partitions,[]}, {alarms,[{rabbit@node3,[]},{rabbit@node1,[]},{rabbit@node2,[]}]}]手動(dòng)下線節(jié)點(diǎn)
將節(jié)點(diǎn)從在線狀態(tài)下線, 首先停止節(jié)點(diǎn),然后重置節(jié)點(diǎn)。
docker-compose exec node2 rabbitmqctl stop_app docker-compose exec node2 rabbitmqctl reset docker-compose exec node2 rabbitmqctl stop_app
在重新啟動(dòng)服務(wù)器之后可以發(fā)現(xiàn)該節(jié)點(diǎn)已經(jīng)脫離了集群。
Cluster status of node rabbit@node2 ... [{nodes,[{disc,[rabbit@node2]}]}, {running_nodes,[rabbit@node2]}, {cluster_name,<<"rabbit@node2">>}, {partitions,[]}, {alarms,[{rabbit@node2,[]}]}]節(jié)點(diǎn)類型
RabbitMQ 的節(jié)點(diǎn)類型有兩種,一種是 disc , 第二種是 ram。 RabbitMQ 要求集群中至少要有一個(gè)磁盤節(jié)點(diǎn),儲(chǔ)存了所有的元數(shù)據(jù)。當(dāng)集群中的唯一一個(gè)磁盤節(jié)點(diǎn)崩潰后,集群可以繼續(xù)收發(fā)消息,但是不能創(chuàng)建隊(duì)列等操作。
RabbitMQ 在加入集群時(shí)默認(rèn)為磁盤模式,如果要以內(nèi)存模式加入:
docker-compose exec node2 rabbitmqctl join_cluster rabbit@node1 --ram
更改節(jié)點(diǎn)類型:
docker-compose exec node 2 rabbitmqctl change cluster_node_type descMirror Queue
RabbitMQ 提供了 Master/Slave 模式的 Mirror Queue 機(jī)制。請(qǐng)注意,開(kāi)啟 Publisher Confirmed 或者事務(wù)的情況下,只有所有的 Slave 都 ACK 之后才會(huì)返回 ACK 給客戶端。
開(kāi)啟 Mirror Queue 主要通過(guò)設(shè)置 Policy 其中最主要的是 defination:
ha-mode: Mirror Queue 的模式
all : 默認(rèn)的模式,表示在集群中的所有節(jié)點(diǎn)上進(jìn)行鏡像
exactly : 在指定數(shù)量的節(jié)點(diǎn)上進(jìn)行鏡像,數(shù)量由 ha-params 指定。
nodes : 在指定的節(jié)點(diǎn)上進(jìn)行鏡像,節(jié)點(diǎn)名稱由 ha-params 指定。
ha-params : 如上所述
ha-sync-mode : 消息的同步模式
automatic : 當(dāng)新的 Slave 加入集群之后會(huì)自動(dòng)同步消息。
manual: 默認(rèn),當(dāng)加入新的 Slave 之后不會(huì)自動(dòng)把消息同步到新的 Slave 上。指導(dǎo)調(diào)用命令顯式同步。
ha-promote-on-shutdown:
when-synced: 默認(rèn),如果主動(dòng)停止 master ,那么 slave 不會(huì)自動(dòng)接管。也就是說(shuō)會(huì)期望 master 會(huì)重啟啟動(dòng),這可以保證消息不會(huì)丟失。
always: 不管 master 是因?yàn)槭裁丛蛲V沟模瑂lave 會(huì)立刻接管,有可能有一部分?jǐn)?shù)據(jù)沒(méi)有從 master 同步到 slave.
ha-promote-on-failure: 默認(rèn) always ,不推薦設(shè)置為 when-synced
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/75346.html
摘要:慕課網(wǎng)消息中間件極速入門與實(shí)戰(zhàn)學(xué)習(xí)總結(jié)時(shí)間年月日星期三說(shuō)明本文部分內(nèi)容均來(lái)自慕課網(wǎng)。 慕課網(wǎng)《RabbitMQ消息中間件極速入門與實(shí)戰(zhàn)》學(xué)習(xí)總結(jié) 時(shí)間:2018年09月05日星期三 說(shuō)明:本文部分內(nèi)容均來(lái)自慕課網(wǎng)。@慕課網(wǎng):https://www.imooc.com 教學(xué)源碼:無(wú) 學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:RabbitM...
摘要:基礎(chǔ)教程注本文是對(duì)眾多博客的學(xué)習(xí)和總結(jié),可能存在理解錯(cuò)誤。請(qǐng)帶著懷疑的眼光,同時(shí)如果有錯(cuò)誤希望能指出。安裝庫(kù)這里我們首先將消息推入隊(duì)列,然后消費(fèi)者從隊(duì)列中去除消息進(jìn)行消費(fèi)。 RabbitMQ 基礎(chǔ)教程(1) - Hello World 注:本文是對(duì)眾多博客的學(xué)習(xí)和總結(jié),可能存在理解錯(cuò)誤。請(qǐng)帶著懷疑的眼光,同時(shí)如果有錯(cuò)誤希望能指出。 如果你喜歡我的文章,可以關(guān)注我的私人博客:http:...
摘要:添加應(yīng)用啟動(dòng)類通過(guò)半自動(dòng)刷新配置。配置客戶端服務(wù)想要實(shí)現(xiàn)自動(dòng)刷新配置的話,一端是不要做任何處理,只需要在一端處理即可。 SpringCloud(第 037 篇)通過(guò)bus/refresh半自動(dòng)刷新ConfigClient配置 - 一、大致介紹 1、上章節(jié)我們講到了手動(dòng)刷新配置,但是我們假設(shè)如果微服務(wù)一多的話,那么我們是不是需要對(duì)每臺(tái)服務(wù)進(jìn)行手動(dòng)刷新呢? 2、答案肯定是不需要的,我們也可...
摘要:消息丟失分成三種情況,可能出現(xiàn)生產(chǎn)者消費(fèi)者。生產(chǎn)者丟失數(shù)據(jù)生產(chǎn)者丟失數(shù)據(jù)首先要確保寫入的消息別丟,消息隊(duì)列通過(guò)請(qǐng)求確認(rèn)機(jī)制,保證消息的可靠傳輸。只有消息被持久化到磁盤以后,才會(huì)回傳消息。消息丟失分成三種情況,可能出現(xiàn)生產(chǎn)者、RabbitMQ、消費(fèi)者。 生產(chǎn)者丟失數(shù)據(jù) 首先要確保寫入 RabbitMQ 的消息別丟,消息隊(duì)列通過(guò)請(qǐng)求確認(rèn)機(jī)制,保證消息的可靠傳輸。生產(chǎn)開(kāi)啟 comf...
閱讀 855·2021-11-15 17:58
閱讀 3658·2021-11-12 10:36
閱讀 3794·2021-09-22 16:06
閱讀 969·2021-09-10 10:50
閱讀 1333·2019-08-30 11:19
閱讀 3317·2019-08-29 16:26
閱讀 942·2019-08-29 10:55
閱讀 3349·2019-08-26 13:48