摘要:介紹線程池一般包含三個主要部分調(diào)度器決定由哪個線程來執(zhí)行任務(wù)執(zhí)行任務(wù)所能夠的最大耗時等線程隊列存放并管理著一系列線程這些線程都處于阻塞狀態(tài)或休眠狀態(tài)任務(wù)隊列存放著用戶提交的需要被執(zhí)行的任務(wù)一般任務(wù)的執(zhí)行的即先提交的任務(wù)先被執(zhí)行調(diào)度器并非是必
介紹
線程池一般包含三個主要部分:
調(diào)度器: 決定由哪個線程來執(zhí)行任務(wù), 執(zhí)行任務(wù)所能夠的最大耗時等
線程隊列: 存放并管理著一系列線程, 這些線程都處于阻塞狀態(tài)或休眠狀態(tài)
任務(wù)隊列: 存放著用戶提交的需要被執(zhí)行的任務(wù). 一般任務(wù)的執(zhí)行 FIFO 的, 即先提交的任務(wù)先被執(zhí)行
調(diào)度器并非是必須的, 例如 Java 中實現(xiàn)的 ThreadPoolExecutor 就沒有調(diào)度器, 而是所有的線程都不斷從任務(wù)隊列中取出任務(wù), 然后執(zhí)行.線程池模型可以用下圖簡單地表示
構(gòu)造函數(shù)public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
一般構(gòu)造函數(shù)包含了最主要的成員變量,我們來看看幾個參數(shù)
corePoolSize:線程池的最小線程數(shù)量
maximumPoolSize:線程池的最大線程數(shù)量
workQueue:任務(wù)隊列
threadFactory:產(chǎn)生線程的工廠
keepAliveTime:允許的最大idle時間
handler:拒絕執(zhí)行處理器
這些是可控制的線程池的參數(shù)
RUNNING: 接受新任務(wù)并處理隊列中的任務(wù)
SHUTDOWN: 不接受新任務(wù),但是處理隊列中的任務(wù)
STOP: 不接受新任務(wù),也不處理隊列中的任務(wù),還會中斷已經(jīng)進行中的任務(wù)
TIDYING: 所有任務(wù)已經(jīng)執(zhí)行完畢,工作線程數(shù)量為0,線程池狀態(tài)轉(zhuǎn)換為TIDYING,terminate方法被出觸發(fā)。
TERMINATED: terminated() 執(zhí)行完畢
狀態(tài)轉(zhuǎn)換
RUNNING -> SHUTDOWN:shutdown()
(RUNNING or SHUTDOWN) -> STOP:shutdownNow()
SHUTDOWN -> TIDYING:線程池和任務(wù)隊列都為空
STOP -> TIDYING:線程池為空
TIDYING -> TERMINATED:terminated()
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
狀態(tài)碼是由一個32位的原子Int的前三位表示的,后三位表示工作線程的數(shù)量
線程存在哪里?真正的工作線程封裝成一個內(nèi)部類Worker,存放在HashSet中
private final HashSetsubmit()workers = new HashSet (); private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** 真正的工作線程 */ final Thread thread; /** 要執(zhí)行的任務(wù) */ Runnable firstTask; /** 已經(jīng)完成的任務(wù)計數(shù)器 */ volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // 阻止中斷 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** 只是一個家的代理,真實的執(zhí)行方法在runWorker里 */ public void run() { runWorker(this); } //下面就是一個不可重入的排他鎖 protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
ThreadPoolExecutor實現(xiàn)了ExecutorService接口
// Class:ExecutorService // 提交一個待執(zhí)行的Runnable任務(wù),并返回FutureFuture submit(Callable task); Future submit(Runnable task, T result); Future> submit(Runnable task);
public Future> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFutureftask = newTaskFor(task, null); execute(ftask); return ftask; } public Future submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, result); execute(ftask); return ftask; } public Future submit(Callable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task); execute(ftask); return ftask; } protected RunnableFuture newTaskFor(Runnable runnable, T value) { return new FutureTask (runnable, value); }
再看ThreadPoolExcutor的實現(xiàn),submit方法都委托給execute執(zhí)行了。
當(dāng)我們通過 execute(Runnable) 提交一個任務(wù)時:
如果此時線程池中線程個數(shù)小于 corePoolSize, 則此任務(wù)不會插入到任務(wù)隊列中, 而是直接創(chuàng)建一個新的線程來執(zhí)行此任務(wù), 即使當(dāng)前線程池中有空閑的線程.
如果線程數(shù)大于 corePoolSize 但是小于 maximumPoolSize:
如果任務(wù)隊列還未滿, 則會將此任務(wù)插入到任務(wù)隊列末尾;
如果此時任務(wù)隊列已滿, 則會創(chuàng)建新的線程來執(zhí)行此任務(wù).
如果線程數(shù)等于 maximumPoolSize:
如果任務(wù)隊列還未滿, 則會將此任務(wù)插入到任務(wù)隊列末尾;
如果此時任務(wù)隊列已滿, 則會又 RejectedExecutionHandler 處理, 默認情況下是拋出 RejectedExecutionException 異常.
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
上面的代碼有三個步驟, 首先第一步是檢查當(dāng)前線程池的線程數(shù)是否小于 corePoolSize, 如果小于, 那么由我們前面提到的規(guī)則, 線程池會創(chuàng)建一個新的線程來執(zhí)行此任務(wù), 因此在第一個 if 語句中, 會調(diào)用 addWorker(command, true) 來創(chuàng)建一個新 Worker 線程, 并執(zhí)行此任務(wù). addWorker 的第二個參數(shù)是一個 boolean 類型的, 它的作用是用于標(biāo)識是否需要使用 corePoolSize 字段, 如果它為真, 則添加新任務(wù)時, 需要考慮到 corePoolSize 字段的影響. 這里至于 addWorker 內(nèi)部的實現(xiàn)細節(jié)我們暫且不管, 先把整個提交任務(wù)的大體脈絡(luò)理清了再說.
如果前面的判斷不滿足, 那么會將此任務(wù)插入到工作隊列中, 即 workQueue.offer(command). 當(dāng)然, 為了健壯性考慮, 當(dāng)插入到 workQueue 后, 我們還需要再次檢查一下此時線程池是否還是 RUNNING 狀態(tài), 如果不是的話就會將原來插入隊列中的那個任務(wù)刪除, 然后調(diào)用 reject 方法拒絕此任務(wù)的提交; 接著考慮到在我們插入任務(wù)到 workQueue 中的同時, 如果此時線程池中的線程都執(zhí)行完畢并終止了, 在這樣的情況下剛剛插入到 workQueue 中的任務(wù)就永遠不會得到執(zhí)行了. 為了避免這樣的情況, 因此我們由再次檢查一下線程池中的線程數(shù), 如果為零, 則調(diào)用 addWorker(null, false) 來添加一個線程.
如果前面所分析的情況都不滿足, 那么就會進入到第三個 if 判斷, 在這里會調(diào)用 addWorker(command, false) 來將此任務(wù)提交到線程池中. 注意到這個方法的第二個參數(shù)是 false, 表示我們在此次調(diào)用 addWorker 時, 不考慮 corePoolSize 的影響, 即忽略 corePoolSize 字段.
前面我們大體分析了一下 execute 提交任務(wù)的流程, 不過省略了一個關(guān)鍵步驟, 即 addWorker 方法. 現(xiàn)在我們就來揭開它的神秘面紗吧.
首先看一下 addWorker 方法的簽名:
private boolean addWorker(Runnable firstTask, boolean core)
這個方法接收兩個參數(shù), 第一個是一個 Runnable 類型的, 一般來說是我們調(diào)用 execute 方法所傳輸?shù)膮?shù), 不過也有可能是 null 值, 這樣的情況我們在前面一小節(jié)中也見到過.
那么第二個參數(shù)是做什么的呢? 第二個參數(shù)是一個 boolean 類型的變量, 它的作用是標(biāo)識是否使用 corePoolSize 屬性. 我們知道, ThreadPoolExecutor 中, 有一個 corePoolSize 屬性, 用于動態(tài)調(diào)整線程池中的核心線程數(shù). 那么當(dāng) core 這個參數(shù)是 true 時, 則表示在添加新任務(wù)時, 需要考慮到 corePoolSzie 的影響(例如如果此時線程數(shù)已經(jīng)大于 corePoolSize 了, 那么就不能再添加新線程了); 當(dāng) core 為 false 時, 就不考慮 corePoolSize 的影響(其實代碼中是以 maximumPoolSize 作為 corePoolSize 來做判斷條件的), 一有新任務(wù), 就對應(yīng)地生成一個新的線程.
說了這么多, 還不如來看一下 addWorker 的源碼吧:
private boolean addWorker(Runnable firstTask, boolean core) { // 這里一大段的 for 語句, 其實就是判斷和處理 core 參數(shù)的. // 當(dāng)經(jīng)過判斷, 如果當(dāng)前的線程大于 corePoolSize 或 maximumPoolSize 時(根據(jù) core 的值來判斷), // 則表示不能新建新的 Worker 線程, 此時返回 false. retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 當(dāng) core 為真, 那么就判斷當(dāng)前線程是否大于 corePoolSize // 當(dāng) core 為假, 那么就判斷當(dāng)前線程數(shù)是否大于 maximumPoolSize // 這里的 for 循環(huán)是一個自旋CAS(CompareAndSwap)操作, 用于確保多線程環(huán)境下的正確性 if (wc >= CAPACITY || wc >= (core ? corePoolSize : ma)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
首先在 addWorker 的一開始, 有一個 for 循環(huán), 用于判斷當(dāng)前是否可以添加新的 Worker 線程. 它的邏輯如下:
如果傳入的 core 為真, 那么判斷當(dāng)前的線程數(shù)是否大于 corePoolSize, 如果大于, 則不能新建 Worker 線程, 返回 false.
如果傳入的 core 為假, 那么判斷當(dāng)前的線程數(shù)是否大于 maximumPoolSize, 如果大于, 則不能新建 Worker 線程, 返回 false.
如果條件符合, 那么在 for 循環(huán)內(nèi), 又有一個自旋CAS 更新邏輯, 用于遞增當(dāng)前的線程數(shù), 即 compareAndIncrementWorkerCount(c), 這個方法會原子地更新 ctl 的值, 將當(dāng)前線程數(shù)的值遞增一.
addWorker 接下來有一個 try...finally 語句塊, 這里就是實際上的創(chuàng)建線程、啟動線程、添加線程到線程池中的工作了.
接下來我們看任務(wù)的真正執(zhí)行runWorker
語義:執(zhí)行runWorker,調(diào)用Runnable的run方法,再其外圍包裝了中斷的策略
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // 如果線程池正在停止,確保其中斷 // 如果不是,確保其不中斷 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }reject
語義:提交不了線程池時,拒絕策略
接下來我們看線程提交被拒絕的策略reject
final void reject(Runnable command) { handler.rejectedExecution(command, this); }
RejectedExecutionHandler 接口有四種實現(xiàn)
AbortPolicy;拒絕,拋出RejectedExecutionException
CallerRunsPolicy:如果被拒絕,在當(dāng)前線程執(zhí)行任務(wù),
RejectedExecutionHandler:靜靜的拒絕,什么都不做
DiscardOldestPolicy:丟棄最老的未處理的請求,重試提交當(dāng)前請求
shutdown語義:不接受新任務(wù),但是處理隊列中的任務(wù)
shutdown方法主要就是設(shè)置線程池狀態(tài),設(shè)置空閑的worker的中斷
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }shutdownNow
shutdownNow:不接受新任務(wù),也不處理隊列中的任務(wù),還會中斷已經(jīng)進行中的任務(wù)
shutdownNow方法主要就是設(shè)置線程池狀態(tài),設(shè)置所有worker的中斷
public ListshutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
本文嚴重參考Java ThreadPoolExecutor 線程池源碼分析
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/70095.html
摘要:那么線程池到底是怎么利用類來實現(xiàn)持續(xù)不斷地接收提交的任務(wù)并執(zhí)行的呢接下來,我們通過的源代碼來一步一步抽絲剝繭,揭開線程池運行模型的神秘面紗。 在上一篇文章《從0到1玩轉(zhuǎn)線程池》中,我們了解了線程池的使用方法,以及向線程池中提交任務(wù)的完整流程和ThreadPoolExecutor.execute方法的源代碼。在這篇文章中,我們將會從頭閱讀線程池ThreadPoolExecutor類的源代...
摘要:最后,我們會通過對源代碼的剖析深入了解線程池的運行過程和具體設(shè)計,真正達到知其然而知其所以然的水平。創(chuàng)建線程池既然線程池是一個類,那么最直接的使用方法一定是一個類的對象,例如。單線程線程池單線程線程 我們一般不會選擇直接使用線程類Thread進行多線程編程,而是使用更方便的線程池來進行任務(wù)的調(diào)度和管理。線程池就像共享單車,我們只要在我們有需要的時候去獲取就可以了。甚至可以說線程池更棒,...
摘要:提交任務(wù)當(dāng)創(chuàng)建了一個線程池之后我們就可以將任務(wù)提交到線程池中執(zhí)行了。提交任務(wù)到線程池中相當(dāng)簡單,我們只要把原來傳入類構(gòu)造器的對象傳入線程池的方法或者方法就可以了。 我們一般不會選擇直接使用線程類Thread進行多線程編程,而是使用更方便的線程池來進行任務(wù)的調(diào)度和管理。線程池就像共享單車,我們只要在我們有需要的時候去獲取就可以了。甚至可以說線程池更棒,我們只需要把任務(wù)提交給它,它就會在合...
摘要:參數(shù)說明,線程池保留的最小線程數(shù)。,線程池中允許擁有的最大線程數(shù)。,線程池的運行狀態(tài)。除非線程池狀態(tài)發(fā)生了變化,發(fā)退回到外層循環(huán)重新執(zhí)行,判斷線程池的狀態(tài)。是線程池的核心控制狀態(tài),包含的線程池運行狀態(tài)和有效線程數(shù)。 Java是一門多線程的語言,基本上生產(chǎn)環(huán)境的Java項目都離不開多線程。而線程則是其中最重要的系統(tǒng)資源之一,如果這個資源利用得不好,很容易導(dǎo)致程序低效率,甚至是出問題。 有...
閱讀 3886·2021-10-08 10:05
閱讀 2973·2021-09-27 13:57
閱讀 2696·2019-08-29 11:32
閱讀 1021·2019-08-28 18:18
閱讀 1314·2019-08-28 18:05
閱讀 1997·2019-08-26 13:39
閱讀 877·2019-08-26 11:37
閱讀 2058·2019-08-26 10:37