成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

rocketmq之producer解析

luodongseu / 837人閱讀

摘要:所以基于目前的設計,建議關(guān)閉自動創(chuàng)建的功能,然后根據(jù)消息量的大小,手動創(chuàng)建。如果發(fā)送消息,返回結(jié)果超時,這種超時不會進行重試了如果是方法本身耗時超過,還未來得及調(diào)用發(fā)送消息,此時的超時也不會重試。

先來看下producer核心的類設計,如下圖:

1、核心發(fā)布消息的類DefaultMQProducer,繼承自MQProducer接口,此接口定義了一系列發(fā)送消息的方法,如普通消息,順序消息,延時消息等,最終進行網(wǎng)絡通信會交給MQClientAPIImpl處理。

2、rocketmq從4.1.3版本開始又支持了事務消息,由TransactionMQProducer類提供(之后會有專門的文章進行詳細解讀事務消息)

producer之配置

我們看到DefaultMQProducer繼承了一個客戶端的公共配置類ClientConfig(與consumer公用),其實就是一個普通的javaBean,既可以代碼中設置屬性,也可以集成spring來配置

參數(shù)名 默認值 說明
namesrvAddr nameserver的地址列表,用分號隔開
clientIP 本機ip地址 客戶端ip地址,有時候無法識別,需要手動配置
instanceName DEFAULT 客戶端實例名稱,客戶端創(chuàng)建的多個 Producer、Consumer 實際是共用一個內(nèi)部實例(這個實例包含網(wǎng)絡連接、線程資源等)
clientCallbackExecutorThreads cpu核數(shù) 通信層客戶端處理請求的線程數(shù)
pollNameServerInterval 30000 輪詢nameserver的時間間隔,單位ms
heartbeatBrokerInterval 30000 向broker發(fā)送心跳的時間間隔,單位ms
persistConsumerOffsetInterval 5000 持久化 Consumer 消費進度間隔時間,單位ms

producer獨有的配置:

參數(shù)名 默認值 說明
producerGroup DEFAULT_PRODUCER Producer組名,相同分組的producer應該有相同的發(fā)送消息邏輯
createTopicKey AUTO_CREATE_TOPIC_KEY 自動創(chuàng)建topic時,以此默認topic為模板創(chuàng)建指定topic
defaultTopicQueueNums 4 自動創(chuàng)建topic隊列數(shù)量
sendMsgTimeout 3000 發(fā)送消息的超時時間,單位ms
compressMsgBodyOverHowmuch 4098 消息體超過多大會進行壓縮,單位字節(jié)
retryTimesWhenSendFailed 2 同步發(fā)送消息,發(fā)送失敗重試次數(shù)
retryTimesWhenSendAsyncFailed 2 異步發(fā)送消息,發(fā)送失敗的重試次數(shù)
retryAnotherBrokerWhenNotStoreOK false 同步發(fā)送消息,消息存儲失敗是否重試其他broker
maxMessageSize 4194304 客戶端限制消息的大小,默認4M
TransactionListener 事務消息時,必須設置的回查監(jiān)聽器
producer之group概念

我們在創(chuàng)建producer時必須要指定一個group,這里有兩個作用:

生產(chǎn)者一般會是集群部署的,group用來標識一類生產(chǎn)者,相同group的生產(chǎn)者一般要有相同的發(fā)送邏輯。

在發(fā)送事務消息時,當事務消息異常,broker端來回查事務狀態(tài)時,需要知道是由哪類生產(chǎn)者發(fā)送的事務消息,生產(chǎn)端會根據(jù)group名稱來查找對應的producer來執(zhí)行相應的回查邏輯。

producer的啟動流程

簡單說明下整個啟動流程:

1、首先在DefaultMQProducerImpl中會做一些參數(shù)校驗,如group是否合法;然后會創(chuàng)建MQClientInstance實例,此實例包含網(wǎng)絡連接、線程資源等,相同的clientId會共享此實例,所以通過MQClientManager來管理。

2、核心的啟動流程在MQClientInstance類中,如果nameserver地址沒有配置的話,會先通過靜態(tài)的http服務器地址去抓取nameserver的地址;再則啟動netty客戶端。

3、啟動一些定時任務,跟producer有關(guān)的如下幾個:

如果producer沒有配置nameserver地址,啟動定時抓取nameserver的地址的定時任務,任務延時10s開始,每隔2分支執(zhí)行一次。

輪詢nameserver定時任務,主要是定時更新topic的路由信息,任務延時10ms開始,每隔30s執(zhí)行一次。

清除下線的broker和向broker發(fā)送心跳,任務延時1s執(zhí)行,每隔30s執(zhí)行一次

Producer如何尋址

RocketMQ 有多種配置方式可以令客戶端找到 NameServer, 然后通過 NameServer 再找到 Broker,分別如下,
優(yōu)先級由高到低,高優(yōu)優(yōu)先級會覆蓋低優(yōu)先級

1、代碼中指定 Name Server 地址

producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");

2、啟動參數(shù)指定

-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876

3、環(huán)境變量指定 Name Server 地址

export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876

4、HTTP 靜態(tài)服務器尋址(默認)

如果以上三種都沒有設置name server的地址,客戶端啟動后先會訪問一個靜態(tài)http服務器獲取name server的地址,然后會啟動一個定時任務訪問這個靜態(tài) HTTP 服務器,地址如下:

http://jmenv.tbsite.net:8080/rocketmq/nsaddr

這是默認的地址,當然你也可以更改,做如下設置:

代碼:

System.setProperty("rocketmq.namesrv.domain","localhost");
System.setProperty("rocketmq.namesrv.domain.subgroup","nameServer")

或者啟動參數(shù)指定:

-Drocketmq.namesrv.domain=localhost
-Drocketmq.namesrv.domain.subgroup=nameServer

以上設置后http服務器地址就變成:

http://localhsot:8080/rocketmq/nameServer

這個 URL 的返回內(nèi)容格式如下:

192.168.0.1:9876;192.168.0.2:9876

客戶端每隔 2 分鐘訪問一次這個 HTTP 服務器,并更新本地的 Name Server 地址。

推薦使用 HTTP 靜態(tài)服務器尋址方式,好處是客戶端部署簡單,且 Name Server 集群可以熱升級。

發(fā)送消息時如何獲取路由信息

1、broker在啟動的時候通過參數(shù)autoCreateTopicEnable設置是否自動創(chuàng)建topic,默認為true,此時會創(chuàng)建一個名為TBW102(4.3版本已經(jīng)改名為AUTO_CREATE_TOPIC_KEY)的topic(參見類TopicConfigManager),broker在向namesrv注冊時會把默認的topic注冊上去。如果設置false,則不會注冊。

2、producer在發(fā)送消息時會在本地獲取路由信息,第一次發(fā)送的話本地肯定沒有,就會去namesrv獲取,如果此時namesrv也沒有,則會獲取TBW102的topic信息(參見DefaultMQProducerImpl.tryToFindTopicPublishInfo),以此為模板創(chuàng)建topic,然后選擇topic下的一臺broker發(fā),broker創(chuàng)建后,會通過心跳注冊到namesrv上。

3、如果autoCreateTopicEnable設置false的話,producer發(fā)送消息會報找不到路由的異常,此時必須手動創(chuàng)建topic。

建議autoCreateTopicEnable設置false,基于以上第二步,自動創(chuàng)建topic后,以后所有該TOPIC的消息,都將發(fā)送到剛才選擇的這臺broke上,達不到負載均衡的目的。所以基于目前RocketMQ的設計,建議關(guān)閉自動創(chuàng)建TOPIC的功能,然后根據(jù)消息量的大小,手動創(chuàng)建TOPIC。

可以通過管理工具mqadmin來手動創(chuàng)建topic

sh mqadmin updateTopic -c [集群名稱] -n [nameserver地址] -t [topic名稱] -w [寫隊列數(shù)] -r [讀隊列數(shù)]

手動創(chuàng)建了Topic后,producer就可以輪詢的發(fā)送到不同的broker了。

topic的隊列數(shù)

這里講一下自動創(chuàng)建的topic的隊列數(shù)如何設置,首先broker創(chuàng)建的模板topic=AUTO_CREATE_TOPIC_KEY的隊列是8,參見類TopicConfigManager:

public TopicConfigManager(BrokerController brokerController) { 
    //省略無關(guān)代碼
    if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
        String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
        TopicConfig topicConfig = new TopicConfig(topic);
        this.systemTopicList.add(topic);
        topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
                                     .getDefaultTopicQueueNums());
        topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
                                      .getDefaultTopicQueueNums());
        int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
        topicConfig.setPerm(perm);
        this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
    }
     //省略無關(guān)代碼
}

BrokerConfig:

private int defaultTopicQueueNums = 8;

DefaultMQProducer端默認知道要創(chuàng)建的topic的隊列數(shù)是4

private volatile int defaultTopicQueueNums = 4;

MQClientInstance類的方法updateTopicRouteInfoFromNameServer中有這樣一段邏輯:

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
    //省略無關(guān)代碼
    for (QueueData data : topicRouteData.getQueueDatas()) {
        int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
        data.setReadQueueNums(queueNums);
        data.setWriteQueueNums(queueNums);
    }
    //省略無關(guān)代碼
 }

創(chuàng)建隊列是取兩者最小的一個,也就是4,所以要設置topic的隊列數(shù)量,很明顯了設置broker的defaultTopicQueueNums的值和DefaultMQProducer的defaultTopicQueueNums值就可以了。這是自動創(chuàng)建Topic時隊列數(shù)的設置方法,上面也提到生成環(huán)境一般不會開啟自動創(chuàng)建Topic的功能,可以通過上面的手動創(chuàng)建Topic的指令來設置讀寫隊列數(shù)。你可能注意到了Topic下有讀寫隊兩個隊列數(shù),分別代表上面意思呢?讀寫隊列其實是個邏輯概念,一個broker下topic的總隊列數(shù)是以寫隊列為準,而讀隊列意思是允許多少隊列可以被消費者消費,也就是說讀多寫少的情況下,沒有問題,隊列都可以被消費掉,如果寫多讀少的話,那么就會存在隊列不會被消費的情況。

消息發(fā)送

前面我們講到了如何獲取topic的路由信息,如何創(chuàng)建topic的隊列數(shù),一個topic下有多個隊列,又可以分布在不同的broker上面,所以topic的總隊列數(shù)應該是所有broker上的topic下隊列數(shù)的總和。

備注:如果手動在每個broker上分別創(chuàng)建topic的話,相同topic在不同broker上的隊列數(shù)可以不一樣。

那么問題來了,在發(fā)送消息時根據(jù)怎么樣的策略來選擇一個隊列發(fā)送呢?rocketmq提供了一個MQFaultStrategy策略類來負責選擇隊列,這里會有一個參數(shù)sendLatencyFaultEnable是否開啟延遲故障,

該值默認為false,在不開啟的情況下,相同線程發(fā)送消息是輪詢topic下的所有隊列,不同線程發(fā)送是隨機的,核心代碼如下:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        //省略不必要的代碼......
    }
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}
//以上代碼邏輯參見類MQFaultStrategy.selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        //省略不必要的代碼......
    }
}
public MessageQueue selectOneMessageQueue() {
    int index = this.sendWhichQueue.getAndIncrement();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}
//以上代碼邏輯參見類TopicPublishInfo
public int getAndIncrement() {
    Integer index = this.threadLocalIndex.get();//ThreadLocal中獲取
    if (null == index) {//為空,隨機生成一個
        index = Math.abs(random.nextInt());
        if (index < 0)
            index = 0;
        this.threadLocalIndex.set(index);
    }
    index = Math.abs(index + 1);
    if (index < 0)
        index = 0;
    this.threadLocalIndex.set(index);
    return index;
}
//以上代碼參見類ThreadLocalIndex

每次獲取index的時候都是從本地線程變量ThreadLocal中獲取,沒有的情況下就是隨機生成一個,加1取絕對值后返回,再對隊列列表的長度取模,所以在同一線程中,會輪訓的從隊列列表獲取隊列。而如果是不同線程的話,index是隨機生成的,所以就是隨機從隊列列表中獲取。如下圖所示:

可以看到選擇隊列方法的入?yún)⒂幸粋€lastBrokerName的入?yún)?,此參?shù)的目的是在發(fā)送消息失敗的情況下,producer會重試再次發(fā)送,而再次發(fā)送選擇的隊列需要另選一個broker,lastBrokerName就是要過濾掉失敗的broker,選擇下一個broker的隊列進行發(fā)送消息。

開啟延遲故障,每當發(fā)送完一次消息,不管成功還是失敗,都會把此次存儲消息的broker給保存下來,記錄故障情況下此broker需要延長多長時間才能再次發(fā)送,目前看到在代碼里面寫死了,故障下30s之內(nèi)是不能再向此broker發(fā)送消息了。

消息重試

producer的send方法本身支持內(nèi)部重試,重試邏輯如下:

1、最大重試次數(shù)默認2次,可以通過參數(shù)retryTimesWhenSendFailed設置

2、發(fā)送失敗,則輪詢到下一個broker,如果此時只有一個broker在線呢?那就會輪訓這個broker下的其他隊列。

3、這個方法的總耗時時間不超過 sendMsgTimeout 設置的值,默認為3s。

如果發(fā)送消息,broker返回結(jié)果超時,這種超時不會進行重試了;如果是方法本身耗時超過sendMsgTimeout ,還未來得及調(diào)用發(fā)送消息,此時的超時也不會重試。

以上策略其實也很難保證同步發(fā)送消息一定成功,如果應用要保證消息不丟失,最好先把消息存儲到db,后臺啟線程定時重試,確保消息一定存儲到broker。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/77288.html

相關(guān)文章

  • 高并發(fā)異步解耦利器:RocketMQ究竟強在哪里?

    摘要:它是阿里巴巴于年開源的第三代分布式消息中間件。是一個分布式消息中間件,具有低延遲高性能和可靠性萬億級別的容量和靈活的可擴展性,它是阿里巴巴于年開源的第三代分布式消息中間件。上篇文章消息隊列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊列的發(fā)展史:并且詳細介紹了RabbitMQ,其功能也是挺強大的,那么,為啥又要搞一個RocketMQ出來呢?是重復造輪子嗎?本文我們就帶大家來詳...

    tainzhi 評論0 收藏0
  • RocketMQ源碼學習(一)-概述

    摘要:每個與集群中的所有節(jié)點建立長連接,定時注冊信息到所有。完全無狀態(tài),可集群部署。本系列源碼解析主要參照原理簡介來追尋其代碼實現(xiàn)雖然版本不太一致但這也是能找到的最詳細的資料了接下來根據(jù)其模塊來源碼閱讀目錄如下 為什么選擇讀RocketMQ? 對MQ的理解一直不深,上周看了,還是覺得不夠深入,找個成熟的產(chǎn)品來學習吧,RabbitMQ是erLang寫的,Kafka是Scala寫的,非Java寫...

    godlong_X 評論0 收藏0
  • 如何解決MQ消息消費順序問題

    摘要:利用的高級特性特性是一種負載均衡的機制。在一個消息被分發(fā)到之前,首先檢查消息屬性。屬性為某個值的消息單個消息或消息集合在描述,和的對應關(guān)系,以及負載均衡策略時。同樣做到了保證消息的順序情況下,均衡消費的消費消息。 通常mq可以保證先到隊列的消息按照順序分發(fā)給消費者消費來保證順序,但是一個隊列有多個消費者消費的時候,那將失去這個保證,因為這些消息被多個線程并發(fā)的消費。但是有的時候消息按照...

    Atom 評論0 收藏0
  • SpringBoot RocketMQ 整合使用和監(jiān)控

    摘要:前提通過前面兩篇文章可以簡單的了解和安裝,今天就將和整合起來使用。然后我運行之前的整合項目,查看監(jiān)控信息如下總結(jié)整篇文章講述了與整合和監(jiān)控平臺的搭建。 showImg(https://segmentfault.com/img/remote/1460000013232432?w=1920&h=1277); 前提 通過前面兩篇文章可以簡單的了解 RocketMQ 和 安裝 RocketMQ...

    Jacendfeng 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<