摘要:支持通過(guò)調(diào)整構(gòu)造參數(shù)來(lái)配置不同的處理策略,本文主要介紹常用的策略配置方法以及應(yīng)用場(chǎng)景。對(duì)于這種場(chǎng)景,我們可以設(shè)置使用帶有長(zhǎng)度限制的隊(duì)列以及限定最大線程個(gè)數(shù)的線程池,同時(shí)通過(guò)設(shè)置處理任務(wù)被拒絕的情況。
ThreadPoolExecutor 是用來(lái)處理異步任務(wù)的一個(gè)接口,可以將其理解成為一個(gè)線程池和一個(gè)任務(wù)隊(duì)列,提交到 ExecutorService 對(duì)象的任務(wù)會(huì)被放入任務(wù)隊(duì)或者直接被線程池中的線程執(zhí)行。ThreadPoolExecutor 支持通過(guò)調(diào)整構(gòu)造參數(shù)來(lái)配置不同的處理策略,本文主要介紹常用的策略配置方法以及應(yīng)用場(chǎng)景。
ThreadPoolExecutor 的處理邏輯首先看一下 ThreadPoolExecutor 構(gòu)造函數(shù)的定義:
public ThreadPoolExecutor(int corePoolSize, //線程池核心線程數(shù)量 int maximumPoolSize, //線程池最大線程數(shù)量 long keepAliveTime, //線程KeepAlive時(shí)間,當(dāng)線程池?cái)?shù)量超過(guò)核心線程數(shù)量以后,idle時(shí)間超過(guò)這個(gè)值的線程會(huì)被終止 TimeUnit unit, //線程KeepAlive時(shí)間單位 BlockingQueueworkQueue, //任務(wù)隊(duì)列 ThreadFactory threadFactory, //創(chuàng)建線程的工廠對(duì)象 RejectedExecutionHandler handler) //任務(wù)被拒絕后調(diào)用的handler
ThreadPoolExecutor 對(duì)線程池和隊(duì)列的使用方式如下:
從線程池中獲取可用線程執(zhí)行任務(wù),如果沒(méi)有可用線程則使用ThreadFactory創(chuàng)建新的線程,直到線程數(shù)達(dá)到corePoolSize限制
線程池線程數(shù)達(dá)到corePoolSize以后,新的任務(wù)將被放入隊(duì)列,直到隊(duì)列不能再容納更多的任務(wù)
當(dāng)隊(duì)列不能再容納更多的任務(wù)以后,會(huì)創(chuàng)建新的線程,直到線程數(shù)達(dá)到maxinumPoolSize限制
線程數(shù)達(dá)到maxinumPoolSize限制以后新任務(wù)會(huì)被拒絕執(zhí)行,調(diào)用 RejectedExecutionHandler 進(jìn)行處理
三種常用的 ThreadPoolExecutorExecutors 是提供了一組工廠方法用于創(chuàng)建常用的 ExecutorService ,分別是 FixedThreadPool,CachedThreadPool 以及 SingleThreadExecutor。這三種ThreadPoolExecutor都是調(diào)用 ThreadPoolExecutor 構(gòu)造函數(shù)進(jìn)行創(chuàng)建,區(qū)別在于參數(shù)不同。
FixedThreadPool - 線程池大小固定,任務(wù)隊(duì)列無(wú)界下面是 Executors 類 newFixedThreadPool 方法的源碼:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
可以看到 corePoolSize 和 maximumPoolSize 設(shè)置成了相同的值,此時(shí)不存在線程數(shù)量大于核心線程數(shù)量的情況,所以KeepAlive時(shí)間設(shè)置不會(huì)生效。任務(wù)隊(duì)列使用的是不限制大小的 LinkedBlockingQueue ,由于是無(wú)界隊(duì)列所以容納的任務(wù)數(shù)量沒(méi)有上限。
因此,F(xiàn)ixedThreadPool的行為如下:
從線程池中獲取可用線程執(zhí)行任務(wù),如果沒(méi)有可用線程則使用ThreadFactory創(chuàng)建新的線程,直到線程數(shù)達(dá)到nThreads
線程池線程數(shù)達(dá)到nThreads以后,新的任務(wù)將被放入隊(duì)列
FixedThreadPool的優(yōu)點(diǎn)是能夠保證所有的任務(wù)都被執(zhí)行,永遠(yuǎn)不會(huì)拒絕新的任務(wù);同時(shí)缺點(diǎn)是隊(duì)列數(shù)量沒(méi)有限制,在任務(wù)執(zhí)行時(shí)間無(wú)限延長(zhǎng)的這種極端情況下會(huì)造成內(nèi)存問(wèn)題。
SingleThreadExecutor - 線程池大小固定為1,任務(wù)隊(duì)列無(wú)界public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }
這個(gè)工廠方法中使用無(wú)界LinkedBlockingQueue,并的將線程數(shù)設(shè)置成1,除此以外還使用FinalizableDelegatedExecutorService類進(jìn)行了包裝。這個(gè)包裝類的主要目的是為了屏蔽ThreadPoolExecutor中動(dòng)態(tài)修改線程數(shù)量的功能,僅保留ExecutorService中提供的方法。雖然是單線程處理,一旦線程因?yàn)樘幚懋惓5仍蚪K止的時(shí)候,ThreadPoolExecutor會(huì)自動(dòng)創(chuàng)建一個(gè)新的線程繼續(xù)進(jìn)行工作。
SingleThreadExecutor 適用于在邏輯上需要單線程處理任務(wù)的場(chǎng)景,同時(shí)無(wú)界的LinkedBlockingQueue保證新任務(wù)都能夠放入隊(duì)列,不會(huì)被拒絕;缺點(diǎn)和FixedThreadPool相同,當(dāng)處理任務(wù)無(wú)限等待的時(shí)候會(huì)造成內(nèi)存問(wèn)題。
CachedThreadPool - 線程池?zé)o限大(MAX INT),等待隊(duì)列長(zhǎng)度為1public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
SynchronousQueue是一個(gè)只有1個(gè)元素的隊(duì)列,入隊(duì)的任務(wù)需要一直等待直到隊(duì)列中的元素被移出。核心線程數(shù)是0,意味著所有任務(wù)會(huì)先入隊(duì)列;最大線程數(shù)是Integer.MAX_VALUE,可以認(rèn)為線程數(shù)量是沒(méi)有限制的。KeepAlive時(shí)間被設(shè)置成60秒,意味著在沒(méi)有任務(wù)的時(shí)候線程等待60秒以后退出。CachedThreadPool對(duì)任務(wù)的處理策略是提交的任務(wù)會(huì)立即分配一個(gè)線程進(jìn)行執(zhí)行,線程池中線程數(shù)量會(huì)隨著任務(wù)數(shù)的變化自動(dòng)擴(kuò)張和縮減,在任務(wù)執(zhí)行時(shí)間無(wú)限延長(zhǎng)的極端情況下會(huì)創(chuàng)建過(guò)多的線程。
三種ExecutorService特性總結(jié)類型 | 核心線程數(shù) | 最大線程數(shù) | Keep Alive 時(shí)間 | 任務(wù)隊(duì)列 | 任務(wù)處理策略 |
---|---|---|---|---|---|
FixedThreadPool | 固定大小 | 固定大?。ㄅc核心線程數(shù)相同) | 0 | LinkedBlockingQueue | 線程池大小固定,沒(méi)有可用線程的時(shí)候任務(wù)會(huì)放入隊(duì)列等待,隊(duì)列長(zhǎng)度無(wú)限制 |
SingleThreadExecutor | 1 | 1 | 0 | LinkedBlockingQueue | 與 FixedThreadPool 相同,區(qū)別在于線程池的大小為1,適用于業(yè)務(wù)邏輯上只允許1個(gè)線程進(jìn)行處理的場(chǎng)景 |
CachedThreadPool | 0 | Integer.MAX_VALUE | 1分鐘 | SynchronousQueue | 線程池的數(shù)量無(wú)限大,新任務(wù)會(huì)直接分配或者創(chuàng)建一個(gè)線程進(jìn)行執(zhí)行 |
我們也可以通過(guò)修改 ThreadPoolExecutor 的構(gòu)造函數(shù)來(lái)自定義任務(wù)處理策略。例如面對(duì)的業(yè)務(wù)是將數(shù)據(jù)異步寫入HBase,當(dāng)HBase嚴(yán)重超時(shí)的時(shí)候允許寫入失敗并記錄日志以便事后補(bǔ)寫。對(duì)于這種應(yīng)用場(chǎng)景,如果使用FixedThreadPool,在HBase服務(wù)嚴(yán)重超時(shí)的時(shí)候會(huì)導(dǎo)致隊(duì)列無(wú)限增長(zhǎng),引發(fā)內(nèi)存問(wèn)題;如果使用CachedThreadPool,會(huì)導(dǎo)致線程數(shù)量無(wú)限增長(zhǎng)。對(duì)于這種場(chǎng)景,我們可以設(shè)置ExecutorService使用帶有長(zhǎng)度限制的隊(duì)列以及限定最大線程個(gè)數(shù)的線程池,同時(shí)通過(guò)設(shè)置RejectedExecutionHandler處理任務(wù)被拒絕的情況。
首先定義 RejectedExecutionHandler:
public class MyRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 處理任務(wù)被拒絕的情況,例如記錄日志等 } }
創(chuàng)建 ThreadPoolExecutor:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 10, //核心線程數(shù)設(shè)置成10 30, //線程池最大線程數(shù)為30 30, TimeUnit.SECONDS, //超過(guò)核心線程數(shù)量的線程idle 30秒之后會(huì)退出 new ArrayBlockingQueue(100), //隊(duì)列長(zhǎng)度為100 new MyRejectedExecutionHandler() //任務(wù)被拒絕以后的處理類 );
這樣設(shè)置以后,如果任務(wù)處理函數(shù)出現(xiàn)長(zhǎng)時(shí)間掛起的情況,會(huì)依次發(fā)生下列現(xiàn)象:
線程池線程數(shù)量達(dá)到核心線程數(shù),向ArrayBlockingQueue中放入任務(wù)
ArrayBlockingQueue達(dá)到上限,創(chuàng)建新的線程進(jìn)行處理
線程池中的線程數(shù)量達(dá)到30個(gè),調(diào)用MyRejectedExecutionHandler處理新提交的任務(wù)
總結(jié)對(duì)于需要保證所有提交的任務(wù)都要被執(zhí)行的情況,使用FixedThreadPool
如果限定只能使用一個(gè)線程進(jìn)行任務(wù)處理,使用SingleThreadExecutor
如果希望提交的任務(wù)盡快分配線程執(zhí)行,使用CachedThreadPool
如果業(yè)務(wù)上允許任務(wù)執(zhí)行失敗,或者任務(wù)執(zhí)行過(guò)程可能出現(xiàn)執(zhí)行時(shí)間過(guò)長(zhǎng)進(jìn)而影響其他業(yè)務(wù)的應(yīng)用場(chǎng)景,可以通過(guò)使用限定線程數(shù)量的線程池以及限定長(zhǎng)度的隊(duì)列進(jìn)行容錯(cuò)處理。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/69864.html
摘要:限制線程池運(yùn)行線程以及等待線程數(shù)量的策略對(duì)于所提供的,可以保證可以在內(nèi)存中有固定數(shù)量的線程數(shù)運(yùn)行。指的是當(dāng)線程池拒絕該任務(wù)的時(shí)候,線程在本地線程直接。由此限制了線程池的等待線程數(shù)與執(zhí)行線程數(shù) 限制Java線程池運(yùn)行線程以及等待線程數(shù)量的策略 對(duì)于java.util.concurrent.Executors所提供的FixedThreadPool,可以保證可以在內(nèi)存中有固定數(shù)量的線程數(shù)運(yùn)行...
摘要:提交任務(wù)當(dāng)創(chuàng)建了一個(gè)線程池之后我們就可以將任務(wù)提交到線程池中執(zhí)行了。提交任務(wù)到線程池中相當(dāng)簡(jiǎn)單,我們只要把原來(lái)傳入類構(gòu)造器的對(duì)象傳入線程池的方法或者方法就可以了。 我們一般不會(huì)選擇直接使用線程類Thread進(jìn)行多線程編程,而是使用更方便的線程池來(lái)進(jìn)行任務(wù)的調(diào)度和管理。線程池就像共享單車,我們只要在我們有需要的時(shí)候去獲取就可以了。甚至可以說(shuō)線程池更棒,我們只需要把任務(wù)提交給它,它就會(huì)在合...
摘要:中的線程池運(yùn)用場(chǎng)景非常廣泛,幾乎所有的一步或者并發(fā)執(zhí)行程序都可以使用。代碼中如果執(zhí)行了方法,線程池會(huì)提前創(chuàng)建并啟動(dòng)所有核心線程。線程池最大數(shù)量線程池允許創(chuàng)建的線程最大數(shù)量。被稱為是可重用固定線程數(shù)的線程池。 Java中的線程池運(yùn)用場(chǎng)景非常廣泛,幾乎所有的一步或者并發(fā)執(zhí)行程序都可以使用。那么線程池有什么好處呢,以及他的實(shí)現(xiàn)原理是怎么樣的呢? 使用線程池的好處 在開發(fā)過(guò)程中,合理的使用線程...
摘要:去美團(tuán)面試,問(wèn)到了什么是線程池,如何使用,為什么要用以下做個(gè)總結(jié)。二線程池線程池的作用線程池作用就是限制系統(tǒng)中執(zhí)行線程的數(shù)量。真正的線程池接口是。創(chuàng)建固定大小的線程池。此線程池支持定時(shí)以及周期性執(zhí)行任務(wù)的需求。 去美團(tuán)面試,問(wèn)到了什么是線程池,如何使用,為什么要用,以下做個(gè)總結(jié)。關(guān)于線程之前也寫過(guò)一篇文章《高級(jí)面試題總結(jié)—線程池還能這么玩?》 1、什么是線程池:? java.util...
摘要:去美團(tuán)面試,問(wèn)到了什么是線程池,如何使用,為什么要用以下做個(gè)總結(jié)。二線程池線程池的作用線程池作用就是限制系統(tǒng)中執(zhí)行線程的數(shù)量。真正的線程池接口是。創(chuàng)建固定大小的線程池。此線程池支持定時(shí)以及周期性執(zhí)行任務(wù)的需求。 去美團(tuán)面試,問(wèn)到了什么是線程池,如何使用,為什么要用,以下做個(gè)總結(jié)。關(guān)于線程之前也寫過(guò)一篇文章《高級(jí)面試題總結(jié)—線程池還能這么玩?》 1、什么是線程池:? java.util...
閱讀 2386·2021-11-24 10:31
閱讀 3438·2021-11-23 09:51
閱讀 2252·2021-11-15 18:11
閱讀 2398·2021-09-02 15:15
閱讀 2463·2019-08-29 17:02
閱讀 2296·2019-08-29 15:04
閱讀 843·2019-08-29 12:27
閱讀 2867·2019-08-28 18:15