成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

深度解析RocketMQ Topic的創(chuàng)建機(jī)制

gself / 1278人閱讀

摘要:當(dāng)接收到消息后,會(huì)在方法中調(diào)用方法,將的信息塞進(jìn)緩存中,并且會(huì)定時(shí)發(fā)送心跳將發(fā)送給進(jìn)行注冊(cè)。這也說(shuō)明了當(dāng)用集群模式去創(chuàng)建時(shí),集群里面每個(gè)的的數(shù)量相同,當(dāng)用單個(gè)模式去創(chuàng)建時(shí),每個(gè)的數(shù)量可以不一致。

微信公眾號(hào)「后端進(jìn)階」,專(zhuān)注后端技術(shù)分享:Java、Golang、WEB框架、分布式中間件、服務(wù)治理等等。  
老司機(jī)傾囊相授,帶你一路進(jìn)階,來(lái)不及解釋了快上車(chē)!

我還記得第一次使用rocketmq的時(shí)候,需要去控制臺(tái)預(yù)先創(chuàng)建topic,我當(dāng)時(shí)就想為什么要這么設(shè)計(jì),于是我決定擼一波源碼,帶大家從根源上吃透rocketmq topic的創(chuàng)建機(jī)制。

topic在rocketmq的設(shè)計(jì)思想里,是作為同一個(gè)業(yè)務(wù)邏輯消息的組織形式,它僅僅是一個(gè)邏輯上的概念,而在一個(gè)topic下又包含若干個(gè)邏輯隊(duì)列,即消息隊(duì)列,消息內(nèi)容實(shí)際是存放在隊(duì)列中,而隊(duì)列又存儲(chǔ)在broker中,下面我用一張圖來(lái)說(shuō)明topic的存儲(chǔ)模型:

其實(shí)rocketmq中存在兩種不同的topic創(chuàng)建方式,一種是我剛剛說(shuō)的預(yù)先創(chuàng)建,另一種是自動(dòng)創(chuàng)建,下面我開(kāi)車(chē)帶大家從源碼的角度來(lái)詳細(xì)地解讀這兩種創(chuàng)建機(jī)制。

自動(dòng)創(chuàng)建

默認(rèn)情況下,topic不用手動(dòng)創(chuàng)建,當(dāng)producer進(jìn)行消息發(fā)送時(shí),會(huì)從nameserver拉取topic的路由信息,如果topic的路由信息不存在,那么會(huì)默認(rèn)拉取broker啟動(dòng)時(shí)默認(rèn)創(chuàng)建好名為“TBW102”的Topic:

org.apache.rocketmq.common.MixAll:

// Will be created at broker when isAutoCreateTopicEnable
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";

自動(dòng)創(chuàng)建的開(kāi)關(guān)配置在BrokerConfig中,通過(guò)autoCreateTopicEnable字段進(jìn)行控制,

org.apache.rocketmq.common.BrokerConfig:

@ImportantField
private boolean autoCreateTopicEnable = true;

在broker啟動(dòng)時(shí),會(huì)調(diào)用TopicConfigManager的構(gòu)造方法,autoCreateTopicEnable打開(kāi)后,會(huì)將“TBW102”保存到topicConfigTable中:

org.apache.rocketmq.broker.topic.TopicConfigManager#TopicConfigManager:

// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
    String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
    TopicConfig topicConfig = new TopicConfig(topic);
    this.systemTopicList.add(topic);
    topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
                                 .getDefaultTopicQueueNums());
    topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
                                  .getDefaultTopicQueueNums());
    int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
    topicConfig.setPerm(perm);
    this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}

broker會(huì)通過(guò)發(fā)送心跳包將topicConfigTable的topic信息發(fā)送給nameserver,nameserver將topic信息注冊(cè)到RouteInfoManager中。

繼續(xù)看消息發(fā)送時(shí)是如何從nameserver獲取topic的路由信息:

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
  TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
  if (null == topicPublishInfo || !topicPublishInfo.ok()) {
    this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
    // 生產(chǎn)者第一次發(fā)送消息,topic在nameserver中并不存在
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
    topicPublishInfo = this.topicPublishInfoTable.get(topic);
  }

  if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
    return topicPublishInfo;
  } else {
    // 第二次請(qǐng)求會(huì)將isDefault=true,開(kāi)啟默認(rèn)“TBW102”從namerserver獲取路由信息
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
    topicPublishInfo = this.topicPublishInfoTable.get(topic);
    return topicPublishInfo;
  }
}

如上方法,topic首次發(fā)送消息,此時(shí)并不能從namserver獲取topic的路由信息,那么接下來(lái)會(huì)進(jìn)行第二次請(qǐng)求namserver,這時(shí)會(huì)將isDefault=true,開(kāi)啟默認(rèn)“TBW102”從namerserver獲取路由信息,此時(shí)的“TBW102”topic已經(jīng)被broker默認(rèn)注冊(cè)到nameserver了:

org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:

if (isDefault && defaultMQProducer != null) {
  // 使用默認(rèn)的“TBW102”topic獲取路由信息
  topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);
  if (topicRouteData != null) {
    for (QueueData data : topicRouteData.getQueueDatas()) {
      int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
      data.setReadQueueNums(queueNums);
      data.setWriteQueueNums(queueNums);
    }
  }
}

如果isDefault=true并且defaultMQProducer不為空,從nameserver中獲取默認(rèn)路由信息,此時(shí)會(huì)獲取所有已開(kāi)啟自動(dòng)創(chuàng)建開(kāi)關(guān)的broker的默認(rèn)“TBW102”topic路由信息,并保存默認(rèn)的topic消息隊(duì)列數(shù)量。

org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:

TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
  changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
  log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}

從本地緩存中取出topic的路由信息,由于topic是第一次發(fā)送消息,這時(shí)本地并沒(méi)有該topic的路由信息,所以對(duì)比該topic路由信息對(duì)比“TBW102”時(shí)changed為true,即有變化,進(jìn)入以下邏輯:

org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:

// Update sub info
{
  Set subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
  Iterator> it = this.consumerTable.entrySet().iterator();
  while (it.hasNext()) {
    Entry entry = it.next();
    MQConsumerInner impl = entry.getValue();
    if (impl != null) {
      impl.updateTopicSubscribeInfo(topic, subscribeInfo);
    }
  }
}

將“TBW102”topic路由信息構(gòu)建TopicPublishInfo,并將用topic為key,TopicPublishInfo為value更新本地緩存,到這里就明白了,原來(lái)broker們千辛萬(wàn)苦創(chuàng)建“TBW102”topic并將其路由信息注冊(cè)到nameserver,被新來(lái)的topic獲取后立即用“TBW102”topic的路由信息構(gòu)建出一個(gè)TopicPublishInfo并且據(jù)為己有,由于TopicPublishInfo的路由信息時(shí)默認(rèn)“TBW102”topic,因此真正要發(fā)送消息的topic也會(huì)被負(fù)載發(fā)送到“TBW102”topic所在的broker中,這里我們可以將其稱(chēng)之為偷梁換柱的做法。

當(dāng)broker接收到消息后,會(huì)在msgCheck方法中調(diào)用createTopicInSendMessageMethod方法,將topic的信息塞進(jìn)topicConfigTable緩存中,并且broker會(huì)定時(shí)發(fā)送心跳將topicConfigTable發(fā)送給nameserver進(jìn)行注冊(cè)。

自動(dòng)創(chuàng)建與消息發(fā)送時(shí)獲取topic信息的時(shí)序圖:

預(yù)先創(chuàng)建

其實(shí)這個(gè)叫預(yù)先創(chuàng)建似乎更加適合,即預(yù)先在broker中創(chuàng)建好topic的相關(guān)信息并注冊(cè)到nameserver中,然后client端發(fā)送消息時(shí)直接從nameserver中獲取topic的路由信息,但是手動(dòng)創(chuàng)建從動(dòng)作上來(lái)將更加形象通俗易懂,直接告訴你,你的topic信息需要在控制臺(tái)上自己手動(dòng)創(chuàng)建。

預(yù)先創(chuàng)建需要通過(guò)mqadmin提供的topic相關(guān)命令進(jìn)行創(chuàng)建,執(zhí)行:

./mqadmin updateTopic

官方給出的各項(xiàng)參數(shù)如下:

usage: mqadmin updateTopic [-b ] [-c ] [-h] [-n ] [-o ] [-p ] [-r ] [-s ]
-t  [-u ] [-w ]
-b,--brokerAddr        create topic to which broker
-c,--clusterName       create topic to which cluster
-h,--help                   Print help
-n,--namesrvAddr       Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
-o,--order             set topic"s order(true|false
-p,--perm              set topic"s permission(2|4|6), intro[2:W 4:R; 6:RW]
-r,--readQueueNums     set read queue nums
-s,--hasUnitSub        has unit sub (true|false
-t,--topic             topic name
-u,--unit              is unit topic (true|false
-w,--writeQueueNums    set write queue nums

我們直接定位到其實(shí)現(xiàn)類(lèi)執(zhí)行命令的方法:

通過(guò)broker模式創(chuàng)建:

org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute:

// -b,--brokerAddr    create topic to which broker
if (commandLine.hasOption("b")) {
  String addr = commandLine.getOptionValue("b").trim();
  defaultMQAdminExt.start();
  defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
  return;
}

從commandLine命令行工具獲取運(yùn)行時(shí)-b參數(shù)重的broker的地址,defaultMQAdminExt是默認(rèn)的rocketmq控制臺(tái)執(zhí)行的API,此時(shí)調(diào)用start方法,該方法創(chuàng)建了一個(gè)mqClientInstance,它封裝了netty通信的細(xì)節(jié),接著就是最重要的一步,調(diào)用createAndUpdateTopicConfig將topic配置信息發(fā)送到指定的broker上,完成topic的創(chuàng)建。

通過(guò)集群模式創(chuàng)建:

org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute:

// -c,--clusterName    create topic to which cluster
else if (commandLine.hasOption("c")) {
  String clusterName = commandLine.getOptionValue("c").trim();
  defaultMQAdminExt.start();
  Set masterSet =
    CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
  for (String addr : masterSet) {
    defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
    System.out.printf("create topic to %s success.%n", addr);
  }
  return;
}

通過(guò)集群模式創(chuàng)建與通過(guò)broker模式創(chuàng)建的邏輯大致相同,多了根據(jù)集群從nameserver獲取集群下所有broker的master地址這個(gè)步驟,然后在循環(huán)發(fā)送topic信息到集群中的每個(gè)broker中,這個(gè)邏輯跟指定單個(gè)broker是一致的。

這也說(shuō)明了當(dāng)用集群模式去創(chuàng)建topic時(shí),集群里面每個(gè)broker的queue的數(shù)量相同,當(dāng)用單個(gè)broker模式去創(chuàng)建topic時(shí),每個(gè)broker的queue數(shù)量可以不一致。

預(yù)先創(chuàng)建時(shí)序圖:

何時(shí)需要預(yù)先創(chuàng)建Topic?

建議線下開(kāi)啟,線上關(guān)閉,不是我說(shuō)的,是官方給出的建議:

rocketmq為什么要這么設(shè)計(jì)呢?經(jīng)過(guò)一波源碼深度解析后,我得到了我想要的答案:

根據(jù)上面的源碼分析,我們得出,rocketmq在發(fā)送消息時(shí),會(huì)先去獲取topic的路由信息,如果topic是第一次發(fā)送消息,由于nameserver沒(méi)有topic的路由信息,所以會(huì)再次以“TBW102”這個(gè)默認(rèn)topic獲取路由信息,假設(shè)broker都開(kāi)啟了自動(dòng)創(chuàng)建開(kāi)關(guān),那么此時(shí)會(huì)獲取所有broker的路由信息,消息的發(fā)送會(huì)根據(jù)負(fù)載算法選擇其中一臺(tái)Broker發(fā)送消息,消息到達(dá)broker后,發(fā)現(xiàn)本地沒(méi)有該topic,會(huì)在創(chuàng)建該topic的信息塞進(jìn)本地緩存中,同時(shí)會(huì)將topic路由信息注冊(cè)到nameserver中,那么這樣就會(huì)造成一個(gè)后果:以后所有該topic的消息,都將發(fā)送到這臺(tái)broker上,如果該topic消息量非常大,會(huì)造成某個(gè)broker上負(fù)載過(guò)大,這樣消息的存儲(chǔ)就達(dá)不到負(fù)載均衡的目的了。

掃面下方二維碼,關(guān)注「Java科代表」,開(kāi)車(chē)帶你臨摹各種源碼,來(lái)不及解釋了快上車(chē)!?

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/74017.html

相關(guān)文章

  • RocketMQ為什么要保證訂閱關(guān)系一致性?

    摘要:微信公眾號(hào)后端進(jìn)階,專(zhuān)注后端技術(shù)分享框架分布式中間件服務(wù)治理等等。 微信公眾號(hào)「后端進(jìn)階」,專(zhuān)注后端技術(shù)分享:Java、Golang、WEB框架、分布式中間件、服務(wù)治理等等。 前段時(shí)間有個(gè)朋友向我提了一個(gè)問(wèn)題,他說(shuō)在搭建 RocketMQ 集群過(guò)程中遇到了關(guān)于消費(fèi)訂閱的問(wèn)題,具體問(wèn)題如下: showImg(https://segmentfault.com/img/remote/1460...

    gekylin 評(píng)論0 收藏0
  • 高并發(fā)異步解耦利器:RocketMQ究竟強(qiáng)在哪里?

    摘要:它是阿里巴巴于年開(kāi)源的第三代分布式消息中間件。是一個(gè)分布式消息中間件,具有低延遲高性能和可靠性萬(wàn)億級(jí)別的容量和靈活的可擴(kuò)展性,它是阿里巴巴于年開(kāi)源的第三代分布式消息中間件。上篇文章消息隊(duì)列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊(duì)列的發(fā)展史:并且詳細(xì)介紹了RabbitMQ,其功能也是挺強(qiáng)大的,那么,為啥又要搞一個(gè)RocketMQ出來(lái)呢?是重復(fù)造輪子嗎?本文我們就帶大家來(lái)詳...

    tainzhi 評(píng)論0 收藏0
  • rocketmq之producer解析

    摘要:所以基于目前的設(shè)計(jì),建議關(guān)閉自動(dòng)創(chuàng)建的功能,然后根據(jù)消息量的大小,手動(dòng)創(chuàng)建。如果發(fā)送消息,返回結(jié)果超時(shí),這種超時(shí)不會(huì)進(jìn)行重試了如果是方法本身耗時(shí)超過(guò),還未來(lái)得及調(diào)用發(fā)送消息,此時(shí)的超時(shí)也不會(huì)重試。 先來(lái)看下producer核心的類(lèi)設(shè)計(jì),如下圖: showImg(http://pbdqyl9hh.bkt.clouddn.com/rocketmq/producer%E7%B1%BB%E5%...

    luodongseu 評(píng)論0 收藏0
  • RocketMq消息中間件介紹

    摘要:消息生產(chǎn)者,負(fù)責(zé)發(fā)消息到。消息消費(fèi)者,負(fù)責(zé)從上拉取消息進(jìn)行消費(fèi),消費(fèi)完進(jìn)行。集群部署端完全消費(fèi)正常后在進(jìn)行手動(dòng)確認(rèn)。消息發(fā)送成功后,服務(wù)器返回確認(rèn)消息給生產(chǎn)者。根據(jù)本地事務(wù)執(zhí)行的結(jié)果向發(fā)送提交或回滾消息。 RabbitMQerlang開(kāi)發(fā),對(duì)消息堆積的支持并不好,當(dāng)大量消息積壓的時(shí)候,會(huì)導(dǎo)致RabbitMQ的性能急劇下降。...

    goji 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

gself

|高級(jí)講師

TA的文章

閱讀更多
最新活動(dòng)
閱讀需要支付1元查看
<