摘要:提高線程的可管理性線程池可以統(tǒng)一管理分配調(diào)優(yōu)和監(jiān)控。線程池的初始化狀態(tài)是。調(diào)用線程池的接口時(shí),線程池由。當(dāng)所有的任務(wù)已終止,記錄的任務(wù)數(shù)量為,阻塞隊(duì)列為空,線程池會(huì)變?yōu)闋顟B(tài)。線程池徹底終止,就變成狀態(tài)。
序言
我們知道,線程池幫我們重復(fù)管理線程,避免創(chuàng)建大量的線程增加開銷。
合理的使用線程池能夠帶來(lái)3個(gè)很明顯的好處:
1.降低資源消耗:通過(guò)重用已經(jīng)創(chuàng)建的線程來(lái)降低線程創(chuàng)建和銷毀的消耗
2.提高響應(yīng)速度:任務(wù)到達(dá)時(shí)不需要等待線程創(chuàng)建就可以立即執(zhí)行。
3.提高線程的可管理性:線程池可以統(tǒng)一管理、分配、調(diào)優(yōu)和監(jiān)控。
java源生的線程池,實(shí)現(xiàn)于ThreadPoolExecutor類,這也是我們今天討論的重點(diǎn)
Jdk使用ThreadPoolExecutor類來(lái)創(chuàng)建線程池,我們來(lái)看看它的構(gòu)造方法。
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:
* {@code corePoolSize < 0}
* {@code keepAliveTime < 0}
* {@code maximumPoolSize <= 0}
* {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
int corePoolSize, //核心線程的數(shù)量
int maximumPoolSize, //最大線程數(shù)量
long keepAliveTime, //超出核心線程數(shù)量以外的線程空閑時(shí),線程存活的時(shí)間
TimeUnit unit, //存活時(shí)間的單位,有如下幾種選擇
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小時(shí) TimeUnit.MINUTES; //分鐘 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //納秒
BlockingQueue
ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue; PriorityBlockingQueue
ThreadFactory threadFactory, //創(chuàng)建新線程使用的工廠
RejectedExecutionHandler handler // 當(dāng)任務(wù)無(wú)法執(zhí)行時(shí)的處理器(線程拒絕策略)
核心類變量 ctl變量ThreadPoolExecutor中有一個(gè)控制狀態(tài)的屬性叫ctl,它是一個(gè)AtomicInteger類型的變量,它一個(gè)int值可以儲(chǔ)存兩個(gè)概念的信息:
workerCount:表明當(dāng)前池中有效的線程數(shù),通過(guò)workerCountOf方法獲得,workerCount上限是(2^29)-1。(最后存放在ctl的低29bit)
runState:表明當(dāng)前線程池的狀態(tài),通過(guò)workerCountOf方法獲得,最后存放在ctl的高3bit中,他們是整個(gè)線程池的運(yùn)行生命周期,有如下取值,分別的含義是:
RUNNING:可以新加線程,同時(shí)可以處理queue中的線程。線程池的初始化狀態(tài)是RUNNING。換句話說(shuō),線程池被一旦被創(chuàng)建,就處于RUNNING狀態(tài),
SHUTDOWN:不增加新線程,但是處理queue中的線程。調(diào)用線程池的shutdown()方法時(shí),線程池由RUNNING -> SHUTDOWN。
STOP 不增加新線程,同時(shí)不處理queue中的線程。調(diào)用線程池的shutdownNow()接口時(shí),線程池由(RUNNING or SHUTDOWN ) -> STOP。
TIDYING 當(dāng)所有的任務(wù)已終止,ctl記錄的”任務(wù)數(shù)量”為0,阻塞隊(duì)列為空,線程池會(huì)變?yōu)門IDYING狀態(tài)。當(dāng)線程池變?yōu)門IDYING狀態(tài)時(shí),會(huì)執(zhí)行鉤子函數(shù)terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變?yōu)門IDYING時(shí),進(jìn)行相應(yīng)的處理;可以通過(guò)重載terminated()函數(shù)來(lái)實(shí)現(xiàn)。
TERMINATED 線程池徹底終止,就變成TERMINATED狀態(tài)。線程池處在TIDYING狀態(tài)時(shí),執(zhí)行完terminated()之后,就會(huì)由 TIDYING -> TERMINATED。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
COUNT_BITS=32(integer的size)-3=29,于是五種狀態(tài)左移29位分別是:
RUNNING: 11100000000000000000000000000000
SHUTDOWN: 00000000000000000000000000000000
STOP: 00100000000000000000000000000000
TIDYING: 01000000000000000000000000000000
TERMINATED:01100000000000000000000000000000
而ThreadPoolExecutor是通過(guò)runStateOf和workerCountOf獲得者兩個(gè)概念的值的。
runStateOf和workerCountOf方法是如何剝離出ctl變量的兩個(gè)有效值呢?這其中我們可以看到CAPACITY是實(shí)現(xiàn)一個(gè)字段存兩個(gè)值的最重要的字段。
CAPACITY變量CAPACITY=(1 << COUNT_BITS) – 1 轉(zhuǎn)成二進(jìn)制為:000 11111111111111111111111111111,他是線程池理論上可以允許的最大的線程數(shù)。
所以很明顯,它的重點(diǎn)在于,其高3bit為0,低29bit為1;
這樣,workderCountOf方法中,CAPACITY和ctl進(jìn)行&運(yùn)算時(shí),它能獲得高3位都是0,低29位和ctl低29位相同的值,這個(gè)值就是workerCount;
同理,runStateOf方法,CAPACITY的取反和ctl進(jìn)行&操作,獲得高3位和ctl高三位相等,低29位都為0的值,這個(gè)值就是runState;
/** * The queue used for holding tasks and handing off to worker * threads. We do not require that workQueue.poll() returning * null necessarily means that workQueue.isEmpty(), so rely * solely on isEmpty to see if the queue is empty (which we must * do for example when deciding whether to transition from * SHUTDOWN to TIDYING). This accommodates special-purpose * queues such as DelayQueues for which poll() is allowed to * return null even if it may later return non-null when delays * expire. */ private final BlockingQueueworkQueue;
一個(gè)BlockingQueue
/** * Set containing all worker threads in pool. Accessed only when * holding mainLock. */ private final HashSetworkers = new HashSet ();
一個(gè)HashSet
private final ReentrantLock mainLock = new ReentrantLock();
mainLock是線程池的主鎖,是可重入鎖,當(dāng)要操作workers set這個(gè)保持線程的HashSet時(shí),需要先獲取mainLock,還有當(dāng)要處理largestPoolSize、completedTaskCount這類統(tǒng)計(jì)數(shù)據(jù)時(shí)需要先獲取mainLock
其他重要屬性private int largestPoolSize; //用來(lái)記錄線程池中曾經(jīng)出現(xiàn)過(guò)的最大線程數(shù) private long completedTaskCount; //用來(lái)記錄已經(jīng)執(zhí)行完畢的任務(wù)個(gè)數(shù) private volatile boolean allowCoreThreadTimeOut; //是否允許為核心線程設(shè)置存活時(shí)間核心內(nèi)部類 Worker
Worker類是線程池中具化一個(gè)線程的對(duì)象,是線程池的核心,我們來(lái)看看源碼:
/** * Class Worker mainly maintains interrupt control state for * threads running tasks, along with other minor bookkeeping. * This class opportunistically extends AbstractQueuedSynchronizer * to simplify acquiring and releasing a lock surrounding each * task execution. This protects against interrupts that are * intended to wake up a worker thread waiting for a task from * instead interrupting a task being run. We implement a simple * non-reentrant mutual exclusion lock rather than use * ReentrantLock because we do not want worker tasks to be able to * reacquire the lock when they invoke pool control methods like * setCorePoolSize. Additionally, to suppress interrupts until * the thread actually starts running tasks, we initialize lock * state to a negative value, and clear it upon start (in * runWorker). */ private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { //設(shè)置AQS的同步狀態(tài)private volatile int state,是一個(gè)計(jì)數(shù)器,大于0代表鎖已經(jīng)被獲取 // 在調(diào)用runWorker()前,禁止interrupt中斷,在interruptIfStarted()方法中會(huì)判斷 getState()>=0 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this);//根據(jù)當(dāng)前worker創(chuàng)建一個(gè)線程對(duì)象 //當(dāng)前worker本身就是一個(gè)runnable任務(wù),也就是不會(huì)用參數(shù)的firstTask創(chuàng)建線程,而是調(diào)用當(dāng)前worker.run()時(shí)調(diào)用firstTask.run() //后面在addworker中,我們會(huì)啟動(dòng)worker對(duì)象中組合的Thread,而我們的執(zhí)行邏輯runWorker方法是在worker的run方法中被調(diào)用。 //為什么執(zhí)行thread的run方法會(huì)調(diào)用worker的run方法呢,原因就是在這里進(jìn)行了注入,將worker本身this注入到了thread中 } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); }//runWorker()是ThreadPoolExecutor的方法 // Lock methods // // The value 0 represents the unlocked state. 0代表“沒被鎖定”狀態(tài) // The value 1 represents the locked state. 1代表“鎖定”狀態(tài) protected boolean isHeldExclusively() { return getState() != 0; } /** * 嘗試獲取鎖 * 重寫AQS的tryAcquire(),AQS本來(lái)就是讓子類來(lái)實(shí)現(xiàn)的 */ protected boolean tryAcquire(int unused) { //嘗試一次將state從0設(shè)置為1,即“鎖定”狀態(tài),但由于每次都是state 0->1,而不是+1,那么說(shuō)明不可重入 //且state==-1時(shí)也不會(huì)獲取到鎖 if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } /** * 嘗試釋放鎖 * 不是state-1,而是置為0 */ protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } /** * 中斷(如果運(yùn)行) * shutdownNow時(shí)會(huì)循環(huán)對(duì)worker線程執(zhí)行 * 且不需要獲取worker鎖,即使在worker運(yùn)行時(shí)也可以中斷 */ void interruptIfStarted() { Thread t; //如果state>=0、t!=null、且t沒有被中斷 //new Worker()時(shí)state==-1,說(shuō)明不能中斷 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
我們看worker類時(shí),會(huì)發(fā)現(xiàn)最重要的幾個(gè)部分在于它里面定義了一個(gè)Thread thread和Runnable firstTask??吹竭@里,我們可能會(huì)比較奇怪,我們只是要一個(gè)可以執(zhí)行的線程,這里放一個(gè)Thread和一個(gè)Runnable的變量做什么呢?
其實(shí)之所以Worker自己實(shí)現(xiàn)Runnable,并創(chuàng)建Thread,在firstTask外包一層,是因?yàn)橐ㄟ^(guò)Worker負(fù)責(zé)控制中斷,而firstTask這個(gè)工作任務(wù)只是負(fù)責(zé)執(zhí)行業(yè)務(wù),worker的run方法調(diào)用了runWorker方法,在這里面,worker里的firstTask的run方法被執(zhí)行。稍后我們會(huì)聚焦這個(gè)執(zhí)行任務(wù)的runWorker方法。
好了,基本上我們將線程池的幾個(gè)主角,ctl,workQueue,workers,Worker簡(jiǎn)單介紹了一遍,現(xiàn)在,我們來(lái)看看線程池是怎么玩的。
線程的運(yùn)行 execute方法這是線程池實(shí)現(xiàn)類外露供給外部實(shí)現(xiàn)提交線程任務(wù)command的核心方法,對(duì)于無(wú)需了解線程池內(nèi)部的使用者來(lái)說(shuō),這個(gè)方法就是把某個(gè)任務(wù)交給線程池,正常情況下,這個(gè)任務(wù)會(huì)在未來(lái)某個(gè)時(shí)刻被執(zhí)行,實(shí)現(xiàn)和注釋如下:
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * 在未來(lái)的某個(gè)時(shí)刻執(zhí)行給定的任務(wù)。這個(gè)任務(wù)用一個(gè)新線程執(zhí)行,或者用一個(gè)線程池中已經(jīng)存在的線程執(zhí)行 * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * 如果任務(wù)無(wú)法被提交執(zhí)行,要么是因?yàn)檫@個(gè)Executor已經(jīng)被shutdown關(guān)閉,要么是已經(jīng)達(dá)到其容量上限,任務(wù)會(huì)被當(dāng)前的RejectedExecutionHandler處理 * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn"t, by returning false. * 如果運(yùn)行的線程少于corePoolSize,嘗試開啟一個(gè)新線程去運(yùn)行command,command作為這個(gè)線程的第一個(gè)任務(wù) * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * 如果任務(wù)成功放入隊(duì)列,我們?nèi)孕枰粋€(gè)雙重校驗(yàn)去確認(rèn)是否應(yīng)該新建一個(gè)線程(因?yàn)榭赡艽嬖谟行┚€程在我們上次檢查后死了) * 或者 從我們進(jìn)入這個(gè)方法后,pool被關(guān)閉了 * 所以我們需要再次檢查state,如果線程池停止了需要回滾入隊(duì)列,如果池中沒有線程了,新開啟 一個(gè)線程 * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. * 如果無(wú)法將任務(wù)入隊(duì)列(可能隊(duì)列滿了),需要新開區(qū)一個(gè)線程(自己:往maxPoolSize發(fā)展) * 如果失敗了,說(shuō)明線程池shutdown 或者 飽和了,所以我們拒絕任務(wù) */ int c = ctl.get(); // 1、如果當(dāng)前線程數(shù)少于corePoolSize(可能是由于addWorker()操作已經(jīng)包含對(duì)線程池狀態(tài)的判斷,如此處沒加,而入workQueue前加了) if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; /** * 沒有成功addWorker(),再次獲取c(凡是需要再次用ctl做判斷時(shí),都會(huì)再次調(diào)用ctl.get()) * 失敗的原因可能是: * 1、線程池已經(jīng)shutdown,shutdown的線程池不再接收新任務(wù) * 2、workerCountOf(c) < corePoolSize 判斷后,由于并發(fā),別的線程先創(chuàng)建了worker線程,導(dǎo)致workerCount>=corePoolSize */ c = ctl.get(); } /** * 2、如果線程池RUNNING狀態(tài),且入隊(duì)列成功 */ if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); /** * 再次校驗(yàn)放入workerQueue中的任務(wù)是否能被執(zhí)行 * 1、如果線程池不是運(yùn)行狀態(tài)了,應(yīng)該拒絕添加新任務(wù),從workQueue中刪除任務(wù) * 2、如果線程池是運(yùn)行狀態(tài),或者從workQueue中刪除任務(wù)失?。▌偤糜幸粋€(gè)線程執(zhí)行完畢,并消耗了這個(gè)任務(wù)), * 確保還有線程執(zhí)行任務(wù)(只要有一個(gè)就夠了) */ //如果再次校驗(yàn)過(guò)程中,線程池不是RUNNING狀態(tài),并且remove(command)--workQueue.remove()成功,拒絕當(dāng)前command if (! isRunning(recheck) && remove(command)) reject(command); //如果當(dāng)前worker數(shù)量為0,通過(guò)addWorker(null, false)創(chuàng)建一個(gè)線程,其任務(wù)為null //為什么只檢查運(yùn)行的worker數(shù)量是不是0呢?? 為什么不和corePoolSize比較呢?? //只保證有一個(gè)worker線程可以從queue中獲取任務(wù)執(zhí)行就行了?? //因?yàn)橹灰€有活動(dòng)的worker線程,就可以消費(fèi)workerQueue中的任務(wù) else if (workerCountOf(recheck) == 0) addWorker(null, false);//第一個(gè)參數(shù)為null,說(shuō)明只為新建一個(gè)worker線程,沒有指定firstTask ////第二個(gè)參數(shù)為true代表占用corePoolSize,false占用maxPoolSize } /** * 3、如果線程池不是running狀態(tài) 或者 無(wú)法入隊(duì)列 * 嘗試開啟新線程,擴(kuò)容至maxPoolSize,如果addWork(command, false)失敗了,拒絕當(dāng)前command */ else if (!addWorker(command, false)) reject(command); }
我們可以簡(jiǎn)單歸納如下(注:圖來(lái)源見水印,謝謝大神的歸納):
在execute方法中,我們看到核心的邏輯是由addWorker方法來(lái)實(shí)現(xiàn)的,當(dāng)我們將一個(gè)任務(wù)提交給線程池,線程池會(huì)如何處理,就是主要由這個(gè)方法加以規(guī)范:
該方法有兩個(gè)參數(shù):
firstTask: worker線程的初始任務(wù),可以為空
core: true:將corePoolSize作為上限,false:將maximumPoolSize作為上限
排列組合,addWorker方法有4種傳參的方式:
1、addWorker(command, true) 2、addWorker(command, false) 3、addWorker(null, false) 4、addWorker(null, true)
在execute方法中就使用了前3種,結(jié)合這個(gè)核心方法進(jìn)行以下分析
第一個(gè):線程數(shù)小于corePoolSize時(shí),放一個(gè)需要處理的task進(jìn)Workers Set。如果Workers Set長(zhǎng)度超過(guò)corePoolSize,就返回false 第二個(gè):當(dāng)隊(duì)列被放滿時(shí),就嘗試將這個(gè)新來(lái)的task直接放入Workers Set,而此時(shí)Workers Set的長(zhǎng)度限制是maximumPoolSize。如果線程池也滿了的話就返回false 第三個(gè):放入一個(gè)空的task進(jìn)workers Set,長(zhǎng)度限制是maximumPoolSize。這樣一個(gè)task為空的worker在線程執(zhí)行的時(shí)候會(huì)去任務(wù)隊(duì)列里拿任務(wù),這樣就相當(dāng)于創(chuàng)建了一個(gè)新的線程,只是沒有馬上分配任務(wù) 第四個(gè):這個(gè)方法就是放一個(gè)null的task進(jìn)Workers Set,而且是在小于corePoolSize時(shí),如果此時(shí)Set中的數(shù)量已經(jīng)達(dá)到corePoolSize那就返回false,什么也不干。實(shí)際使用中是在prestartAllCoreThreads()方法,這個(gè)方法用來(lái)為線程池預(yù)先啟動(dòng)corePoolSize個(gè)worker等待從workQueue中獲取任務(wù)執(zhí)行
/** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread.start()), we roll back cleanly. * 檢查根據(jù)當(dāng)前線程池的狀態(tài)和給定的邊界(core or maximum)是否可以創(chuàng)建一個(gè)新的worker * 如果是這樣的話,worker的數(shù)量做相應(yīng)的調(diào)整,如果可能的話,創(chuàng)建一個(gè)新的worker并啟動(dòng),參數(shù)中的firstTask作為worker的第一個(gè)任務(wù) * 如果方法返回false,可能因?yàn)閜ool已經(jīng)關(guān)閉或者調(diào)用過(guò)了shutdown * 如果線程工廠創(chuàng)建線程失敗,也會(huì)失敗,返回false * 如果線程創(chuàng)建失敗,要么是因?yàn)榫€程工廠返回null,要么是發(fā)生了OutOfMemoryError * * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */ private boolean addWorker(Runnable firstTask, boolean core) { //外層循環(huán),負(fù)責(zé)判斷線程池狀態(tài) retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /** * 線程池的state越小越是運(yùn)行狀態(tài),runnbale=-1,shutdown=0,stop=1,tidying=2,terminated=3 * 要想這個(gè)if為true,線程池state必須已經(jīng)至少是shutdown狀態(tài)了 * 這時(shí)候以下3個(gè)條件任意一個(gè)是false都會(huì)進(jìn)入if語(yǔ)句,即無(wú)法addWorker(): * 1,rs == SHUTDOWN (隱含:rs>=SHUTDOWN)false情況: 線程池狀態(tài)已經(jīng)超過(guò)shutdown, * 可能是stop、tidying、terminated其中一個(gè),即線程池已經(jīng)終止 * 2,firstTask == null (隱含:rs==SHUTDOWN)false情況: firstTask不為空,rs==SHUTDOWN 且 firstTask不為空, * return false,場(chǎng)景是在線程池已經(jīng)shutdown后,還要添加新的任務(wù),拒絕 * 3,! workQueue.isEmpty() (隱含:rs==SHUTDOWN,firstTask==null)false情況: workQueue為空, * 當(dāng)firstTask為空時(shí)是為了創(chuàng)建一個(gè)沒有任務(wù)的線程,再?gòu)膚orkQueue中獲取任務(wù), * 如果workQueue已經(jīng)為空,那么就沒有添加新worker線程的必要了 * return false, */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; //內(nèi)層循環(huán),負(fù)責(zé)worker數(shù)量+1 for (;;) { int wc = workerCountOf(c); //入?yún)ore在這里起作用,表示加入的worker是加入corePool還是非corepool,換句話說(shuō),受到哪個(gè)size的約束 //如果worker數(shù)量>線程池最大上限CAPACITY(即使用int低29位可以容納的最大值) //或者( worker數(shù)量>corePoolSize 或 worker數(shù)量>maximumPoolSize ),即已經(jīng)超過(guò)了給定的邊界,不添加worker if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //CAS嘗試增加線程數(shù),,如果成功加了wc,那么break跳出檢查 //如果失敗,證明有競(jìng)爭(zhēng),那么重新到retry。 if (compareAndIncrementWorkerCount(c)) break retry; //如果不成功,重新獲取狀態(tài)繼續(xù)檢查 c = ctl.get(); // Re-read ctl //如果狀態(tài)不等于之前獲取的state,跳出內(nèi)層循環(huán),繼續(xù)去外層循環(huán)判斷 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop // else CAS失敗時(shí)因?yàn)閣orkerCount改變了,繼續(xù)內(nèi)層循環(huán)嘗試CAS對(duì)worker數(shù)量+1 } } //worker數(shù)量+1成功的后續(xù)操作 // 添加到workers Set集合,并啟動(dòng)worker線程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //新建worker//構(gòu)造方法做了三件事//1、設(shè)置worker這個(gè)AQS鎖的同步狀態(tài)state=-1 w = new Worker(firstTask); //2、將firstTask設(shè)置給worker的成員變量firstTask //3、使用worker自身這個(gè)runnable,調(diào)用ThreadFactory創(chuàng)建一個(gè)線程,并設(shè)置給worker的成員變量thread final Thread t = w.thread; if (t != null) { //獲取重入鎖,并且鎖上 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // rs!=SHUTDOWN ||firstTask!=null // 如果線程池在運(yùn)行runninglargestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) {//如果往HashSet中添加worker成功,啟動(dòng)線程 //通過(guò)t.start()方法正式執(zhí)行線程。在這里一個(gè)線程才算是真正的執(zhí)行起來(lái)了。 t.start(); workerStarted = true; } } } finally { //如果啟動(dòng)線程失敗 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
同樣的,我們可以歸納一下:
在addWorker方法中,我們將一個(gè)新增進(jìn)去的worker所組合的線程屬性thread啟動(dòng)了,但我們知道,在worker的構(gòu)造方法中,它將自己本身注入到了thread的target屬性里,所以繞了一圈,線程啟動(dòng)后,調(diào)用的還是worker的run方法,而在這里面,runWorker定義了線程執(zhí)行的邏輯:
/** * Main worker run loop. Repeatedly gets tasks from queue and * executes them, while coping with a number of issues: * * 1. We may start out with an initial task, in which case we * don"t need to get the first one. Otherwise, as long as pool is * running, we get tasks from getTask. If it returns null then the * worker exits due to changed pool state or configuration * parameters. Other exits result from exception throws in * external code, in which case completedAbruptly holds, which * usually leads processWorkerExit to replace this thread. * 我們可能使用一個(gè)初始化任務(wù)開始,即firstTask為null * 然后只要線程池在運(yùn)行,我們就從getTask()獲取任務(wù) * 如果getTask()返回null,則worker由于改變了線程池狀態(tài)或參數(shù)配置而退出 * 其它退出因?yàn)橥獠看a拋異常了,這會(huì)使得completedAbruptly為true,這會(huì)導(dǎo)致在processWorkerExit()方法中替換當(dāng)前線程 * * 2. Before running any task, the lock is acquired to prevent * other pool interrupts while the task is executing, and then we * ensure that unless pool is stopping, this thread does not have * its interrupt set. * 在任何任務(wù)執(zhí)行之前,都需要對(duì)worker加鎖去防止在任務(wù)運(yùn)行時(shí),其它的線程池中斷操作 * clearInterruptsForTaskRun保證除非線程池正在stoping,線程不會(huì)被設(shè)置中斷標(biāo)示 * * 3. Each task run is preceded by a call to beforeExecute, which * might throw an exception, in which case we cause thread to die * (breaking loop with completedAbruptly true) without processing * the task. * 每個(gè)任務(wù)執(zhí)行前會(huì)調(diào)用beforeExecute(),其中可能拋出一個(gè)異常,這種情況下會(huì)導(dǎo)致線程die(跳出循環(huán),且completedAbruptly==true),沒有執(zhí)行任務(wù) * 因?yàn)閎eforeExecute()的異常沒有cache住,會(huì)上拋,跳出循環(huán) * * 4. Assuming beforeExecute completes normally, we run the task, * gathering any of its thrown exceptions to send to afterExecute. * We separately handle RuntimeException, Error (both of which the * specs guarantee that we trap) and arbitrary Throwables. * Because we cannot rethrow Throwables within Runnable.run, we * wrap them within Errors on the way out (to the thread"s * UncaughtExceptionHandler). Any thrown exception also * conservatively causes thread to die. * * 5. After task.run completes, we call afterExecute, which may * also throw an exception, which will also cause thread to * die. According to JLS Sec 14.20, this exception is the one that * will be in effect even if task.run throws. * * The net effect of the exception mechanics is that afterExecute * and the thread"s UncaughtExceptionHandler have as accurate * information as we can provide about any problems encountered by * user code. * * @param w the worker */
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts //標(biāo)識(shí)線程是不是異常終止的 boolean completedAbruptly = true; try { //task不為null情況是初始化worker時(shí),如果task為null,則去隊(duì)列中取線程--->getTask() //可以看到,只要getTask方法被調(diào)用且返回null,那么worker必定被銷毀,而確定一個(gè)線程是否應(yīng)該被銷毀的邏輯,在getTask方法中 while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //線程開始執(zhí)行之前執(zhí)行此方法,可以實(shí)現(xiàn)Worker未執(zhí)行退出,本類中未實(shí)現(xiàn) beforeExecute(wt, task); Throwable thrown = null; try { task.run();//runWorker方法最本質(zhì)的存在意義,就是調(diào)用task的run方法 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //線程執(zhí)行后執(zhí)行,可以實(shí)現(xiàn)標(biāo)識(shí)Worker異常中斷的功能,本類中未實(shí)現(xiàn) afterExecute(task, thrown); } } finally { task = null;//運(yùn)行過(guò)的task標(biāo)null w.completedTasks++; w.unlock(); } } //標(biāo)識(shí)線程不是異常終止的,是因?yàn)椴粷M足while條件,被迫銷毀的 completedAbruptly = false; } finally { //處理worker退出的邏輯 processWorkerExit(w, completedAbruptly); } }
我們歸納:
runWorker方法中的getTask()方法是線程處理完一個(gè)任務(wù)后,從隊(duì)列中獲取新任務(wù)的實(shí)現(xiàn),也是處理判斷一個(gè)線程是否應(yīng)該被銷毀的邏輯所在:
/** * Performs blocking or timed wait for a task, depending on * current configuration settings, or returns null if this worker * must exit because of any of: 以下情況會(huì)返回null * 1. There are more than maximumPoolSize workers (due to * a call to setMaximumPoolSize). * 超過(guò)了maximumPoolSize設(shè)置的線程數(shù)量(因?yàn)檎{(diào)用了setMaximumPoolSize()) * 2. The pool is stopped. * 線程池被stop * 3. The pool is shutdown and the queue is empty. * 線程池被shutdown,并且workQueue空了 * 4. This worker timed out waiting for a task, and timed-out * workers are subject to termination (that is, * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) * both before and after the timed wait. * 線程等待任務(wù)超時(shí) * * @return task, or null if the worker must exit, in which case * workerCount is decremented * 返回null表示這個(gè)worker要結(jié)束了,這種情況下workerCount-1 */ private Runnable getTask() { // timedOut 主要是判斷后面的poll是否要超時(shí) boolean timedOut = false; // Did the last poll() time out? /** * 用于判斷線程池狀態(tài) */ for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /** * 對(duì)線程池狀態(tài)的判斷,兩種情況會(huì)workerCount-1,并且返回null * 1,線程池狀態(tài)為shutdown,且workQueue為空(反映了shutdown狀態(tài)的線程池還是要執(zhí)行workQueue中剩余的任務(wù)的) * 2,線程池狀態(tài)為>=stop(只有TIDYING和TERMINATED會(huì)大于stop)(shutdownNow()會(huì)導(dǎo)致變成STOP)(此時(shí)不用考慮workQueue的情況) */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount();//循環(huán)的CAS減少worker數(shù)量,直到成功 return null; } int wc = workerCountOf(c); // Are workers subject to culling? //allowCoreThreadTimeOut字段,表示是否允許核心線程超過(guò)閑置時(shí)間后被摧毀,默認(rèn)為false //我們前面說(shuō)過(guò),如果getTask方法返回null,那么這個(gè)worker只有被銷毀一途 //于是這個(gè)timed有3種情況 //(1)當(dāng)線程數(shù)沒有超過(guò)核心線程數(shù),且默認(rèn)allowCoreThreadTimeOut為false時(shí) // timed值為false??聪旅鎖f的判斷邏輯,除非目前線程數(shù)大于最大值,否則下面的if始終進(jìn)不去,該方法不可能返回null,worker也就不會(huì)被銷毀。 // 因?yàn)榍疤?線程數(shù)不超過(guò)核心線程數(shù)"與"線程數(shù)大于最大值"兩個(gè)命題互斥,所以(1)情況,邏輯進(jìn)入下面的if(返回null的線程銷毀邏輯)的可能性不存在。 // 也就是說(shuō),當(dāng)線程數(shù)沒有超過(guò)核心線程數(shù)時(shí),線程不會(huì)被銷毀。 //(2)當(dāng)當(dāng)前線程數(shù)超過(guò)核心線程數(shù),且默認(rèn)allowCoreThreadTimeOut為false時(shí)//timed值為true。 //(3)如果allowCoreThreadTimeOut為true,則timed始終為true boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //wc > maximumPoolSize則必銷毀,因?yàn)檫@情況下,wc>1也肯定為true //wc <= maximumPoolSize,且(timed && timedOut) = true,這種情況下一般也意味著worker要被銷毀,因?yàn)槌瑫r(shí)一般是由阻塞隊(duì)列為空造成的,所以workQueue.isEmpty()也大概率為真,進(jìn)入if邏輯。 //一般情況是這樣,那不一般的情況呢?阻塞隊(duì)列沒有為空,但是因?yàn)橐恍┰颍€是超時(shí)了,這時(shí)候取決于wc > 1,它為真就銷毀,為假就不銷毀。 // 也就是說(shuō),如果阻塞隊(duì)列還有任務(wù),但是wc=1,線程池里只剩下自己這個(gè)線程了,那么就不能銷毀,這個(gè)if不滿足,我們的代碼繼續(xù)往下走 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //如果timed為true那么使用poll取線程。否則使用take() Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //workQueue.poll():如果在keepAliveTime時(shí)間內(nèi),阻塞隊(duì)列還是沒有任務(wù),返回null workQueue.take(); //workQueue.take():如果阻塞隊(duì)列為空,當(dāng)前線程會(huì)被掛起等待;當(dāng)隊(duì)列中有任務(wù)加入時(shí),線程被喚醒,take方法返回任務(wù) //如果正常返回,那么返回取到的task。 if (r != null) return r; //否則,設(shè)為超時(shí),重新執(zhí)行循環(huán), timedOut = true; } catch (InterruptedException retry) { //在阻塞從workQueue中獲取任務(wù)時(shí),可以被interrupt()中斷,代碼中捕獲了InterruptedException,重置timedOut為初始值false,再次執(zhí)行第1步中的判斷,滿足就繼續(xù)獲取任務(wù),不滿足return null,會(huì)進(jìn)入worker退出的流程 timedOut = false; } }
歸納:
processWorkerExit方法在runWorker方法中,我們看到當(dāng)不滿足while條件后,線程池會(huì)執(zhí)行退出線程的操作,這個(gè)操作,就封裝在processWorkerExit方法中。
/** * Performs cleanup and bookkeeping for a dying worker. Called * only from worker threads. Unless completedAbruptly is set, * assumes that workerCount has already been adjusted to account * for exit. This method removes thread from worker set, and * possibly terminates the pool or replaces the worker if either * it exited due to user task exception or if fewer than * corePoolSize workers are running or queue is non-empty but * there are no workers. * * @param w the worker * @param completedAbruptly if the worker died due to user exception */ private void processWorkerExit(Worker w, boolean completedAbruptly) { //參數(shù): //worker: 要結(jié)束的worker //completedAbruptly: 是否突然完成(是否因?yàn)楫惓M顺觯? /** * 1、worker數(shù)量-1 * 如果是突然終止,說(shuō)明是task執(zhí)行時(shí)異常情況導(dǎo)致,即run()方法執(zhí)行時(shí)發(fā)生了異常,那么正在工作的worker線程數(shù)量需要-1 * 如果不是突然終止,說(shuō)明是worker線程沒有task可執(zhí)行了,不用-1,因?yàn)橐呀?jīng)在getTask()方法中-1了 */ if (completedAbruptly) // If abrupt, then workerCount wasn"t adjusted 代碼和注釋正好相反啊 decrementWorkerCount(); /** * 2、從Workers Set中移除worker */ final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; //把worker的完成任務(wù)數(shù)加到線程池的完成任務(wù)數(shù) workers.remove(w); //從HashSet中移除 } finally { mainLock.unlock(); } /** * 3、在對(duì)線程池有負(fù)效益的操作時(shí),都需要“嘗試終止”線程池 * 主要是判斷線程池是否滿足終止的狀態(tài) * 如果狀態(tài)滿足,但線程池還有線程,嘗試對(duì)其發(fā)出中斷響應(yīng),使其能進(jìn)入退出流程 * 沒有線程了,更新狀態(tài)為tidying->terminated */ tryTerminate(); /** * 4、是否需要增加worker線程 * 線程池狀態(tài)是running 或 shutdown * 如果當(dāng)前線程是突然終止的,addWorker() * 如果當(dāng)前線程不是突然終止的,但當(dāng)前線程數(shù)量 < 要維護(hù)的線程數(shù)量,addWorker() * 故如果調(diào)用線程池shutdown(),直到workQueue為空前,線程池都會(huì)維持corePoolSize個(gè)線程,然后再逐漸銷毀這corePoolSize個(gè)線程 */ int c = ctl.get(); //如果狀態(tài)是running、shutdown,即tryTerminate()沒有成功終止線程池,嘗試再添加一個(gè)worker if (runStateLessThan(c, STOP)) { //不是突然完成的,即沒有task任務(wù)可以獲取而完成的,計(jì)算min,并根據(jù)當(dāng)前worker數(shù)量判斷是否需要addWorker() if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //allowCoreThreadTimeOut默認(rèn)為false,即min默認(rèn)為corePoolSize //如果min為0,即不需要維持核心線程數(shù)量,且workQueue不為空,至少保持一個(gè)線程 if (min == 0 && ! workQueue.isEmpty()) min = 1; //如果線程數(shù)量大于最少數(shù)量,直接返回,否則下面至少要addWorker一個(gè) if (workerCountOf(c) >= min) return; // replacement not needed } //添加一個(gè)沒有firstTask的worker //只要worker是completedAbruptly突然終止的,或者線程數(shù)量小于要維護(hù)的數(shù)量,就新添一個(gè)worker線程,即使是shutdown狀態(tài) addWorker(null, false); } }
總而言之:如果線程池還沒有完全終止,就仍需要保持一定數(shù)量的線程。
線程池狀態(tài)是running 或 shutdown的情況下:
A、如果當(dāng)前線程是突然終止的,addWorker() B、如果當(dāng)前線程不是突然終止的,但當(dāng)前線程數(shù)量 < 要維護(hù)的線程數(shù)量,addWorker() 故如果調(diào)用線程池shutdown(),直到workQueue為空前,線程池都會(huì)維持corePoolSize個(gè)線程,然后再逐漸銷毀這corePoolSize個(gè)線程submit方法
前面我們講過(guò)execute方法,其作用是將一個(gè)任務(wù)提交給線程池,以期在未來(lái)的某個(gè)時(shí)間點(diǎn)被執(zhí)行。
submit方法在作用上,和execute方法是一樣的,將某個(gè)任務(wù)提交給線程池,讓線程池調(diào)度線程去執(zhí)行它。
那么它和execute方法有什么區(qū)別呢?我們來(lái)看看submit方法的源碼:
submit方法的實(shí)現(xiàn)在ThreadPoolExecutor的父類AbstractExecutorService類中,有三種重載方法:
/** * 提交一個(gè) Runnable 任務(wù)用于執(zhí)行,并返回一個(gè)表示該任務(wù)的 Future。該Future的get方法在成功完成時(shí)將會(huì)返回null。 * submit 參數(shù): task - 要提交的任務(wù) 返回:表示任務(wù)等待完成的 Future * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFutureftask = newTaskFor(task, null); execute(ftask); return ftask; } /** * 提交一個(gè)Runnable 任務(wù)用于執(zhí)行,并返回一個(gè)表示該任務(wù)的 Future。該 Future 的 get 方法在成功完成時(shí)將會(huì)返回給定的結(jié)果。 * submit 參數(shù): task - 要提交的任務(wù) result - 完成任務(wù)時(shí)要求返回的結(jié)果 * 返回: 表示任務(wù)等待完成的 Future * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, result); execute(ftask); return ftask; } /** * 提交一個(gè)Callable的任務(wù)用于執(zhí)行,返回一個(gè)表示任務(wù)的未決結(jié)果的 Future。該 Future 的 get 方法在成功完成時(shí)將會(huì)返回該任務(wù)的結(jié)果。 * 如果想立即阻塞任務(wù)的等待,則可以使用 result = exec.submit(aCallable).get(); 形式的構(gòu)造。 * 參數(shù): task - 要提交的任務(wù) 返回: 表示任務(wù)等待完成的Future * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future submit(Callable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task); execute(ftask); return ftask; }
源碼很簡(jiǎn)單,submit方法,將任務(wù)task封裝成FutureTask(newTaskFor方法中就是new了一個(gè)FutureTask),然后調(diào)用execute。所以submit方法和execute的所有區(qū)別,都在這FutureTask所帶來(lái)的差異化實(shí)現(xiàn)上。
總而言之,submit方法將一個(gè)任務(wù)task用future模式封裝成FutureTask對(duì)象,提交給線程執(zhí)行,并將這個(gè)FutureTask對(duì)象返回,以供主線程在該任務(wù)被線程池執(zhí)行之后得到執(zhí)行結(jié)果。
注意,獲得執(zhí)行結(jié)果的方法FutureTask.get(),會(huì)阻塞執(zhí)行該方法的線程,尤其是當(dāng)任務(wù)被DiscardPolicy策略和DiscardOldestPolicy拒絕的時(shí)候,get方法會(huì)一直阻塞在那里,所以我們最好使用自帶超時(shí)時(shí)間的future。
線程池的關(guān)閉 shutdown方法講完了線程池的基本運(yùn)轉(zhuǎn)過(guò)程,在方法章的最后,我們來(lái)看看負(fù)責(zé)線程池生命周期最后收尾工作的幾個(gè)重要方法,首先是shutdown方法。
/** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. * Invocation has no additional effect if already shut down. * *This method does not wait for previously submitted tasks to * complete execution. Use {@link #awaitTermination awaitTermination} * to do that. * 開始一個(gè)順序的shutdown操作,shutdown之前被執(zhí)行的已提交任務(wù),新的任務(wù)不會(huì)再被接收了。如果線程池已經(jīng)被shutdown了,該方法的調(diào)用沒有其他任何效果了。 * **該方法不會(huì)等待之前已經(jīng)提交的任務(wù)執(zhí)行完畢**,awaitTermination方法才有這個(gè)效果。 * * @throws SecurityException {@inheritDoc} */ public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //判斷是否可以操作關(guān)閉目標(biāo)線程。 checkShutdownAccess(); //advanceRunState方法,參數(shù):目標(biāo)狀態(tài);作用:一直執(zhí)行,直到成功利用CAS將狀態(tài)置為目標(biāo)值。 //設(shè)置線程池狀態(tài)為SHUTDOWN,此處之后,線程池中不會(huì)增加新Task advanceRunState(SHUTDOWN); //中斷所有的空閑線程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } //嘗試進(jìn)行terminate操作,但其實(shí)我們上面將狀態(tài)置為shutdown,就已經(jīng)算是“中止”了一個(gè)線程池了,它不會(huì)再執(zhí)行任務(wù),于外部而言,已經(jīng)失去了作用。而這里,也只是嘗試去將線程池的狀態(tài)一擼到底而已,并不是一定要terminate掉。該方法我們后面會(huì)說(shuō)到。 tryTerminate(); }
我們可以看到,shutdown方法只不過(guò)是中斷喚醒了所有阻塞的線程,并且把線程池狀態(tài)置為shutdown,正如注釋所說(shuō)的,它沒有等待所有正在執(zhí)行任務(wù)的線程執(zhí)行完任務(wù),把狀態(tài)置為shutdown,已經(jīng)足夠線程池喪失基本的功能了。
在該方法中,線程池如何中斷線程是我們最需要關(guān)心的,我們來(lái)看一下interruptIdleWorkers方法:
private void interruptIdleWorkers(boolean onlyOne) {//參數(shù)onlyOne表示是否值中斷一個(gè)線程就退出,在shutdown中該值為false。 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //遍歷workers 對(duì)所有worker做中斷處理。 for (Worker w : workers) { Thread t = w.thread; // w.tryLock()對(duì)Worker獲取鎖,因?yàn)檎趫?zhí)行的worker已經(jīng)加鎖了(見runWorker方法,w.lock()語(yǔ)句) //所以這保證了正在運(yùn)行執(zhí)行Task的Worker不會(huì)被中斷。只有阻塞在getTask方法的空閑線程才會(huì)進(jìn)這個(gè)if判斷(被中斷),但中斷不代表線程立刻停止,它要繼續(xù)處理到阻塞隊(duì)列為空時(shí)才會(huì)被銷毀。 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
我們可以看到,在中斷方法中,我們調(diào)用了worker的tryLock方法去嘗試獲取worker的鎖,所以我們說(shuō),worker類這一層的封裝,是用來(lái)控制線程中斷的,正在執(zhí)行任務(wù)的線程已經(jīng)上了鎖,無(wú)法被中斷,只有在獲取阻塞隊(duì)列中的任務(wù)的線程(我們稱為空閑線程)才會(huì)有被中斷的可能。
之前我們看過(guò)getTask方法,在這個(gè)方法中, worker是不加鎖的,所以可以被中斷。我們?yōu)槭裁凑f(shuō)“中斷不代表線程立刻停止,它要繼續(xù)處理到阻塞隊(duì)列為空時(shí)才會(huì)被銷毀”呢?具體邏輯,我們?cè)賮?lái)看一下getTask的源碼,以及我們的注釋(我們模擬中斷發(fā)生時(shí)的場(chǎng)景):
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? /** * 當(dāng)執(zhí)行過(guò)程中拋出InterruptedException 的時(shí)候,該異常被catch住,邏輯重新回到這個(gè)for循環(huán) * catch塊在getTask方法的最后。 */ for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /** * 因?yàn)檫壿嬍窃趻伋鲋袛喈惓:髞?lái)到這里的,那說(shuō)明線程池的狀態(tài)已經(jīng)在shutdown方法中被置為shutdown了,rs >= SHUTDOWN為true,rs >=STOP為false(只有TIDYING和TERMINATED狀態(tài)會(huì)大于stop) * 這時(shí)候,如果workQueue為空,判斷為真,線程被銷毀。 * 否則,workQueue為非空,判斷為假,線程不會(huì)進(jìn)入銷毀邏輯。 */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount();//循環(huán)的CAS減少worker數(shù)量,直到成功 return null; } int wc = workerCountOf(c); // Are workers subject to culling? //因?yàn)樵赾atch塊中,timeOut已經(jīng)為false了。 //所以只要不發(fā)生當(dāng)前線程數(shù)超過(guò)最大線程數(shù)這種極端情況,命題(wc > maximumPoolSize || (timed && timedOut)一定為false,線程依舊不被銷毀。 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //繼續(xù)執(zhí)行正常的從阻塞隊(duì)列中取任務(wù)的邏輯,直到阻塞隊(duì)列徹底為空,這時(shí)候,上面第一個(gè)if判斷符合,線程被銷毀,壽命徹底結(jié)束。 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //如果正常返回,那么返回取到的task。 if (r != null) return r; //否則,設(shè)為超時(shí),重新執(zhí)行循環(huán), timedOut = true; } catch (InterruptedException retry) { //捕獲中斷異常 timedOut = false; } } }
總結(jié):正阻塞在getTask()獲取任務(wù)的worker在被中斷后,會(huì)拋出InterruptedException,不再阻塞獲取任務(wù)。捕獲中斷異常后,將繼續(xù)循環(huán)到getTask()最開始的判斷線程池狀態(tài)的邏輯,當(dāng)線程池是shutdown狀態(tài),且workQueue.isEmpty時(shí),return null,進(jìn)行worker線程退出邏輯。
所以,這就是我們?yōu)槭裁凑f(shuō),shutdown方法不會(huì)立刻停止線程池,它的作用是阻止新的任務(wù)被添加進(jìn)來(lái)(邏輯在addWorker方法的第一個(gè)if判斷中,可以返回去看一下),并且繼續(xù)處理完剩下的任務(wù),然后tryTerminated,嘗試關(guān)閉。
tryTerminate方法/** * Transitions to TERMINATED state if either (SHUTDOWN and pool * and queue empty) or (STOP and pool empty). If otherwise * eligible to terminate but workerCount is nonzero, interrupts an * idle worker to ensure that shutdown signals propagate. This * method must be called following any action that might make * termination possible -- reducing worker count or removing tasks * from the queue during shutdown. The method is non-private to * allow access from ScheduledThreadPoolExecutor. * 在以下情況將線程池變?yōu)門ERMINATED終止?fàn)顟B(tài) * shutdown 且 正在運(yùn)行的worker 和 workQueue隊(duì)列 都empty * stop 且 沒有正在運(yùn)行的worker * * 這個(gè)方法必須在任何可能導(dǎo)致線程池終止的情況下被調(diào)用,如: * 減少worker數(shù)量 * shutdown時(shí)從queue中移除任務(wù) * * 這個(gè)方法不是私有的,所以允許子類ScheduledThreadPoolExecutor調(diào)用 */ final void tryTerminate() { for (;;) { int c = ctl.get(); /** * 線程池是否需要終止 * 如果以下3中情況任一為true,return,不進(jìn)行終止 * 1、還在運(yùn)行狀態(tài) * 2、狀態(tài)是TIDYING、或 TERMINATED,已經(jīng)終止過(guò)了 * 3、SHUTDOWN 且 workQueue不為空 */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; /** * 只有shutdown狀態(tài) 且 workQueue為空,或者 stop狀態(tài)能執(zhí)行到這一步 * 如果此時(shí)線程池還有線程(正在運(yùn)行任務(wù)或正在等待任務(wù),總之count不等于0) * 中斷喚醒一個(gè)正在等任務(wù)的空閑worker *(中斷喚醒的意思就是讓阻塞在阻塞隊(duì)列中的worker拋出異常,然后重新判斷狀態(tài),getTask方法邏輯) * 線程被喚醒后再次判斷線程池狀態(tài),會(huì)return null,進(jìn)入processWorkerExit()流程(runWorker邏輯) */ if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE);//中斷workers集合中的空閑任務(wù),參數(shù)為true,只中斷一個(gè)。(該邏輯的意義應(yīng)該在于通知被阻塞在隊(duì)列中的線程:別瞎jb等了,這個(gè)線程池都要倒閉了,趕緊收拾鋪蓋準(zhǔn)備銷毀吧你個(gè)逼玩意兒)。 //嘗試終止失敗,返回??赡艽蠹視?huì)有疑問,shutdown只調(diào)用了一次tryTerminate方法,如果一次嘗試失敗了,是不是就意味著shutdown方法很可能最終無(wú)法終止線程池? //其實(shí)看注釋,我們知道線程池在進(jìn)行所有負(fù)面效益的操作時(shí)都會(huì)調(diào)用該方法嘗試終止,上面我們中斷了一個(gè)阻塞線程讓他被銷毀,他銷毀時(shí)也會(huì)嘗試終止(這其中又喚醒了一個(gè)阻塞線程去銷毀),以此類推,直到最后一個(gè)線程執(zhí)行tryTerminate時(shí),邏輯才有可能走到下面去。 return; } /** * 如果狀態(tài)是SHUTDOWN,workQueue也為空了,正在運(yùn)行的worker也沒有了,開始terminated */ final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //CAS:將線程池的ctl變成TIDYING(所有的任務(wù)被終止,workCount為0,為此狀態(tài)時(shí)將會(huì)調(diào)用terminated()方法),期間ctl有變化就會(huì)失敗,會(huì)再次for循環(huán) if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { //方法為空,需子類實(shí)現(xiàn) terminated(); } finally { //將狀態(tài)置為TERMINATED ctl.set(ctlOf(TERMINATED, 0)); //最后執(zhí)行termination.signalAll(),并喚醒所有等待線程池終止這個(gè)Condition的線程(也就是調(diào)用了awaitTermination方法的線程,這個(gè)方法的作用是阻塞調(diào)用它的線程,直到調(diào)用該方法的線程池真的已經(jīng)被終止了。) termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
總結(jié)一下:tryTerminate被調(diào)用的時(shí)機(jī)主要有:
1,shutdown方法時(shí)
2,processWorkerExit方法銷毀一個(gè)線程時(shí)
3,addWorkerFailed方法添加線程失敗或啟動(dòng)線程失敗時(shí)
4,remove方法,從阻塞隊(duì)列中刪掉一個(gè)任務(wù)時(shí)
我們知道,shutdown后線程池將變成shutdown狀態(tài),此時(shí)不接收新任務(wù),但會(huì)處理完正在運(yùn)行的 和 在阻塞隊(duì)列中等待處理的任務(wù)。
我們接下來(lái)要說(shuō)的shutdownNow方法,作用是:shutdownNow后線程池將變成stop狀態(tài),此時(shí)不接收新任務(wù),不再處理在阻塞隊(duì)列中等待的任務(wù),還會(huì)嘗試中斷正在處理中的工作線程。
代碼如下:
/** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks * that were awaiting execution. These tasks are drained (removed) * from the task queue upon return from this method. * 嘗試停止所有活動(dòng)的正在執(zhí)行的任務(wù),停止等待任務(wù)的處理,并返回正在等待被執(zhí)行的任務(wù)列表 * 這個(gè)任務(wù)列表是從任務(wù)隊(duì)列中排出(刪除)的 *This method does not wait for actively executing tasks to * terminate. Use {@link #awaitTermination awaitTermination} to * do that. * 這個(gè)方法不用等到正在執(zhí)行的任務(wù)結(jié)束,要等待線程池終止可使用awaitTermination() *
There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. This implementation * cancels tasks via {@link Thread#interrupt}, so any task that * fails to respond to interrupts may never terminate. * 除了盡力嘗試停止運(yùn)行中的任務(wù),沒有任何保證 * 取消任務(wù)是通過(guò)Thread.interrupt()實(shí)現(xiàn)的,所以任何響應(yīng)中斷失敗的任務(wù)可能
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/71020.html
摘要:源碼分析創(chuàng)建可緩沖的線程池。源碼分析使用創(chuàng)建線程池源碼分析的構(gòu)造函數(shù)構(gòu)造函數(shù)參數(shù)核心線程數(shù)大小,當(dāng)線程數(shù),會(huì)創(chuàng)建線程執(zhí)行最大線程數(shù),當(dāng)線程數(shù)的時(shí)候,會(huì)把放入中保持存活時(shí)間,當(dāng)線程數(shù)大于的空閑線程能保持的最大時(shí)間。 之前創(chuàng)建線程的時(shí)候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThr...
摘要:當(dāng)活動(dòng)線程核心線程非核心線程達(dá)到這個(gè)數(shù)值后,后續(xù)任務(wù)將會(huì)根據(jù)來(lái)進(jìn)行拒絕策略處理。線程池工作原則當(dāng)線程池中線程數(shù)量小于則創(chuàng)建線程,并處理請(qǐng)求。當(dāng)線程池中的數(shù)量等于最大線程數(shù)時(shí)默默丟棄不能執(zhí)行的新加任務(wù),不報(bào)任何異常。 spring-cache使用記錄 spring-cache的使用記錄,坑點(diǎn)記錄以及采用的解決方案 深入分析 java 線程池的實(shí)現(xiàn)原理 在這篇文章中,作者有條不紊的將 ja...
摘要:線程池的作用線程池能有效的處理多個(gè)線程的并發(fā)問題,避免大量的線程因?yàn)榛ハ鄰?qiáng)占系統(tǒng)資源導(dǎo)致阻塞現(xiàn)象,能夠有效的降低頻繁創(chuàng)建和銷毀線程對(duì)性能所帶來(lái)的開銷。固定的線程數(shù)由系統(tǒng)資源設(shè)置。線程池的排隊(duì)策略與有關(guān)。線程池的狀態(tài)值分別是。 線程池的作用 線程池能有效的處理多個(gè)線程的并發(fā)問題,避免大量的線程因?yàn)榛ハ鄰?qiáng)占系統(tǒng)資源導(dǎo)致阻塞現(xiàn)象,能夠有效的降低頻繁創(chuàng)建和銷毀線程對(duì)性能所帶來(lái)的開銷。 線程池的...
摘要:任務(wù)性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。線程池在運(yùn)行過(guò)程中已完成的任務(wù)數(shù)量。如等于線程池的最大大小,則表示線程池曾經(jīng)滿了。線程池的線程數(shù)量。獲取活動(dòng)的線程數(shù)。通過(guò)擴(kuò)展線程池進(jìn)行監(jiān)控??蚣馨ň€程池,,,,,,等。 Java線程池 [toc] 什么是線程池 線程池就是有N個(gè)子線程共同在運(yùn)行的線程組合。 舉個(gè)容易理解的例子:有個(gè)線程組合(即線程池,咱可以比喻為一個(gè)公司),里面有3...
摘要:線程池常見實(shí)現(xiàn)線程池一般包含三個(gè)主要部分調(diào)度器決定由哪個(gè)線程來(lái)執(zhí)行任務(wù)執(zhí)行任務(wù)所能夠的最大耗時(shí)等線程隊(duì)列存放并管理著一系列線程這些線程都處于阻塞狀態(tài)或休眠狀態(tài)任務(wù)隊(duì)列存放著用戶提交的需要被執(zhí)行的任務(wù)一般任務(wù)的執(zhí)行的即先提交的任務(wù)先被執(zhí)行調(diào)度 線程池常見實(shí)現(xiàn) 線程池一般包含三個(gè)主要部分: 調(diào)度器: 決定由哪個(gè)線程來(lái)執(zhí)行任務(wù), 執(zhí)行任務(wù)所能夠的最大耗時(shí)等 線程隊(duì)列: 存放并管理著一系列線...
摘要:當(dāng)面試官問線程池時(shí),你應(yīng)該知道些什么一執(zhí)行流程與不同,向中提交任務(wù)的時(shí)候,任務(wù)被包裝成對(duì)象加入延遲隊(duì)列并啟動(dòng)一個(gè)線程。當(dāng)我們創(chuàng)建出一個(gè)調(diào)度線程池以后,就可以開始提交任務(wù)了。 最近新接手的項(xiàng)目里大量使用了ScheduledThreadPoolExecutor類去執(zhí)行一些定時(shí)任務(wù),之前一直沒有機(jī)會(huì)研究這個(gè)類的源碼,這次趁著機(jī)會(huì)好好研讀一下。 原文地址:http://www.jianshu....
閱讀 2671·2021-10-14 09:47
閱讀 4975·2021-09-22 15:52
閱讀 3380·2019-08-30 15:53
閱讀 1477·2019-08-30 15:44
閱讀 712·2019-08-29 16:41
閱讀 1683·2019-08-29 16:28
閱讀 467·2019-08-29 15:23
閱讀 1650·2019-08-26 12:20