摘要:完全無狀態(tài),可集群部署與集群中的其中一個節(jié)點隨機選擇建立長連接,定期從取路由信息,并向提供服務(wù)的建立長連接,且定時向發(fā)送心跳。既可以從訂閱消息,也可以從訂閱消息,訂閱規(guī)則由配置決定。
問題列表:
Name Server 的作用是什么?
Name Server 存儲了Broker的什么信息?
Name Server 為Producer的提供些什么信息?
Name Server 為Consuner的提供些什么信息?
Name Server 作用Name Server在RocketMQ中猶如如它名字一樣,是提供Broker發(fā)現(xiàn)服務(wù)的.
Producer 與 Name Server 集群中的其中一個節(jié)點(隨機選擇)建立長連接,定期從 Name Server 取 Topic 路由信息,并向提供 Topic 服務(wù)的 Master 建立長連接,且定時向 Master 發(fā)送心跳。Producer 完全無狀態(tài),可集群部署
Consumer 與 Name Server 集群中的其中一個節(jié)點(隨機選擇)建立長連接,定期從 Name Server 取 Topic 路由信息,并向提供 Topic 服務(wù)的 Master、Slave 建立長連接,且定時向 Master、Slave 發(fā)送心跳。Consumer 既可以從 Master 訂閱消息,也可以從 Slave 訂閱消息,訂閱規(guī)則由 Broker 配置決定。
Name Server 存儲了Broker的什么信息?RouteInfoManager
//主題信息 private final HashMap> topicQueueTable; //broker信息 private final HashMap brokerAddrTable; //集群信息 private final HashMap > clusterAddrTable; //活躍broker信息 private final HashMap brokerLiveTable; //過濾器信息 private final HashMap /* Filter Server */> filterServerTable;
我們注意到保存broker的Map有兩個,即brokerAddrTable用來保存所有的broker列表和brokerLiveTable用來保存當(dāng)前活躍的broker列表,而BrokerData用來保存broker的主要新增,而BrokerLiveInfo只用來保存上次更新(心跳)時間,我們可以直接看看RouteInfoManager中掃描非活躍broker的方法:
public void scanNotActiveBroker() { Iterator> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } }
這個方法由在initialize的定時線程池加載,每十秒執(zhí)行一次.可以看出,如果兩分鐘內(nèi)都沒收到一個broker的心跳數(shù)據(jù),則直接將其從brokerLiveTable中移除,注意,這還會導(dǎo)致該broker從brokerAddrTable被刪除,當(dāng)然,如果該broker是Master,則它的所有Slave的broker都將被刪除。具體細(xì)節(jié)可以參看RouteInfoManager的onChannelDestroy方法.
Name Server 為Producer的提供些什么信息?HashMap> topicQueueTable;
private String brokerName; // broker的名稱 private int readQueueNums; // 讀隊列數(shù)量 private int writeQueueNums; // 寫隊列數(shù)量 private int perm; // 讀寫權(quán)限 private int topicSynFlag; // 同步復(fù)制還是異步復(fù)制標(biāo)記
NameServer 維護(hù)了key為topic,List
RouteInfoManager.pickupTopicRouteData
public TopicRouteData pickupTopicRouteData(final String topic) { TopicRouteData topicRouteData = new TopicRouteData(); boolean foundQueueData = false; boolean foundBrokerData = false; SetName Server 為Consuner的提供些什么信息?brokerNameSet = new HashSet (); List brokerDataList = new LinkedList (); topicRouteData.setBrokerDatas(brokerDataList); HashMap > filterServerMap = new HashMap >(); topicRouteData.setFilterServerTable(filterServerMap); try { try { this.lock.readLock().lockInterruptibly(); //根據(jù)topic獲取QueueData信息 List queueDataList = this.topicQueueTable.get(topic); if (queueDataList != null) { topicRouteData.setQueueDatas(queueDataList); foundQueueData = true; Iterator it = queueDataList.iterator(); while (it.hasNext()) { QueueData qd = it.next(); brokerNameSet.add(qd.getBrokerName()); } for (String brokerName : brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null != brokerData) { //根據(jù)broker名稱獲取其地址信息 BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap ) brokerData .getBrokerAddrs().clone()); brokerDataList.add(brokerDataClone); foundBrokerData = true; for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) { List filterServerList = this.filterServerTable.get(brokerAddr); filterServerMap.put(brokerAddr, filterServerList); } } } } } finally { this.lock.readLock().unlock(); } } catch (Exception e) { log.error("pickupTopicRouteData Exception", e); } if (log.isDebugEnabled()) { log.debug("pickupTopicRouteData {} {}", topic, topicRouteData); } if (foundBrokerData && foundQueueData) { return topicRouteData; } return null; }
Consumer需要哪些信息?
1. Consumer需要的topic的broker信息 2. 每一個consumer group都有哪些consumer,對應(yīng)的topic是誰
答
1.如上節(jié)所述 2.此信息保存在Broker中總結(jié)
Name Server比較簡單,如同一個簡單的web服務(wù),提供配置信息,只不過CRUD的不是數(shù)據(jù)庫而是json文件.
此次RocketMQ學(xué)習(xí)就告一段落了,只描述了我比較關(guān)心的流程,很多細(xì)節(jié)沒能涉及到,有時間再寫吧,如有疑問和錯誤請在評論中指出,thx!
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/70362.html
摘要:每個與集群中的所有節(jié)點建立長連接,定時注冊信息到所有。完全無狀態(tài),可集群部署。本系列源碼解析主要參照原理簡介來追尋其代碼實現(xiàn)雖然版本不太一致但這也是能找到的最詳細(xì)的資料了接下來根據(jù)其模塊來源碼閱讀目錄如下 為什么選擇讀RocketMQ? 對MQ的理解一直不深,上周看了,還是覺得不夠深入,找個成熟的產(chǎn)品來學(xué)習(xí)吧,RabbitMQ是erLang寫的,Kafka是Scala寫的,非Java寫...
摘要:前提好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲抱歉了。熟悉我的人都知道我寫博客的時間比較早,而且堅持的時間也比較久,一直到現(xiàn)在也是一直保持著更新狀態(tài)。 showImg(https://segmentfault.com/img/remote/1460000014076586?w=1920&h=1080); 前提 好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲:抱歉了!。自己這段時...
摘要:但是服務(wù)器又確實是收到了這條消息的,只是給客戶端的響應(yīng)丟失了,所以導(dǎo)致的結(jié)果就是扣款失敗,成功發(fā)貨。所有的半消息都會寫在為的半消息隊列里,并且每條半消息,在整個鏈路里會被寫多次,如果并發(fā)很大且大部分消息都是事務(wù)消息的話,可靠性會存在問題。 前言 得益于MQ削峰填谷,系統(tǒng)解耦,操作異步等功能特性,在互聯(lián)網(wǎng)行業(yè),可以說有分布式服務(wù)的地方,MQ都往往不會缺席。由阿里自研的RocketMQ更是...
摘要:但是服務(wù)器又確實是收到了這條消息的,只是給客戶端的響應(yīng)丟失了,所以導(dǎo)致的結(jié)果就是扣款失敗,成功發(fā)貨。既然消息的發(fā)送不能和本地事務(wù)寫在一起,那如何來保證其整體具有原子性的需求呢答案就是今天我們介紹的主角事務(wù)消息。 前言 得益于MQ削峰填谷,系統(tǒng)解耦,操作異步等功能特性,在互聯(lián)網(wǎng)行業(yè),可以說有分布式服務(wù)的地方,MQ都往往不會缺席。由阿里自研的RocketMQ更是經(jīng)歷了多年的雙十一高并發(fā)挑戰(zhàn)...
閱讀 673·2021-11-24 09:39
閱讀 2342·2021-11-22 13:54
閱讀 2210·2021-09-23 11:46
閱讀 3254·2019-08-30 15:55
閱讀 2690·2019-08-30 15:54
閱讀 2414·2019-08-30 14:18
閱讀 1554·2019-08-29 14:15
閱讀 2743·2019-08-29 13:49