摘要:知識(shí)庫(kù)前言舍棄了繁重的事務(wù)消息而使用了消息確認(rèn)機(jī)制實(shí)現(xiàn)了分布式事務(wù),實(shí)在是解耦之一大神器。但是其配置起來挺麻煩,各種參數(shù),各種調(diào)整。消費(fèi)確認(rèn)消費(fèi)確認(rèn)用來確保消費(fèi)者是否成功的消費(fèi)了消息。
rabbitmq知識(shí)庫(kù): http://rabbitmq.org.cn/
前言:Rabbitmq舍棄了繁重的事務(wù)消息而使用了消息確認(rèn)機(jī)制實(shí)現(xiàn)了分布式事務(wù),實(shí)在是解耦之一大神器。但是其配置起來挺麻煩,各種參數(shù),各種調(diào)整。但國(guó)內(nèi)貌似資料很少,找來找去都找不到,自己擼一發(fā)先
1 發(fā)送確認(rèn)發(fā)送確認(rèn)用來確保消息是否已送達(dá)消息隊(duì)列。消息一旦到達(dá)消息服務(wù),就會(huì)觸發(fā)確認(rèn)機(jī)制,可以分為兩種情況:
對(duì)于無法被路由的消息,一旦沒法找到一個(gè)隊(duì)列來消費(fèi)它,就會(huì)觸發(fā)確認(rèn)無法消費(fèi),此時(shí)ack=false。旦有一種情況例外,就是連exchange都沒法找到,如果設(shè)置了mandatory, 此時(shí)就會(huì)先觸發(fā)basic.return,就是會(huì)先觸發(fā)returncallback回調(diào)。
對(duì)于可以被路由的消息,當(dāng)消息被(所有的?)queue接受時(shí),會(huì)觸發(fā)ack=true;對(duì)于設(shè)置了持久化(persistent)的消息,當(dāng)消息成功的持久化到硬盤上才會(huì)觸發(fā);對(duì)于設(shè)置了鏡像(mirror)的消息,那么是當(dāng)所有的mirror接受到這個(gè)消息。
2 消費(fèi)確認(rèn)(Delivery Acknowledgements)消費(fèi)確認(rèn)用來確保消費(fèi)者是否成功的消費(fèi)了消息。一旦有消費(fèi)者成功注冊(cè)到相應(yīng)的消息服務(wù),消息將會(huì)被消息服務(wù)通過basic.deliver推(push)給消費(fèi)者,此時(shí)消息會(huì)包含一個(gè)deliver tag用來唯一的標(biāo)識(shí)消息。如果此時(shí)是手動(dòng)模式,就需要手動(dòng)的確認(rèn)消息已經(jīng)被成功消費(fèi),否則消息服務(wù)將會(huì)重發(fā)消息(因?yàn)橄⒁呀?jīng)持久化到了硬盤上,所以無論消息服務(wù)是不是可能掛掉,都會(huì)重發(fā)消息)。而且必須確認(rèn),無論是成功或者失敗,否則會(huì)引起非常嚴(yán)重的問題
3 死信交換機(jī)(Dead Letter Exchanges)有三種情況可能進(jìn)死信交換機(jī)
被reject或者nack,并且requeue設(shè)置為false
消息最大存活時(shí)間(TTL)超時(shí)
消息數(shù)量超過最大隊(duì)列長(zhǎng)度
只需要設(shè)置一個(gè)args,就ok拉
channel.exchangeDeclare("some.exchange.name", "direct"); Map4 Qosargs = new HashMap (); args.put("x-dead-letter-exchange", "some.exchange.name"); channel.queueDeclare("myqueue", false, false, false, args);
Channel Prefetch Setting (QoS),表示當(dāng)前channel中未應(yīng)答消息的數(shù)目,如果超過了,隊(duì)列中將不再接受新的消息。這里所謂的channel就是指從消息服務(wù)到消費(fèi)者的一個(gè)通道,簡(jiǎn)單來說就是指消息從消息隊(duì)列發(fā)送到消費(fèi)者了,如果沒收到應(yīng)答,就算是一個(gè)Qos
加大這個(gè)值會(huì)增加消息的發(fā)送速度(Throughput),但是會(huì)加重消息隊(duì)列的內(nèi)存,所以100-300之間是一個(gè)比較理想的狀態(tài),可參考:http://next.rabbitmq.com/conf... :Channel Prefetch Setting (QoS)。
暴力的設(shè)置微100-300是存在一些問題的,如果太大,可能消息全部都?jí)涸谙M(fèi)者中而得不到消費(fèi),看起來隊(duì)列是空的,實(shí)際上全部積壓在客戶端;如果太小則得不到消費(fèi),浪費(fèi)資源。具體該怎么設(shè)置要根據(jù)實(shí)際的網(wǎng)絡(luò)吞吐量、以及消費(fèi)者的消費(fèi)能力。比如果消費(fèi)者很快,是內(nèi)存操作,那么你設(shè)置很大,甚至不設(shè)置都可以;但是如果消費(fèi)者很慢,比如是個(gè)數(shù)據(jù)庫(kù)操作,那么很可能將消息全部積壓到消費(fèi)者而得不到響應(yīng)
。可以參考http://www.rabbitmq.com/blog/...
Qos同時(shí)也會(huì)存在一個(gè)問題,一個(gè)channel是會(huì)被多個(gè)消費(fèi)者的,所以必須計(jì)算出所有消費(fèi)者中未應(yīng)答的數(shù)目,這顯然是非常不合理的,而且很麻煩,所以可以改為設(shè)置每個(gè)消費(fèi)者緩存(Prefetch Buf)可以允許的最大的數(shù)目。
例子:
// 1. 一下子設(shè)置所有的queue都是10條unacknowledged Channel channel = ...; Consumer consumer = ...; channel.basicQos(10); // Per consumer limit channel.basicConsume("my-queue", false, consumer); //2. 分別設(shè)置10條 Channel channel = ...; Consumer consumer1 = ...; Consumer consumer2 = ...; channel.basicQos(10); // Per consumer limit channel.basicConsume("my-queue1", false, consumer1); channel.basicConsume("my-queue2", false, consumer2); //3. 分別設(shè)置channel和consume,個(gè)人不推薦 Channel channel = ...; Consumer consumer1 = ...; Consumer consumer2 = ...; channel.basicQos(10, false); // Per consumer limit channel.basicQos(15, true); // Per channel limit channel.basicConsume("my-queue1", false, consumer1); channel.basicConsume("my-queue2", false, consumer2);5 channelCacheSize
當(dāng)前最大允許空閑的最大channel數(shù)。如果在高并發(fā)的環(huán)境中,如果值過小的話channel會(huì)關(guān)關(guān)開開非常頻繁。所以在1.6的版本中,spring amqp將這個(gè)值從1提高到了25。同時(shí)你也可以從RabbitMQ Admin中心觀察到channel關(guān)關(guān)開開,那么就可以考慮增大cache的值了。
當(dāng)你遇到 connetion error的錯(cuò)誤時(shí),就可以考慮增大channel cache size了。
6 其它參數(shù)和配置 delivery tags通道(channel)中的消息標(biāo)志,按照正數(shù)遞增,消息隊(duì)列中用來標(biāo)識(shí)消息的唯一標(biāo)識(shí)
Blocked Connection Notifications當(dāng)消息服務(wù)器資源不足時(shí),會(huì)向所有的生產(chǎn)者發(fā)送這個(gè)消息,我們可以捕獲這個(gè)消息并做處理,資源可以是內(nèi)存不足,cpu負(fù)載過重等等。
ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); connection.addBlockedListener(new BlockedListener() { public void handleBlocked(String reason) throws IOException { // Connection is now blocked } public void handleUnblocked() throws IOException { // Connection is now unblocked } });
話說真出現(xiàn)block了也應(yīng)該是在管理臺(tái)直接給出警告,不過也可以做一下避免異常
Multiple全應(yīng)答標(biāo)識(shí)。如果設(shè)置為true,一條消息應(yīng)答了,那么之前的全部消息將被應(yīng)答。比如目前channel中有delivery tags為5,6,7,8的消息,那么一旦8被應(yīng)答,那么5,6,7將都被應(yīng)答,如果設(shè)置為false,那么5,6,7將不會(huì)被應(yīng)答。(不建議設(shè)置,畢竟一個(gè)channel中會(huì)綁定好多consumer)
basic.nack當(dāng)消息服務(wù)發(fā)生異常時(shí),不會(huì)發(fā)送basic.ack,反而會(huì)發(fā)送一個(gè)basic.nack,而且不會(huì)自動(dòng)requeue,此時(shí)需要消息發(fā)送方手動(dòng)處理,進(jìn)行重發(fā)。只有一種情況會(huì)發(fā)送nack:“basic.nack will only be delivered if an internal error occurs in the Erlang process responsible for a queue”
1 mq配置的listener到什么地方,是配置到每個(gè)微服務(wù)、或者是配置到mq中?如果在每個(gè)服務(wù)里面都配置concurency,是不是隨著節(jié)點(diǎn)的增加,listener數(shù)量也會(huì)無限增加?
2 ssl和max-queue-length的配置
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/70800.html
摘要:本章主要是貼出一些相關(guān)的配置參數(shù),如果需要修改添加對(duì)應(yīng)的參數(shù)配置即可。 本章主要是貼出一些SpringBoot相關(guān)的配置參數(shù),如果需要修改添加對(duì)應(yīng)的參數(shù)配置即可。 application.properties # ---------------------------------------- # CORE PROPERTIES # --------------------------...
摘要:官方鏡像倉(cāng)庫(kù)地址本地運(yùn)行訪問可視化面板地址默認(rèn)賬號(hào)默認(rèn)密碼集成基本參數(shù)配置配置配置定義優(yōu)先級(jí)隊(duì)列定義交換器定義參考官方文檔應(yīng)用啟動(dòng)后,會(huì)自動(dòng)創(chuàng)建和,并相互綁定,優(yōu)先級(jí)隊(duì)列會(huì)有如圖所示標(biāo)識(shí)。 showImg(https://upload-images.jianshu.io/upload_images/3424642-6085f3f9e43c7a4c.png?imageMogr2/auto...
摘要:一關(guān)鍵字和之間的連接關(guān)系實(shí)際存儲(chǔ)消息。生產(chǎn)者進(jìn)行接受應(yīng)答,用來確定這條消息是否正常的發(fā)送到了,這種方式也是消息的可靠性投遞的核心保障。支持消息的過期時(shí)間,在消息發(fā)送時(shí)可以進(jìn)行指定??梢员O(jiān)聽這個(gè)隊(duì)列中消息做相應(yīng)的處理。 一、rabbitmq關(guān)鍵字 Binding:Exchange和Exchange、Queue之間的連接關(guān)系Queue:實(shí)際存儲(chǔ)消息。Durability:是否持久化,Du...
摘要:而調(diào)用后端服務(wù)就應(yīng)用了的高級(jí)特分布式配置管理平臺(tái)后端掘金輕量的分布式配置管理平臺(tái)。關(guān)于網(wǎng)絡(luò)深度解讀后端掘金什么是網(wǎng)絡(luò)呢總的來說,網(wǎng)絡(luò)中的容器們可以相互通信,網(wǎng)絡(luò)外的又訪問不了這些容器。 在 Java 路上,我看過的一些書、源碼和框架(持續(xù)更新) - 后端 - 掘金簡(jiǎn)書 占小狼轉(zhuǎn)載請(qǐng)注明原創(chuàng)出處,謝謝!如果讀完覺得有收獲的話,歡迎點(diǎn)贊加關(guān)注 物有本末,事有終始,知所先后,則近道矣 ......
閱讀 2346·2021-11-23 09:51
閱讀 1152·2021-11-22 13:52
閱讀 3623·2021-11-10 11:35
閱讀 1203·2021-10-25 09:47
閱讀 3008·2021-09-07 09:58
閱讀 1073·2019-08-30 15:54
閱讀 2830·2019-08-29 14:21
閱讀 3041·2019-08-29 12:20