摘要:修改的實現,實現接口在改造筆記一整合到中修改實現,添加對的實現如下負載過大,處理不過來時,會回調該方法例如可以發(fā)生郵件通知相關人員改造筆記四解決中的調用兩次
發(fā)現問題
在io.moquette.spi.impl.BrokerInterceptor的構造函數中,新建了一個線程池,代碼如下:
private BrokerInterceptor(int poolSize, Listhandlers) { LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers)); this.handlers = new HashMap<>(); for (Class> messageType : InterceptHandler.ALL_MESSAGE_TYPES) { this.handlers.put(messageType, new CopyOnWriteArrayList ()); } for (InterceptHandler handler : handlers) { this.addInterceptHandler(handler); } executor = Executors.newFixedThreadPool(poolSize); }
executor = Executors.newFixedThreadPool(poolSize);這句代碼雖然創(chuàng)建了一個固定線程數量的線程池,但是線程池的任務隊列并沒有做限制,一旦某個InterceptHandler中的某個方法進行了耗時處理,在高并發(fā)的情況下,會很容易導致線程池的隊列堆積大量待處理的任務,進而可能造成內存溢出。
解決問題分別添加以下類和接口
public class ThreadPoolHelper { public static ExecutorService createFixedExecutor(int threadNum,int capacity,String threadFactoryName) { return new ThreadPoolExecutor( threadNum, threadNum, 30, TimeUnit.SECONDS, new LinkedBlockingDeque(capacity), new SimpleThreadFactory(threadFactoryName), new LogDiscardRejectPolicy() ); } } public class SimpleThreadFactory implements ThreadFactory { private static final String NAME_FORMAT = "%s-%s"; private String threadNamePrefix; public SimpleThreadFactory(String threadNamePrefix) { this.threadNamePrefix = threadNamePrefix; } @Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable); thread.setName(String.format(NAME_FORMAT, threadNamePrefix, System.currentTimeMillis())); return thread; } } public class LogDiscardRejectPolicy implements RejectedExecutionHandler { private static final Logger LOG = LoggerFactory.getLogger(LogDiscardRejectPolicy.class); @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { LOG.error("executor:{} task queue has full, runnable:{} discarded",executor,r); if (!(r instanceof PublishTask)) { return; } PublishTask publishTask = (PublishTask) r; InterceptHandler interceptHandler = publishTask.getInterceptHandler(); if (!(interceptHandler instanceof RejectHandler)) { return; } ((RejectHandler)interceptHandler).rejectedExecution(r,executor); } } public interface RejectHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
BrokerInterceptor 創(chuàng)建線程池的邏輯改為
private BrokerInterceptor(int poolSize, int capacity, Listhandlers) { LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers)); this.handlers = new HashMap<>(); for (Class> messageType : InterceptHandler.ALL_MESSAGE_TYPES) { this.handlers.put(messageType, new CopyOnWriteArrayList ()); } for (InterceptHandler handler : handlers) { this.addInterceptHandler(handler); } /** modify by liuhh */ executor = ThreadPoolHelper.createFixedExecutor(poolSize, capacity, THREAD_POOL_NAME); //executor = Executors.newFixedThreadPool(poolSize); }
解釋:
(1)ThreadPoolHelper中的createFixedExecutor()方法為新建的線程池指定任務隊列大小和拒絕策略LogDiscardRejectPolicy
(2)在LogDiscardRejectPolicy中,首先將被拒絕的任務log一遍,對于PublishTask(moquette改造筆記(二):優(yōu)化BrokerInterceptor notifyTopicPublished()邏輯)做特殊處理,會交給實現RejectHandler的InterceptHandler處理,由業(yè)務邏輯決定,出現任務太多處理不完被遺棄的任務該如何處理。
在 moquette改造筆記(一):整合到SpringBoot 中修改SafetyInterceptHandler實現,添加對RejectHandler的實現如下
@Slf4j @Component public class SafetyInterceptHandler extends AbstractInterceptHandler{ @Override public String getID() { return SafetyInterceptHandler.class.getName(); } @Override public void onConnect(InterceptConnectMessage msg) { } @Override public void onConnectionLost(InterceptConnectionLostMessage msg) { } @Override public void onPublish(InterceptPublishMessage msg) { } @Override public void onMessageAcknowledged(InterceptAcknowledgedMessage msg) { } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { /**MQTT SERVICE 負載過大,處理不過來時,會回調該方法*/ //例如可以發(fā)生郵件通知相關人員 } }
moquette改造筆記(四):解決InterceptHandler中的onConnectionLost()調用兩次
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://systransis.cn/yun/77202.html
摘要:優(yōu)化邏輯優(yōu)化方向向啟動方法一樣,每次調用的方法都是在線程池中新建一個任務具體代碼解釋新建一個用來實現調用方法。改造筆記三優(yōu)化中的線程池 發(fā)現問題 下面部分是io.moquette.spi.impl.BrokerInterceptor.java部分源碼 @Override public void notifyClientConnected(final MqttConnectMes...
摘要:整合到本文更加注重代碼實踐,對于配置相關的知識會一筆帶過,不做過多的詳解。筆者是上傳到私服,然后通過導入。接口是預留給開發(fā)者根據不同事件處理業(yè)務邏輯的接口。改造筆記二優(yōu)化邏輯 Moquette簡介 Mqtt作為物聯網比較流行的協議現在已經被大范圍使用,其中也有很多開源的MQTT BROKEN。Moquette是用java基于netty實現的輕量級的MQTT BROKEN. Moquet...
摘要:發(fā)現問題在使用中發(fā)現在設備頻繁上下線和兩個設備一樣相互頂替連接的情況下,的和的方法調用沒有先后順序,如果在這兩個方法里面來記錄設備上下線狀態(tài),會造成狀態(tài)不對。因為相互頂替的情況并不多見,因此兩個也可以接受,在性能上并不會造成多大影響。 發(fā)現問題 在moquette使用中發(fā)現在設備頻繁上下線和兩個設備ClientId一樣相互頂替連接的情況下,InterceptHandler的onConn...
摘要:發(fā)現問題在使用中設備異常斷開中的。在中事件都是在鏈中依次傳遞的。事件最后傳遞到。解決方法添加會導致調用兩次解釋會在該從鏈中移除掉時被調用,一般的話沒有手動從鏈中刪除時,會在連接斷開后回調該方法。 發(fā)現問題 在使用中設備異常斷開,InterceptHandler中的onConnectionLost()。經過調試發(fā)現是MoquetteIdleTimeoutHandler中的代碼導致的,代碼...
摘要:常見標高線程上下文切換頻繁線程太多鎖競爭激烈標高如果的占用很高,排查涉及到的程序,比如把改造成。抖動問題原因字節(jié)碼轉為機器碼需要占用時間片,大量的在執(zhí)行字節(jié)碼時,導致長期處于高位現象,占用率最高解決辦法保證編譯線程的占比。 一、并發(fā) Unable to create new native thread …… 問題1:Java中創(chuàng)建一個線程消耗多少內存? 每個線程有獨自的棧內存,共享堆內...
閱讀 2006·2021-11-24 10:45
閱讀 1861·2021-10-09 09:43
閱讀 1303·2021-09-22 15:38
閱讀 1230·2021-08-18 10:19
閱讀 2849·2019-08-30 15:55
閱讀 3069·2019-08-30 12:45
閱讀 2975·2019-08-30 11:25
閱讀 365·2019-08-29 11:30