摘要:微信公眾號后端進階,專注后端技術(shù)分享框架分布式中間件服務(wù)治理等等。
微信公眾號「后端進階」,專注后端技術(shù)分享:Java、Golang、WEB框架、分布式中間件、服務(wù)治理等等。
前段時間有個朋友向我提了一個問題,他說在搭建 RocketMQ 集群過程中遇到了關(guān)于消費訂閱的問題,具體問題如下:
然后他發(fā)了報錯的日志給我看:
the consumer"s subscription not exist
我第一時間在源碼里找到了報錯的位置:
org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest:
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic()); if (null == subscriptionData) { log.warn("the consumer"s subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); response.setRemark("the consumer"s subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); return response; }
此處源碼是將該 Topic 的訂閱信息找出來,然而這里卻沒找到,所以報了消費訂閱不存在的錯誤。
朋友還跟我講了他的消費集群中,每個消費者訂閱了自己的 Topic,他的消費組中 有 c1 和 c2 消費者,c1 訂閱了 topicA,而 c2 訂閱了 topicB。
這時我已經(jīng)知道什么原因了,我先說一下消費者的訂閱信息在 broker 中是以 group 來分組的,數(shù)據(jù)結(jié)構(gòu)如下:
org.apache.rocketmq.broker.client.ConsumerManager:
private final ConcurrentMapconsumerTable = new ConcurrentHashMap (1024);
這意味著集群中的每個消費者在向 broker 注冊訂閱信息的時候相互覆蓋掉對方的訂閱信息了,這也是為什么同一個消費組應(yīng)該擁有完全一樣的訂閱關(guān)系的原因,而朋友在同一個消費組的每個消費者訂閱關(guān)系都不一樣,就出現(xiàn)了訂閱信息相互覆蓋的問題。
可是朋友這時又有疑惑了,他覺得每個消費者訂閱自己的主題,貌似沒問題啊,邏輯上也行的通,他不明白為什么 RocketMQ 不允許這樣做,于是秉承著老司機的職業(yè)素養(yǎng),下面我會從源碼的角度深度分析 RocketMQ 消費訂閱注冊,消息拉取,消息隊列負載與重新分布機制,讓大家徹底弄清 RocketMQ 消費訂閱機制。
消費者訂閱信息注冊消費者在啟動時會向所有 broker 注冊訂閱信息,并啟動心跳機制,定時更新訂閱信息,每個消費者都有一個 MQClientInstance,消費者啟動時會啟動這個類,啟動方法中會啟動一些列定時任務(wù),其中:
org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.cleanOfflineBroker(); MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
上面是向集群內(nèi)所有 broker 發(fā)送訂閱心跳信息的定時任務(wù),源碼繼續(xù)跟進去,發(fā)現(xiàn)會給集群中的每個 broker 都發(fā)送自己的 HeartbeatData,HeartbeatData 即是每個客戶端的心跳數(shù)據(jù),它包含了如下數(shù)據(jù):
// 客戶端ID private String clientID; // 生產(chǎn)者信息 private SetproducerDataSet = new HashSet (); // 消費者信息 private Set consumerDataSet = new HashSet ();
其中消費者信息包含了客戶端訂閱的主題信息。
我們繼續(xù)看看 broker 如何處理 HeartbeatData 數(shù)據(jù),客戶端發(fā)送 HeartbeatData 時的請求類型為 HEART_BEAT,我們直接找到 broker 處理 HEART_BEAT 請求類型的邏輯:
org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat:
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) { RemotingCommand response = RemotingCommand.createResponseCommand(null); // 解碼,獲取 HeartbeatData HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); ClientChannelInfo clientChannelInfo = new ClientChannelInfo( ctx.channel(), heartbeatData.getClientID(), request.getLanguage(), request.getVersion() ); // 循環(huán)注冊消費者訂閱信息 for (ConsumerData data : heartbeatData.getConsumerDataSet()) { // 按消費組獲取訂閱配置信息 SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig( data.getGroupName()); boolean isNotifyConsumerIdsChangedEnable = true; if (null != subscriptionGroupConfig) { isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable(); int topicSysFlag = 0; if (data.isUnitMode()) { topicSysFlag = TopicSysFlag.buildSysFlag(false, true); } String newTopic = MixAll.getRetryTopic(data.getGroupName()); this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( newTopic, subscriptionGroupConfig.getRetryQueueNums(), PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); } // 注冊消費者訂閱信息 boolean changed = this.brokerController.getConsumerManager().registerConsumer( data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(), data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable ); // ... response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
在這里我們可以看到,broker 收到 HEART_BEAT 請求后,將請求數(shù)據(jù)解壓獲取 HeartbeatData,根據(jù) HeartbeatData 里面的消費訂閱信息,循環(huán)進行注冊:
org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer:
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, final SetsubList, boolean isNotifyConsumerIdsChangedEnable) { // 獲取消費組內(nèi)的消費者信息 ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); // 如果消費組的消費者信息為空,則新建一個 if (null == consumerGroupInfo) { ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere); ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); consumerGroupInfo = prev != null ? prev : tmp; } boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere); // 更新訂閱信息,訂閱信息是按照消費組存放的,因此這步驟就會導(dǎo)致同一個消費組內(nèi)的各個消費者客戶端的訂閱信息相互被覆蓋 boolean r2 = consumerGroupInfo.updateSubscription(subList); if (r1 || r2) { if (isNotifyConsumerIdsChangedEnable) { this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } } this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList); return r1 || r2; }
這步驟是 broker 更新消費者訂閱信息的核心方法,如果消費組的消費者信息 ConsumerGroupInfo 為空,則新建一個,從名字可知道,訂閱信息是按照消費組進行存放的,因此在更新訂閱信息時,訂閱信息是按照消費組存放的,這步驟就會導(dǎo)致同一個消費組內(nèi)的各個消費者客戶端的訂閱信息相互被覆蓋。
消息拉取在 MQClientInstance 啟動時,會啟動一條線程來處理消息拉取任務(wù):
org.apache.rocketmq.client.impl.factory.MQClientInstance#start:
// Start pull service this.pullMessageService.start();
pullMessageService 繼承了 ServiceThread,而 ServiceThread 實現(xiàn)了 Runnable 接口,它的 run 方法實現(xiàn)如下:
org.apache.rocketmq.client.impl.consumer.PullMessageService#run:
@Override public void run() { while (!this.isStopped()) { try { // 從 pullRequestQueue 中獲取拉取消息請求對象 PullRequest pullRequest = this.pullRequestQueue.take(); // 執(zhí)行消息拉取 this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } }
消費端拿到 PullRequest 對象進行拉取消息,pullRequestQueue 是一個阻塞隊列,如果 pullRequest 數(shù)據(jù)為空,執(zhí)行 take() 方法會一直阻塞,直到有新的 pullRequest 拉取任務(wù)進來,這里是一個很關(guān)鍵的步驟,你可能會想,pullRequest 什么時候被創(chuàng)建然后放入 pullRequestQueue?pullRequest 它是在RebalanceImpl 中創(chuàng)建,它是 RocketMQ 消息隊列負載與重新分布機制的實現(xiàn)。
消息隊列負載與重新分布從上面消息拉取源碼分析可知,pullMessageService 啟動時由于 pullRequestQueue 中沒有 pullRequest 對象,會一直阻塞,而在 MQClientInstance 啟動時,同樣會啟動一條線程來處理消息隊列負載與重新分布任務(wù):
org.apache.rocketmq.client.impl.factory.MQClientInstance#start:
// Start rebalance service this.rebalanceService.start();
rebalanceService 同樣繼承了 ServiceThread,它的 run 方法如下:
@Override public void run() { while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } }
繼續(xù)跟進去:
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance:
public void doRebalance(final boolean isOrder) { // 獲取消費者所有訂閱信息 MapsubTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry entry : subTable.entrySet()) { final String topic = entry.getKey(); try { // 消息隊列負載與重新分布 this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); }
這里主要是獲取客戶端訂閱的主題,并根據(jù)主題進行消息隊列負載與重新分布,subTable 存儲了消費者的訂閱信息,消費者進行消息訂閱時會填充到里面,我們接著往下:
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic:
SetmqSet = this.topicSubscribeInfoTable.get(topic); List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
rebalanceByTopic 方法是實現(xiàn) Consumer 端負載均衡的核心,我們這里以集群模式的消息隊列負載與重新分布,首先從 topicSubscribeInfoTable 中獲取訂閱主題的隊列信息,接著隨機從集群中的一個 broker 中獲取消費組內(nèi)某個 topic 的訂閱客戶端 ID 列表,這里需要注意的是,為什么從集群內(nèi)任意一個 broker 就可以獲取訂閱客戶端信息呢?前面的分析也說了,消費者客戶端啟動時會啟動一個線程,向所有 broker 發(fā)送心跳包。
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic:
// 如果 主題訂閱信息mqSet和主題訂閱客戶端不為空,就執(zhí)行消息隊列負載與重新分布 if (mqSet != null && cidAll != null) { ListmqAll = new ArrayList (); mqAll.addAll(mqSet); // 排序,確保每個消息隊列只分配一個消費者 Collections.sort(mqAll); Collections.sort(cidAll); // 消息隊列分配算法 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; // 執(zhí)行算法,并得到隊列重新分配后的結(jié)果對象allocateResult List allocateResult = null; try { allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); return; } // ... }
以上是消息負載均衡的核心邏輯,RocketMQ 本身提供了 5 種負載算法,默認(rèn)使用 AllocateMessageQueueAveragely 平均分配算法,它分配算法特點如下:
假設(shè)有消費組 g1,有消費者 c1 和 c2,c1 訂閱了 topicA,c2 訂閱了 topicB,集群內(nèi)有 broker1 和broker2,假設(shè) topicA 有 8 個消息隊列,broker_a(q0/q1/q2/q3) 和 broker_b(q0/q1/q2/q3),前面我們知道 findConsumerIdList 方法會獲取消費組內(nèi)所有消費者客戶端 ID,topicA 經(jīng)過平均分配算法進行分配之后的消費情況如下:
c1:broker_a(q0/q1/q2/q3)
c2:broker_b(q0/q1/q2/q3)
問題就出現(xiàn)在這里,c2 根本沒有訂閱 topicA,但根據(jù)分配算法,卻要加上 c2 進行分配,這樣就會導(dǎo)致這種情況有一半的消息被分配到 c2 進行消費,被分配到 c2 的消息隊列會延遲十幾秒甚至更久才會被消費,topicB 同理。
下面我用圖表示 topicA 和 topicB 經(jīng)過 rebalance 之后的消費情況:
至于為什么會報 the consumer"s subscription not exist,我們繼續(xù)往下擼:
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic:
if (mqSet != null && cidAll != null) { // ... SetallocateResultSet = new HashSet (); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } // 用戶重新分配后的結(jié)果allocateResult來更新當(dāng)前消費者負載的消息隊列緩存表processQueueTable,并生成 pullRequestList 放入 pullRequestQueue 阻塞隊列中 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); this.messageQueueChanged(topic, mqSet, allocateResultSet); } }
以上代碼邏輯主要是拿 mqSet 和 cidAll 進行消息隊列負載與重新分布,得到結(jié)果 allocateResult,它是一個 MessageQueue 列表,接著用 allocateResult 更新消費者負載的消息隊列緩存表 processQueueTable,生成 pullRequestList 放入 pullRequestQueue 阻塞隊列中:
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:
ListpullRequestList = new ArrayList (); // 循環(huán)執(zhí)行,將mqSet訂閱數(shù)據(jù)封裝成PullRequest對象,并添加到pullRequestList中 for (MessageQueue mq : mqSet) { // 如果緩存列表不存在該訂閱信息,說明這次消息隊列重新分配后新增加的消息隊列 if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } } // 將pullRequestList添加到PullMessageService中的pullRequestQueue阻塞隊列中,以喚醒PullMessageService線程執(zhí)行消息拉取 this.dispatchPullRequest(pullRequestList);
前面我們講到消息拉取是從 pullRequestQueue 阻塞隊列中拿 pullRequest 執(zhí)行拉取的,以上方法就是創(chuàng)建 pullRequest 的地方。
源碼分析到這里,就可以弄清楚為什么會報 the consumer"s subscription not exist 這個錯誤了:
假設(shè)有消費者組 g1,g1下有消費者 c1 和消費者 c2,c1 訂閱了 topicA,c2 訂閱了 topicB,此時c2 先啟動,將 g1 的訂閱信息更新為 topicB,c1 隨后啟動,將 g1 的訂閱信息覆蓋為 topicA,c1 的 Rebalance 負載將 topicA 的 pullRequest 添加到 pullRequestQueue 中,而恰好此時 c2 心跳包又將 g1 的訂閱信息更新為 topicB,那么此時 c1 的 PullMessageService 線程拿到 pullRequestQueue 中 topicA 的 pullRequest 進行消息拉取,然而在 broker 端找不到消費者組 g1 下 topicA 的訂閱信息(因為此時恰好被 c2 心跳包給覆蓋了),就會報消費者訂閱信息不存在的錯誤了。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/75680.html
摘要:和之間的關(guān)系通過來綁定,來定義,即相同的,等于表示節(jié)點,非表示節(jié)點。所有的節(jié)點與集群的所有節(jié)點保持長連接,定時注冊信息到所有的。對磁盤的訪問串行化,避免磁盤竟?fàn)?,不會因為隊列增加?dǎo)致增高。要保證與完全的一致,增加了編程的復(fù)雜度。 Apache RocketMQ?是一個開源的分布式消息和流數(shù)據(jù)平臺。 1、既然是消息系統(tǒng),最核心的功能就是要提供消息的發(fā)布與訂閱功能,最簡單的概念模型如下: ...
摘要:通過以上分析我們可以得出消息隊列具有很好的削峰作用的功能即通過異步處理,將短時間高并發(fā)產(chǎn)生的事務(wù)消息存儲在消息隊列中,從而削平高峰期的并發(fā)事務(wù)。 該文已加入開源項目:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識的文檔類項目,Star 數(shù)接近 16k)。地址:https://github.com/Snailclimb... 本文內(nèi)容思維導(dǎo)圖:showImg(ht...
摘要:它是阿里巴巴于年開源的第三代分布式消息中間件。是一個分布式消息中間件,具有低延遲高性能和可靠性萬億級別的容量和靈活的可擴展性,它是阿里巴巴于年開源的第三代分布式消息中間件。上篇文章消息隊列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊列的發(fā)展史:并且詳細介紹了RabbitMQ,其功能也是挺強大的,那么,為啥又要搞一個RocketMQ出來呢?是重復(fù)造輪子嗎?本文我們就帶大家來詳...
閱讀 3371·2021-11-04 16:10
閱讀 3871·2021-09-29 09:43
閱讀 2706·2021-09-24 10:24
閱讀 3370·2021-09-01 10:46
閱讀 2518·2019-08-30 15:54
閱讀 602·2019-08-30 13:19
閱讀 3244·2019-08-29 17:19
閱讀 1065·2019-08-29 16:40