成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

eclipse mqttclient 性能&MQTT(32202): 正在發(fā)布過(guò)多的消息

lucas / 1884人閱讀

摘要:性能正在發(fā)布過(guò)多的消息系統(tǒng)性能,注意請(qǐng)使用單線程的萬(wàn)條毫秒萬(wàn)條毫秒萬(wàn)毫秒萬(wàn)條毫秒多線程的正在發(fā)布過(guò)多的消息問(wèn)題異常信息正在進(jìn)行過(guò)多的發(fā)布解決辦法消息發(fā)送發(fā)送限流用多帶帶的一個(gè)線程來(lái)完成消息的推送不用這個(gè),使用就沒(méi)有事增加的值反思筆者出現(xiàn)這個(gè)錯(cuò)

mqttclient性能&MQTT(32202): 正在發(fā)布過(guò)多的消息
org.eclipse.paho.client.mqttv3

2.2 GHz Intel Core i7 mac系統(tǒng)

publish性能,注意請(qǐng)使用單線程的 mqttclinet

1萬(wàn)條 341毫秒
4萬(wàn)條 1163毫秒
5萬(wàn) 1450毫秒
10萬(wàn)條 2700毫秒

多線程的 mqttclinet?MQTT(32202): 正在發(fā)布過(guò)多的消息 問(wèn)題

異常信息
[15:07:21]: publish failed, message: aaaa
正在進(jìn)行過(guò)多的發(fā)布 (32202)
 at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:496)
 at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:132)
 at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:156)
 at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1027)
 at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:399)
 at io.communet.ichater.emq.util.MqttUtil.publishMsg(MqttUtil.java:171)
 at io.communet.ichater.emq.util.MqttUtil.publishMsg(MqttUtil.java:161)
 at io.communet.ichater.emq.sub.MqttSendMsgEventSubscribe.onEvent(MqttSendMsgEventSubscribe.java:28)
 at java.lang.reflect.Method.invoke(Native Method)
 at java.lang.reflect.Method.invoke(Method.java:372)
 at org.greenrobot.eventbus.EventBus.invokeSubscriber(EventBus.java:507)
 at org.greenrobot.eventbus.EventBus.invokeSubscriber(EventBus.java:501)
 at org.greenrobot.eventbus.AsyncPoster.run(AsyncPoster.java:46)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
 at java.lang.Thread.run(Thread.java:818)

解決辦法

消息發(fā)送發(fā)送限流

用多帶帶的一個(gè)線程來(lái)完成 MQ 消息的推送 (不用這個(gè)MqttAsyncClient ,使用MqttClient 就沒(méi)有事)

options.setMaxInflight(1000)?增加?actualInFlight?的值;

反思

筆者出現(xiàn)這個(gè)錯(cuò)誤是因?yàn)槭褂?EventBus, 之前使用多帶帶線程的 Handler 是沒(méi)有問(wèn)題的, 調(diào)查發(fā)現(xiàn), 使用 EventBus 是新建線程運(yùn)行的, 而 Handler 是多帶帶一個(gè)線程.
所以當(dāng)發(fā)送大量消息的時(shí)候, EventBus 幾乎是同一個(gè)點(diǎn)發(fā)出去, 就會(huì)造成這個(gè)錯(cuò)誤

原因

根據(jù)堆棧信息找到報(bào)錯(cuò)地方

if (actualInFlight >= this.maxInflight) {
    //@TRACE 613= sending {0} msgs at max inflight window
    log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)});

    throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
}

其中?actualInFlight?如下

// processed until the inflight window has space. 
if (actualInFlight < this.maxInflight) {
    // The in flight window is not full so process the 
    // first message in the queue
    result = (MqttWireMessage)pendingMessages.elementAt(0);
    pendingMessages.removeElementAt(0);
    actualInFlight++;
    
    //@TRACE 623=+1 actualInFlight={0}
    log.fine(CLASS_NAME,methodName,"623",new Object[]{new Integer(actualInFlight)});
}

從?pendingMessages?中取出消息時(shí),?actualInFlight?加 1,?maxInflight?可以自己設(shè)定, 默認(rèn)值為 10.

public class ClientState {
  ...
  volatile private Vector pendingMessages;
  ...
}

在?ClientState?中:

public void send(MqttWireMessage message, MqttToken token) throws MqttException {
        ... 
        if (message instanceof MqttPublish) {
            synchronized (queueLock) {
                if (actualInFlight >= this.maxInflight) {
                    //@TRACE 613= sending {0} msgs at max inflight window
                    log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)});

                    throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
                }
                
                MqttMessage innerMessage = ((MqttPublish) message).getMessage();
                //@TRACE 628=pending publish key={0} qos={1} message={2}
                log.fine(CLASS_NAME,methodName,"628", new Object[]{new Integer(message.getMessageId()), new Integer(innerMessage.getQos()), message});

                switch(innerMessage.getQos()) {
                    case 2:
                        outboundQoS2.put(new Integer(message.getMessageId()), message);
                        persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
                        break;
                    case 1:
                        outboundQoS1.put(new Integer(message.getMessageId()), message);
                        persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
                        break;
                }
                tokenStore.saveToken(token, message);
                pendingMessages.addElement(message);
                queueLock.notifyAll();
            }
        } else {
        ...
        }
    }

可以看到?pendingMessages?中添加元素的時(shí)候并沒(méi)有做?qos?類型的判斷

private void decrementInFlight() {
    final String methodName = "decrementInFlight";
    synchronized (queueLock) {
        actualInFlight--;
        //@TRACE 646=-1 actualInFlight={0}
        log.fine(CLASS_NAME,methodName,"646",new Object[]{new Integer(actualInFlight)});
            
        if (!checkQuiesceLock()) {
            queueLock.notifyAll();
        }
    }
}

當(dāng)收到消息反饋時(shí)?actualInFlight?減 1.

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/75369.html

相關(guān)文章

  • MQTT 實(shí)戰(zhàn)

    摘要:協(xié)議版本版本介紹是一個(gè)用編寫的客戶端庫(kù)用于開發(fā)在或其他兼容平臺(tái)如上運(yùn)行的應(yīng)用程序提供了兩個(gè)提供完全異步的通過(guò)注冊(cè)的回調(diào)完成是的同步包裝器例子這個(gè)就是官網(wǎng)提供的例子可以直接連上代理做測(cè)試的相關(guān)類介紹此類主要用于連接代理發(fā)布消息訂閱消息斷開連接 mqtt 協(xié)議版本: 3.1.1eclipse paho client 版本: 1.2.0 eclipse paho client 介紹 Pah...

    tyheist 評(píng)論0 收藏0
  • 搭建IM服務(wù) so easy

    摘要:現(xiàn)在很多網(wǎng)站都通過(guò)服務(wù)來(lái)實(shí)現(xiàn)消息推送及數(shù)據(jù)即時(shí)同步功能,即時(shí)通訊組件逐漸成為產(chǎn)品的標(biāo)配。目前國(guó)內(nèi)有很多成熟穩(wěn)定的第三方即時(shí)通訊服務(wù)廠家,比如融云。 現(xiàn)在很多網(wǎng)站、APP都通過(guò)IM服務(wù)來(lái)實(shí)現(xiàn)消息推送及數(shù)據(jù)即時(shí)同步功能,即時(shí)通訊組件逐漸成為產(chǎn)品的標(biāo)配。目前國(guó)內(nèi)有很多成熟穩(wěn)定的第三方即時(shí)通訊服務(wù)廠家,比如:融云。使用這些專業(yè)的服務(wù)可以提高開發(fā)效率而且服務(wù)穩(wěn)定有保障。 如果自己DIY或者需要在...

    imccl 評(píng)論0 收藏0
  • 基于MQTT物聯(lián)網(wǎng)云測(cè)量解決方案

    摘要:本文是其中的一個(gè)解決方案。地址客戶端服務(wù)端前端網(wǎng)頁(yè)介紹,消息隊(duì)列遙測(cè)傳輸是開發(fā)的一個(gè)即時(shí)通訊協(xié)議,有可能成為物聯(lián)網(wǎng)的重要組成部分。必須用于在頂層分隔符之后,除了當(dāng)自己指定時(shí)。 1. 問(wèn)題描述 最近,本實(shí)驗(yàn)室大量上馬云測(cè)量,云監(jiān)控方面的項(xiàng)目,大概是屬于物聯(lián)網(wǎng)應(yīng)用的一個(gè)分支。老板也有將舊有儀器改造的想法,所以要實(shí)現(xiàn)儀器設(shè)備的云控制。本文是其中的一個(gè)解決方案。 2. 技術(shù)選型 消息隊(duì)列:M...

    張金寶 評(píng)論0 收藏0
  • (超簡(jiǎn)單)ESP8266深度睡眠模式下遠(yuǎn)程采集溫濕度信息

    摘要:超簡(jiǎn)單深度睡眠模式下遠(yuǎn)程采集溫濕度信息項(xiàng)目背景相關(guān)技術(shù)深度睡眠模式溫濕度采集數(shù)據(jù)收發(fā)前后端實(shí)現(xiàn)后端前端項(xiàng)目背景自己用收納箱做了一個(gè)用于存放打印耗材的干燥箱,想用閑置的開發(fā)板和溫濕度傳感器做一個(gè)遠(yuǎn)程溫濕度監(jiān)測(cè)的小項(xiàng)目。 ...

    pkhope 評(píng)論0 收藏0
  • 在Node.js下運(yùn)用MQTT協(xié)議實(shí)現(xiàn)即時(shí)通訊及離線推送

    摘要:前言前些日子了解到這樣一個(gè)協(xié)議,可以在上達(dá)到即時(shí)通訊的效果,但網(wǎng)上并不能很方便地找到一篇目前版本的在下正確實(shí)現(xiàn)這個(gè)協(xié)議的博客。 前言 前些日子了解到mqtt這樣一個(gè)協(xié)議,可以在web上達(dá)到即時(shí)通訊的效果,但網(wǎng)上并不能很方便地找到一篇目前版本的在node下正確實(shí)現(xiàn)這個(gè)協(xié)議的博客。 自己搗鼓了一段時(shí)間,理解不深刻,但也算是基本能夠達(dá)到使用目的。 本文目的為對(duì)MQTT有需求的學(xué)習(xí)者提供一定程...

    jlanglang 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

lucas

|高級(jí)講師

TA的文章

閱讀更多
最新活動(dòng)
閱讀需要支付1元查看
<