摘要:它是阿里巴巴于年開源的第三代分布式消息中間件。是一個分布式消息中間件,具有低延遲高性能和可靠性萬億級別的容量和靈活的可擴展性,它是阿里巴巴于年開源的第三代分布式消息中間件。
上篇文章消息隊列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊列的發(fā)展史:
并且詳細介紹了RabbitMQ,其功能也是挺強大的,那么,為啥又要搞一個RocketMQ出來呢?是重復造輪子嗎?本文我們就帶大家來詳細探討RocketMQ究竟好在哪里。
RocketMQ是一個分布式消息中間件,具有低延遲、高性能和可靠性、萬億級別的容量和靈活的可擴展性。它是阿里巴巴于2012年開源的第三代分布式消息中間件。
隨著阿里巴巴的電商業(yè)務不斷發(fā)展,需要一款更高性能的消息中間件,RocketMQ就是這個業(yè)務背景的產(chǎn)物。RocketMQ是一個分布式消息中間件,具有低延遲、高性能和可靠性、萬億級別的容量和靈活的可擴展性,它是阿里巴巴于2012年開源的第三代分布式消息中間件。RocketMQ經(jīng)歷了多年雙十一的洗禮,在可用性、可靠性以及穩(wěn)定性等方面都有出色的表現(xiàn)。值得一提的是,RocketMQ最初就是借鑒了Kafka進行改造開發(fā)而來的,所以熟悉Kafka的朋友,會發(fā)現(xiàn)RocketMQ的原理和Kafka有很多相似之處。
RocketMQ前身叫做MetaQ,在MeataQ發(fā)布3.0版本的時候改名為RocketMQ,其本質(zhì)上的設計思路和Kafka類似,因為最初就是基于Kafka改造而來,經(jīng)過不斷的迭代與版本升級,2016年11月21日,阿里巴巴向Apache軟件基金會捐贈了RocketMQ 。近年來被越來越多的國內(nèi)企業(yè)使用。
本文帶大家從以下幾個方面詳細了解RocketMQ:
RocketMQ的架構主要分為四部分,如下圖所示:
Producer
:消息生產(chǎn)者,支持集群方式部署;Consumer
:消息消費者,支持集群方式部署,支持pull,push模式獲取消息進行消費,支持集群和廣播方式消費;NameServer
:Topic路由注冊中心,類似于Dubbo中的zookeeper,支持Broker的動態(tài)注冊與發(fā)現(xiàn);BrokerServer
:主要負責消息的存儲、投遞和查詢,以及服務高可用保證。BrokerServer包含以下幾個重要的子模塊:RocketMQ執(zhí)行原理如下圖所示:
brokerRole
,可選值:ASYNC_MASTER
:異步復制方式(異步雙寫),生產(chǎn)者寫入消息到Master之后,無需等到消息復制到Slave即可返回,消息的復制由旁路線程進行異步復制;SYNC_MASTER
:同步復制方式(同步雙寫),生產(chǎn)者寫入消息到Master之后,需要等到Slave復制成功才可以返回。如果有多個Slave,只需要有一個Slave復制成功,并成功應答,就算復制成功了。這里是否持久化到磁盤依賴于另一個參數(shù):flushDiskType
;SLAVE
:從節(jié)點本節(jié)我們來看看一個雙主雙從的RocketMQ是如何搭建的。
集群配置參數(shù)說明:
在討論集群前,我們需要了解兩個關鍵的集群配置參數(shù):brokerRole,flushDiskType。brokerRole在前一節(jié)已經(jīng)介紹了,而flushDiskType則是刷盤方式的配置,主要有:
- ASYNC_FLUSH: 異步刷盤
- SYNC_FLUSH: 同步刷盤
brokerRole確定了主從同步是異步的還是同步的,flushDiskType確定了數(shù)據(jù)刷盤的方式是同步的還是異步的。
如果業(yè)務場景對消息丟失容忍度很低,可以采用SYNC_MASTER + ASYNC_FLUSH的方式,這樣只有master和slave在刷盤前同時掛掉,消息才會丟失,也就是說即使有一臺機器出故障,仍然能保證數(shù)據(jù)不丟;
如果業(yè)務場景對消息丟失容忍度比較高,則可以采用ASYNC_MASTER + ASYNC_FLUSH的方式,這樣可以盡可能的提高消息的吞吐量。
Master Broker支持讀和寫,Slave Broker只支持讀。
當Master不可用的時候,Consumer會自動切換到Slave進行讀,也就是說,當Master節(jié)點的機器出現(xiàn)故障后,Consumer仍然可以從Slave節(jié)點讀取消息,不影響消費端的消費程序。
集群配置參數(shù)說明:
- brokerName: broker的名稱,需要把Master和Slave節(jié)點配置成相同的名稱,表示他們的主從關系,相同的brokerName的一組broker,組成一個broker組;
- brokerId: broker的id,0表示Master節(jié)點的id,大于0表示Slave節(jié)點的id。
在RocketMQ中,機器的主從節(jié)點關系是提前配置好的,沒有類似Kafka的Master動態(tài)選主功能。
如果一個Master宕機了,要讓生產(chǎn)端程序繼續(xù)可以生產(chǎn)消息,您需要部署多個Master節(jié)點,組成多個broker組。這樣在創(chuàng)建Topic的時候,就可以把Topic的不同消息隊列分布在多個broker組中,即使某一個broker組的Master節(jié)點不可用了,其他組的Master節(jié)點仍然可用,保證了Producer可以繼續(xù)發(fā)送消息。
為了盡可能的保證消息不丟失
,并且保證生產(chǎn)者和消費者的可用性
,我們可以構建一個雙主雙從的集群,搭建的架構圖如下所示:
部署架構說明:
以下是關鍵的配置參數(shù):
# NameServer地址namesrvAddr=192.168.1.100:9876;192.168.1.101:9876# 集群名稱brokerClusterName=itzhai-com-cluster# brokerIP地址brokerIP1=192.168.1.100# broker通信端口listenPort=10911# broker名稱brokerName=broker‐1# 0表示主節(jié)點brokerId=0# 2點進行消息刪除deleteWhen=02# 消息在磁盤上保留48小時fileReservedTime=48# 主從同步復制brokerRole=SYNC_MASTER# 異步刷盤flushDiskType=ASYNC_FLUSH# 自動創(chuàng)建TopicautoCreateTopicEnable=true# 消息存儲根目錄storePathRootDir=/data/rocketmq/store‐m
# NameServer地址namesrvAddr=192.168.1.100:9876;192.168.1.101:9876# 集群名稱brokerClusterName=itzhai-com-cluster# brokerIP地址brokerIP1=192.168.1.101# broker通信端口listenPort=10911# broker名稱brokerName=broker‐1 # 非0表示從節(jié)點brokerId=1# 2點進行消息刪除deleteWhen=02# 消息在磁盤上保留48小時fileReservedTime=48# 從節(jié)點brokerRole=SLAVE# 異步刷盤flushDiskType=ASYNC_FLUSH# 自動創(chuàng)建TopicautoCreateTopicEnable=true # 消息存儲根目錄storePathRootDir=/data/rocketmq/store‐s
# NameServer地址namesrvAddr=192.168.1.100:9876;192.168.1.101:9876# 集群名稱brokerClusterName=itzhai-com-cluster# brokerIP地址brokerIP1=192.168.1.102# broker通信端口listenPort=10911# broker名稱brokerName=broker‐2# 0表示主節(jié)點brokerId=0# 2點進行消息刪除deleteWhen=02# 消息在磁盤上保留48小時fileReservedTime=48# 主從同步復制brokerRole=SYNC_MASTER# 異步刷盤flushDiskType=ASYNC_FLUSH# 自動創(chuàng)建TopicautoCreateTopicEnable=true# 消息存儲根目錄storePathRootDir=/data/rocketmq/store‐m
# NameServer地址namesrvAddr=192.168.1.100:9876;192.168.1.101:9876# 集群名稱brokerClusterName=itzhai-com-cluster# brokerIP地址brokerIP1=192.168.1.103# broker通信端口listenPort=10911# broker名稱brokerName=broker‐2# 非0表示從節(jié)點brokerId=1# 2點進行消息刪除deleteWhen=02# 消息在磁盤上保留48小時fileReservedTime=48# 從節(jié)點brokerRole=SLAVE# 異步刷盤flushDiskType=ASYNC_FLUSH# 自動創(chuàng)建TopicautoCreateTopicEnable=true# 消息存儲根目錄storePathRootDir=/data/rocketmq/store‐s
寫了那么多頂層架構圖,不寫寫底層內(nèi)幕,就不是IT宅(itzhai.com)的文章風格,接下來,我們就來看看底層存儲架構。
我們在broker.conf
文件中配置了消息存儲的根目錄:
# 消息存儲根目錄storePathRootDir=/data/rocketmq/store‐m
進入這個目錄,我們可以發(fā)現(xiàn)如下的目錄結構:
其中:
下面我們來看看關鍵的commitlog以及consumequeue:
消息投遞到Broker之后,是先把實際的消息內(nèi)容存放到CommitLog中的,然后再把消息寫入到對應主題的ConsumeQueue中。其中:
CommitLog:消息的物理存儲文件,存儲實際的消息內(nèi)容。每個Broker上面的CommitLog被該Broker上所有的ConsumeQueue共享。
單個文件大小默認為1G,文件名長度為20位,左邊補零,剩余為起始偏移量。預分配好空間,消息順序?qū)懭肴罩疚募?/strong>。當文件滿了,則寫入下一個文件,下一個文件的文件名基于文件第一條消息的偏移量進行命名;
ConsumeQueue:消息的邏輯隊列,相當于CommitLog的索引文件。RocketMQ是基于Topic主題訂閱模式實現(xiàn)的,每個Topic下會創(chuàng)建若干個邏輯上的消息隊列ConsumeQueue,在消息寫入到CommitLog之后,通過Broker的后臺服務線程(ReputMessageService)不停地分發(fā)請求并異步構建ConsumeQueue和IndexFile(索引文件,后面介紹),然后把每個ConsumeQueue需要的消息記錄到各個ConsumeQueue中。
ConsumeQueue主要記錄8個字節(jié)的commitLogOffset(消息在CommitLog中的物理偏移量), 4個字節(jié)的msgSize(消息大小), 8個字節(jié)的TagHashcode,每個元素固定20個字節(jié)。
ConsumeQueue相當于CommitLog文件的索引,可以通過ConsumeQueue快速從很大的CommitLog文件中快速定位到需要的消息。
主題消息隊列:在consumequeue目錄下,按照topic的維度存儲消息隊列。
重試消息隊列:如果topic中的消息消費失敗,則會把消息發(fā)到重試隊列,重新隊列按照消費端的GroupName來分組,命名規(guī)則:%RETRY%ConsumerGroupName
死信消息隊列:如果topic中的消息消費失敗,并且超過了指定重試次數(shù)之后,則會把消息發(fā)到死信隊列,死信隊列按照消費端的GroupName來分組,命名規(guī)則:%DLQ%ConsumerGroupName
假設我們現(xiàn)在有一個topic:itzhai-test
,消費分組:itzhai_consumer_group
,當消息消費失敗之后,我們查看consumequeue目錄,會發(fā)現(xiàn)多處了一個重試隊列:
我們可以在RocketMQ的控制臺看到這個重試消息隊列的主題和消息:
如果一直重試失敗,達到一定次數(shù)之后(默認是16次,重試時間:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h),就會把消息投遞到死信隊列:
每條消息的長度是不固定的,為了提高寫入的效率,RocketMQ預先分配好1G空間的CommitLog文件,采用順序?qū)?/strong>的方式寫入消息,大大的提高寫入的速度。
RocketMQ中消息刷盤主要可以分為同步刷盤和異步刷盤兩種,通過flushDiskType參數(shù)進行配置。如果需要提高寫消息的效率,降低延遲,提高MQ的性能和吞吐量,并且不要求消息數(shù)據(jù)存儲的高可靠性,可以把刷盤策略設置為異步刷盤。
為了提高讀取的效率,RocketMQ使用ConsumeQueue作為消費消息的索引,使用IndexFile作為基于消息key的查詢的索引。下面來詳細介紹下。
讀取消息是隨機讀的,為此,RocketMQ專門建立了ConsumeQueue索引文件,每次先從ConsumeQueue中獲取需要的消息的地址,消息大小,然后從CommitLog文件中根據(jù)地址直接讀取消息內(nèi)容。在讀取消息內(nèi)容的過程中,也盡量利用到了操作系統(tǒng)的頁緩存機制,進一步加速讀取速度。
ConsumeQueue由于每個元素大小是固定的,因此可以像訪問數(shù)組一樣訪問每個消息元素。并且占用空間很小,大部分的ConsumeQueue能夠被全部載入內(nèi)存,所以這個索引查找的速度很快。每個ConsumeQueue文件由30w個元素組成,占用空間在6M以內(nèi)。每個文件默認大小為600萬個字節(jié),當一個ConsumeQueue類型的文件寫滿之后,則寫入下一個文件。
我們在RocketMQ的store目錄中可以發(fā)現(xiàn)有一個index目錄,這個是一個用于輔助提高查詢消息效率的索引文件。通過該索引文件實現(xiàn)基于消息key來查詢消息的功能。
IndexFile索引文件物理存儲結構如下圖所示:
beginTimestamp
:索引文件中第一個索引消息存入Broker的時間戳;endTimestamp
:索引文件中最后一個索引消息存入Broker的時間戳beginPHYOffset
:索引文件中第一個索引消息在CommitLog中的偏移量;endPhyOffset
:索引文件中最后一個索引消息在CommitLog中的偏移量;hashSlotCount
:構建索引使用的slot數(shù)量;indexCount
:索引的總數(shù);Key Hash
:消息的哈希值;Commit Log Offset
:消息在CommitLog中的偏移量;Timestamp
:消息存儲的時間戳;Next Index Offset
:下一個索引的位置,如果消息取模后發(fā)生槽位槽位碰撞,則通過此字段把碰撞的消息構成鏈表。每個IndexFile文件的大?。?0b + 4b * 5000000 + 20b * 20000000 = 420000040b,約為400M。
IndexFile索引文件的邏輯存儲結構如下圖所示:
IndexFile邏輯上是基于哈希表來實現(xiàn)的,Slot Table為哈希鍵,Index Linked List中存儲的為哈希值。
RocketMQ中的MessageId的長度總共有16字節(jié),其中包含了:消息存儲主機地址(IP地址和端口),消息Commit Log offset。“
按照MessageId查詢消息的流程:Client端從MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封裝成一個RPC請求后通過Remoting通信層發(fā)送(業(yè)務請求碼:VIEW_MESSAGE_BY_ID)。Broker端走的是QueryMessageProcessor,讀取消息的過程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的記錄并解析成一個完整的消息返回。
我們繼續(xù)看看在集群模式下,RocketMQ的Topic數(shù)據(jù)是如何做分區(qū)的。IT宅(itzhai.com)提醒大家,實踐出真知。這里我們部署兩個Master節(jié)點:
我們通過手動配置每個Broker中的Topic,以及ConsumeQueue數(shù)量,來實現(xiàn)Topic的數(shù)據(jù)分片,如,我們到集群中手動配置這樣的Topic:
broker-a
創(chuàng)建itzhai-com-test-1
,4個隊列;broker-b
創(chuàng)建itzhai-com-test-1
,2個隊列。創(chuàng)建完成之后,Topic分片集群分布如下:
即:
可以發(fā)現(xiàn),RocketMQ是把Topic分片存儲到各個Broker節(jié)點中,然后在把Broker節(jié)點中的Topic繼續(xù)分片為若干等分的ConsumeQueue,從而提高消息的吞吐量。ConsumeQueue是作為負載均衡資源分配的基本單元。
這樣把Topic的消息分區(qū)到了不同的Broker上,從而增加了消息隊列的數(shù)量,從而能夠支持更塊的并發(fā)消費速度(只要有足夠的消費者)。
假設設置為通過Broker自動創(chuàng)建Topic(autoCreateTopicEnable=true),并且Producer端設置Topic消息隊列數(shù)量設置為4,也就是默認值:
producer.setDefaultTopicQueueNums(4);
嘗試往一個新的 topic itzhai-test-queue-1
連續(xù)發(fā)送10條消息,發(fā)送完畢之后,查看Topic狀態(tài):
我們可以發(fā)現(xiàn),在兩個broker上面都創(chuàng)建了itzhai-test-queue-a
,并且每個broker上的消息隊列數(shù)量都為4。怎么回事,我配置的明明是期望創(chuàng)建4個隊列,為什么加起來會變成了8個?如下圖所示:
由于時間關系,本文我們不會帶大家從源碼方面去解讀為啥會出現(xiàn)這種情況,接下來我們通過一種更加直觀的方式來驗證下這個問題:繼續(xù)做實驗。
我們繼續(xù)嘗試往一個新的 topic itzhai-test-queue-10
發(fā)送1條消息,注意,這一次不做并發(fā)發(fā)送了,只發(fā)送一條,發(fā)送完畢之后,查看Topic狀態(tài):
可以發(fā)現(xiàn),這次創(chuàng)建的消息隊列數(shù)量又是對的了,并且都是在broker-a上面創(chuàng)建的。接下來,無論怎么并發(fā)發(fā)送消息,消息隊列的數(shù)量都不會繼續(xù)增加了。
其實這也是并發(fā)請求Broker,觸發(fā)自動創(chuàng)建Topic的bug。
為了更加嚴格的管理Topic的創(chuàng)建和分片配置,一般在生產(chǎn)環(huán)境都是配置為手動創(chuàng)建Topic,通過提交運維工單申請創(chuàng)建Topic以及Topic的數(shù)據(jù)分配。
接下來我們來看看RocketMQ的特性。更多其他技術的底層架構內(nèi)幕分析,請訪問我的博客IT宅(itzhai.com)或者關注Java架構雜談公眾號。
RocketMQ中定義了如下三種消息通信的方式:
public enum CommunicationMode { SYNC, ASYNC, ONEWAY,}
SYNC
:同步發(fā)送,生產(chǎn)端會阻塞等待發(fā)送結果;ASYNC
:異步發(fā)送,生產(chǎn)端調(diào)用發(fā)送API之后,立刻返回,在拿到Broker的響應結果后,觸發(fā)對應的SendCallback回調(diào);ONEWAY
:單向發(fā)送,發(fā)送方只負責發(fā)送消息,不等待服務器回應且沒有回調(diào)函數(shù)觸發(fā),即只發(fā)送請求不等待應答。 此方式發(fā)送消息的過程耗時非常短,一般在微秒級別;SYNC和ASYNC關注發(fā)送結果,ONEWAY不關注發(fā)送結果。發(fā)送結果如下:
public enum SendStatus { SEND_OK, FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT, SLAVE_NOT_AVAILABLE,}
SEND_OK
:消息發(fā)送成功。SEND_OK并不意味著投遞是可靠的,要確保消息不丟失,需要開啟SYNC_MASTER同步或者SYNC_FLUSH同步寫;FLUSH_DISK_TIMEOUT
:消息發(fā)送成功,但是刷盤超時。如果Broker的flushDiskType=SYNC_FLUSH,并且5秒內(nèi)沒有完成消息的刷盤,則會返回這個狀態(tài);FLUSH_SLAVE_TIMEOUT
:消息發(fā)送成功,但是服務器同步到Slave時超時。如果Broker的brokerRole=SYNC_MASTER,并且5秒內(nèi)沒有完成同步,則會返回這個狀態(tài);SLAVE_NOT_AVAILABLE
:消息發(fā)送成功,但是無可用的Slave節(jié)點。如果Broker的brokerRole=SYNC_MASTER,但是沒有發(fā)現(xiàn)SLAVE節(jié)點或者SLAVE節(jié)點掛掉了,那么會返回這個狀態(tài)。源碼內(nèi)容更精彩,歡迎大家進一步閱讀源碼詳細了解消息發(fā)送的內(nèi)幕:
- 同步發(fā)送:org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)
- 異步發(fā)送:org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message, org.apache.rocketmq.client.producer.SendCallback)
- 單向發(fā)送:org.apache.rocketmq.client.producer.DefaultMQProducer#sendOneway(org.apache.rocketmq.common.message.Message)
消息的有序性指的是一類消息消費的時候,可以按照發(fā)送順序來消費,比如:在Java架構雜談
茶餐廳吃飯產(chǎn)生的消息:進入餐廳、點餐、下單、上菜、付款,消息要按照這個順序消費才有意義,但是多個顧客產(chǎn)生的消息是可以并行消費的。順序消費又分為全局順序消費和分區(qū)順序消費:
全局順序
:同一個Topic下的消息,所有消息按照嚴格的FIFO順序進行發(fā)布和消費。適用于:性能要求不高,所有消息嚴格按照FIFO進行發(fā)布和消費的場景;分區(qū)順序
:同一個Topic下,根據(jù)消息的特定業(yè)務ID進行sharding key分區(qū),同一個分區(qū)內(nèi)的消息按照嚴格的FIFO順序進行發(fā)布和消費。適用于:性能要求高,在同一個分區(qū)中嚴格按照FIFO進行發(fā)布和消費的場景。一般情況下,生產(chǎn)者是會以輪訓的方式把消息發(fā)送到Topic的消息隊列中的:
在同一個Queue里面,消息的順序性是可以得到保證的,但是如果一個Topic有多個Queue,以輪訓的方式投遞消息,那么就會導致消息亂序了。
為了保證消息的順序性,需要把保持順序性的消息投遞到同一個Queue中。
RocketMQ提供了MessageQueueSelector
接口,可以用來實現(xiàn)自定義的選擇投遞的消息隊列的算法:
for (int i = 0; i < orderList.size(); i++) { String content = "Hello itzhai.com. Java架構雜談," + new Date(); Message msg = new Message("topic-itzhai-com", tags[i % tags.length], "KEY" + i, content.getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List mqs, Message msg, Object arg) { Long orderId = (Long) arg; // 訂單號與消息隊列個數(shù)取模,保證讓同一個訂單號的消息落入同一個消息隊列 long index = orderId % mqs.size(); return mqs.get((int) index); } }, orderList.get(i).getOrderId()); System.out.printf("content: %s, sendResult: %s%n", content, sendResult);}
如上圖,我們實現(xiàn)了MessageQueueSelector
接口,并在實現(xiàn)的select方法里面,指定了選擇消息隊列的算法:訂單號與消息隊列個數(shù)取模,保證讓同一個訂單號的消息落入同一個消息隊列:
有個異常場景需要考慮:假設某一個Master節(jié)點掛掉了,導致Topic的消息隊列數(shù)量發(fā)生了變化,那么繼續(xù)使用以上的選擇算法,就會導致在這個過程中同一個訂單的消息會分散到不同的消息隊列里面,最終導致消息不能順序消費。
為了避免這種情況,只能選擇犧牲failover特性了。
現(xiàn)在投遞到消息隊列中的消息保證了順序,那如何保證消費也是順序的呢?
RocketMQ中提供了MessageListenerOrderly
,該對象用于有順序收異步傳遞的消息,一個隊列對應一個消費線程,使用方法如下:
consumer.registerMessageListener(new MessageListenerOrderly() { // 消費次數(shù),用于輔助模擬各種消費結果 AtomicLong consumeTimes = new AtomicLong(0); @Override public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); this.consumeTimes.incrementAndGet(); if ((this.consumeTimes.get() % 2) == 0) { return ConsumeOrderlyStatus.SUCCESS; } else if ((this.consumeTimes.get() % 3) == 0) { return ConsumeOrderlyStatus.ROLLBACK; } else if ((this.consumeTimes.get() % 4) == 0) { return ConsumeOrderlyStatus.COMMIT; } else if ((this.consumeTimes.get() % 5) == 0) { context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; }});
如果您使用的是MessageListenerConcurrently
,表示并發(fā)消費,為了保證消息消費的順序性,需要設置為單線程模式。
使用
MessageListenerOrderly
的問題:如果遇到某條消息消費失敗,并且無法跳過,那么消息隊列的消費進度就會停滯。
定時消費是指消息發(fā)送到Broker之后不會立即被消費,而是等待特定的時間之后才投遞到Topic中。定時消息會暫存在名為SCHEDULE_TOPIC_XXXX
的topic中,并根據(jù)delayTimeLevel存入特定的queue,queueId=delayTimeLevel-1,一個queue只存相同延遲的消息,保證具有相同延遲的消息能夠順序消費。比如,我們設置1秒后把消息投遞到topic-itzhai-com
topic,則存儲的文件目錄如下所示:
Broker會調(diào)度地消費SCHEDULE_TOPIC_XXXX,將消息寫入真實的topic。
定時消息的副作用:定時消息會在第一次寫入Topic和調(diào)度寫入實際的topic都會進行計數(shù),因此發(fā)送數(shù)量,tps都會變高。
使用延遲隊列的場景:提交了訂單之后,如果等待超過約定的時間還未支付,則把訂單設置為超時狀態(tài)。
RocketMQ提供了以下幾個固定的延遲級別:
public class MessageStoreConfig { ... // 10個level,level:1~18 private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; ...}
level = 0 表示不使用延遲消息。
另外,消息消費失敗也會進入延遲隊列,消息發(fā)送時間與設置的延遲級別和重試次數(shù)有關。
以下是發(fā)送延遲消息的代碼:
public class ScheduledMessageProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("TestProducerGroup"); producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); // 指定該消息在10秒后被消費者消費 message.setDelayTimeLevel(3); producer.send(message); } producer.shutdown(); }}
通過消息對系統(tǒng)進行解耦之后,勢必會遇到分布式系統(tǒng)數(shù)據(jù)完整性的問題。
我們可以通過以下手段解決分布式系統(tǒng)數(shù)據(jù)最終一致性問題:
2PC(Two-phase commit protocol)
,二階段提交,同步阻塞,效率低下,存在協(xié)調(diào)者單點故障問題,極端情況下存在數(shù)據(jù)不一致的風險。對應技術上的XA、JTA/JTS。這是分布式環(huán)境下事務處理的典型模式;3PC
,三階段提交,引入了參與者超時機制,增加了預提交階段,使得故障恢復之后協(xié)調(diào)者的決策復雜度降低,但整體的交互過程變得更長了,性能有所下降,仍舊會存在數(shù)據(jù)不一致的問題;Try - Confirm - Cancel
。對業(yè)務的侵入較大,和業(yè)務緊耦合,對于每一個操作都需要定義三個動作分別對應:Try - Confirm - Cancel
,將資源層的兩階段提交協(xié)議轉(zhuǎn)換到業(yè)務層,成為業(yè)務模型中的一部分;RocketMQ事務消息(Transactional Message)則是通過事務消息來實現(xiàn)分布式事務的最終一致性。下面看看RocketMQ是如何實現(xiàn)事務消息的。
如下圖:
事務消息有兩個流程:
補償階段主要用于解決消息的Commit或者Rollback發(fā)生超時或者失敗的情況。
half消息:并不是發(fā)送了一半的消息,而是指消息已經(jīng)發(fā)送到了MQ Server,但是該消息未收到生產(chǎn)者的二次確認,此時該消息暫時不能投遞到具體的ConsumeQueue中,這種狀態(tài)的消息稱為half消息。
發(fā)送到MQ Server的half消息對消費者是不可見的,為此,RocketMQ會先把half消息的Topic和Queue信息存儲到消息的屬性中,然后把該half消息投遞到一個專門的處理事務消息的隊列中:RMQ_SYS_TRANS_HALF_TOPIC
,由于消費者沒有訂閱該Topic,所以無法消息half類型的消息。
生產(chǎn)者執(zhí)行Commit half消息的時候,會存儲一條專門的Op消息,用于標識事務消息已確定的狀態(tài),如果一條事務消息還沒有對應的Op消息,說明這個事務的狀態(tài)還無法確定。RocketMQ會開啟一個定時任務,對于pending狀態(tài)的消息,會先向生產(chǎn)者發(fā)送回查事務狀態(tài)請求,根據(jù)事務狀態(tài)來決定是否提交或者回滾消息。
當消息被標記為Commit狀態(tài)之后,會把half消息的Topic和Queue相關屬性還原為原來的值,最終構建實際的消費索引(ConsumeQueue)。
RocketMQ并不會無休止的嘗試消息事務狀態(tài)回查,默認查找15次,超過了15次還是無法獲取事務狀態(tài),RocketMQ默認回滾該消息。并打印錯誤日志,可以通過重寫AbstractTransactionalMessageCheckListener類修改這個行為。
可以通過Broker的配置參數(shù):transactionCheckMax來修改此值。
如果消息發(fā)布方式是同步發(fā)送會重投,如果是異步發(fā)送會重試。
消息重投可以盡可能保證消息投遞成功,但是可能會造成消息重復。
什么情況會造成重復消費消息?
可以使用的消息重試策略:
retryTimesWhenSendFailed
:設置同步發(fā)送失敗的重投次數(shù),默認為2。所以生產(chǎn)者最多會嘗試發(fā)送retryTimesWhenSendFailed+1次。retryTimesWhenSendAsyncFailed
:設置異步發(fā)送失敗重試次數(shù),異步重試不會選擇其他Broker,不保證消息不丟失;retryAnotherBrokerWhenNotStoreOK
:消息刷盤(主或備)超時或slave不可用(返回狀態(tài)非SEND_OK),是否嘗試發(fā)送到其他broker,默認false。重要的消息可以開啟此選項。oneway發(fā)布方式不支持重投。
為了提高系統(tǒng)的吞吐量,提高發(fā)送效率,可以使用批量發(fā)送消息。
批量發(fā)送消息的限制:
發(fā)送批量消息的例子:
String topic = "itzhai-test-topic";List messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world itzhai.com 0".getBytes()));messages.add(new Message(topic, "TagA", "OrderID002", "Hello world itzhai.com 1".getBytes()));messages.add(new Message(topic, "TagA", "OrderID003", "Hello world itzhai.com 2".getBytes()));producer.send(messages);
如果發(fā)送的消息比較多,會增加復雜性,為此,可以對大消息進行拆分。以下是拆分的例子:
public class ListSplitter implements Iterator> { // 限制最大大小 private final int SIZE_LIMIT = 1024 * 1024 * 4; private final List messages; private int currIndex; public ListSplitter(List messages) { this.messages = messages; } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List next() { int startIndex = getStartIndex(); int nextIndex = startIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { Message message = messages.get(nextIndex); int tmpSize = calcMessageSize(message); if (tmpSize + totalSize > SIZE_LIMIT) { break; } else { totalSize += tmpSize; } } List subList = messages.subList(startIndex, nextIndex); currIndex = nextIndex; return subList; } private int getStartIndex() { Message currMessage = messages.get(currIndex); int tmpSize = calcMessageSize(currMessage); while(tmpSize > SIZE_LIMIT) { currIndex += 1; Message message = messages.get(curIndex); tmpSize = calcMessageSize(message); } return currIndex; } private int calcMessageSize(Message message) { int tmpSize = message.getTopic().length() + message.getBody().length(); Map properties = message.getProperties(); for (Map.Entry entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } tmpSize = tmpSize + 20; // Increase the log overhead by 20 bytes return tmpSize; }}// then you could split the large list into small ones:ListSplitter splitter = new ListSplitter(messages);while (splitter.hasNext()) { try { List listItem = splitter.next(); producer.send(listItem); } catch (Exception e) { e.printStackTrace(); // handle the error }}
RocketMQ的消費者可以根據(jù)Tag進行消息過濾來獲取自己感興趣的消息,也支持自定義屬性過濾。
Tags是Topic下的次級消息類型/二級類型(注:Tags也支持TagA || TagB
這樣的表達式),可以在同一個Topic下基于Tags進行消息過濾。
消息過濾是在Broker端實現(xiàn)的,減少了對Consumer無用消息的網(wǎng)絡傳輸,缺點是增加了Broker負擔,實現(xiàn)相對復雜。
消費端有兩周消費模型:集群消費和廣播消費。
集群消費模式下,相同Consumer Group的每個Consumer實例平均分攤消息。
廣播消費模式下,相同Consumer Group的每個Consumer實例都接收全量的消息。
RocketMQ會為每個消費組都設置一個Topic名稱為%RETRY%consumerGroupName
的重試隊列(這里需要注意的是,這個Topic的重試隊列是針對消費組,而不是針對每個Topic設置的),用于暫時保存因為各種異常而導致Consumer端無法消費的消息。
考慮到異?;謴推饋硇枰恍r間,會為重試隊列設置多個重試級別,每個重試級別都有與之對應的重新投遞延時,重試次數(shù)越多投遞延時就越大。
RocketMQ對于重試消息的處理是先保存至Topic名稱為SCHEDULE_TOPIC_XXXX
的延遲隊列中,后臺定時任務按照對應的時間進行Delay后重新保存至%RETRY%consumerGroupName
的重試隊列中。
比如,我們設置1秒后把消息投遞到topic-itzhai-com
topic,則存儲的文件目錄如下所示:
當一條消息初次消費失敗,消息隊列會自動進行消息重試;達到最大重試次數(shù)后,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息,此時,消息隊列不會立刻將消息丟棄,而是將其發(fā)送到該消費者對應的特殊隊列中。
RocketMQ將這種正常情況下無法被消費的消息稱為死信消息(Dead-Letter Message)
,將存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue)
。
在RocketMQ中,可以通過使用console控制臺對死信隊列中的消息進行重發(fā)來使得消費者實例再次進行消費。
由于RocketMQ是使用Java寫的,所以它的代碼特別適合拿來閱讀消遣,我們繼續(xù)來看看RocketMQ的源碼結構...
不不,還是算了,一下子又到周末晚上了,時間差不多了,今天就寫到這里了。有空再聊。
我精心整理了一份Redis寶典給大家,涵蓋了Redis的方方面面,面試官懂的里面有,面試官不懂的里面也有,有了它,不怕面試官連環(huán)問,就怕面試官一上來就問你Redis的Redo Log是干啥的?畢竟這種問題我也不會。
在Java架構雜談
公眾號發(fā)送Redis
關鍵字獲取pdf文件:
本文作者: arthinking
博客鏈接: https://www.itzhai.com/articles/deep-understanding-of-rocketmq.html
高并發(fā)異步解耦利器:RocketMQ究竟強在哪里?
版權聲明: 版權歸作者所有,未經(jīng)許可不得轉(zhuǎn)載,侵權必究!聯(lián)系作者請加公眾號。
apache/rocketmq. Retrieved from https://github.com/apache/rocketmq
文章版權歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/124531.html
摘要:熱點隨筆入門實現(xiàn)跨框架組件復用葡萄城技術團隊二工作三年的一些感悟百萬級大數(shù)據(jù)插入更新,支持多種數(shù)據(jù)庫果糖大數(shù)據(jù)科技被下屬罵,記一次矛盾升級有心無心,蝴蝶效應葉小釵中的鑒權授權正確方式包子推薦一款顏值逆天且功能齊全的開源工具鉑賽東開源免費圖熱點隨筆:·?Svelte入門——Web Components實現(xiàn)跨框架組件復用?(葡萄城技術團隊)·?(二)工作三年的一些感悟?(Craftsman-L)...
摘要:故事中的下屬們,就是消息生產(chǎn)者角色,屋子右面墻根那塊地就是消息持久化,呂秀才就是消息調(diào)度中心,而你就是消息消費者角色。下屬們匯報的消息,應該疊放在哪里,這個消息又應該在哪里才能找到,全靠呂秀才的驚人記憶力,才可以讓消息準確的被投放以及消費。 微信公眾號:IT一刻鐘大型現(xiàn)實非嚴肅主義現(xiàn)場一刻鐘與你分享優(yōu)質(zhì)技術架構與見聞,做一個有劇情的程序員關注可了解更多精彩內(nèi)容。問題或建議,請公眾號留言...
摘要:故事中的下屬們,就是消息生產(chǎn)者角色,屋子右面墻根那塊地就是消息持久化,呂秀才就是消息調(diào)度中心,而你就是消息消費者角色。下屬們匯報的消息,應該疊放在哪里,這個消息又應該在哪里才能找到,全靠呂秀才的驚人記憶力,才可以讓消息準確的被投放以及消費。 微信公眾號:IT一刻鐘大型現(xiàn)實非嚴肅主義現(xiàn)場一刻鐘與你分享優(yōu)質(zhì)技術架構與見聞,做一個有劇情的程序員關注可了解更多精彩內(nèi)容。問題或建議,請公眾號留言...
摘要:數(shù)量對吞吐量的影響可以達到幾百幾千個的級別,吞吐量會有小幅度的下降。這是的一大優(yōu)勢,可在同等數(shù)量機器下支撐大量的從幾十個到幾百個的時候,吞吐量會大幅下降。下一篇如何保證消息隊列的高可用 1.為什么使用消息隊列? (1)解耦:可以在多個系統(tǒng)之間進行解耦,將原本通過網(wǎng)絡之間的調(diào)用的方式改為使用MQ進行消息的異步通訊,只要該操作不是需要同步的,就可以改為使用MQ進行不同系統(tǒng)之間的聯(lián)系,這樣項目之間...
閱讀 3694·2021-11-23 09:51
閱讀 2018·2021-11-16 11:42
閱讀 3303·2021-11-08 13:20
閱讀 1116·2019-08-30 15:55
閱讀 2223·2019-08-30 10:59
閱讀 1263·2019-08-29 14:04
閱讀 1046·2019-08-29 12:41
閱讀 2093·2019-08-26 12:22