成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

RocketMQ源碼學(xué)習(xí)(六)-Name Server

Joyven / 2436人閱讀

摘要:完全無狀態(tài),可集群部署與集群中的其中一個節(jié)點隨機選擇建立長連接,定期從取路由信息,并向提供服務(wù)的建立長連接,且定時向發(fā)送心跳。既可以從訂閱消息,也可以從訂閱消息,訂閱規(guī)則由配置決定。

問題列表:

Name Server 的作用是什么?

Name Server 存儲了Broker的什么信息?

Name Server 為Producer的提供些什么信息?

Name Server 為Consuner的提供些什么信息?

Name Server 作用

Name Server在RocketMQ中猶如如它名字一樣,是提供Broker發(fā)現(xiàn)服務(wù)的.

Producer 與 Name Server 集群中的其中一個節(jié)點(隨機選擇)建立長連接,定期從 Name Server 取 Topic 路由信息,并向提供 Topic 服務(wù)的 Master 建立長連接,且定時向 Master 發(fā)送心跳。Producer 完全無狀態(tài),可集群部署

Consumer 與 Name Server 集群中的其中一個節(jié)點(隨機選擇)建立長連接,定期從 Name Server 取 Topic 路由信息,并向提供 Topic 服務(wù)的 Master、Slave 建立長連接,且定時向 Master、Slave 發(fā)送心跳。Consumer 既可以從 Master 訂閱消息,也可以從 Slave 訂閱消息,訂閱規(guī)則由 Broker 配置決定。

Name Server 存儲了Broker的什么信息?

RouteInfoManager

//主題信息
 private final HashMap> topicQueueTable;
//broker信息
    private final HashMap brokerAddrTable;
//集群信息
    private final HashMap> clusterAddrTable;
//活躍broker信息
    private final HashMap brokerLiveTable;
//過濾器信息
    private final HashMap/* Filter Server */> filterServerTable;

我們注意到保存broker的Map有兩個,即brokerAddrTable用來保存所有的broker列表和brokerLiveTable用來保存當(dāng)前活躍的broker列表,而BrokerData用來保存broker的主要新增,而BrokerLiveInfo只用來保存上次更新(心跳)時間,我們可以直接看看RouteInfoManager中掃描非活躍broker的方法:

    public void scanNotActiveBroker() {
        Iterator> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
            }
        }
    }

這個方法由在initialize的定時線程池加載,每十秒執(zhí)行一次.可以看出,如果兩分鐘內(nèi)都沒收到一個broker的心跳數(shù)據(jù),則直接將其從brokerLiveTable中移除,注意,這還會導(dǎo)致該broker從brokerAddrTable被刪除,當(dāng)然,如果該broker是Master,則它的所有Slave的broker都將被刪除。具體細(xì)節(jié)可以參看RouteInfoManager的onChannelDestroy方法.

Name Server 為Producer的提供些什么信息?
    HashMap> topicQueueTable;
    private String brokerName;  // broker的名稱
    private int readQueueNums;  // 讀隊列數(shù)量
    private int writeQueueNums; // 寫隊列數(shù)量
    private int perm;           // 讀寫權(quán)限
    private int topicSynFlag;   // 同步復(fù)制還是異步復(fù)制標(biāo)記

NameServer 維護(hù)了key為topic,List的數(shù)據(jù)為Producer提供服務(wù).返回TopicRouteData信息
RouteInfoManager.pickupTopicRouteData

public TopicRouteData pickupTopicRouteData(final String topic) {
        TopicRouteData topicRouteData = new TopicRouteData();
        boolean foundQueueData = false;
        boolean foundBrokerData = false;
        Set brokerNameSet = new HashSet();
        List brokerDataList = new LinkedList();
        topicRouteData.setBrokerDatas(brokerDataList);

        HashMap> filterServerMap = new HashMap>();
        topicRouteData.setFilterServerTable(filterServerMap);

        try {
            try {
                this.lock.readLock().lockInterruptibly();
                //根據(jù)topic獲取QueueData信息
                List queueDataList = this.topicQueueTable.get(topic);
                if (queueDataList != null) {
                    topicRouteData.setQueueDatas(queueDataList);
                    foundQueueData = true;

                    Iterator it = queueDataList.iterator();
                    while (it.hasNext()) {
                        QueueData qd = it.next();
                        brokerNameSet.add(qd.getBrokerName());
                    }

                    for (String brokerName : brokerNameSet) {
                        BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                        if (null != brokerData) {
                        //根據(jù)broker名稱獲取其地址信息
                            BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap) brokerData
                                .getBrokerAddrs().clone());
                            brokerDataList.add(brokerDataClone);
                            foundBrokerData = true;
                            for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                                List filterServerList = this.filterServerTable.get(brokerAddr);
                                filterServerMap.put(brokerAddr, filterServerList);
                            }
                        }
                    }
                }
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (Exception e) {
            log.error("pickupTopicRouteData Exception", e);
        }

        if (log.isDebugEnabled()) {
            log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
        }

        if (foundBrokerData && foundQueueData) {
            return topicRouteData;
        }

        return null;
    }
Name Server 為Consuner的提供些什么信息?

Consumer需要哪些信息?

   
1. Consumer需要的topic的broker信息
2. 每一個consumer group都有哪些consumer,對應(yīng)的topic是誰

1.如上節(jié)所述
2.此信息保存在Broker中

總結(jié)

Name Server比較簡單,如同一個簡單的web服務(wù),提供配置信息,只不過CRUD的不是數(shù)據(jù)庫而是json文件.

此次RocketMQ學(xué)習(xí)就告一段落了,只描述了我比較關(guān)心的流程,很多細(xì)節(jié)沒能涉及到,有時間再寫吧,如有疑問和錯誤請在評論中指出,thx!

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/70362.html

相關(guān)文章

  • RocketMQ源碼學(xué)習(xí)(一)-概述

    摘要:每個與集群中的所有節(jié)點建立長連接,定時注冊信息到所有。完全無狀態(tài),可集群部署。本系列源碼解析主要參照原理簡介來追尋其代碼實現(xiàn)雖然版本不太一致但這也是能找到的最詳細(xì)的資料了接下來根據(jù)其模塊來源碼閱讀目錄如下 為什么選擇讀RocketMQ? 對MQ的理解一直不深,上周看了,還是覺得不夠深入,找個成熟的產(chǎn)品來學(xué)習(xí)吧,RabbitMQ是erLang寫的,Kafka是Scala寫的,非Java寫...

    godlong_X 評論0 收藏0
  • 寫這么多系列博客,怪不得找不到女朋友

    摘要:前提好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲抱歉了。熟悉我的人都知道我寫博客的時間比較早,而且堅持的時間也比較久,一直到現(xiàn)在也是一直保持著更新狀態(tài)。 showImg(https://segmentfault.com/img/remote/1460000014076586?w=1920&h=1080); 前提 好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲:抱歉了!。自己這段時...

    JerryWangSAP 評論0 收藏0
  • 讓你看懂的RocketMQ事務(wù)消息源碼分析(干貨)

    摘要:但是服務(wù)器又確實是收到了這條消息的,只是給客戶端的響應(yīng)丟失了,所以導(dǎo)致的結(jié)果就是扣款失敗,成功發(fā)貨。所有的半消息都會寫在為的半消息隊列里,并且每條半消息,在整個鏈路里會被寫多次,如果并發(fā)很大且大部分消息都是事務(wù)消息的話,可靠性會存在問題。 前言 得益于MQ削峰填谷,系統(tǒng)解耦,操作異步等功能特性,在互聯(lián)網(wǎng)行業(yè),可以說有分布式服務(wù)的地方,MQ都往往不會缺席。由阿里自研的RocketMQ更是...

    zsirfs 評論0 收藏0
  • 一定能看懂的RocketMQ事務(wù)消息源碼分析(干貨)

    摘要:但是服務(wù)器又確實是收到了這條消息的,只是給客戶端的響應(yīng)丟失了,所以導(dǎo)致的結(jié)果就是扣款失敗,成功發(fā)貨。既然消息的發(fā)送不能和本地事務(wù)寫在一起,那如何來保證其整體具有原子性的需求呢答案就是今天我們介紹的主角事務(wù)消息。 前言 得益于MQ削峰填谷,系統(tǒng)解耦,操作異步等功能特性,在互聯(lián)網(wǎng)行業(yè),可以說有分布式服務(wù)的地方,MQ都往往不會缺席。由阿里自研的RocketMQ更是經(jīng)歷了多年的雙十一高并發(fā)挑戰(zhàn)...

    myshell 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<