摘要:線程池常見(jiàn)實(shí)現(xiàn)線程池一般包含三個(gè)主要部分調(diào)度器決定由哪個(gè)線程來(lái)執(zhí)行任務(wù)執(zhí)行任務(wù)所能夠的最大耗時(shí)等線程隊(duì)列存放并管理著一系列線程這些線程都處于阻塞狀態(tài)或休眠狀態(tài)任務(wù)隊(duì)列存放著用戶提交的需要被執(zhí)行的任務(wù)一般任務(wù)的執(zhí)行的即先提交的任務(wù)先被執(zhí)行調(diào)度
線程池常見(jiàn)實(shí)現(xiàn)
線程池一般包含三個(gè)主要部分:
調(diào)度器: 決定由哪個(gè)線程來(lái)執(zhí)行任務(wù), 執(zhí)行任務(wù)所能夠的最大耗時(shí)等
線程隊(duì)列: 存放并管理著一系列線程, 這些線程都處于阻塞狀態(tài)或休眠狀態(tài)
任務(wù)隊(duì)列: 存放著用戶提交的需要被執(zhí)行的任務(wù). 一般任務(wù)的執(zhí)行 FIFO 的, 即先提交的任務(wù)先被執(zhí)行
調(diào)度器并非是必須的, 例如 Java 中實(shí)現(xiàn)的 ThreadPoolExecutor 就沒(méi)有調(diào)度器, 而是所有的線程都不斷從任務(wù)隊(duì)列中取出任務(wù), 然后執(zhí)行.
線程池模型可以用下圖簡(jiǎn)單地表示:
ThreadPoolExecutor 有兩個(gè)參數(shù)用于控制線程池中線程的個(gè)數(shù): corePoolSize 和 maximumPoolSize, 根據(jù)這兩個(gè)參數(shù), ThreadPoolExecutor 會(huì)自適應(yīng)地調(diào)整線程個(gè)數(shù), 以適應(yīng)不同的任務(wù)數(shù).
當(dāng)我們通過(guò) execute(Runnable) 提交一個(gè)任務(wù)時(shí):
如果此時(shí)線程池中線程個(gè)數(shù)小于 corePoolSize, 則此任務(wù)不會(huì)插入到任務(wù)隊(duì)列中, 而是直接創(chuàng)建一個(gè)新的線程來(lái)執(zhí)行此任務(wù), 即使當(dāng)前線程池中有空閑的線程.
如果線程數(shù)大于 corePoolSize 但是小于 maximumPoolSize:
如果任務(wù)隊(duì)列還未滿, 則會(huì)將此任務(wù)插入到任務(wù)隊(duì)列末尾;
如果此時(shí)任務(wù)隊(duì)列已滿, 則會(huì)創(chuàng)建新的線程來(lái)執(zhí)行此任務(wù).
如果線程數(shù)等于 maximumPoolSize:
如果任務(wù)隊(duì)列還未滿, 則會(huì)將此任務(wù)插入到任務(wù)隊(duì)列末尾;
如果此時(shí)任務(wù)隊(duì)列已滿, 則會(huì)又 RejectedExecutionHandler 處理, 默認(rèn)情況下是拋出 RejectedExecutionException 異常.
線程的 Keep-Alive 時(shí)間在創(chuàng)建一個(gè)線程池時(shí), 我們可以指定線程池中的線程的最大空閑(Idle)時(shí)間, 線程池會(huì)根據(jù)我們?cè)O(shè)置的這個(gè)值來(lái)動(dòng)態(tài)的減少不必要的線程, 釋放系統(tǒng)資源.
當(dāng)我們的線程池中的線程數(shù)大于 corePoolSize 時(shí), 如果此時(shí)有線程處于空閑(Idle)狀態(tài)超過(guò)指定的時(shí)間(keepAliveTime), 那么線程池會(huì)將此線程銷(xiāo)毀.
工作隊(duì)列(WorkQueue) 是 一個(gè) BlockingQueue, 它時(shí)用于存放那些已經(jīng)提交的, 但是還沒(méi)有空余線程來(lái)執(zhí)行的任務(wù). 例如我們?cè)谇懊?線程池大小 一節(jié)中討論的情況, 如果當(dāng)前的線程數(shù)大于 corePoolSize 并且工作隊(duì)列的還有剩余空間, 那么新提交的任務(wù)就會(huì)先放到工作隊(duì)列中.
根據(jù) Java Docs, 有三種常見(jiàn)的工作隊(duì)列的使用場(chǎng)景:
直接切換(Direct handoffs): 一個(gè)不錯(cuò)并且是默認(rèn)的工作隊(duì)列的選擇時(shí)
無(wú)界隊(duì)列(Unbounded queues)
有界隊(duì)列(Bounded queues)
任務(wù)提交失敗處理因?yàn)榫€程池中維護(hù)有一個(gè)工作隊(duì)列, 我們自然地會(huì)想到, 當(dāng)線程池中的工作隊(duì)列滿了, 不能再添加新的任務(wù)了, 此時(shí)線程池會(huì)怎么處理呢?
一般來(lái)說(shuō), 當(dāng)我們提交一個(gè)任務(wù)到線程池中, 如果此時(shí)線程池不能再添加任務(wù)了, 那么通常會(huì)返回一個(gè)錯(cuò)誤, 或者是調(diào)用我們預(yù)先設(shè)置的一個(gè)錯(cuò)誤處理 handler, 例如在 Java ThreadPoolExecutor 中, 我們可以通過(guò)如下方式實(shí)例化一個(gè)帶有任務(wù)提交失敗 handler 的線程池:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 100, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("Task " + r.toString() + " failed!"); } });活躍線程數(shù)與線程池狀態(tài)
ThreadPoolExecutor 中有一個(gè)名為 ctl 的字段, 它是一個(gè) AtomicInteger 類(lèi)型, ThreadPoolExecutor 復(fù)用了此字段來(lái)表示兩個(gè)信息:
當(dāng)前活躍的線程數(shù)
線程池狀態(tài)
ctl 是一個(gè) AtomicInteger 類(lèi)型, 它的 低29位 用于存放當(dāng)前的線程數(shù), 因此一個(gè)線程池在理論上最大的線程數(shù)是 536870911; 高 3 位是用于表示當(dāng)前線程池的狀態(tài), 其中高三位的值和狀態(tài)對(duì)應(yīng)如下:
111: RUNNING
000: SHUTDOWN
001: STOP
010: TIDYING
110: TERMINATED
線程池的基本使用 創(chuàng)建線程池前面我們提到, 一個(gè)線程池中有 corePoolSize, maximumPoolSize, keepAliveTime, workQueue 之類(lèi)的概念, 這些屬性我們必須在實(shí)例化線程池時(shí)通過(guò)構(gòu)造器傳入. Java 線程池實(shí)現(xiàn)類(lèi) ThreadPoolExecutor 中提供了不少構(gòu)造方法, 我們來(lái)看一下其中兩個(gè)常用的構(gòu)造器:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
可以看到, 在實(shí)例化一個(gè) ThreadPoolExecutor 線程池時(shí), 我們需要指定一些線程池的基本屬性, 并且可選地, 我們還可以指定當(dāng)任務(wù)提交失敗時(shí)的處理 handler.
例如我們可以通過(guò)如下方式實(shí)例化一個(gè)帶有任務(wù)提交失敗 handler 的線程池:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 100, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("Task " + r.toString() + " failed!"); } });
當(dāng)然, 除了上述使用構(gòu)造器來(lái)直接創(chuàng)建線程池, Java 還提供了幾個(gè)簡(jiǎn)便地創(chuàng)建線程池的方法:
Executors.newCachedThreadPool
Executors.newFixedThreadPool
Executors.newWorkStealingPool
Executors.newSingleThreadExecutor
Executors.newScheduledThreadPool
例如我們想創(chuàng)建一個(gè)有五個(gè)線程的線程池, 那么可以調(diào)用 Executors.newFixedThreadPool, 這個(gè)方法等效于:
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue提交任務(wù)())
提交任務(wù)到線程池中比較簡(jiǎn)單, 如果是 ThreadPoolExecutor 類(lèi)型的線程池, 我們直接調(diào)用它的 execute 方法即可, 例如:
ExecutorService executorService = ... executorService.execute(new Runnable() { @Override public void run() { System.out.println("OK, thread name: " + Thread.currentThread().getName()); } });
如果我們獲取到一個(gè) ScheduledThreadPoolExecutor 類(lèi)型的線程池, 那么除了調(diào)用 execute 方法外, 我們還可以通過(guò)調(diào)用 schedule 方法提交一個(gè)定時(shí)任務(wù), 例如:
ScheduledExecutorService executorService = xxx executorService.schedule(new Runnable() { @Override public void run() { System.out.println("OK, thread name: " + Thread.currentThread().getName()); } }, 1, TimeUnit.SECONDS);
上面代代碼就會(huì)在1秒后執(zhí)行我們的定時(shí)任務(wù).
關(guān)閉線程池Java 線程池提供了兩個(gè)方法用于關(guān)閉一個(gè)線程池, 一個(gè)是 shutdownNow(), 另一個(gè)是 shutdown(). 我們可以看一下這兩個(gè)方法的簽名:
void shutdown(); ListshutdownNow();
這兩個(gè)方法除了名字不一樣外(廢話), 它們的返回值也不太一樣.
那么這兩個(gè)方法到底有什么區(qū)別呢? 它們的區(qū)別有:
當(dāng)線程池調(diào)用該方法時(shí),線程池的狀態(tài)則立刻變成 SHUTDOWN 狀態(tài). 我們不能再往線程池中添加任何任務(wù), 否則將會(huì)拋出RejectedExecutionException異常; 但是, 此時(shí)線程池不會(huì)立刻退出, 直到添加到線程池中的任務(wù)都已經(jīng)處理完成后才會(huì)退出.
當(dāng)執(zhí)行該方法, 線程池的狀態(tài)立刻變成STOP狀態(tài), 并試圖停止所有正在執(zhí)行的線程, 不再處理還在池隊(duì)列中等待的任務(wù), 并以返回值的形式返回那些未執(zhí)行的任務(wù).
此方法會(huì)通過(guò)調(diào)用 Thread.interrupt() 方法來(lái)試圖停止正在運(yùn)行的 Worker 線程, 但是這種方法的作用有限, 如果線程中沒(méi)有 sleep 、wait、Condition、定時(shí)鎖 等操作時(shí), interrupt() 方法是無(wú)法中斷當(dāng)前的線程的. 所以, ShutdownNow() 并不代表線程池就一定立即就能退出, 可能必須要等待所有正在執(zhí)行的任務(wù)都執(zhí)行完成了才能退出.
廢話了一大堆, 我們來(lái)看一下具體的例子吧:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5); executorService.schedule(new Runnable() { @Override public void run() { System.out.println("OK, thread name: " + Thread.currentThread().getName()); } }, 1, TimeUnit.SECONDS); // 調(diào)用此方法關(guān)閉線程時(shí), 我們提交的定時(shí)任務(wù)不會(huì)被執(zhí)行 // executorService.shutdownNow(); executorService.shutdown();
可以看到, 如果我們調(diào)用的是 executorService.shutdownNow(), 那么原先提交的未執(zhí)行的定時(shí)任務(wù)并不會(huì)再被執(zhí)行, 但是如果我們調(diào)用的是 executorService.shutdown(), 那么此調(diào)用會(huì)阻塞住, 直到所有提交的任務(wù)都執(zhí)行完畢才會(huì)返回.
代碼分析 線程池的屬性字段在開(kāi)始深入了解 ThreadPoolExecutor 代碼之前, 我們先來(lái)簡(jiǎn)單地看一下 ThreadPoolExecutor 類(lèi)中到底有哪些重要的字段.
public class ThreadPoolExecutor extends AbstractExecutorService { // 這個(gè)是一個(gè)復(fù)用字段, 它復(fù)用地表示了當(dāng)前線程池的狀態(tài), 當(dāng)前線程數(shù)信息. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 用于存放提交到線程池中, 但是還未執(zhí)行的那些任務(wù). private final BlockingQueueworkQueue; // 線程池內(nèi)部鎖, 對(duì)線程池內(nèi)部操作加鎖, 防止競(jìng)態(tài)條件 private final ReentrantLock mainLock = new ReentrantLock(); // 一個(gè) Set 結(jié)構(gòu), 包含了當(dāng)前線程池中的所有工作線程. // 對(duì) workers 字段的操作前, 需要獲取到這個(gè)鎖. private final HashSet workers = new HashSet (); // 條件變量, 用于支持 awaitTermination 操作 private final Condition termination = mainLock.newCondition(); // 記錄線程池中曾經(jīng)到達(dá)過(guò)的最大的線程數(shù). // 這個(gè)字段在獲取 mainLock 鎖的前提下才能操作. private int largestPoolSize; // 記錄已經(jīng)完成的任務(wù)數(shù). 僅僅當(dāng)工作線程結(jié)束時(shí)才更新此字段. // 這個(gè)字段在獲取 mainLock 鎖的前提下才能操作. private long completedTaskCount; // 線程工廠. 當(dāng)需要一個(gè)新的線程時(shí), 這里生成. private volatile ThreadFactory threadFactory; // 任務(wù)提交失敗后的處理 handler private volatile RejectedExecutionHandler handler; // 空閑線程的等待任務(wù)時(shí)間, 以納秒為單位. // 當(dāng)當(dāng)前線程池中的線程數(shù)大于 corePoolSize 時(shí), // 或者 allowCoreThreadTimeOut 為真時(shí), 線程才有 idle 等待超時(shí)時(shí)間, // 如果超時(shí)則此線程會(huì)停止.; // 反之線程會(huì)一直等待新任務(wù)到來(lái). private volatile long keepAliveTime; // 默認(rèn)為 false. // 當(dāng)為 false 時(shí), keepAliveTime 不起作用, 線程池中的 core 線程會(huì)一直存活, // 即使這些線程是 idle 狀態(tài). // 當(dāng)為 true 時(shí), core 線程使用 keepAliveTime 作為 idle 超時(shí) // 時(shí)間來(lái)等待新的任務(wù). private volatile boolean allowCoreThreadTimeOut; // 核心線程數(shù). private volatile int corePoolSize; // 最大線程數(shù). private volatile int maximumPoolSize; }
ThreadPoolExecutor 中, 使用到 ctl 這個(gè)字段來(lái)維護(hù)線程池中當(dāng)前線程數(shù)和線程池的狀態(tài). ctl 是一個(gè) AtomicInteger 類(lèi)型, 它的 低29位 用于存放當(dāng)前的線程數(shù), 因此一個(gè)線程池在理論上最大的線程數(shù)是 536870911; 高 3 位是用于表示當(dāng)前線程池的狀態(tài), 其中高三位的值和狀態(tài)對(duì)應(yīng)如下:
111: RUNNING
000: SHUTDOWN
001: STOP
010: TIDYING
110: TERMINATED
提交任務(wù)到線程池public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
上面的代碼有三個(gè)步驟, 首先第一步是檢查當(dāng)前線程池的線程數(shù)是否小于 corePoolSize, 如果小于, 那么由我們前面提到的規(guī)則, 線程池會(huì)創(chuàng)建一個(gè)新的線程來(lái)執(zhí)行此任務(wù), 因此在第一個(gè) if 語(yǔ)句中, 會(huì)調(diào)用 addWorker(command, true) 來(lái)創(chuàng)建一個(gè)新 Worker 線程, 并執(zhí)行此任務(wù). addWorker 的第二個(gè)參數(shù)是一個(gè) boolean 類(lèi)型的, 它的作用是用于標(biāo)識(shí)是否需要使用 corePoolSize 字段, 如果它為真, 則添加新任務(wù)時(shí), 需要考慮到 corePoolSize 字段的影響. 這里至于 addWorker 內(nèi)部的實(shí)現(xiàn)細(xì)節(jié)我們暫且不管, 先把整個(gè)提交任務(wù)的大體脈絡(luò)理清了再說(shuō).
如果前面的判斷不滿足, 那么會(huì)將此任務(wù)插入到工作隊(duì)列中, 即 workQueue.offer(command). 當(dāng)然, 為了健壯性考慮, 當(dāng)插入到 workQueue 后, 我們還需要再次檢查一下此時(shí)線程池是否還是 RUNNING 狀態(tài), 如果不是的話就會(huì)將原來(lái)插入隊(duì)列中的那個(gè)任務(wù)刪除, 然后調(diào)用 reject 方法拒絕此任務(wù)的提交; 接著考慮到在我們插入任務(wù)到 workQueue 中的同時(shí), 如果此時(shí)線程池中的線程都執(zhí)行完畢并終止了, 在這樣的情況下剛剛插入到 workQueue 中的任務(wù)就永遠(yuǎn)不會(huì)得到執(zhí)行了. 為了避免這樣的情況, 因此我們由再次檢查一下線程池中的線程數(shù), 如果為零, 則調(diào)用 addWorker(null, false) 來(lái)添加一個(gè)線程.
如果前面所分析的情況都不滿足, 那么就會(huì)進(jìn)入到第三個(gè) if 判斷, 在這里會(huì)調(diào)用 addWorker(command, false) 來(lái)將此任務(wù)提交到線程池中. 注意到這個(gè)方法的第二個(gè)參數(shù)是 false, 表示我們?cè)诖舜握{(diào)用 addWorker 時(shí), 不考慮 corePoolSize 的影響, 即忽略 corePoolSize 字段.
前面我們大體分析了一下 execute 提交任務(wù)的流程, 不過(guò)省略了一個(gè)關(guān)鍵步驟, 即 addWorker 方法. 現(xiàn)在我們就來(lái)揭開(kāi)它的神秘面紗吧.
首先看一下 addWorker 方法的簽名:
private boolean addWorker(Runnable firstTask, boolean core)
這個(gè)方法接收兩個(gè)參數(shù), 第一個(gè)是一個(gè) Runnable 類(lèi)型的, 一般來(lái)說(shuō)是我們調(diào)用 execute 方法所傳輸?shù)膮?shù), 不過(guò)也有可能是 null 值, 這樣的情況我們?cè)谇懊嬉恍」?jié)中也見(jiàn)到過(guò).
那么第二個(gè)參數(shù)是做什么的呢? 第二個(gè)參數(shù)是一個(gè) boolean 類(lèi)型的變量, 它的作用是標(biāo)識(shí)是否使用 corePoolSize 屬性. 我們知道, ThreadPoolExecutor 中, 有一個(gè) corePoolSize 屬性, 用于動(dòng)態(tài)調(diào)整線程池中的核心線程數(shù). 那么當(dāng) core 這個(gè)參數(shù)是 true 時(shí), 則表示在添加新任務(wù)時(shí), 需要考慮到 corePoolSzie 的影響(例如如果此時(shí)線程數(shù)已經(jīng)大于 corePoolSize 了, 那么就不能再添加新線程了); 當(dāng) core 為 false 時(shí), 就不考慮 corePoolSize 的影響(其實(shí)代碼中是以 maximumPoolSize 作為 corePoolSize 來(lái)做判斷條件的), 一有新任務(wù), 就對(duì)應(yīng)地生成一個(gè)新的線程.
說(shuō)了這么多, 還不如來(lái)看一下 addWorker 的源碼吧:
private boolean addWorker(Runnable firstTask, boolean core) { // 這里一大段的 for 語(yǔ)句, 其實(shí)就是判斷和處理 core 參數(shù)的. // 當(dāng)經(jīng)過(guò)判斷, 如果當(dāng)前的線程大于 corePoolSize 或 maximumPoolSize 時(shí)(根據(jù) core 的值來(lái)判斷), // 則表示不能新建新的 Worker 線程, 此時(shí)返回 false. retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 當(dāng) core 為真, 那么就判斷當(dāng)前線程是否大于 corePoolSize // 當(dāng) core 為假, 那么就判斷當(dāng)前線程數(shù)是否大于 maximumPoolSize // 這里的 for 循環(huán)是一個(gè)自旋CAS(CompareAndSwap)操作, 用于確保多線程環(huán)境下的正確性 if (wc >= CAPACITY || wc >= (core ? corePoolSize : ma)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
首先在 addWorker 的一開(kāi)始, 有一個(gè) for 循環(huán), 用于判斷當(dāng)前是否可以添加新的 Worker 線程. 它的邏輯如下:
如果傳入的 core 為真, 那么判斷當(dāng)前的線程數(shù)是否大于 corePoolSize, 如果大于, 則不能新建 Worker 線程, 返回 false.
如果傳入的 core 為假, 那么判斷當(dāng)前的線程數(shù)是否大于 maximumPoolSize, 如果大于, 則不能新建 Worker 線程, 返回 false.
如果條件符合, 那么在 for 循環(huán)內(nèi), 又有一個(gè)自旋CAS 更新邏輯, 用于遞增當(dāng)前的線程數(shù), 即 compareAndIncrementWorkerCount(c), 這個(gè)方法會(huì)原子地更新 ctl 的值, 將當(dāng)前線程數(shù)的值遞增一.
addWorker 接下來(lái)有一個(gè) try...finally 語(yǔ)句塊, 這里就是實(shí)際上的創(chuàng)建線程、啟動(dòng)線程、添加線程到線程池中的工作了.
首先可以看到 w = new Worker(firstTask); 這里是實(shí)例化一個(gè) Worker 對(duì)象, 這個(gè)類(lèi)其實(shí)就是 ThreadPoolExecutor 中對(duì)工作線程的封裝. Worker 類(lèi)繼承于 AbstractQueuedSynchronizer 并實(shí)現(xiàn)了 Runnable 接口, 我們來(lái)看一下它的構(gòu)造器:
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
它會(huì)把我們提交的任務(wù)(firstTask) 設(shè)置為自己的內(nèi)部屬性 firstTask, 然后呢, 使用 ThreadPoolExecutor 中的 threadFactory 來(lái)創(chuàng)建一個(gè)新的線程, 并保存在 thread 字段中, 而且注意到, 創(chuàng)建線程時(shí), 我們傳遞給新線程城的 Runnable 其實(shí)是 Worker 對(duì)象本身(this), 因此當(dāng)這個(gè)線程啟動(dòng)時(shí), 實(shí)際上運(yùn)行的是 Worker.run() 中的代碼.
回過(guò)頭來(lái)再看一下 addWorker 方法. 當(dāng)創(chuàng)建好 Worker 線程后, 就會(huì)將這個(gè) worker 線程存放在 workers 這個(gè) HashSet
最后別忘啦, 新建了一個(gè)線程后, 需要調(diào)用它的 start() 方法后, 這個(gè)線程才真正地運(yùn)行, 因此我們可以看到, 在 addWorker 方法的最后, 調(diào)用了 t.start(); 來(lái)啟動(dòng)這個(gè)新建的線程.
任務(wù)的分配與調(diào)度我們已經(jīng)分析了工作線程的創(chuàng)建和任務(wù)插入到 wokerQuque 的過(guò)程, 那么根據(jù)本文最開(kāi)頭的線程池工作模型可知, 光有工作線程和工作隊(duì)列還不行啊, 還需要有一個(gè)調(diào)度器, 把任務(wù)和工作線程關(guān)聯(lián)起來(lái)才是一個(gè)真正的線程池.
在 ThreadPoolExecutor 中, 調(diào)度器的實(shí)現(xiàn)很簡(jiǎn)單, 其實(shí)就是每個(gè)工作線程在執(zhí)行完一個(gè)任務(wù)后, 會(huì)再次中 workQueue 中拿出下一個(gè)任務(wù), 如果獲取到了任務(wù), 那么就再次執(zhí)行.
我們來(lái)看一下具體的代碼實(shí)現(xiàn)吧.
在前面一小節(jié)中, 我們講到 addWorker 中會(huì)新建一個(gè) Worker 對(duì)象來(lái)代表一個(gè) worker 線程, 接著會(huì)調(diào)用線程的 start() 來(lái)啟動(dòng)這個(gè)線程, 我們也提到了當(dāng)啟動(dòng)這個(gè)線程后, 會(huì)運(yùn)行到 Worker 中的 run 方法, 那么這里我們就來(lái)看一下 Worker.run有什么玄機(jī)吧:
public void run() { runWorker(this); }
Worker.run 方法很簡(jiǎn)單, 只是調(diào)用了 ThreadPoolExecutor.runWorker 方法而已.
runWorker 方法比較關(guān)鍵, 它是整個(gè)線程池任務(wù)分配的核心:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
runWorker 方法是整個(gè)工作線程的核心循環(huán), 在這個(gè)循環(huán)中, 工作線程會(huì)不斷的從 workerQuque 中獲取新的 task, 然后執(zhí)行它.
我們注意到在 runWorker 一開(kāi)始, 有一個(gè) w.unlock();, 咦, 這是為什么呢? 其實(shí)這是 Worker 類(lèi)玩的一個(gè)小把戲. 回想一下, Worker 類(lèi)繼承于 AbstractQueuedSynchronizer 并實(shí)現(xiàn)了 Runnable 接口, 它的構(gòu)造器如下:
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
setState(-1); 方法是 AbstractQueuedSynchronizer 提供的, 初始化 Worker 時(shí), 會(huì)先設(shè)置 state 為 -1, 根據(jù)注釋, 這樣做的原因是為了抑制工作線程的 interrupt 信號(hào), 直到此工作線程正是開(kāi)始執(zhí)行 task. 那么在 addWorker 中的 w.unlock(); 就是允許 Worker 的 interrupt 信號(hào).
接著在 addWorker 中會(huì)進(jìn)入一個(gè) while 循環(huán), 在這里此工作線程會(huì)不斷地從 workQueue 中取出一個(gè)任務(wù), 然后調(diào)用 task.run() 來(lái)執(zhí)行這個(gè)任務(wù), 因此就執(zhí)行到了用戶所提交的 Runnable 中的 run() 方法了.
工作線程的 idle 超出處理在底層依賴(lài)于 BlockingQueue 帶超時(shí)的 poll 方法, 即工作線程會(huì)不斷地從 workQueue 這個(gè) BlockingQueue 中獲取任務(wù), 如果 allowCoreThreadTimeOut 字段為 true, 或者當(dāng)前的工作線程數(shù)大于 corePoolSize, 那么線程的 idle 超時(shí)機(jī)制就生效了, 此時(shí)工作線程會(huì)以帶超時(shí)的 poll 方式從 workQueue 中獲取任務(wù). 當(dāng)超時(shí)了還沒(méi)有獲取到任務(wù), 那么我們就知道此線程一個(gè)到達(dá) idle 超時(shí)時(shí)間, 因此終止此工作線程.
具體源碼如下:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
從源碼中就可以看到, 一開(kāi)始會(huì)判斷當(dāng)前的線程池狀態(tài), 如果不是 SHUTDOWN 或 STOP 之類(lèi)的狀態(tài), 那么接著獲取當(dāng)前的工作線程數(shù), 然后判斷工作線程數(shù)量是否已經(jīng)大于了 corePoolSize. 當(dāng) allowCoreThreadTimeOut 字段為 true, 或者當(dāng)前的工作線程數(shù)大于 corePoolSize, 那么線程的 idle 超時(shí)機(jī)制就生效, 此時(shí)工作線程會(huì)以帶超時(shí)的 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方式從 workQueue 中獲取任務(wù); 反之會(huì)以 workQueue.take() 方式阻塞等待任務(wù), 直到獲取一個(gè)新的任務(wù).
當(dāng)從 workQueue 獲取新任務(wù)超時(shí)時(shí), 那么就會(huì)調(diào)用 compareAndDecrementWorkerCount 將當(dāng)前的工作線程數(shù)減一, 并返回 null. getTask 方法返回 null 后, 那么 runWorker 中的 while 循環(huán)自然也就結(jié)束了, 因此也導(dǎo)致了 runWorker 方法的返回, 最后自然整個(gè)工作線程的 run() 方法執(zhí)行完畢, 工作線程自然就終止了.
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/66811.html
摘要:當(dāng)活動(dòng)線程核心線程非核心線程達(dá)到這個(gè)數(shù)值后,后續(xù)任務(wù)將會(huì)根據(jù)來(lái)進(jìn)行拒絕策略處理。線程池工作原則當(dāng)線程池中線程數(shù)量小于則創(chuàng)建線程,并處理請(qǐng)求。當(dāng)線程池中的數(shù)量等于最大線程數(shù)時(shí)默默丟棄不能執(zhí)行的新加任務(wù),不報(bào)任何異常。 spring-cache使用記錄 spring-cache的使用記錄,坑點(diǎn)記錄以及采用的解決方案 深入分析 java 線程池的實(shí)現(xiàn)原理 在這篇文章中,作者有條不紊的將 ja...
摘要:源碼分析創(chuàng)建可緩沖的線程池。源碼分析使用創(chuàng)建線程池源碼分析的構(gòu)造函數(shù)構(gòu)造函數(shù)參數(shù)核心線程數(shù)大小,當(dāng)線程數(shù),會(huì)創(chuàng)建線程執(zhí)行最大線程數(shù),當(dāng)線程數(shù)的時(shí)候,會(huì)把放入中保持存活時(shí)間,當(dāng)線程數(shù)大于的空閑線程能保持的最大時(shí)間。 之前創(chuàng)建線程的時(shí)候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThr...
摘要:任務(wù)性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開(kāi)處理。線程池在運(yùn)行過(guò)程中已完成的任務(wù)數(shù)量。如等于線程池的最大大小,則表示線程池曾經(jīng)滿了。線程池的線程數(shù)量。獲取活動(dòng)的線程數(shù)。通過(guò)擴(kuò)展線程池進(jìn)行監(jiān)控。框架包括線程池,,,,,,等。 Java線程池 [toc] 什么是線程池 線程池就是有N個(gè)子線程共同在運(yùn)行的線程組合。 舉個(gè)容易理解的例子:有個(gè)線程組合(即線程池,咱可以比喻為一個(gè)公司),里面有3...
摘要:當(dāng)面試官問(wèn)線程池時(shí),你應(yīng)該知道些什么一執(zhí)行流程與不同,向中提交任務(wù)的時(shí)候,任務(wù)被包裝成對(duì)象加入延遲隊(duì)列并啟動(dòng)一個(gè)線程。當(dāng)我們創(chuàng)建出一個(gè)調(diào)度線程池以后,就可以開(kāi)始提交任務(wù)了。 最近新接手的項(xiàng)目里大量使用了ScheduledThreadPoolExecutor類(lèi)去執(zhí)行一些定時(shí)任務(wù),之前一直沒(méi)有機(jī)會(huì)研究這個(gè)類(lèi)的源碼,這次趁著機(jī)會(huì)好好研讀一下。 原文地址:http://www.jianshu....
摘要:當(dāng)面試官問(wèn)線程池時(shí),你應(yīng)該知道些什么一執(zhí)行流程與不同,向中提交任務(wù)的時(shí)候,任務(wù)被包裝成對(duì)象加入延遲隊(duì)列并啟動(dòng)一個(gè)線程。當(dāng)我們創(chuàng)建出一個(gè)調(diào)度線程池以后,就可以開(kāi)始提交任務(wù)了。 最近新接手的項(xiàng)目里大量使用了ScheduledThreadPoolExecutor類(lèi)去執(zhí)行一些定時(shí)任務(wù),之前一直沒(méi)有機(jī)會(huì)研究這個(gè)類(lèi)的源碼,這次趁著機(jī)會(huì)好好研讀一下。 原文地址:http://www.jianshu....
閱讀 733·2023-04-25 20:32
閱讀 2297·2021-11-24 10:27
閱讀 4537·2021-09-29 09:47
閱讀 2252·2021-09-28 09:36
閱讀 3654·2021-09-22 15:27
閱讀 2773·2019-08-30 15:54
閱讀 382·2019-08-30 11:06
閱讀 1280·2019-08-30 10:58