摘要:的前位數(shù)用來表示線程的數(shù)量,后面三位用來表示線程池的狀態(tài)。線程池的狀態(tài)有五種,分別是,根據(jù)單詞就能猜出大概。并且為了考慮性能問題,線程池的設(shè)計(jì)沒有使用悲觀鎖關(guān)鍵字,而是大量使用了和機(jī)制。
零 前期準(zhǔn)備 0 FBI WARNING
文章異常啰嗦且繞彎。
1 版本JDK 版本 : OpenJDK 11.0.1
IDE : idea 2018.3
2 ThreadPoolExecutor 簡(jiǎn)介ThreadPoolExecutor 是 jdk4 中加入的工具,被封裝在 jdk 自帶的 Executors 框架中,是 java 中最經(jīng)典的線程池技術(shù)。
ThreadPoolExecutor 類在 concurrent 包下,和其它線程工具類一樣都由 Doug Lea 大神操刀完成。
[ 在看完 Spring ioc 和 Gson 之后有點(diǎn)乏了,換換口味看一些 jdk 的源碼 ]
3 Demoimport java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadPoolDemo { public static void main(String[] args){ //創(chuàng)建線程池 //這里使用固定線程數(shù)的線程池,線程數(shù)為 5 ExecutorService executorService = Executors.newFixedThreadPool(5); for(int i = 0 ; i < 100 ; i ++){ final int ii = i; //創(chuàng)建 Runnable 作為線程池的任務(wù) Runnable r = () -> System.out.println(ii); //執(zhí)行 executorService.execute(r); } } }一 線程池的初始化
線程池的初始化調(diào)用的 Executors 框架的靜態(tài)方法:
//Executors.class public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
繼續(xù)追蹤這個(gè)構(gòu)造方法:
//ThreadPoolExecutor.class public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
繼續(xù)追蹤:
//ThreadPoolExecutor.class public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue二 WorkerworkQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { //驗(yàn)證參數(shù)的有效性 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); //本例中不涉及權(quán)限 this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); //線程數(shù) this.corePoolSize = corePoolSize; //最大線程數(shù) //本例中使用固定線程數(shù)的線程池,所以線程數(shù)和最大線程數(shù)相等 this.maximumPoolSize = maximumPoolSize; //用于存儲(chǔ)任務(wù)的隊(duì)列 //此處使用 LinkedBlockingQueue 來儲(chǔ)存任務(wù),其線程安全 this.workQueue = workQueue; //keepAliveTime 參數(shù)用于表示: //對(duì)于超出線程和隊(duì)列緩存總和的任務(wù),是否要臨時(shí)增加線程來處理 //超出的線程的存在時(shí)間是多少 //這里使用的是定長(zhǎng)線程池,所以 keepAliveTime = 0,即不增加線程 this.keepAliveTime = unit.toNanos(keepAliveTime); //用于創(chuàng)建線程的工廠類 this.threadFactory = threadFactory; //handler 用來處理 task 太多時(shí)候的拒絕策略 //此例中使用的是默認(rèn)的,即定義在 ThreadPoolExecutor 中的 defaultHandler 對(duì)象 this.handler = handler; }
Worker 是 ThreadPoolExecutor 的內(nèi)部類,可以看做是 Runnable 的代理類:
//ThreadPoolExecutor.class private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ private static final long serialVersionUID = 6138294804551838833L; final Thread thread; Runnable firstTask; //完成 task 數(shù)量的計(jì)數(shù)器 volatile long completedTasks; Worker(Runnable firstTask) { //這個(gè)方法是 AbstractQueuedSynchronizer 中的方法,功能相當(dāng)于加鎖 //-1 的意思是后續(xù)的任務(wù)會(huì)處于阻塞狀態(tài),即為已經(jīng)加鎖 setState(-1); //在創(chuàng)建的時(shí)候存入一個(gè)要處理的 task //需要注意的是每個(gè) worker 對(duì)象被創(chuàng)建出來之后是可以重復(fù)利用來處理多個(gè) task 的 this.firstTask = firstTask; //worker 會(huì)用自身作為 Runnable 對(duì)象去創(chuàng)建一個(gè)線程 //這里調(diào)用線程工廠進(jìn)行線程創(chuàng)建 this.thread = getThreadFactory().newThread(this); } //對(duì)于線程變量來說,其啟動(dòng)的就是 worker 的 run() 方法 public void run() { //runWorker(...) 方法在 ThreadPoolExecutor 里 runWorker(this); } //獲取鎖的狀態(tài) protected boolean isHeldExclusively() { return getState() != 0; } //重寫了 AbstractQueuedSynchronizer 中的 tryAcquire(...) 方法 //嘗試加鎖 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } //重寫了 AbstractQueuedSynchronizer 中的 tryRelease(...) 方法 //嘗試釋放鎖 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } //真正的加鎖方法 public void lock() { acquire(1); } //嘗試加鎖 public boolean tryLock() { return tryAcquire(1); } //真正的釋放鎖方法 public void unlock() { release(1); } //判斷是否在鎖中 public boolean isLocked() { return isHeldExclusively(); } //中斷線程 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
追蹤一下 runWorker(...) 方法:
//ThreadPoolExecutor.class final void runWorker(Worker w) { //獲取當(dāng)前所在的線程的實(shí)例對(duì)象 Thread wt = Thread.currentThread(); //獲取 task Runnable task = w.firstTask; //取出來之后把 task 置空 w.firstTask = null; //此處釋放鎖 w.unlock(); //指示器,此變量為 true 的時(shí)候確認(rèn)該方法已經(jīng)執(zhí)行完畢 boolean completedAbruptly = true; try { //此處為一個(gè) while 循環(huán),用于不斷的執(zhí)行 task //getTask() 方法會(huì)從隊(duì)列里不斷抓取 task 并進(jìn)行執(zhí)行 //當(dāng) task 為 null,且隊(duì)列里已經(jīng)沒有更多 task 的時(shí)候,就會(huì)終止循環(huán) while (task != null || (task = getTask()) != null) { //加鎖,獨(dú)占線程 w.lock(); //在這里會(huì)判斷線程的狀態(tài),如果存在符合中斷的情況,就會(huì)直接中斷掉 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //beforeExecute(...) 和 afterExecute(...) 方法在 ThreadPoolExecutor 中并沒有實(shí)現(xiàn) //是預(yù)留出來給使用者重寫,以達(dá)到業(yè)務(wù)需求的方法 beforeExecute(wt, task); try { //此處執(zhí)行 task task.run(); afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { //將執(zhí)行的 task 置空 task = null; //每完成一個(gè) task 就會(huì)加 1 w.completedTasks++; //釋放鎖 w.unlock(); } } completedAbruptly = false; } finally { //這個(gè)方法會(huì)銷毀掉 worker //同時(shí)如果檢測(cè)到有新的 task 又會(huì)重新創(chuàng)建 Worker processWorkerExit(w, completedAbruptly); } }
Worker 是線程池中真正起完成業(yè)務(wù)邏輯的組件,是任務(wù)和線程的封裝。
三 線程池的狀態(tài)控制線程池的狀態(tài)主要由 ctl 變量來進(jìn)行控制:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl 是一個(gè) AtomicInteger 類型的變量,其實(shí)可以簡(jiǎn)單理解為一個(gè) int 值,AtomicInteger 只是能夠適應(yīng)高并發(fā)的原子化操作的需要。
ctl 的前 29 位數(shù)用來表示線程(Worker)的數(shù)量,后面三位用來表示線程池的狀態(tài)。
線程池的狀態(tài)有五種,分別是 Running、Shutdown、Stop、Tidying、Terminate,根據(jù)單詞就能猜出大概。
注意的是,這五種狀態(tài)在線程池中都以 int 變量的形式存在,從前到后依次變大,對(duì)狀態(tài)的比較有一系列方法:
//ThreadPoolExecutor.class private static boolean runStateLessThan(int c, int s) { //c 的狀態(tài)值要小于 s return c < s; } //ThreadPoolExecutor.class private static boolean runStateAtLeast(int c, int s) { //c 的狀態(tài)值要大于或等于 s return c >= s; } //ThreadPoolExecutor.class private static boolean isRunning(int c) { //狀態(tài)里只有 RUNNING 是小于 SHUTDOWN 的 return c < SHUTDOWN; }
在這些方法里,傳入的參數(shù) c 一般指的是當(dāng)前線程池狀態(tài),s 是用來對(duì)比的參照狀態(tài)。
四 線程池的執(zhí)行該 part 的起點(diǎn):
executorService.execute(r);
來追蹤 execute(...) 方法:
public void execute(Runnable command) { //有效性驗(yàn)證 if (command == null) throw new NullPointerException(); //ctl 是一個(gè) AtomicInteger 類型的變量,用來記錄線程池的狀態(tài) int c = ctl.get(); //workerCountOf(...) 方法會(huì)返回當(dāng)前運(yùn)行的 Worker 的數(shù)量 if (workerCountOf(c) < corePoolSize) { //Worker 的數(shù)量小于線程池容量的情況下 //直接增加 Worker 并取出 task 去運(yùn)行 if (addWorker(command, true)) return; //如果 Worker 已經(jīng)順利執(zhí)行了 task,應(yīng)該會(huì)直接返回掉 //如果執(zhí)行中出現(xiàn)了其它情況,則會(huì)繼續(xù)往下走 //此處刷新狀態(tài) c = ctl.get(); } //當(dāng) Worker 數(shù)量已經(jīng)達(dá)到線程池的指定數(shù)量,或者添加 Worker 的時(shí)候出問題的時(shí)候,會(huì)進(jìn)入此判斷語(yǔ)句 //先判斷線程池是否處于活躍狀態(tài),且 task 是否已經(jīng)被成功添加到隊(duì)列中 //如果不滿足,會(huì)進(jìn)入 else 語(yǔ)句中,先最后嘗試一次 addWorker(...) 方法,如果不成功就拒絕 task //reject(...) 方法會(huì)調(diào)用 handler 的拒絕策略 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }else if (!addWorker(command, false)) reject(command); }1 reject
這里先提及一下 reject(...) 方法:
//ThreadPoolExecutor.class final void reject(Runnable command) { handler.rejectedExecution(command, this); }
本質(zhì)是調(diào)用了 handler 對(duì)象的相關(guān)方法。在本例中,handler 對(duì)象指向了 defaultHandler:
//ThreadPoolExecutor.class private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
defaultHandler 是一個(gè) AbortPolicy 類型的對(duì)象,而 AbortPolicy 是 ThreadPoolExecutor 的靜態(tài)內(nèi)部類。
AbortPolicy 起作用的方法為 rejectedExecution(...) 方法:
//AbortPolicy.class public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
也就是說,在 task 過多的情況下,AbortPolicy 的應(yīng)對(duì)策略是拋出異常。
2 addWorker來看一下核心方法 addWorker(...):
//ThreadPoolExecutor.class private boolean addWorker(Runnable firstTask, boolean core) { //先標(biāo)記這個(gè) for 循環(huán),方便退出循環(huán) retry: //在每一次循環(huán)開始之前會(huì)刷新一次狀態(tài)標(biāo)識(shí) for (int c = ctl.get();;) { //這里先進(jìn)行判斷,如果線程池已經(jīng)關(guān)閉了,或者沒有 task 了,就會(huì)返回 false if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false; for (;;) { //如果 Worker 數(shù)量已經(jīng)超出了最大值就會(huì)直接返回 false if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; //將 ctl 變量的值加 1,如果成功了就會(huì)跳出循環(huán) if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); //在狀態(tài)值比 SHUTDOWN 大的時(shí)候會(huì)直接跳到最外頭的循環(huán)里 //需要注意的是最外面的 for 循環(huán)會(huì)判斷狀態(tài)值是否大于 SHUTDOWN //如果大于 SHUTDOWN 的話就返回 false 了 if (runStateAtLeast(c, SHUTDOWN)) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //創(chuàng)建一個(gè) Worker w = new Worker(firstTask); //獲取線程對(duì)象 final Thread t = w.thread; if (t != null) { //加鎖,此處加的是一把全局的鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int c = ctl.get(); //如果狀態(tài)值 c 是 RUNNING,或者 [c 是 RUNNING 或者 SHUTDOWN 且 firstTask 是 null] 就會(huì)進(jìn)入這個(gè)判斷語(yǔ)句 // if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { //如果這個(gè)線程已經(jīng)處于運(yùn)作狀態(tài),會(huì)拋出異常 if (t.isAlive()) throw new IllegalThreadStateException(); //workers 是一個(gè)列表,用于存儲(chǔ) Worker 對(duì)象 workers.add(w); //獲取 Worker 的數(shù)量 int s = workers.size(); //largestPoolSize 用來記錄線程池達(dá)到過的最大線程數(shù) if (s > largestPoolSize) largestPoolSize = s; //標(biāo)記 Worker 已經(jīng)被添加 workerAdded = true; } } finally { //釋放鎖 mainLock.unlock(); } //先判斷 Worker 是否已經(jīng)被添加到 workers 內(nèi)了 if (workerAdded) { //這是該方法核心的啟動(dòng)線程方法 t.start(); //標(biāo)記 Worker 已經(jīng)開始運(yùn)行了 workerStarted = true; } } } finally { //如果沒有標(biāo)記 Worker 已經(jīng)開始工作,會(huì)在這里銷毀掉 Worker if (!workerStarted) addWorkerFailed(w); } return workerStarted; }五 一點(diǎn)嘮叨
先總結(jié)一下線程池的業(yè)務(wù)邏輯:
1 接收到 task (即實(shí)現(xiàn)了 Runnable 接口的實(shí)例對(duì)象) [execute(...) 方法] 2 用 task 去嘗試創(chuàng)建一個(gè) Worker 實(shí)例 [execute(...) 方法] 2.1 如果 Worker 數(shù)量沒有達(dá)到線程池的指定最大值 -> 新建 2.2 如果 Worker 數(shù)量達(dá)到了線程池的指定最大值 -> 不會(huì)再創(chuàng)建,而是把 task 儲(chǔ)存起來等待空閑的 Worker 去提取 2.3 如果 task 隊(duì)列也已經(jīng)滿了,無法再添加 -> 觸發(fā)拒絕機(jī)制(handler) 3 Worker 在執(zhí)行的時(shí)候調(diào)用其內(nèi)部的 Thread 實(shí)例對(duì)象的 start() 方法 [addWorker(...) 方法] 4 該 start() 方法會(huì)調(diào)用到 Worker 的 run() 方法 [Worker.class 內(nèi)的 run() 方法] 5 Worker 的 run() 方法本質(zhì)上是封裝了 task 的 run() 方法 [runWorker(...) 方法]
主線業(yè)務(wù)邏輯不算復(fù)雜,比較艱難的是為了保證數(shù)據(jù)的一致性,線程池代碼中充斥著大量的狀態(tài)判斷和鎖機(jī)制。
并且為了考慮性能問題,線程池的設(shè)計(jì)沒有使用悲觀鎖(synchronized 關(guān)鍵字),而是大量使用了 ASQ 和 ReetrentLock 機(jī)制。
本文僅為個(gè)人的學(xué)習(xí)筆記,可能存在錯(cuò)誤或者表述不清的地方,有緣補(bǔ)充
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/73156.html
摘要:零前期準(zhǔn)備文章異常啰嗦且繞彎。版本版本簡(jiǎn)介是中默認(rèn)的實(shí)現(xiàn)類,常與結(jié)合進(jìn)行多線程并發(fā)操作。所以方法的主體其實(shí)就是去喚醒被阻塞的線程。本文僅為個(gè)人的學(xué)習(xí)筆記,可能存在錯(cuò)誤或者表述不清的地方,有緣補(bǔ)充 零 前期準(zhǔn)備 0 FBI WARNING 文章異常啰嗦且繞彎。 1 版本 JDK 版本 : OpenJDK 11.0.1 IDE : idea 2018.3 2 ThreadLocal 簡(jiǎn)介 ...
摘要:那么線程池到底是怎么利用類來實(shí)現(xiàn)持續(xù)不斷地接收提交的任務(wù)并執(zhí)行的呢接下來,我們通過的源代碼來一步一步抽絲剝繭,揭開線程池運(yùn)行模型的神秘面紗。 在上一篇文章《從0到1玩轉(zhuǎn)線程池》中,我們了解了線程池的使用方法,以及向線程池中提交任務(wù)的完整流程和ThreadPoolExecutor.execute方法的源代碼。在這篇文章中,我們將會(huì)從頭閱讀線程池ThreadPoolExecutor類的源代...
摘要:將線程池狀態(tài)置為并不會(huì)立即停止,停止接收外部的任務(wù),內(nèi)部正在跑的任務(wù)和隊(duì)列里等待的任務(wù),會(huì)執(zhí)行完,才真正停止。將線程池狀態(tài)置為。 在Java中,我們經(jīng)常使用的線程池就是ThreadPoolExecutor,此外還有定時(shí)的線程池ScheduledExecutorService(),但是需要注意的是Executors.newCachedThreadPool()的線程是沒有上屆的,在使用時(shí),...
摘要:創(chuàng)建一個(gè)線程池,具有固定線程數(shù),運(yùn)行在共享的無界隊(duì)列中。固定線程數(shù)源碼如下是的實(shí)現(xiàn)類。線程池中允許最大的線程數(shù)。如果線程數(shù)超過了核心線程數(shù),過量的線程在關(guān)閉前等待新任務(wù)的最大時(shí)間。處理因?yàn)榫€程邊界和隊(duì)列容量導(dǎo)致的堵塞。 1.Executors.newFixedThreadPool(int nThreads):創(chuàng)建一個(gè)線程池,具有固定線程數(shù),運(yùn)行在共享的無界隊(duì)列中。在大多數(shù)時(shí)候,線程會(huì)主...
摘要:當(dāng)活動(dòng)線程核心線程非核心線程達(dá)到這個(gè)數(shù)值后,后續(xù)任務(wù)將會(huì)根據(jù)來進(jìn)行拒絕策略處理。線程池工作原則當(dāng)線程池中線程數(shù)量小于則創(chuàng)建線程,并處理請(qǐng)求。當(dāng)線程池中的數(shù)量等于最大線程數(shù)時(shí)默默丟棄不能執(zhí)行的新加任務(wù),不報(bào)任何異常。 spring-cache使用記錄 spring-cache的使用記錄,坑點(diǎn)記錄以及采用的解決方案 深入分析 java 線程池的實(shí)現(xiàn)原理 在這篇文章中,作者有條不紊的將 ja...
閱讀 687·2021-09-30 09:47
閱讀 2876·2021-09-04 16:40
閱讀 864·2019-08-30 13:18
閱讀 3457·2019-08-29 16:22
閱讀 1563·2019-08-29 12:36
閱讀 593·2019-08-29 11:11
閱讀 1482·2019-08-26 13:47
閱讀 1134·2019-08-26 13:32