成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

moquette改造筆記(三):優(yōu)化BrokerInterceptor 中的線程池

cfanr / 2672人閱讀

摘要:修改的實現,實現接口在改造筆記一整合到中修改實現,添加對的實現如下負載過大,處理不過來時,會回調該方法例如可以發(fā)生郵件通知相關人員改造筆記四解決中的調用兩次

發(fā)現問題

在io.moquette.spi.impl.BrokerInterceptor的構造函數中,新建了一個線程池,代碼如下:

private BrokerInterceptor(int poolSize, List handlers) {
        LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers));
        this.handlers = new HashMap<>();
        for (Class messageType : InterceptHandler.ALL_MESSAGE_TYPES) {
            this.handlers.put(messageType, new CopyOnWriteArrayList());
        }
        for (InterceptHandler handler : handlers) {
            this.addInterceptHandler(handler);
        }
        executor = Executors.newFixedThreadPool(poolSize);
    }

executor = Executors.newFixedThreadPool(poolSize);這句代碼雖然創(chuàng)建了一個固定線程數量的線程池,但是線程池的任務隊列并沒有做限制,一旦某個InterceptHandler中的某個方法進行了耗時處理,在高并發(fā)的情況下,會很容易導致線程池的隊列堆積大量待處理的任務,進而可能造成內存溢出。

解決問題

分別添加以下類和接口

public class ThreadPoolHelper {
    public static ExecutorService createFixedExecutor(int threadNum,int capacity,String threadFactoryName) {
        return new ThreadPoolExecutor(
                threadNum,
                threadNum,
                30,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque(capacity),
                new SimpleThreadFactory(threadFactoryName),
                new LogDiscardRejectPolicy()
        );
    }
}

public class SimpleThreadFactory implements ThreadFactory {
    private static final String NAME_FORMAT = "%s-%s";
    private String threadNamePrefix;

    public SimpleThreadFactory(String threadNamePrefix) {
        this.threadNamePrefix = threadNamePrefix;
    }

    @Override
    public Thread newThread(Runnable runnable) {
        Thread thread = new Thread(runnable);
        thread.setName(String.format(NAME_FORMAT, threadNamePrefix, System.currentTimeMillis()));
        return thread;
    }
}


public class LogDiscardRejectPolicy implements RejectedExecutionHandler {
    private static final Logger LOG = LoggerFactory.getLogger(LogDiscardRejectPolicy.class);

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        LOG.error("executor:{} task queue has full, runnable:{} discarded",executor,r);
        if (!(r instanceof PublishTask)) {
            return;
        }

        PublishTask publishTask = (PublishTask) r;
        InterceptHandler interceptHandler = publishTask.getInterceptHandler();
        if (!(interceptHandler instanceof RejectHandler)) {
            return;
        }

        ((RejectHandler)interceptHandler).rejectedExecution(r,executor);
    }
}


public interface RejectHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

BrokerInterceptor 創(chuàng)建線程池的邏輯改為

private BrokerInterceptor(int poolSize, int capacity, List handlers) {
        LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers));
        this.handlers = new HashMap<>();
        for (Class messageType : InterceptHandler.ALL_MESSAGE_TYPES) {
            this.handlers.put(messageType, new CopyOnWriteArrayList());
        }
        for (InterceptHandler handler : handlers) {
            this.addInterceptHandler(handler);
        }

        /** modify by liuhh */
        executor = ThreadPoolHelper.createFixedExecutor(poolSize, capacity, THREAD_POOL_NAME);
        //executor = Executors.newFixedThreadPool(poolSize);

    }

解釋:
(1)ThreadPoolHelper中的createFixedExecutor()方法為新建的線程池指定任務隊列大小和拒絕策略LogDiscardRejectPolicy
(2)在LogDiscardRejectPolicy中,首先將被拒絕的任務log一遍,對于PublishTask(moquette改造筆記(二):優(yōu)化BrokerInterceptor notifyTopicPublished()邏輯)做特殊處理,會交給實現RejectHandler的InterceptHandler處理,由業(yè)務邏輯決定,出現任務太多處理不完被遺棄的任務該如何處理。

修改InterceptHandler的實現,實現RejectHandler接口

在 moquette改造筆記(一):整合到SpringBoot 中修改SafetyInterceptHandler實現,添加對RejectHandler的實現如下

@Slf4j
@Component
public class SafetyInterceptHandler extends AbstractInterceptHandler{

    @Override
    public String getID() {
        return SafetyInterceptHandler.class.getName();
    }

    @Override
    public void onConnect(InterceptConnectMessage msg) {
        
    }

    @Override
    public void onConnectionLost(InterceptConnectionLostMessage msg) {
       
    }


    @Override
    public void onPublish(InterceptPublishMessage msg) {
       
    }


    @Override
    public void onMessageAcknowledged(InterceptAcknowledgedMessage msg) {
        
    }
    
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        /**MQTT SERVICE 負載過大,處理不過來時,會回調該方法*/
        //例如可以發(fā)生郵件通知相關人員
    }

}

moquette改造筆記(四):解決InterceptHandler中的onConnectionLost()調用兩次

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://systransis.cn/yun/77202.html

相關文章

  • moquette改造筆記(二):優(yōu)化BrokerInterceptor notifyTopicPu

    摘要:優(yōu)化邏輯優(yōu)化方向向啟動方法一樣,每次調用的方法都是在線程池中新建一個任務具體代碼解釋新建一個用來實現調用方法。改造筆記三優(yōu)化中的線程池 發(fā)現問題 下面部分是io.moquette.spi.impl.BrokerInterceptor.java部分源碼 @Override public void notifyClientConnected(final MqttConnectMes...

    liangzai_cool 評論0 收藏0
  • moquette改造筆記(一):整合到SpringBoot

    摘要:整合到本文更加注重代碼實踐,對于配置相關的知識會一筆帶過,不做過多的詳解。筆者是上傳到私服,然后通過導入。接口是預留給開發(fā)者根據不同事件處理業(yè)務邏輯的接口。改造筆記二優(yōu)化邏輯 Moquette簡介 Mqtt作為物聯網比較流行的協議現在已經被大范圍使用,其中也有很多開源的MQTT BROKEN。Moquette是用java基于netty實現的輕量級的MQTT BROKEN. Moquet...

    young.li 評論0 收藏0
  • moquette改造筆記(五):設備連接頻繁上下線或者相互頂替出現的設備上下線狀態(tài)錯亂問題

    摘要:發(fā)現問題在使用中發(fā)現在設備頻繁上下線和兩個設備一樣相互頂替連接的情況下,的和的方法調用沒有先后順序,如果在這兩個方法里面來記錄設備上下線狀態(tài),會造成狀態(tài)不對。因為相互頂替的情況并不多見,因此兩個也可以接受,在性能上并不會造成多大影響。 發(fā)現問題 在moquette使用中發(fā)現在設備頻繁上下線和兩個設備ClientId一樣相互頂替連接的情況下,InterceptHandler的onConn...

    betacat 評論0 收藏0
  • moquette改造筆記(四):解決InterceptHandler中的onConnectionLo

    摘要:發(fā)現問題在使用中設備異常斷開中的。在中事件都是在鏈中依次傳遞的。事件最后傳遞到。解決方法添加會導致調用兩次解釋會在該從鏈中移除掉時被調用,一般的話沒有手動從鏈中刪除時,會在連接斷開后回調該方法。 發(fā)現問題 在使用中設備異常斷開,InterceptHandler中的onConnectionLost()。經過調試發(fā)現是MoquetteIdleTimeoutHandler中的代碼導致的,代碼...

    joyqi 評論0 收藏0
  • 程序員筆記|如何編寫高性能的Java代碼

    摘要:常見標高線程上下文切換頻繁線程太多鎖競爭激烈標高如果的占用很高,排查涉及到的程序,比如把改造成。抖動問題原因字節(jié)碼轉為機器碼需要占用時間片,大量的在執(zhí)行字節(jié)碼時,導致長期處于高位現象,占用率最高解決辦法保證編譯線程的占比。 一、并發(fā) Unable to create new native thread …… 問題1:Java中創(chuàng)建一個線程消耗多少內存? 每個線程有獨自的棧內存,共享堆內...

    ky0ncheng 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<