摘要:一關(guān)鍵字和之間的連接關(guān)系實(shí)際存儲(chǔ)消息。生產(chǎn)者進(jìn)行接受應(yīng)答,用來確定這條消息是否正常的發(fā)送到了,這種方式也是消息的可靠性投遞的核心保障。支持消息的過期時(shí)間,在消息發(fā)送時(shí)可以進(jìn)行指定??梢员O(jiān)聽這個(gè)隊(duì)列中消息做相應(yīng)的處理。
一、rabbitmq關(guān)鍵字
Binding:Exchange和Exchange、Queue之間的連接關(guān)系二、rabbitmq高級(jí)特性
Queue:實(shí)際存儲(chǔ)消息。
Durability:是否持久化,Durable:是,Transient:否。
Auto delete:如選yes,代表當(dāng)最后一個(gè)監(jiān)聽被移除之后,該Queue會(huì)自動(dòng)刪除。
Message:服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù)。本質(zhì)上就是一段數(shù)據(jù),由Properties和Payload(Body)組成。常用屬性:delivery mode、headers(自定義屬性)
Virtual host:虛擬主機(jī),用于進(jìn)行邏輯隔離,最上層的消息路由。一個(gè)Virtual Host里面可以有若干個(gè)Exchange和Queue,同一個(gè)Virtual Host里面不能有相同名稱的Exchange或者Queue。
消息如何保證100%的投遞成功?
生產(chǎn)端可靠性投遞
保障消息的成功發(fā)出
保障MQ節(jié)點(diǎn)的成功接收
發(fā)送端收到MQ節(jié)點(diǎn)(Broker)確認(rèn)應(yīng)答
完善的消息進(jìn)行補(bǔ)償機(jī)制
可靠性投遞解決方案
消息落庫(kù),對(duì)消息狀態(tài)進(jìn)行打標(biāo)
消息的延遲投遞,做二次確認(rèn),回調(diào)檢查
消息集群鏡像隊(duì)列:rabbitmq集群模式非常經(jīng)典的就是Mirror鏡像模式,保證100%數(shù)據(jù)不丟失,在實(shí)際工作中也是用的最多的。并且實(shí)現(xiàn)集群非常簡(jiǎn)單,一般互聯(lián)網(wǎng)公司都會(huì)構(gòu)建這種鏡像集群模式。https://segmentfault.com/a/11...
冪等性概念詳解
在海量訂單產(chǎn)生的業(yè)務(wù)高峰期,如何避免消息的重復(fù)消費(fèi)問題?
Confirm確認(rèn)消息、Return返回消息
理解Confirm消息確認(rèn)機(jī)制:
消息的確認(rèn),是指生產(chǎn)者投遞消息后,如果Broker收到消息,則會(huì)給我們生產(chǎn)者一個(gè)應(yīng)答。
生產(chǎn)者進(jìn)行接受應(yīng)答,用來確定這條消息是否正常的發(fā)送到了Broker,這種方式也是消息的可靠性投遞的核心保障。
如何實(shí)現(xiàn)Confirm確認(rèn)消息?
第一步:在channel上開啟確認(rèn)模式:channel.confirmSelect()
第二步:在channel上添加監(jiān)聽:addConfirmListener,監(jiān)聽成功和失敗的返回結(jié)果,根據(jù)具體的結(jié)果對(duì)消息進(jìn)行重新發(fā)送、或者記錄日志等后續(xù)處理。
Return消息機(jī)制
Return Listener用于處理一些不可路由的消息
我們的消息生產(chǎn)者,通過制定一個(gè)Exchange和Routing Key,把消息送達(dá)到某一個(gè)隊(duì)列中去,然后我們的消費(fèi)者監(jiān)聽隊(duì)列,進(jìn)行消費(fèi)處理。
但是在某寫情況下,如果我們?cè)诎l(fā)送消息的時(shí)候,當(dāng)前的exchange不存在或者指定的路由key路由不到,這個(gè)時(shí)候如果我們需要監(jiān)聽這種不可達(dá)的消息,就要使用Return Listener。
Mandatory:如果為true,則監(jiān)聽器會(huì)接收到路由不可達(dá)的消息,然后進(jìn)行后續(xù)處理,如果為false,那么broker端自動(dòng)刪除該消息。
自定義消費(fèi)者
public class MyConsumer extends DefaultConsumer implements Consumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String arg0, Envelope arg1, BasicProperties arg2, byte[] arg3) throws IOException { } }
消息的ACK與重回隊(duì)列
消費(fèi)端手工ACK(應(yīng)答成功)和NACK(應(yīng)答失?。?/b>
消費(fèi)端進(jìn)行消費(fèi)的時(shí)候,如果由于業(yè)務(wù)異常我們可以進(jìn)行日志的記錄,然后進(jìn)行補(bǔ)償。
由于服務(wù)器宕機(jī)等嚴(yán)重問題,我們就要手工進(jìn)行ACK保障消費(fèi)端消費(fèi)成功。比如:消費(fèi)一半的時(shí)候宕機(jī)了,broker端沒有收到應(yīng)答,重發(fā)消息。
消費(fèi)端重回隊(duì)列
消費(fèi)端重回隊(duì)列是為了對(duì)沒有處理成功的消息,把消息重新回遞給broker。
一般我們?cè)趯?shí)際應(yīng)用中,都會(huì)關(guān)閉重回隊(duì)列,也就是設(shè)置為false。
channel.basciNack(envelope.getDeliveryTag(),false,true) // 為true的話,在消費(fèi)失敗的情況下會(huì)重回隊(duì)列放入隊(duì)列末端。
消息的限流
假設(shè)有一個(gè)場(chǎng)景,RabbitMq服務(wù)器上有上萬條未處理的消息,我們隨便打開一個(gè)消費(fèi)者客戶端,會(huì)出現(xiàn)下面的情況: 巨量的消息瞬間全部推送過來,但是我們單個(gè)客戶端無法同時(shí)處理這么多數(shù)據(jù)。RabbitMq提供了一種qos(服務(wù)質(zhì)量保證)功能,即在非自動(dòng)確認(rèn)消息的前提下,如果一定數(shù)目的消息(通過基于consume或者channel設(shè)置Qos的值)未被確認(rèn)前,不進(jìn)行消費(fèi)新的消息。
void BasicQos(unit prefetchSize, ushort prefetchCount, bool global);
prefetchSize:0
prefetchCount: 會(huì)告訴RabbitMq不要同時(shí)給一個(gè)消費(fèi)者推送多于N個(gè)消息,即一旦有N個(gè)消息還沒有被ack,則該consumer將block掉,知道有消息ack。
global: true/false,是否將上面設(shè)置應(yīng)用于channel,簡(jiǎn)答點(diǎn)說,就是上面限制是channel級(jí)別的還是consumer級(jí)別。
// 限流方式,第一件事就是autoAck設(shè)置為false,關(guān)閉自動(dòng)簽收,必須手動(dòng)簽收 channel.basicQos(0,3,false); // 3表示如果消息積壓了1000條,先給我推3條,這三條消費(fèi)結(jié)束后,我會(huì)給你一個(gè)ack表示這三條我已經(jīng)處理完了,然后再給我推送3條... channel.basicConsume(queueName,false,new MyConsumer(channel)) //在MyConsumer中對(duì)消息進(jìn)行簽收ack channel.basicAck(envelope,getDeliveryTag(), false);
TTL消息
TTL:是Time To Live。就是生存時(shí)間。
RabbitMQ支持消息的過期時(shí)間,在消息發(fā)送時(shí)可以進(jìn)行指定。
RabbitMQ支持隊(duì)列的過期時(shí)間,從消息入隊(duì)開始計(jì)算,只要超過了隊(duì)列的超時(shí)時(shí)間配置,那么消息會(huì)自動(dòng)清除。
死信隊(duì)列
死信隊(duì)列:DLX,Dead-Letter-Exchange。
利用DLX,當(dāng)消息在一個(gè)隊(duì)列中變成了死信(dead message:這條消息沒有消費(fèi)者去消費(fèi))之后,他能被重新publish到另外一個(gè)Exchange,這個(gè)Exchange就是DLX。
進(jìn)入死信隊(duì)列(進(jìn)入死信的三種方式)1.消息被拒絕(basic.reject or basic.nack)并且requeue=false
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
2.消息TTL過期
3.隊(duì)列達(dá)到最大長(zhǎng)度
備注說明
DLX也是一個(gè)正常的Exchange,和一般的Exchange沒區(qū)別,他能在任何的隊(duì)列上被指定,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性。
當(dāng)這個(gè)隊(duì)列中有死信時(shí),RabbitMQ會(huì)自動(dòng)將這個(gè)消息重新發(fā)送到已經(jīng)設(shè)置了的Exchange上去,進(jìn)而被路由到另外一個(gè)隊(duì)列上去。
可以監(jiān)聽這個(gè)隊(duì)列中消息做相應(yīng)的處理。
死信隊(duì)列設(shè)置
首先需要設(shè)置死信隊(duì)列的exchange和queue,然后進(jìn)行綁定:Exchange:dlx.exchange、Queue:dlx.queue、RoutingKey:#
然后進(jìn)行正常聲明交換機(jī)、隊(duì)列、綁定,只不過我們需要在隊(duì)列加上一個(gè)參數(shù)即可:arguments.put("x-dead-letter-exchange","dlx.exchange");
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/77668.html
摘要:第一步安裝因?yàn)槭钦Z言編寫的,所以我們首先需要安裝第二步安裝官網(wǎng)提供的安裝方式本人安裝成功的方式第三步查看是否已經(jīng)安裝好了,能查到說明已經(jīng)安裝完成了。 第一步:安裝Erlang 因?yàn)閞abbitMQ是Erlang語言編寫的,所以我們首先需要安裝Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...
摘要:第一步安裝因?yàn)槭钦Z言編寫的,所以我們首先需要安裝第二步安裝官網(wǎng)提供的安裝方式本人安裝成功的方式第三步查看是否已經(jīng)安裝好了,能查到說明已經(jīng)安裝完成了。 第一步:安裝Erlang 因?yàn)閞abbitMQ是Erlang語言編寫的,所以我們首先需要安裝Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...
摘要:本文基于的插件,針對(duì)進(jìn)行簡(jiǎn)單的測(cè)試。包括協(xié)議的介紹,的安裝配置開啟插件及基于進(jìn)行的測(cè)試。協(xié)議是基于發(fā)布訂閱模型的物聯(lián)網(wǎng)消息傳遞協(xié)議。對(duì)傳輸消息有三種服務(wù)質(zhì)量最多一次,這一級(jí)別會(huì)發(fā)生消息丟失或重復(fù),消息發(fā)布依賴于底層網(wǎng)絡(luò)。 ...
閱讀 2040·2023-04-25 14:50
閱讀 2920·2021-11-17 09:33
閱讀 2628·2019-08-30 13:07
閱讀 2850·2019-08-29 16:57
閱讀 918·2019-08-29 15:26
閱讀 3563·2019-08-29 13:08
閱讀 2003·2019-08-29 12:32
閱讀 3398·2019-08-26 13:57