首先,造成這個(gè)問(wèn)題的 BUG RocketMQ 官方已經(jīng)在 3月16號(hào)這個(gè)提交中修復(fù)了,這里只是探討一下在修復(fù)之前造成問(wèn)題的具體細(xì)節(jié),更多的上下文可以參考我之前寫的 《RocketMQ Consumer 啟動(dòng)時(shí)都干了些啥?》 ,這篇文章講解了 RocketMQ 的 Consumer 啟動(dòng)之后都做了哪些操作,對(duì)理解本次要講解的 BUG 有一定的幫助。

其中講到了:

重復(fù)消費(fèi)自不必說(shuō),你 ClientID 都相同了。本篇著重聊聊為什么會(huì)消息堆積

文章中講到,初始化 Consumer 時(shí),會(huì)初始化 Rebalance 的策略。你可以大致將 Rebalance 策略理解為如何將一個(gè) Topic 下的 m 個(gè) MessageQueue 分配給一個(gè) ConsumerGroup 下的 n 個(gè) Consumer 實(shí)例的策略,看著有些繞,其實(shí)就長(zhǎng)這樣:

而從 Consumer 初始化的源碼中可以看出,默認(rèn)情況下 Consumer 采取的 Rebalance 策略是 AllocateMessageQueueAverage()

默認(rèn)的策略很好理解,將 MessageQueue 平均的分配給 Consumer。舉個(gè)例子,假設(shè)有 8 個(gè) MessageQueue,2 個(gè) Consumer,那么每個(gè) Consumer 就會(huì)被分配到 4 個(gè) MessageQueue。

那如果分配不均勻怎么辦?例如只有 7 個(gè) MessageQueue,但是 Consumer 仍然是 2 個(gè)。此時(shí) RocketMQ 會(huì)將多出來(lái)的部分,對(duì)已經(jīng)排好序的 Consumer 再做平均分配,一個(gè)一個(gè)分發(fā)給 Consumer,直到分發(fā)完。例如剛剛說(shuō)的 7 個(gè) MessageQueue 和 2 個(gè) ConsumerGroup 這種 case,排在第一個(gè)的 Consumer 就會(huì)被分配到 4 個(gè) MessageQueue,而第二個(gè)會(huì)被分配到 3 個(gè) MessageQueue。

大家可以先理解一下 AllocateMessageQueueAveragely 的實(shí)現(xiàn),作為默認(rèn)的 Rebalance 的策略,其實(shí)現(xiàn)位于這里:

接下來(lái)我們看看,AllocateMessageQueueAveragely 內(nèi)部具體都做了哪些事情。

其核心其實(shí)就是實(shí)現(xiàn)的 AllocateMessageQueueStrategy 接口中的 allocate 方法。實(shí)際上,RocketMQ 對(duì)該接口總共有 5 種實(shí)現(xiàn):

  • AllocateMachineRoomNearby
  • AllocateMessageQueueAveragely
  • AllocateMessageQueueAveragelyByCircle
  • AllocateMessageQueueByConfig
  • AllocateMessageQueueByMachineRoom
  • AllocateMessageQueueConsistentHash

其默認(rèn)的 AllocateMessageQueueAveragely 只是其中的一種實(shí)現(xiàn)而已,那執(zhí)行 allocate 它需要什么參數(shù)呢?

需要以下四個(gè):

  • ConsumerGroup 消費(fèi)者組的名字
  • currentCID 當(dāng)前消費(fèi)者的 clientID
  • mqAll 當(dāng)前 ConsumerGroup 所消費(fèi)的 Topic 下的所有的 MessageQueue
  • cidAll 當(dāng)前 ConsumerGroup 下所有消費(fèi)者的 ClientID

實(shí)際上是將某個(gè) Topic 下的所有 MessageQueue 分配給屬于同一個(gè)消費(fèi)者的所有消費(fèi)者實(shí)例,粒度是 By Topic 的。

所以到這里剩下的事情就很簡(jiǎn)單了,無(wú)非就是怎么樣把這一堆 MessageQueue 分配給這一堆 Consumer。這個(gè)怎么樣,就對(duì)應(yīng)了 AllocateMessageQueueStrategy 的不同實(shí)現(xiàn)。

接下來(lái)我們就來(lái)看看 AllocateMessageQueueAveragely 是如何對(duì) MessageQueue 進(jìn)行分配的,之前講源碼我一般都會(huì)一步一步的來(lái),結(jié)合源碼跟圖,但是這個(gè)源碼太短了,我就直接先給出來(lái)吧。

public List allocate(String consumerGroup, String currentCID, List mqAll, List cidAll) {  if (currentCID == null || currentCID.length() < 1) {    throw new IllegalArgumentException("currentCID is empty");  }  if (mqAll == null || mqAll.isEmpty()) {    throw new IllegalArgumentException("mqAll is null or mqAll empty");  }  if (cidAll == null || cidAll.isEmpty()) {    throw new IllegalArgumentException("cidAll is null or cidAll empty");  }  List result = new ArrayList();  // 判斷一下當(dāng)前的客戶端是否在 cidAll 的集合當(dāng)中  if (!cidAll.contains(currentCID)) {    log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",             consumerGroup,             currentCID,             cidAll);    return result;  }  // 拿到當(dāng)前消費(fèi)者在所有的消費(fèi)者實(shí)例數(shù)組中的位置  int index = cidAll.indexOf(currentCID);  // 用 messageQueue 的數(shù)量 對(duì) 消費(fèi)者實(shí)例的數(shù)量取余數(shù), 這個(gè)實(shí)際上就把不夠均勻分的 MessageQueue 的數(shù)量算出來(lái)了  // 舉個(gè)例子, 12 個(gè) MessageQueue, 有 5 個(gè) Consumer, 12 % 5 = 2   int mod = mqAll.size() % cidAll.size();  int averageSize =    mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());  int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;  int range = Math.min(averageSize, mqAll.size() - startIndex);  for (int i = 0; i < range; i++) {    result.add(mqAll.get((startIndex + i) % mqAll.size()));  }  return result;}

其實(shí)前半部分都是些常規(guī)的 check,可以忽略不看,從這里:

int index = cidAll.indexOf(currentCID);

開始,才是核心邏輯。為了避免邏輯混亂,還是假設(shè)有 12 個(gè) MessageQueue,5 個(gè) Consumer,同時(shí)假設(shè) index=0 。

那么 mod 的值就為 12 % 5 = 2 了。

averageSize 的值,稍微有點(diǎn)繞。如果 MessageQueue 的數(shù)量比消費(fèi)者的數(shù)量還少,那么就為 1 ;否則,就走這一堆邏輯(mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size())。我們 index 是 0,而 mod 是 2,index < mod 則是成立的,那么最終 averageSize 的值就為 12 / 5 + 1 = 3。

接下來(lái)是 startIndex,由于這個(gè)三元運(yùn)算符的條件是成立的,所以其值為 0 * 3 ,就為 0。

看了一大堆邏輯,是不是已經(jīng)暈了?直接舉實(shí)例:

12 個(gè) Message Queue

5 個(gè) Consumer 實(shí)例

按照上面的分法:

排在第 1 的消費(fèi)者 分到 3 個(gè)

排在第 2 的消費(fèi)者 分到 3 個(gè)

排在第 3 的消費(fèi)者 分到 2 個(gè)

排在第 4 的消費(fèi)者 分到 2 個(gè)

排在第 5 的消費(fèi)者 分到 2 個(gè)

所以,你可以大致認(rèn)為:

先“均分”,12 / 5 取整為 2。然后“均分”完之后還剩下 2 個(gè),那么就從上往下,挨個(gè)再分配,這樣第 1、第 2 個(gè)消費(fèi)者就會(huì)被多分到 1 個(gè)。

所以如果有 13 個(gè) MessageQueue,5 個(gè) Consumer,那么第 1、第 2、第 3 就會(huì)被分配 3 個(gè)。

但并不準(zhǔn)確,因?yàn)榉峙涞?MessageQueue 是一次性的,例如那 3 個(gè) MessageQueue 是一次性獲取的,不會(huì)先給 2 個(gè),再給 1 個(gè)。

而我們開篇提到的 Consumer 的 ClientID 相同,會(huì)造成什么?

當(dāng)然是 index 的值相同,進(jìn)而造成 mod、averageSize、startIndexrange 全部相同。那么最后 result.add(mqAll.get((startIndex + i) % mqAll.size())); 時(shí),本來(lái)不同的 Consumer,會(huì)取到相同的 MessageQueue(舉個(gè)例子,Consumer 1 和 Consumer 2 都取到了前 3 個(gè) MessageQueue),從而造成有些 MessageQueue(如果有的話) 沒(méi)有 Consumer 對(duì)其消費(fèi),而沒(méi)有被消費(fèi),消息也在不停的投遞進(jìn)來(lái),就會(huì)造成消息的大量堆積

當(dāng)然,現(xiàn)在的新版本從代碼上看已經(jīng)修復(fù)這個(gè)問(wèn)題了,這個(gè)只是對(duì)之前的版本的原因做一個(gè)探索。

本篇文章已放到我的 Github github.com/sh-blog 中,歡迎 Star。微信搜索關(guān)注【SH的全棧筆記】,回復(fù)【隊(duì)列】獲取MQ學(xué)習(xí)資料,包含基礎(chǔ)概念解析和RocketMQ詳細(xì)的源碼解析,持續(xù)更新中。

如果你覺(jué)得這篇文章對(duì)你有幫助,還麻煩點(diǎn)個(gè)贊關(guān)個(gè)注,分個(gè)享,留個(gè)言。