成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

線程池源碼分析——ThreadPoolExecutor

xiguadada / 545人閱讀

摘要:提高線程的可管理性線程池可以統(tǒng)一管理分配調(diào)優(yōu)和監(jiān)控。線程池的初始化狀態(tài)是。調(diào)用線程池的接口時(shí),線程池由。當(dāng)所有的任務(wù)已終止,記錄的任務(wù)數(shù)量為,阻塞隊(duì)列為空,線程池會(huì)變?yōu)闋顟B(tài)。線程池徹底終止,就變成狀態(tài)。

序言

我們知道,線程池幫我們重復(fù)管理線程,避免創(chuàng)建大量的線程增加開銷。
合理的使用線程池能夠帶來(lái)3個(gè)很明顯的好處:
1.降低資源消耗:通過(guò)重用已經(jīng)創(chuàng)建的線程來(lái)降低線程創(chuàng)建和銷毀的消耗
2.提高響應(yīng)速度:任務(wù)到達(dá)時(shí)不需要等待線程創(chuàng)建就可以立即執(zhí)行。
3.提高線程的可管理性:線程池可以統(tǒng)一管理、分配、調(diào)優(yōu)和監(jiān)控。
java源生的線程池,實(shí)現(xiàn)于ThreadPoolExecutor類,這也是我們今天討論的重點(diǎn)

ThreadPoolExecutor類構(gòu)造方法

Jdk使用ThreadPoolExecutor類來(lái)創(chuàng)建線程池,我們來(lái)看看它的構(gòu)造方法。

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:
* {@code corePoolSize < 0}
* {@code keepAliveTime < 0}
* {@code maximumPoolSize <= 0}
* {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

int corePoolSize, //核心線程的數(shù)量

int maximumPoolSize, //最大線程數(shù)量

long keepAliveTime, //超出核心線程數(shù)量以外的線程空閑時(shí),線程存活的時(shí)間

TimeUnit unit, //存活時(shí)間的單位,有如下幾種選擇

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小時(shí)
TimeUnit.MINUTES;           //分鐘
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒

BlockingQueue workQueue, //保存待執(zhí)行任務(wù)的隊(duì)列,常見的也有如下幾種:

ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
PriorityBlockingQueue

ThreadFactory threadFactory, //創(chuàng)建新線程使用的工廠

RejectedExecutionHandler handler // 當(dāng)任務(wù)無(wú)法執(zhí)行時(shí)的處理器(線程拒絕策略)

核心類變量 ctl變量

ThreadPoolExecutor中有一個(gè)控制狀態(tài)的屬性叫ctl,它是一個(gè)AtomicInteger類型的變量,它一個(gè)int值可以儲(chǔ)存兩個(gè)概念的信息:

workerCount:表明當(dāng)前池中有效的線程數(shù),通過(guò)workerCountOf方法獲得,workerCount上限是(2^29)-1。(最后存放在ctl的低29bit)

runState:表明當(dāng)前線程池的狀態(tài),通過(guò)workerCountOf方法獲得,最后存放在ctl的高3bit中,他們是整個(gè)線程池的運(yùn)行生命周期,有如下取值,分別的含義是:

RUNNING:可以新加線程,同時(shí)可以處理queue中的線程。線程池的初始化狀態(tài)是RUNNING。換句話說(shuō),線程池被一旦被創(chuàng)建,就處于RUNNING狀態(tài),

SHUTDOWN:不增加新線程,但是處理queue中的線程。調(diào)用線程池的shutdown()方法時(shí),線程池由RUNNING -> SHUTDOWN。

STOP 不增加新線程,同時(shí)不處理queue中的線程。調(diào)用線程池的shutdownNow()接口時(shí),線程池由(RUNNING or SHUTDOWN ) -> STOP。

TIDYING 當(dāng)所有的任務(wù)已終止,ctl記錄的”任務(wù)數(shù)量”為0,阻塞隊(duì)列為空,線程池會(huì)變?yōu)門IDYING狀態(tài)。當(dāng)線程池變?yōu)門IDYING狀態(tài)時(shí),會(huì)執(zhí)行鉤子函數(shù)terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變?yōu)門IDYING時(shí),進(jìn)行相應(yīng)的處理;可以通過(guò)重載terminated()函數(shù)來(lái)實(shí)現(xiàn)。

TERMINATED 線程池徹底終止,就變成TERMINATED狀態(tài)。線程池處在TIDYING狀態(tài)時(shí),執(zhí)行完terminated()之后,就會(huì)由 TIDYING -> TERMINATED。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

COUNT_BITS=32(integer的size)-3=29,于是五種狀態(tài)左移29位分別是:

RUNNING: 11100000000000000000000000000000

SHUTDOWN: 00000000000000000000000000000000

STOP: 00100000000000000000000000000000

TIDYING: 01000000000000000000000000000000

TERMINATED:01100000000000000000000000000000

而ThreadPoolExecutor是通過(guò)runStateOf和workerCountOf獲得者兩個(gè)概念的值的。

runStateOf和workerCountOf方法是如何剝離出ctl變量的兩個(gè)有效值呢?這其中我們可以看到CAPACITY是實(shí)現(xiàn)一個(gè)字段存兩個(gè)值的最重要的字段。

CAPACITY變量

CAPACITY=(1 << COUNT_BITS) – 1 轉(zhuǎn)成二進(jìn)制為:000 11111111111111111111111111111,他是線程池理論上可以允許的最大的線程數(shù)。
所以很明顯,它的重點(diǎn)在于,其高3bit為0,低29bit為1;
這樣,workderCountOf方法中,CAPACITY和ctl進(jìn)行&運(yùn)算時(shí),它能獲得高3位都是0,低29位和ctl低29位相同的值,這個(gè)值就是workerCount;
同理,runStateOf方法,CAPACITY的取反和ctl進(jìn)行&操作,獲得高3位和ctl高三位相等,低29位都為0的值,這個(gè)值就是runState

workQueue
/**
     * The queue used for holding tasks and handing off to worker
     * threads.  We do not require that workQueue.poll() returning
     * null necessarily means that workQueue.isEmpty(), so rely
     * solely on isEmpty to see if the queue is empty (which we must
     * do for example when deciding whether to transition from
     * SHUTDOWN to TIDYING).  This accommodates special-purpose
     * queues such as DelayQueues for which poll() is allowed to
     * return null even if it may later return non-null when delays
     * expire.
     */
    private final BlockingQueue workQueue;

一個(gè)BlockingQueue隊(duì)列,本身的結(jié)構(gòu)可以保證訪問的線程安全(這里不展開了)。這是一個(gè)排隊(duì)等待隊(duì)列。當(dāng)我們線程池里線程達(dá)到corePoolSize的時(shí)候,一些需要等待執(zhí)行的線程就放在這個(gè)隊(duì)列里等待。

workers
/**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     */
    private final HashSet workers = new HashSet();

一個(gè)HashSet的集合。線程池里所有可以立即執(zhí)行的線程都放在這個(gè)集合里。這也是我們直觀理解的線程的池子。

mainLock
    private final ReentrantLock mainLock = new ReentrantLock();

mainLock是線程池的主鎖,是可重入鎖,當(dāng)要操作workers set這個(gè)保持線程的HashSet時(shí),需要先獲取mainLock,還有當(dāng)要處理largestPoolSize、completedTaskCount這類統(tǒng)計(jì)數(shù)據(jù)時(shí)需要先獲取mainLock

其他重要屬性
private int largestPoolSize;   //用來(lái)記錄線程池中曾經(jīng)出現(xiàn)過(guò)的最大線程數(shù)
 
private long completedTaskCount;   //用來(lái)記錄已經(jīng)執(zhí)行完畢的任務(wù)個(gè)數(shù)

private volatile boolean allowCoreThreadTimeOut;   //是否允許為核心線程設(shè)置存活時(shí)間
核心內(nèi)部類 Worker

Worker類是線程池中具化一個(gè)線程的對(duì)象,是線程池的核心,我們來(lái)看看源碼:

/**
     * Class Worker mainly maintains interrupt control state for
     * threads running tasks, along with other minor bookkeeping.
     * This class opportunistically extends AbstractQueuedSynchronizer
     * to simplify acquiring and releasing a lock surrounding each
     * task execution.  This protects against interrupts that are
     * intended to wake up a worker thread waiting for a task from
     * instead interrupting a task being run.  We implement a simple
     * non-reentrant mutual exclusion lock rather than use
     * ReentrantLock because we do not want worker tasks to be able to
     * reacquire the lock when they invoke pool control methods like
     * setCorePoolSize.  Additionally, to suppress interrupts until
     * the thread actually starts running tasks, we initialize lock
     * state to a negative value, and clear it upon start (in
     * runWorker).
     */
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            //設(shè)置AQS的同步狀態(tài)private volatile int state,是一個(gè)計(jì)數(shù)器,大于0代表鎖已經(jīng)被獲取
            // 在調(diào)用runWorker()前,禁止interrupt中斷,在interruptIfStarted()方法中會(huì)判斷 getState()>=0
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);//根據(jù)當(dāng)前worker創(chuàng)建一個(gè)線程對(duì)象
            //當(dāng)前worker本身就是一個(gè)runnable任務(wù),也就是不會(huì)用參數(shù)的firstTask創(chuàng)建線程,而是調(diào)用當(dāng)前worker.run()時(shí)調(diào)用firstTask.run()
            //后面在addworker中,我們會(huì)啟動(dòng)worker對(duì)象中組合的Thread,而我們的執(zhí)行邏輯runWorker方法是在worker的run方法中被調(diào)用。
            //為什么執(zhí)行thread的run方法會(huì)調(diào)用worker的run方法呢,原因就是在這里進(jìn)行了注入,將worker本身this注入到了thread中
        }
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }//runWorker()是ThreadPoolExecutor的方法

        // Lock methods
        //
        // The value 0 represents the unlocked state. 0代表“沒被鎖定”狀態(tài)
        // The value 1 represents the locked state. 1代表“鎖定”狀態(tài)
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        /**
         * 嘗試獲取鎖
         * 重寫AQS的tryAcquire(),AQS本來(lái)就是讓子類來(lái)實(shí)現(xiàn)的
         */
        protected boolean tryAcquire(int unused) {
            //嘗試一次將state從0設(shè)置為1,即“鎖定”狀態(tài),但由于每次都是state 0->1,而不是+1,那么說(shuō)明不可重入
            //且state==-1時(shí)也不會(huì)獲取到鎖
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        /**
         * 嘗試釋放鎖
         * 不是state-1,而是置為0
         */
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
        /**
         * 中斷(如果運(yùn)行)
         * shutdownNow時(shí)會(huì)循環(huán)對(duì)worker線程執(zhí)行
         * 且不需要獲取worker鎖,即使在worker運(yùn)行時(shí)也可以中斷
         */
        void interruptIfStarted() {
            Thread t;
            //如果state>=0、t!=null、且t沒有被中斷
            //new Worker()時(shí)state==-1,說(shuō)明不能中斷
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

我們看worker類時(shí),會(huì)發(fā)現(xiàn)最重要的幾個(gè)部分在于它里面定義了一個(gè)Thread thread和Runnable firstTask??吹竭@里,我們可能會(huì)比較奇怪,我們只是要一個(gè)可以執(zhí)行的線程,這里放一個(gè)Thread和一個(gè)Runnable的變量做什么呢?
其實(shí)之所以Worker自己實(shí)現(xiàn)Runnable,并創(chuàng)建Thread,在firstTask外包一層,是因?yàn)橐ㄟ^(guò)Worker負(fù)責(zé)控制中斷,而firstTask這個(gè)工作任務(wù)只是負(fù)責(zé)執(zhí)行業(yè)務(wù),worker的run方法調(diào)用了runWorker方法,在這里面,worker里的firstTask的run方法被執(zhí)行。稍后我們會(huì)聚焦這個(gè)執(zhí)行任務(wù)的runWorker方法。

核心方法

好了,基本上我們將線程池的幾個(gè)主角,ctl,workQueue,workers,Worker簡(jiǎn)單介紹了一遍,現(xiàn)在,我們來(lái)看看線程池是怎么玩的。

線程的運(yùn)行 execute方法

這是線程池實(shí)現(xiàn)類外露供給外部實(shí)現(xiàn)提交線程任務(wù)command的核心方法,對(duì)于無(wú)需了解線程池內(nèi)部的使用者來(lái)說(shuō),這個(gè)方法就是把某個(gè)任務(wù)交給線程池,正常情況下,這個(gè)任務(wù)會(huì)在未來(lái)某個(gè)時(shí)刻被執(zhí)行,實(shí)現(xiàn)和注釋如下:

    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     * * 在未來(lái)的某個(gè)時(shí)刻執(zhí)行給定的任務(wù)。這個(gè)任務(wù)用一個(gè)新線程執(zhí)行,或者用一個(gè)線程池中已經(jīng)存在的線程執(zhí)行
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     * 如果任務(wù)無(wú)法被提交執(zhí)行,要么是因?yàn)檫@個(gè)Executor已經(jīng)被shutdown關(guān)閉,要么是已經(jīng)達(dá)到其容量上限,任務(wù)會(huì)被當(dāng)前的RejectedExecutionHandler處理
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn"t, by returning false.
         * 如果運(yùn)行的線程少于corePoolSize,嘗試開啟一個(gè)新線程去運(yùn)行command,command作為這個(gè)線程的第一個(gè)任務(wù)
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *  如果任務(wù)成功放入隊(duì)列,我們?nèi)孕枰粋€(gè)雙重校驗(yàn)去確認(rèn)是否應(yīng)該新建一個(gè)線程(因?yàn)榭赡艽嬖谟行┚€程在我們上次檢查后死了)
         *  或者 從我們進(jìn)入這個(gè)方法后,pool被關(guān)閉了
         *  所以我們需要再次檢查state,如果線程池停止了需要回滾入隊(duì)列,如果池中沒有線程了,新開啟 一個(gè)線程
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         * 如果無(wú)法將任務(wù)入隊(duì)列(可能隊(duì)列滿了),需要新開區(qū)一個(gè)線程(自己:往maxPoolSize發(fā)展)
        * 如果失敗了,說(shuō)明線程池shutdown 或者 飽和了,所以我們拒絕任務(wù)
         */
        int c = ctl.get();
        // 1、如果當(dāng)前線程數(shù)少于corePoolSize(可能是由于addWorker()操作已經(jīng)包含對(duì)線程池狀態(tài)的判斷,如此處沒加,而入workQueue前加了)
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;

            /**
             * 沒有成功addWorker(),再次獲取c(凡是需要再次用ctl做判斷時(shí),都會(huì)再次調(diào)用ctl.get())
             * 失敗的原因可能是:
             * 1、線程池已經(jīng)shutdown,shutdown的線程池不再接收新任務(wù)
             * 2、workerCountOf(c) < corePoolSize 判斷后,由于并發(fā),別的線程先創(chuàng)建了worker線程,導(dǎo)致workerCount>=corePoolSize
             */
            c = ctl.get();
        }
        /**
         * 2、如果線程池RUNNING狀態(tài),且入隊(duì)列成功
         */
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();

            /**
             * 再次校驗(yàn)放入workerQueue中的任務(wù)是否能被執(zhí)行
             * 1、如果線程池不是運(yùn)行狀態(tài)了,應(yīng)該拒絕添加新任務(wù),從workQueue中刪除任務(wù)
             * 2、如果線程池是運(yùn)行狀態(tài),或者從workQueue中刪除任務(wù)失?。▌偤糜幸粋€(gè)線程執(zhí)行完畢,并消耗了這個(gè)任務(wù)),
             * 確保還有線程執(zhí)行任務(wù)(只要有一個(gè)就夠了)
             */
            //如果再次校驗(yàn)過(guò)程中,線程池不是RUNNING狀態(tài),并且remove(command)--workQueue.remove()成功,拒絕當(dāng)前command
            if (! isRunning(recheck) && remove(command))
                reject(command);

            //如果當(dāng)前worker數(shù)量為0,通過(guò)addWorker(null, false)創(chuàng)建一個(gè)線程,其任務(wù)為null
            //為什么只檢查運(yùn)行的worker數(shù)量是不是0呢?? 為什么不和corePoolSize比較呢??
            //只保證有一個(gè)worker線程可以從queue中獲取任務(wù)執(zhí)行就行了??
            //因?yàn)橹灰€有活動(dòng)的worker線程,就可以消費(fèi)workerQueue中的任務(wù)
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);//第一個(gè)參數(shù)為null,說(shuō)明只為新建一個(gè)worker線程,沒有指定firstTask
                                       ////第二個(gè)參數(shù)為true代表占用corePoolSize,false占用maxPoolSize
        }
        /**
         * 3、如果線程池不是running狀態(tài) 或者 無(wú)法入隊(duì)列
         *   嘗試開啟新線程,擴(kuò)容至maxPoolSize,如果addWork(command, false)失敗了,拒絕當(dāng)前command
         */
        else if (!addWorker(command, false))
            reject(command);
    }

我們可以簡(jiǎn)單歸納如下(注:圖來(lái)源見水印,謝謝大神的歸納):

addWorker

在execute方法中,我們看到核心的邏輯是由addWorker方法來(lái)實(shí)現(xiàn)的,當(dāng)我們將一個(gè)任務(wù)提交給線程池,線程池會(huì)如何處理,就是主要由這個(gè)方法加以規(guī)范:

該方法有兩個(gè)參數(shù):

firstTask: worker線程的初始任務(wù),可以為空

core: true:將corePoolSize作為上限,false:將maximumPoolSize作為上限

排列組合,addWorker方法有4種傳參的方式:

1、addWorker(command, true)
2、addWorker(command, false)
3、addWorker(null, false)
4、addWorker(null, true)

在execute方法中就使用了前3種,結(jié)合這個(gè)核心方法進(jìn)行以下分析

第一個(gè):線程數(shù)小于corePoolSize時(shí),放一個(gè)需要處理的task進(jìn)Workers Set。如果Workers Set長(zhǎng)度超過(guò)corePoolSize,就返回false
第二個(gè):當(dāng)隊(duì)列被放滿時(shí),就嘗試將這個(gè)新來(lái)的task直接放入Workers Set,而此時(shí)Workers Set的長(zhǎng)度限制是maximumPoolSize。如果線程池也滿了的話就返回false
第三個(gè):放入一個(gè)空的task進(jìn)workers Set,長(zhǎng)度限制是maximumPoolSize。這樣一個(gè)task為空的worker在線程執(zhí)行的時(shí)候會(huì)去任務(wù)隊(duì)列里拿任務(wù),這樣就相當(dāng)于創(chuàng)建了一個(gè)新的線程,只是沒有馬上分配任務(wù)
第四個(gè):這個(gè)方法就是放一個(gè)null的task進(jìn)Workers Set,而且是在小于corePoolSize時(shí),如果此時(shí)Set中的數(shù)量已經(jīng)達(dá)到corePoolSize那就返回false,什么也不干。實(shí)際使用中是在prestartAllCoreThreads()方法,這個(gè)方法用來(lái)為線程池預(yù)先啟動(dòng)corePoolSize個(gè)worker等待從workQueue中獲取任務(wù)執(zhí)行
    /**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     * 檢查根據(jù)當(dāng)前線程池的狀態(tài)和給定的邊界(core or maximum)是否可以創(chuàng)建一個(gè)新的worker
     * 如果是這樣的話,worker的數(shù)量做相應(yīng)的調(diào)整,如果可能的話,創(chuàng)建一個(gè)新的worker并啟動(dòng),參數(shù)中的firstTask作為worker的第一個(gè)任務(wù)
     * 如果方法返回false,可能因?yàn)閜ool已經(jīng)關(guān)閉或者調(diào)用過(guò)了shutdown
     * 如果線程工廠創(chuàng)建線程失敗,也會(huì)失敗,返回false
     * 如果線程創(chuàng)建失敗,要么是因?yàn)榫€程工廠返回null,要么是發(fā)生了OutOfMemoryError
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        //外層循環(huán),負(fù)責(zé)判斷線程池狀態(tài)
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            /**
             * 線程池的state越小越是運(yùn)行狀態(tài),runnbale=-1,shutdown=0,stop=1,tidying=2,terminated=3
             * 要想這個(gè)if為true,線程池state必須已經(jīng)至少是shutdown狀態(tài)了
             * 這時(shí)候以下3個(gè)條件任意一個(gè)是false都會(huì)進(jìn)入if語(yǔ)句,即無(wú)法addWorker():
             *   1,rs == SHUTDOWN         (隱含:rs>=SHUTDOWN)false情況: 線程池狀態(tài)已經(jīng)超過(guò)shutdown,
             *                               可能是stop、tidying、terminated其中一個(gè),即線程池已經(jīng)終止
             *  2,firstTask == null      (隱含:rs==SHUTDOWN)false情況: firstTask不為空,rs==SHUTDOWN 且 firstTask不為空,
             *                               return false,場(chǎng)景是在線程池已經(jīng)shutdown后,還要添加新的任務(wù),拒絕
             *  3,! workQueue.isEmpty()  (隱含:rs==SHUTDOWN,firstTask==null)false情況: workQueue為空,
             *                               當(dāng)firstTask為空時(shí)是為了創(chuàng)建一個(gè)沒有任務(wù)的線程,再?gòu)膚orkQueue中獲取任務(wù),
             *                               如果workQueue已經(jīng)為空,那么就沒有添加新worker線程的必要了
             * return false,
             */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            //內(nèi)層循環(huán),負(fù)責(zé)worker數(shù)量+1
            for (;;) {
                int wc = workerCountOf(c);
                //入?yún)ore在這里起作用,表示加入的worker是加入corePool還是非corepool,換句話說(shuō),受到哪個(gè)size的約束
                //如果worker數(shù)量>線程池最大上限CAPACITY(即使用int低29位可以容納的最大值)
                //或者( worker數(shù)量>corePoolSize 或  worker數(shù)量>maximumPoolSize ),即已經(jīng)超過(guò)了給定的邊界,不添加worker
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //CAS嘗試增加線程數(shù),,如果成功加了wc,那么break跳出檢查
                //如果失敗,證明有競(jìng)爭(zhēng),那么重新到retry。
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //如果不成功,重新獲取狀態(tài)繼續(xù)檢查
                c = ctl.get();  // Re-read ctl
                //如果狀態(tài)不等于之前獲取的state,跳出內(nèi)層循環(huán),繼續(xù)去外層循環(huán)判斷
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
                // else CAS失敗時(shí)因?yàn)閣orkerCount改變了,繼續(xù)內(nèi)層循環(huán)嘗試CAS對(duì)worker數(shù)量+1
            }
        }
         //worker數(shù)量+1成功的后續(xù)操作
         // 添加到workers Set集合,并啟動(dòng)worker線程
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //新建worker//構(gòu)造方法做了三件事//1、設(shè)置worker這個(gè)AQS鎖的同步狀態(tài)state=-1
            w = new Worker(firstTask);  //2、將firstTask設(shè)置給worker的成員變量firstTask
                                        //3、使用worker自身這個(gè)runnable,調(diào)用ThreadFactory創(chuàng)建一個(gè)線程,并設(shè)置給worker的成員變量thread
            final Thread t = w.thread;
            if (t != null) {
                //獲取重入鎖,并且鎖上
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                     // rs!=SHUTDOWN ||firstTask!=null
                     // 如果線程池在運(yùn)行running largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {//如果往HashSet中添加worker成功,啟動(dòng)線程
                    //通過(guò)t.start()方法正式執(zhí)行線程。在這里一個(gè)線程才算是真正的執(zhí)行起來(lái)了。
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            //如果啟動(dòng)線程失敗
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

同樣的,我們可以歸納一下:

runWorker方法

在addWorker方法中,我們將一個(gè)新增進(jìn)去的worker所組合的線程屬性thread啟動(dòng)了,但我們知道,在worker的構(gòu)造方法中,它將自己本身注入到了thread的target屬性里,所以繞了一圈,線程啟動(dòng)后,調(diào)用的還是worker的run方法,而在這里面,runWorker定義了線程執(zhí)行的邏輯:

/**
     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them, while coping with a number of issues:
     *
     * 1. We may start out with an initial task, in which case we
     * don"t need to get the first one. Otherwise, as long as pool is
     * running, we get tasks from getTask. If it returns null then the
     * worker exits due to changed pool state or configuration
     * parameters.  Other exits result from exception throws in
     * external code, in which case completedAbruptly holds, which
     * usually leads processWorkerExit to replace this thread.
     * 我們可能使用一個(gè)初始化任務(wù)開始,即firstTask為null
     * 然后只要線程池在運(yùn)行,我們就從getTask()獲取任務(wù)
     * 如果getTask()返回null,則worker由于改變了線程池狀態(tài)或參數(shù)配置而退出
     * 其它退出因?yàn)橥獠看a拋異常了,這會(huì)使得completedAbruptly為true,這會(huì)導(dǎo)致在processWorkerExit()方法中替換當(dāng)前線程
     *
     * 2. Before running any task, the lock is acquired to prevent
     * other pool interrupts while the task is executing, and then we
     * ensure that unless pool is stopping, this thread does not have
     * its interrupt set.
     * 在任何任務(wù)執(zhí)行之前,都需要對(duì)worker加鎖去防止在任務(wù)運(yùn)行時(shí),其它的線程池中斷操作
     * clearInterruptsForTaskRun保證除非線程池正在stoping,線程不會(huì)被設(shè)置中斷標(biāo)示
     *
     * 3. Each task run is preceded by a call to beforeExecute, which
     * might throw an exception, in which case we cause thread to die
     * (breaking loop with completedAbruptly true) without processing
     * the task.
     * 每個(gè)任務(wù)執(zhí)行前會(huì)調(diào)用beforeExecute(),其中可能拋出一個(gè)異常,這種情況下會(huì)導(dǎo)致線程die(跳出循環(huán),且completedAbruptly==true),沒有執(zhí)行任務(wù)
     * 因?yàn)閎eforeExecute()的異常沒有cache住,會(huì)上拋,跳出循環(huán)
     *
     * 4. Assuming beforeExecute completes normally, we run the task,
     * gathering any of its thrown exceptions to send to afterExecute.
     * We separately handle RuntimeException, Error (both of which the
     * specs guarantee that we trap) and arbitrary Throwables.
     * Because we cannot rethrow Throwables within Runnable.run, we
     * wrap them within Errors on the way out (to the thread"s
     * UncaughtExceptionHandler).  Any thrown exception also
     * conservatively causes thread to die.
     *
     * 5. After task.run completes, we call afterExecute, which may
     * also throw an exception, which will also cause thread to
     * die. According to JLS Sec 14.20, this exception is the one that
     * will be in effect even if task.run throws.
     *
     * The net effect of the exception mechanics is that afterExecute
     * and the thread"s UncaughtExceptionHandler have as accurate
     * information as we can provide about any problems encountered by
     * user code.
     *
     * @param w the worker
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        //標(biāo)識(shí)線程是不是異常終止的
        boolean completedAbruptly = true;
        try {
            //task不為null情況是初始化worker時(shí),如果task為null,則去隊(duì)列中取線程--->getTask()
            //可以看到,只要getTask方法被調(diào)用且返回null,那么worker必定被銷毀,而確定一個(gè)線程是否應(yīng)該被銷毀的邏輯,在getTask方法中
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //線程開始執(zhí)行之前執(zhí)行此方法,可以實(shí)現(xiàn)Worker未執(zhí)行退出,本類中未實(shí)現(xiàn)
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//runWorker方法最本質(zhì)的存在意義,就是調(diào)用task的run方法
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //線程執(zhí)行后執(zhí)行,可以實(shí)現(xiàn)標(biāo)識(shí)Worker異常中斷的功能,本類中未實(shí)現(xiàn)
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;//運(yùn)行過(guò)的task標(biāo)null
                    w.completedTasks++;
                    w.unlock();
                }
            }
            //標(biāo)識(shí)線程不是異常終止的,是因?yàn)椴粷M足while條件,被迫銷毀的
            completedAbruptly = false;
        } finally {
            //處理worker退出的邏輯
            processWorkerExit(w, completedAbruptly);
        }
    }

我們歸納:

getTask方法

runWorker方法中的getTask()方法是線程處理完一個(gè)任務(wù)后,從隊(duì)列中獲取新任務(wù)的實(shí)現(xiàn),也是處理判斷一個(gè)線程是否應(yīng)該被銷毀的邏輯所在:

    /**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:  以下情況會(huì)返回null
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     *    超過(guò)了maximumPoolSize設(shè)置的線程數(shù)量(因?yàn)檎{(diào)用了setMaximumPoolSize())
     * 2. The pool is stopped.
     *    線程池被stop
     * 3. The pool is shutdown and the queue is empty.
     *    線程池被shutdown,并且workQueue空了
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait.
     *    線程等待任務(wù)超時(shí)
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     *         返回null表示這個(gè)worker要結(jié)束了,這種情況下workerCount-1
     */
    private Runnable getTask() {
        // timedOut 主要是判斷后面的poll是否要超時(shí)
        boolean timedOut = false; // Did the last poll() time out?

        /**
         * 用于判斷線程池狀態(tài)
         */
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            /**
             * 對(duì)線程池狀態(tài)的判斷,兩種情況會(huì)workerCount-1,并且返回null
             * 1,線程池狀態(tài)為shutdown,且workQueue為空(反映了shutdown狀態(tài)的線程池還是要執(zhí)行workQueue中剩余的任務(wù)的)
             * 2,線程池狀態(tài)為>=stop(只有TIDYING和TERMINATED會(huì)大于stop)(shutdownNow()會(huì)導(dǎo)致變成STOP)(此時(shí)不用考慮workQueue的情況)
             */
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();//循環(huán)的CAS減少worker數(shù)量,直到成功
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?

            //allowCoreThreadTimeOut字段,表示是否允許核心線程超過(guò)閑置時(shí)間后被摧毀,默認(rèn)為false
            //我們前面說(shuō)過(guò),如果getTask方法返回null,那么這個(gè)worker只有被銷毀一途
            //于是這個(gè)timed有3種情況
            //(1)當(dāng)線程數(shù)沒有超過(guò)核心線程數(shù),且默認(rèn)allowCoreThreadTimeOut為false時(shí)
            //          timed值為false??聪旅鎖f的判斷邏輯,除非目前線程數(shù)大于最大值,否則下面的if始終進(jìn)不去,該方法不可能返回null,worker也就不會(huì)被銷毀。
            //          因?yàn)榍疤?線程數(shù)不超過(guò)核心線程數(shù)"與"線程數(shù)大于最大值"兩個(gè)命題互斥,所以(1)情況,邏輯進(jìn)入下面的if(返回null的線程銷毀邏輯)的可能性不存在。
            //          也就是說(shuō),當(dāng)線程數(shù)沒有超過(guò)核心線程數(shù)時(shí),線程不會(huì)被銷毀。
            //(2)當(dāng)當(dāng)前線程數(shù)超過(guò)核心線程數(shù),且默認(rèn)allowCoreThreadTimeOut為false時(shí)//timed值為true。
            //(3)如果allowCoreThreadTimeOut為true,則timed始終為true
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            //wc > maximumPoolSize則必銷毀,因?yàn)檫@情況下,wc>1也肯定為true
            //wc <= maximumPoolSize,且(timed && timedOut) = true,這種情況下一般也意味著worker要被銷毀,因?yàn)槌瑫r(shí)一般是由阻塞隊(duì)列為空造成的,所以workQueue.isEmpty()也大概率為真,進(jìn)入if邏輯。
            
            //一般情況是這樣,那不一般的情況呢?阻塞隊(duì)列沒有為空,但是因?yàn)橐恍┰颍€是超時(shí)了,這時(shí)候取決于wc > 1,它為真就銷毀,為假就不銷毀。
            // 也就是說(shuō),如果阻塞隊(duì)列還有任務(wù),但是wc=1,線程池里只剩下自己這個(gè)線程了,那么就不能銷毀,這個(gè)if不滿足,我們的代碼繼續(xù)往下走
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //如果timed為true那么使用poll取線程。否則使用take()
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    //workQueue.poll():如果在keepAliveTime時(shí)間內(nèi),阻塞隊(duì)列還是沒有任務(wù),返回null
                    workQueue.take();
                    //workQueue.take():如果阻塞隊(duì)列為空,當(dāng)前線程會(huì)被掛起等待;當(dāng)隊(duì)列中有任務(wù)加入時(shí),線程被喚醒,take方法返回任務(wù)
                //如果正常返回,那么返回取到的task。
                if (r != null)
                    return r;
                //否則,設(shè)為超時(shí),重新執(zhí)行循環(huán),
                timedOut = true;
            } catch (InterruptedException retry) {
            //在阻塞從workQueue中獲取任務(wù)時(shí),可以被interrupt()中斷,代碼中捕獲了InterruptedException,重置timedOut為初始值false,再次執(zhí)行第1步中的判斷,滿足就繼續(xù)獲取任務(wù),不滿足return null,會(huì)進(jìn)入worker退出的流程
                timedOut = false;
            }
        }

歸納:

processWorkerExit方法

在runWorker方法中,我們看到當(dāng)不滿足while條件后,線程池會(huì)執(zhí)行退出線程的操作,這個(gè)操作,就封裝在processWorkerExit方法中。

/**
 * Performs cleanup and bookkeeping for a dying worker. Called
 * only from worker threads. Unless completedAbruptly is set,
 * assumes that workerCount has already been adjusted to account
 * for exit.  This method removes thread from worker set, and
 * possibly terminates the pool or replaces the worker if either
 * it exited due to user task exception or if fewer than
 * corePoolSize workers are running or queue is non-empty but
 * there are no workers.
 *
 * @param w the worker
 * @param completedAbruptly if the worker died due to user exception
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //參數(shù):
        //worker:                      要結(jié)束的worker
        //completedAbruptly: 是否突然完成(是否因?yàn)楫惓M顺觯?        
    /**
     * 1、worker數(shù)量-1
     * 如果是突然終止,說(shuō)明是task執(zhí)行時(shí)異常情況導(dǎo)致,即run()方法執(zhí)行時(shí)發(fā)生了異常,那么正在工作的worker線程數(shù)量需要-1
     * 如果不是突然終止,說(shuō)明是worker線程沒有task可執(zhí)行了,不用-1,因?yàn)橐呀?jīng)在getTask()方法中-1了
     */
    if (completedAbruptly) // If abrupt, then workerCount wasn"t adjusted 代碼和注釋正好相反啊
        decrementWorkerCount();
 
    /**
     * 2、從Workers Set中移除worker
     */
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks; //把worker的完成任務(wù)數(shù)加到線程池的完成任務(wù)數(shù)
        workers.remove(w); //從HashSet中移除
    } finally {
        mainLock.unlock();
    }
 
    /**
     * 3、在對(duì)線程池有負(fù)效益的操作時(shí),都需要“嘗試終止”線程池
     * 主要是判斷線程池是否滿足終止的狀態(tài)
     * 如果狀態(tài)滿足,但線程池還有線程,嘗試對(duì)其發(fā)出中斷響應(yīng),使其能進(jìn)入退出流程
     * 沒有線程了,更新狀態(tài)為tidying->terminated
     */
    tryTerminate();
 
    /**
     * 4、是否需要增加worker線程
     * 線程池狀態(tài)是running 或 shutdown
     * 如果當(dāng)前線程是突然終止的,addWorker()
     * 如果當(dāng)前線程不是突然終止的,但當(dāng)前線程數(shù)量 < 要維護(hù)的線程數(shù)量,addWorker()
     * 故如果調(diào)用線程池shutdown(),直到workQueue為空前,線程池都會(huì)維持corePoolSize個(gè)線程,然后再逐漸銷毀這corePoolSize個(gè)線程
     */
    int c = ctl.get();
    //如果狀態(tài)是running、shutdown,即tryTerminate()沒有成功終止線程池,嘗試再添加一個(gè)worker
    if (runStateLessThan(c, STOP)) {
        //不是突然完成的,即沒有task任務(wù)可以獲取而完成的,計(jì)算min,并根據(jù)當(dāng)前worker數(shù)量判斷是否需要addWorker()
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //allowCoreThreadTimeOut默認(rèn)為false,即min默認(rèn)為corePoolSize
             
            //如果min為0,即不需要維持核心線程數(shù)量,且workQueue不為空,至少保持一個(gè)線程
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
             
            //如果線程數(shù)量大于最少數(shù)量,直接返回,否則下面至少要addWorker一個(gè)
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
         
        //添加一個(gè)沒有firstTask的worker
        //只要worker是completedAbruptly突然終止的,或者線程數(shù)量小于要維護(hù)的數(shù)量,就新添一個(gè)worker線程,即使是shutdown狀態(tài)
        addWorker(null, false);
    }
}

總而言之:如果線程池還沒有完全終止,就仍需要保持一定數(shù)量的線程。

線程池狀態(tài)是running 或 shutdown的情況下:

A、如果當(dāng)前線程是突然終止的,addWorker()
B、如果當(dāng)前線程不是突然終止的,但當(dāng)前線程數(shù)量 < 要維護(hù)的線程數(shù)量,addWorker()
故如果調(diào)用線程池shutdown(),直到workQueue為空前,線程池都會(huì)維持corePoolSize個(gè)線程,然后再逐漸銷毀這corePoolSize個(gè)線程
submit方法

前面我們講過(guò)execute方法,其作用是將一個(gè)任務(wù)提交給線程池,以期在未來(lái)的某個(gè)時(shí)間點(diǎn)被執(zhí)行。
submit方法在作用上,和execute方法是一樣的,將某個(gè)任務(wù)提交給線程池,讓線程池調(diào)度線程去執(zhí)行它。
那么它和execute方法有什么區(qū)別呢?我們來(lái)看看submit方法的源碼:
submit方法的實(shí)現(xiàn)在ThreadPoolExecutor的父類AbstractExecutorService類中,有三種重載方法:

    /**
     * 提交一個(gè) Runnable 任務(wù)用于執(zhí)行,并返回一個(gè)表示該任務(wù)的 Future。該Future的get方法在成功完成時(shí)將會(huì)返回null。
     * submit 參數(shù): task - 要提交的任務(wù) 返回:表示任務(wù)等待完成的 Future
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    /**
     * 提交一個(gè)Runnable 任務(wù)用于執(zhí)行,并返回一個(gè)表示該任務(wù)的 Future。該 Future 的 get 方法在成功完成時(shí)將會(huì)返回給定的結(jié)果。
     * submit 參數(shù): task - 要提交的任務(wù) result - 完成任務(wù)時(shí)要求返回的結(jié)果 
     * 返回: 表示任務(wù)等待完成的 Future
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public  Future submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    /**
     * 提交一個(gè)Callable的任務(wù)用于執(zhí)行,返回一個(gè)表示任務(wù)的未決結(jié)果的 Future。該 Future 的 get 
方法在成功完成時(shí)將會(huì)返回該任務(wù)的結(jié)果。 
     * 如果想立即阻塞任務(wù)的等待,則可以使用 result = 
exec.submit(aCallable).get(); 形式的構(gòu)造。
     * 參數(shù): task - 要提交的任務(wù) 返回: 表示任務(wù)等待完成的Future
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public  Future submit(Callable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

源碼很簡(jiǎn)單,submit方法,將任務(wù)task封裝成FutureTask(newTaskFor方法中就是new了一個(gè)FutureTask),然后調(diào)用execute。所以submit方法和execute的所有區(qū)別,都在這FutureTask所帶來(lái)的差異化實(shí)現(xiàn)上。

總而言之,submit方法將一個(gè)任務(wù)task用future模式封裝成FutureTask對(duì)象,提交給線程執(zhí)行,并將這個(gè)FutureTask對(duì)象返回,以供主線程該任務(wù)被線程池執(zhí)行之后得到執(zhí)行結(jié)果

注意,獲得執(zhí)行結(jié)果的方法FutureTask.get(),會(huì)阻塞執(zhí)行該方法的線程,尤其是當(dāng)任務(wù)被DiscardPolicy策略和DiscardOldestPolicy拒絕的時(shí)候,get方法會(huì)一直阻塞在那里,所以我們最好使用自帶超時(shí)時(shí)間的future。

線程池的關(guān)閉 shutdown方法

講完了線程池的基本運(yùn)轉(zhuǎn)過(guò)程,在方法章的最后,我們來(lái)看看負(fù)責(zé)線程池生命周期最后收尾工作的幾個(gè)重要方法,首先是shutdown方法。

/**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * 

This method does not wait for previously submitted tasks to * complete execution. Use {@link #awaitTermination awaitTermination} * to do that. * 開始一個(gè)順序的shutdown操作,shutdown之前被執(zhí)行的已提交任務(wù),新的任務(wù)不會(huì)再被接收了。如果線程池已經(jīng)被shutdown了,該方法的調(diào)用沒有其他任何效果了。 * **該方法不會(huì)等待之前已經(jīng)提交的任務(wù)執(zhí)行完畢**,awaitTermination方法才有這個(gè)效果。 * * @throws SecurityException {@inheritDoc} */ public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //判斷是否可以操作關(guān)閉目標(biāo)線程。 checkShutdownAccess(); //advanceRunState方法,參數(shù):目標(biāo)狀態(tài);作用:一直執(zhí)行,直到成功利用CAS將狀態(tài)置為目標(biāo)值。 //設(shè)置線程池狀態(tài)為SHUTDOWN,此處之后,線程池中不會(huì)增加新Task advanceRunState(SHUTDOWN); //中斷所有的空閑線程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } //嘗試進(jìn)行terminate操作,但其實(shí)我們上面將狀態(tài)置為shutdown,就已經(jīng)算是“中止”了一個(gè)線程池了,它不會(huì)再執(zhí)行任務(wù),于外部而言,已經(jīng)失去了作用。而這里,也只是嘗試去將線程池的狀態(tài)一擼到底而已,并不是一定要terminate掉。該方法我們后面會(huì)說(shuō)到。 tryTerminate(); }

我們可以看到,shutdown方法只不過(guò)是中斷喚醒了所有阻塞的線程,并且把線程池狀態(tài)置為shutdown,正如注釋所說(shuō)的,它沒有等待所有正在執(zhí)行任務(wù)的線程執(zhí)行完任務(wù),把狀態(tài)置為shutdown,已經(jīng)足夠線程池喪失基本的功能了。

在該方法中,線程池如何中斷線程是我們最需要關(guān)心的,我們來(lái)看一下interruptIdleWorkers方法:

private void interruptIdleWorkers(boolean onlyOne) {//參數(shù)onlyOne表示是否值中斷一個(gè)線程就退出,在shutdown中該值為false。
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //遍歷workers 對(duì)所有worker做中斷處理。
            for (Worker w : workers) {
                Thread t = w.thread;
                // w.tryLock()對(duì)Worker獲取鎖,因?yàn)檎趫?zhí)行的worker已經(jīng)加鎖了(見runWorker方法,w.lock()語(yǔ)句)
                //所以這保證了正在運(yùn)行執(zhí)行Task的Worker不會(huì)被中斷。只有阻塞在getTask方法的空閑線程才會(huì)進(jìn)這個(gè)if判斷(被中斷),但中斷不代表線程立刻停止,它要繼續(xù)處理到阻塞隊(duì)列為空時(shí)才會(huì)被銷毀。
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

我們可以看到,在中斷方法中,我們調(diào)用了worker的tryLock方法去嘗試獲取worker的鎖,所以我們說(shuō),worker類這一層的封裝,是用來(lái)控制線程中斷的,正在執(zhí)行任務(wù)的線程已經(jīng)上了鎖,無(wú)法被中斷,只有在獲取阻塞隊(duì)列中的任務(wù)的線程(我們稱為空閑線程)才會(huì)有被中斷的可能。
之前我們看過(guò)getTask方法,在這個(gè)方法中, worker是不加鎖的,所以可以被中斷。我們?yōu)槭裁凑f(shuō)“中斷不代表線程立刻停止,它要繼續(xù)處理到阻塞隊(duì)列為空時(shí)才會(huì)被銷毀”呢?具體邏輯,我們?cè)賮?lái)看一下getTask的源碼,以及我們的注釋(我們模擬中斷發(fā)生時(shí)的場(chǎng)景):

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        /**
         * 當(dāng)執(zhí)行過(guò)程中拋出InterruptedException 的時(shí)候,該異常被catch住,邏輯重新回到這個(gè)for循環(huán)
         * catch塊在getTask方法的最后。
         */
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            /**
             * 因?yàn)檫壿嬍窃趻伋鲋袛喈惓:髞?lái)到這里的,那說(shuō)明線程池的狀態(tài)已經(jīng)在shutdown方法中被置為shutdown了,rs >= SHUTDOWN為true,rs >=STOP為false(只有TIDYING和TERMINATED狀態(tài)會(huì)大于stop)
             * 這時(shí)候,如果workQueue為空,判斷為真,線程被銷毀。
             * 否則,workQueue為非空,判斷為假,線程不會(huì)進(jìn)入銷毀邏輯。
             */
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();//循環(huán)的CAS減少worker數(shù)量,直到成功
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?

            //因?yàn)樵赾atch塊中,timeOut已經(jīng)為false了。
            //所以只要不發(fā)生當(dāng)前線程數(shù)超過(guò)最大線程數(shù)這種極端情況,命題(wc > maximumPoolSize || (timed && timedOut)一定為false,線程依舊不被銷毀。
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //繼續(xù)執(zhí)行正常的從阻塞隊(duì)列中取任務(wù)的邏輯,直到阻塞隊(duì)列徹底為空,這時(shí)候,上面第一個(gè)if判斷符合,線程被銷毀,壽命徹底結(jié)束。
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                //如果正常返回,那么返回取到的task。
                if (r != null)
                    return r;
                //否則,設(shè)為超時(shí),重新執(zhí)行循環(huán),
                timedOut = true;
            } catch (InterruptedException retry) {
                //捕獲中斷異常
                timedOut = false;
            }
        }
    }

總結(jié):正阻塞在getTask()獲取任務(wù)的worker在被中斷后,會(huì)拋出InterruptedException,不再阻塞獲取任務(wù)。捕獲中斷異常后,將繼續(xù)循環(huán)到getTask()最開始的判斷線程池狀態(tài)的邏輯,當(dāng)線程池是shutdown狀態(tài),且workQueue.isEmpty時(shí),return null,進(jìn)行worker線程退出邏輯

所以,這就是我們?yōu)槭裁凑f(shuō),shutdown方法不會(huì)立刻停止線程池,它的作用是阻止新的任務(wù)被添加進(jìn)來(lái)(邏輯在addWorker方法的第一個(gè)if判斷中,可以返回去看一下),并且繼續(xù)處理完剩下的任務(wù),然后tryTerminated,嘗試關(guān)閉。

tryTerminate方法
    /**
     * Transitions to TERMINATED state if either (SHUTDOWN and pool
     * and queue empty) or (STOP and pool empty).  If otherwise
     * eligible to terminate but workerCount is nonzero, interrupts an
     * idle worker to ensure that shutdown signals propagate. This
     * method must be called following any action that might make
     * termination possible -- reducing worker count or removing tasks
     * from the queue during shutdown. The method is non-private to
     * allow access from ScheduledThreadPoolExecutor.
     * 在以下情況將線程池變?yōu)門ERMINATED終止?fàn)顟B(tài)
     * shutdown 且 正在運(yùn)行的worker 和 workQueue隊(duì)列 都empty
     * stop 且  沒有正在運(yùn)行的worker
     * 
     * 這個(gè)方法必須在任何可能導(dǎo)致線程池終止的情況下被調(diào)用,如:
     * 減少worker數(shù)量
     * shutdown時(shí)從queue中移除任務(wù)
     * 
     * 這個(gè)方法不是私有的,所以允許子類ScheduledThreadPoolExecutor調(diào)用
     */
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            /**
             * 線程池是否需要終止
             * 如果以下3中情況任一為true,return,不進(jìn)行終止
             * 1、還在運(yùn)行狀態(tài)
             * 2、狀態(tài)是TIDYING、或 TERMINATED,已經(jīng)終止過(guò)了
             * 3、SHUTDOWN 且 workQueue不為空
             */
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
                /**
                 * 只有shutdown狀態(tài) 且 workQueue為空,或者 stop狀態(tài)能執(zhí)行到這一步
                 * 如果此時(shí)線程池還有線程(正在運(yùn)行任務(wù)或正在等待任務(wù),總之count不等于0)
                 * 中斷喚醒一個(gè)正在等任務(wù)的空閑worker
                 *(中斷喚醒的意思就是讓阻塞在阻塞隊(duì)列中的worker拋出異常,然后重新判斷狀態(tài),getTask方法邏輯)
                 * 線程被喚醒后再次判斷線程池狀態(tài),會(huì)return null,進(jìn)入processWorkerExit()流程(runWorker邏輯)
                 */
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);//中斷workers集合中的空閑任務(wù),參數(shù)為true,只中斷一個(gè)。(該邏輯的意義應(yīng)該在于通知被阻塞在隊(duì)列中的線程:別瞎jb等了,這個(gè)線程池都要倒閉了,趕緊收拾鋪蓋準(zhǔn)備銷毀吧你個(gè)逼玩意兒)。
                //嘗試終止失敗,返回??赡艽蠹視?huì)有疑問,shutdown只調(diào)用了一次tryTerminate方法,如果一次嘗試失敗了,是不是就意味著shutdown方法很可能最終無(wú)法終止線程池?
                //其實(shí)看注釋,我們知道線程池在進(jìn)行所有負(fù)面效益的操作時(shí)都會(huì)調(diào)用該方法嘗試終止,上面我們中斷了一個(gè)阻塞線程讓他被銷毀,他銷毀時(shí)也會(huì)嘗試終止(這其中又喚醒了一個(gè)阻塞線程去銷毀),以此類推,直到最后一個(gè)線程執(zhí)行tryTerminate時(shí),邏輯才有可能走到下面去。
                return;
            }
            /**
             * 如果狀態(tài)是SHUTDOWN,workQueue也為空了,正在運(yùn)行的worker也沒有了,開始terminated
             */
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //CAS:將線程池的ctl變成TIDYING(所有的任務(wù)被終止,workCount為0,為此狀態(tài)時(shí)將會(huì)調(diào)用terminated()方法),期間ctl有變化就會(huì)失敗,會(huì)再次for循環(huán)
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        //方法為空,需子類實(shí)現(xiàn)
                        terminated();
                    } finally {
                        //將狀態(tài)置為TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        //最后執(zhí)行termination.signalAll(),并喚醒所有等待線程池終止這個(gè)Condition的線程(也就是調(diào)用了awaitTermination方法的線程,這個(gè)方法的作用是阻塞調(diào)用它的線程,直到調(diào)用該方法的線程池真的已經(jīng)被終止了。)
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

總結(jié)一下:tryTerminate被調(diào)用的時(shí)機(jī)主要有:
1,shutdown方法時(shí)
2,processWorkerExit方法銷毀一個(gè)線程時(shí)
3,addWorkerFailed方法添加線程失敗或啟動(dòng)線程失敗時(shí)
4,remove方法,從阻塞隊(duì)列中刪掉一個(gè)任務(wù)時(shí)

shutdownNow方法

我們知道,shutdown后線程池將變成shutdown狀態(tài),此時(shí)不接收新任務(wù),但會(huì)處理完正在運(yùn)行的 和 在阻塞隊(duì)列中等待處理的任務(wù)。

我們接下來(lái)要說(shuō)的shutdownNow方法,作用是:shutdownNow后線程池將變成stop狀態(tài),此時(shí)不接收新任務(wù),不再處理在阻塞隊(duì)列中等待的任務(wù),還會(huì)嘗試中斷正在處理中的工作線程。
代碼如下:

    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     * 嘗試停止所有活動(dòng)的正在執(zhí)行的任務(wù),停止等待任務(wù)的處理,并返回正在等待被執(zhí)行的任務(wù)列表
     * 這個(gè)任務(wù)列表是從任務(wù)隊(duì)列中排出(刪除)的
     * 

This method does not wait for actively executing tasks to * terminate. Use {@link #awaitTermination awaitTermination} to * do that. * 這個(gè)方法不用等到正在執(zhí)行的任務(wù)結(jié)束,要等待線程池終止可使用awaitTermination() *

There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. This implementation * cancels tasks via {@link Thread#interrupt}, so any task that * fails to respond to interrupts may never terminate. * 除了盡力嘗試停止運(yùn)行中的任務(wù),沒有任何保證 * 取消任務(wù)是通過(guò)Thread.interrupt()實(shí)現(xiàn)的,所以任何響應(yīng)中斷失敗的任務(wù)可能

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/71020.html

相關(guān)文章

  • 使用 Executors,ThreadPoolExecutor,創(chuàng)建線程源碼分析理解

    摘要:源碼分析創(chuàng)建可緩沖的線程池。源碼分析使用創(chuàng)建線程池源碼分析的構(gòu)造函數(shù)構(gòu)造函數(shù)參數(shù)核心線程數(shù)大小,當(dāng)線程數(shù),會(huì)創(chuàng)建線程執(zhí)行最大線程數(shù),當(dāng)線程數(shù)的時(shí)候,會(huì)把放入中保持存活時(shí)間,當(dāng)線程數(shù)大于的空閑線程能保持的最大時(shí)間。 之前創(chuàng)建線程的時(shí)候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThr...

    Chiclaim 評(píng)論0 收藏0
  • 后端ing

    摘要:當(dāng)活動(dòng)線程核心線程非核心線程達(dá)到這個(gè)數(shù)值后,后續(xù)任務(wù)將會(huì)根據(jù)來(lái)進(jìn)行拒絕策略處理。線程池工作原則當(dāng)線程池中線程數(shù)量小于則創(chuàng)建線程,并處理請(qǐng)求。當(dāng)線程池中的數(shù)量等于最大線程數(shù)時(shí)默默丟棄不能執(zhí)行的新加任務(wù),不報(bào)任何異常。 spring-cache使用記錄 spring-cache的使用記錄,坑點(diǎn)記錄以及采用的解決方案 深入分析 java 線程池的實(shí)現(xiàn)原理 在這篇文章中,作者有條不紊的將 ja...

    roadtogeek 評(píng)論0 收藏0
  • 線程源碼分析

    摘要:線程池的作用線程池能有效的處理多個(gè)線程的并發(fā)問題,避免大量的線程因?yàn)榛ハ鄰?qiáng)占系統(tǒng)資源導(dǎo)致阻塞現(xiàn)象,能夠有效的降低頻繁創(chuàng)建和銷毀線程對(duì)性能所帶來(lái)的開銷。固定的線程數(shù)由系統(tǒng)資源設(shè)置。線程池的排隊(duì)策略與有關(guān)。線程池的狀態(tài)值分別是。 線程池的作用 線程池能有效的處理多個(gè)線程的并發(fā)問題,避免大量的線程因?yàn)榛ハ鄰?qiáng)占系統(tǒng)資源導(dǎo)致阻塞現(xiàn)象,能夠有效的降低頻繁創(chuàng)建和銷毀線程對(duì)性能所帶來(lái)的開銷。 線程池的...

    enda 評(píng)論0 收藏0
  • 一看就懂的Java線程分析詳解

    摘要:任務(wù)性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。線程池在運(yùn)行過(guò)程中已完成的任務(wù)數(shù)量。如等于線程池的最大大小,則表示線程池曾經(jīng)滿了。線程池的線程數(shù)量。獲取活動(dòng)的線程數(shù)。通過(guò)擴(kuò)展線程池進(jìn)行監(jiān)控??蚣馨ň€程池,,,,,,等。 Java線程池 [toc] 什么是線程池 線程池就是有N個(gè)子線程共同在運(yùn)行的線程組合。 舉個(gè)容易理解的例子:有個(gè)線程組合(即線程池,咱可以比喻為一個(gè)公司),里面有3...

    Yangder 評(píng)論0 收藏0
  • Java ThreadPoolExecutor 線程源碼分析

    摘要:線程池常見實(shí)現(xiàn)線程池一般包含三個(gè)主要部分調(diào)度器決定由哪個(gè)線程來(lái)執(zhí)行任務(wù)執(zhí)行任務(wù)所能夠的最大耗時(shí)等線程隊(duì)列存放并管理著一系列線程這些線程都處于阻塞狀態(tài)或休眠狀態(tài)任務(wù)隊(duì)列存放著用戶提交的需要被執(zhí)行的任務(wù)一般任務(wù)的執(zhí)行的即先提交的任務(wù)先被執(zhí)行調(diào)度 線程池常見實(shí)現(xiàn) 線程池一般包含三個(gè)主要部分: 調(diào)度器: 決定由哪個(gè)線程來(lái)執(zhí)行任務(wù), 執(zhí)行任務(wù)所能夠的最大耗時(shí)等 線程隊(duì)列: 存放并管理著一系列線...

    greatwhole 評(píng)論0 收藏0
  • Java調(diào)度線程ScheduledThreadPoolExecutor源碼分析

    摘要:當(dāng)面試官問線程池時(shí),你應(yīng)該知道些什么一執(zhí)行流程與不同,向中提交任務(wù)的時(shí)候,任務(wù)被包裝成對(duì)象加入延遲隊(duì)列并啟動(dòng)一個(gè)線程。當(dāng)我們創(chuàng)建出一個(gè)調(diào)度線程池以后,就可以開始提交任務(wù)了。 最近新接手的項(xiàng)目里大量使用了ScheduledThreadPoolExecutor類去執(zhí)行一些定時(shí)任務(wù),之前一直沒有機(jī)會(huì)研究這個(gè)類的源碼,這次趁著機(jī)會(huì)好好研讀一下。 原文地址:http://www.jianshu....

    kohoh_ 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<