摘要:發(fā)現(xiàn)問題在使用中設(shè)備異常斷開中的。在中事件都是在鏈中依次傳遞的。事件最后傳遞到。解決方法添加會(huì)導(dǎo)致調(diào)用兩次解釋會(huì)在該從鏈中移除掉時(shí)被調(diào)用,一般的話沒有手動(dòng)從鏈中刪除時(shí),會(huì)在連接斷開后回調(diào)該方法。
發(fā)現(xiàn)問題
在使用中設(shè)備異常斷開,InterceptHandler中的onConnectionLost()。經(jīng)過調(diào)試發(fā)現(xiàn)是MoquetteIdleTimeoutHandler中的代碼導(dǎo)致的,代碼如下:
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleState e = ((IdleStateEvent) evt).state(); if (e == IdleState.READER_IDLE) { LOG.info("Firing channel inactive event. MqttClientId = {}.", NettyUtils.clientID(ctx.channel())); // fire a channelInactive to trigger publish of Will ctx.fireChannelInactive(); ctx.close().addListener(CLOSE_ON_FAILURE); } } ...... }
這部分代碼的大致含義是:當(dāng)在一段時(shí)間內(nèi)沒有收到任何數(shù)據(jù)后,就會(huì)調(diào)用觸發(fā)ChannelInactive事件然后關(guān)掉連接。
在netty中事件都是在handler鏈中依次傳遞的。ChannelInactive事件最后傳遞到NettyMQTTHandler。處理邏輯如下:
public void channelInactive(ChannelHandlerContext ctx) { String clientID = NettyUtils.clientID(ctx.channel()); if (clientID != null && !clientID.isEmpty()) { LOG.info("N otifying connection lost event. MqttClientId = {}", clientID); m_processor.processConnectionLost(clientID, ctx.channel()); } ctx.close().addListener(CLOSE_ON_FAILURE); }
如果條件成立,會(huì)調(diào)用一次m_processor.processConnectionLost(clientID, ctx.channel());這會(huì)導(dǎo)致InterceptHandler中的onConnectionLost()調(diào)用一次。因?yàn)檫B接緊接著又被關(guān)閉了,連接關(guān)閉同樣會(huì)導(dǎo)致ChannelInactive事件,因此以上方法又會(huì)被觸發(fā)一次,因此這樣就會(huì)造成異常斷開會(huì)調(diào)用兩次onConnectionLost()。
解決方法添加handlerRemove
@Override public void channelInactive(ChannelHandlerContext ctx) { /** modify by ljq 2018.6.11 會(huì)導(dǎo)致processConnectionLost調(diào)用兩次*/ // String clientID = NettyUtils.clientID(ctx.channel()); // if (clientID != null && !clientID.isEmpty()) { // LOG.info("N otifying connection lost event. MqttClientId = {}", clientID); // m_processor.processConnectionLost(clientID, ctx.channel()); // } ctx.close().addListener(CLOSE_ON_FAILURE); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { String clientID = NettyUtils.clientID(ctx.channel()); if (clientID != null && !clientID.isEmpty()) { LOG.info("Notifying connection lost event. MqttClientId = {}", clientID); m_processor.processConnectionLost(clientID, ctx.channel()); } }
解釋:
handler remove會(huì)在該handler從鏈中移除掉時(shí)被調(diào)用,一般的話沒有手動(dòng)從鏈中刪除時(shí),會(huì)在連接斷開后回調(diào)該方法。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/77222.html
摘要:修改的實(shí)現(xiàn),實(shí)現(xiàn)接口在改造筆記一整合到中修改實(shí)現(xiàn),添加對(duì)的實(shí)現(xiàn)如下負(fù)載過大,處理不過來時(shí),會(huì)回調(diào)該方法例如可以發(fā)生郵件通知相關(guān)人員改造筆記四解決中的調(diào)用兩次 發(fā)現(xiàn)問題 在io.moquette.spi.impl.BrokerInterceptor的構(gòu)造函數(shù)中,新建了一個(gè)線程池,代碼如下: private BrokerInterceptor(int poolSize, List hand...
摘要:優(yōu)化邏輯優(yōu)化方向向啟動(dòng)方法一樣,每次調(diào)用的方法都是在線程池中新建一個(gè)任務(wù)具體代碼解釋新建一個(gè)用來實(shí)現(xiàn)調(diào)用方法。改造筆記三優(yōu)化中的線程池 發(fā)現(xiàn)問題 下面部分是io.moquette.spi.impl.BrokerInterceptor.java部分源碼 @Override public void notifyClientConnected(final MqttConnectMes...
摘要:整合到本文更加注重代碼實(shí)踐,對(duì)于配置相關(guān)的知識(shí)會(huì)一筆帶過,不做過多的詳解。筆者是上傳到私服,然后通過導(dǎo)入。接口是預(yù)留給開發(fā)者根據(jù)不同事件處理業(yè)務(wù)邏輯的接口。改造筆記二優(yōu)化邏輯 Moquette簡介 Mqtt作為物聯(lián)網(wǎng)比較流行的協(xié)議現(xiàn)在已經(jīng)被大范圍使用,其中也有很多開源的MQTT BROKEN。Moquette是用java基于netty實(shí)現(xiàn)的輕量級(jí)的MQTT BROKEN. Moquet...
摘要:發(fā)現(xiàn)問題在使用中發(fā)現(xiàn)在設(shè)備頻繁上下線和兩個(gè)設(shè)備一樣相互頂替連接的情況下,的和的方法調(diào)用沒有先后順序,如果在這兩個(gè)方法里面來記錄設(shè)備上下線狀態(tài),會(huì)造成狀態(tài)不對(duì)。因?yàn)橄嗷ロ斕娴那闆r并不多見,因此兩個(gè)也可以接受,在性能上并不會(huì)造成多大影響。 發(fā)現(xiàn)問題 在moquette使用中發(fā)現(xiàn)在設(shè)備頻繁上下線和兩個(gè)設(shè)備ClientId一樣相互頂替連接的情況下,InterceptHandler的onConn...
閱讀 1772·2021-11-24 09:39
閱讀 1570·2021-11-16 11:54
閱讀 3509·2021-11-11 16:55
閱讀 1682·2021-10-14 09:43
閱讀 1456·2019-08-30 15:55
閱讀 1247·2019-08-30 15:54
閱讀 3434·2019-08-30 15:53
閱讀 1352·2019-08-30 14:18