成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Java中線程池ThreadPoolExecutor原理探究

zzir / 1092人閱讀

摘要:類型是位二進(jìn)制標(biāo)示,其中高位用來表示線程池狀態(tài),后面位用來記錄線程池線程個數(shù)。創(chuàng)建一個最小線程個數(shù)為,最大為,阻塞隊列為的線程池。

一、 前言

線程池主要解決兩個問題:一方面當(dāng)執(zhí)行大量異步任務(wù)時候線程池能夠提供較好的性能,這是因為使用線程池可以使每個任務(wù)的調(diào)用開銷減少(因為線程池線程是可以復(fù)用的)。另一方面線程池提供了一種資源限制和管理的手段,比如當(dāng)執(zhí)行一系列任務(wù)時候?qū)€程的管理,每個ThreadPoolExecutor也保留了一些基本的統(tǒng)計數(shù)據(jù),比如當(dāng)前線程池完成的任務(wù)數(shù)目。

二、 類圖結(jié)構(gòu)

Executors其實是個工具類,里面提供了好多靜態(tài)方法,根據(jù)用戶選擇返回不同的線程池實例。
ThreadPoolExecutor繼承了AbstractExecutorService,成員變量ctl是個Integer的原子變量用來記錄線程池狀態(tài) 和 線程池線程個數(shù),類似于ReentrantReadWriteLock使用一個變量存放兩種信息。
Integer類型是32位二進(jìn)制標(biāo)示,其中高3位用來表示線程池狀態(tài),后面 29位用來記錄線程池線程個數(shù)。

線程池狀態(tài)含義:

RUNNING:接受新任務(wù)并且處理阻塞隊列里的任務(wù)
SHUTDOWN:拒絕新任務(wù)但是處理阻塞隊列里的任務(wù)
STOP:拒絕新任務(wù)并且拋棄阻塞隊列里的任務(wù)同時會中斷正在處理的任務(wù)
TIDYING:所有任務(wù)都執(zhí)行完(包含阻塞隊列里面任務(wù))當(dāng)前線程池活動線程為0,將要調(diào)用terminated方法
TERMINATED:終止?fàn)顟B(tài)。terminated方法調(diào)用完成以后的狀態(tài)
線程池狀態(tài)轉(zhuǎn)換:

RUNNING -> SHUTDOWN
顯式調(diào)用shutdown()方法,或者隱式調(diào)用了finalize(),它里面調(diào)用了shutdown()方法。
RUNNING or SHUTDOWN)-> STOP
顯式 shutdownNow()方法
SHUTDOWN -> TIDYING
當(dāng)線程池和任務(wù)隊列都為空的時候
STOP -> TIDYING
當(dāng)線程池為空的時候
TIDYING -> TERMINATED
當(dāng) terminated() hook 方法執(zhí)行完成時候
線程池參數(shù):

corePoolSize:線程池核心線程個數(shù)
workQueue:用于保存等待執(zhí)行的任務(wù)的阻塞隊列。
比如基于數(shù)組的有界ArrayBlockingQueue、,基于鏈表的無界LinkedBlockingQueue,最多只有一個元素的同步隊列SynchronousQueue,優(yōu)先級隊列PriorityBlockingQueue,具體可參考 https://www.atatech.org/artic...
maximunPoolSize:線程池最大線程數(shù)量。
ThreadFactory:創(chuàng)建線程的工廠
RejectedExecutionHandler:飽和策略,當(dāng)隊列滿了并且線程個數(shù)達(dá)到maximunPoolSize后采取的策略,比如AbortPolicy(拋出異常),CallerRunsPolicy(使用調(diào)用者所在線程來運行任務(wù)),DiscardOldestPolicy(調(diào)用poll丟棄一個任務(wù),執(zhí)行當(dāng)前任務(wù)),DiscardPolicy(默默丟棄,不拋出異常)
keeyAliveTime:存活時間。如果當(dāng)前線程池中的線程數(shù)量比基本數(shù)量要多,并且是閑置狀態(tài)的話,這些閑置的線程能存活的最大時間
TimeUnit,存活時間的時間單位
線程池類型:

newFixedThreadPool

創(chuàng)建一個核心線程個數(shù)和最大線程個數(shù)都為nThreads的線程池,并且阻塞隊列長度為Integer.MAX_VALUE,keeyAliveTime=0說明只要線程個數(shù)比核心線程個數(shù)多并且當(dāng)前空閑則回收。

newSingleThreadExecutor
創(chuàng)建一個核心線程個數(shù)和最大線程個數(shù)都為1的線程池,并且阻塞隊列長度為Integer.MAX_VALUE,keeyAliveTime=0說明只要線程個數(shù)比核心線程個數(shù)多并且當(dāng)前空閑則回收。

newCachedThreadPool
創(chuàng)建一個按需創(chuàng)建線程的線程池,初始線程個數(shù)為0,最多線程個數(shù)為Integer.MAX_VALUE,并且阻塞隊列為同步隊列,keeyAliveTime=60說明只要當(dāng)前線程60s內(nèi)空閑則回收。這個特殊在于加入到同步隊列的任務(wù)會被馬上被執(zhí)行,同步隊列里面最多只有一個任務(wù),并且存在后馬上會拿出執(zhí)行。

newSingleThreadScheduledExecutor

創(chuàng)建一個最小線程個數(shù)corePoolSize為1,最大為Integer.MAX_VALUE,阻塞隊列為DelayedWorkQueue的線程池。

其中Worker繼承AQS和Runnable是具體承載任務(wù)的對象,Worker繼承了AQS自己實現(xiàn)了簡單的不可重入獨占鎖,其中status=0標(biāo)示鎖未被獲取狀態(tài)也就是未被鎖住的狀態(tài),state=1標(biāo)示鎖已經(jīng)被獲取的狀態(tài)也就是鎖住的狀態(tài)。

DefaultThreadFactory是線程工廠,newThread方法是對線程的一個分組包裹,其中poolNumber是個靜態(tài)的原子變量,用來統(tǒng)計線程工廠的個數(shù),threadNumber用來記錄每個線程工廠創(chuàng)建了多少線程。

三、 源碼分析

3.1 添加任務(wù)到線程池exectue方法

如果當(dāng)前線程池線程個數(shù)小于corePoolSize則開啟新線程
否則添加任務(wù)到任務(wù)隊列
如果任務(wù)隊列滿了,則嘗試新開啟線程執(zhí)行任務(wù),如果線程個數(shù)>maximumPoolSize則執(zhí)行拒絕策略。

重點看addWorkder方法:

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // 檢查隊列是否只在必要時為空.(1)
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;

    //循環(huán)cas增加線程個數(shù)
    for (;;) {
        int wc = workerCountOf(c);

        //如果線程個數(shù)超限則返回false
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;
        //cas增加線程個數(shù),同時只有一個線程成功
        if (compareAndIncrementWorkerCount(c))
            break retry;
        //cas失敗了,則看線程池狀態(tài)是否變化了,變化則跳到外層循環(huán)重試重新獲取線程池狀態(tài),否者內(nèi)層循環(huán)重新cas。
        c = ctl.get();  // Re-read ctl
        if (runStateOf(c) != rs)
            continue retry;
    }
}

//到這里說明cas成功了,(2)
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    //創(chuàng)建worker
    final ReentrantLock mainLock = this.mainLock;
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {

        //加獨占鎖,為了workers同步,因為可能多個線程調(diào)用了線程池的execute方法。
        mainLock.lock();
        try {

            //重新檢查線程池狀態(tài),為了避免在獲取鎖前調(diào)用了shutdown接口(3)
            int c = ctl.get();
            int rs = runStateOf(c);

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                //添加任務(wù)
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        //添加成功則啟動任務(wù)
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;}

代碼比較長,主要分兩部分,第一部分雙重循環(huán)目的是通過cas增加線程池線程個數(shù),第二部分主要是并發(fā)安全的把任務(wù)添加到workers里面,并且啟動任務(wù)執(zhí)行。

先看第一部分的(1)

展開!運算后等價于

也就是說下面幾種情況下會返回false:

當(dāng)前線程池狀態(tài)為STOP,TIDYING,TERMINATED
當(dāng)前線程池狀態(tài)為SHUTDOWN并且已經(jīng)有了第一個任務(wù)
當(dāng)前線程池狀態(tài)為SHUTDOWN并且任務(wù)隊列為空
內(nèi)層循環(huán)作用是使用cas增加線程個數(shù),如果線程個數(shù)超限則返回false,否者進(jìn)行cas,cas成功則退出雙循環(huán),否者cas失敗了,要看當(dāng)前線程池的狀態(tài)是否變化了,如果變了,則重新進(jìn)入外層循環(huán)重新獲取線程池狀態(tài),否者進(jìn)入內(nèi)層循環(huán)繼續(xù)進(jìn)行cas嘗試。

到了第二部分說明CAS成功了,也就是說線程個數(shù)加一了,但是現(xiàn)在任務(wù)還沒開始執(zhí)行,這里使用全局的獨占鎖來控制workers里面添加任務(wù),其實也可以使用并發(fā)安全的set,但是性能沒有獨占鎖好(這個從注釋中知道的)。這里需要注意的是要在獲取鎖后重新檢查線程池的狀態(tài),這是因為其他線程可可能在本方法獲取鎖前改變了線程池的狀態(tài),比如調(diào)用了shutdown方法。添加成功則啟動任務(wù)執(zhí)行。

3.2 工作線程Worker的執(zhí)行

先看下構(gòu)造函數(shù):

這里添加一個新狀態(tài)-1是為了避免當(dāng)前線程worker線程被中斷,比如調(diào)用了線程池的shutdownNow,如果當(dāng)前worker狀態(tài)>=0則會設(shè)置該線程的中斷標(biāo)志。這里設(shè)置了-1所以條件不滿足就不會中斷該線程了。運行runWorker時候會調(diào)用unlock方法,該方法吧status變?yōu)榱?,所以這時候調(diào)用shutdownNow會中斷worker線程。

    final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // status設(shè)置為0,允許中斷
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {

            w.lock();
            // 如果線程池當(dāng)前狀態(tài)至少是stop,則設(shè)置中斷標(biāo)志;
            // 如果線程池當(dāng)前狀態(tài)是RUNNININ,則重置中斷標(biāo)志,重置后需要重新
            //檢查下線程池狀態(tài),因為當(dāng)重置中斷標(biāo)志時候,可能調(diào)用了線程池的shutdown方法
            //改變了線程池狀態(tài)。
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();


            try {
                //任務(wù)執(zhí)行前干一些事情
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();//執(zhí)行任務(wù)
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //任務(wù)執(zhí)行完畢后干一些事情
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                //統(tǒng)計當(dāng)前worker完成了多少個任務(wù)
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {

        //執(zhí)行清了工作
        processWorkerExit(w, completedAbruptly);
    }
}

如果當(dāng)前task為空,則直接執(zhí)行,否者調(diào)用getTask從任務(wù)隊列獲取一個任務(wù)執(zhí)行,如果任務(wù)隊列為空,則worker退出。

private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // 如果當(dāng)前線程池狀態(tài)>=STOP 或者線程池狀態(tài)為shutdown并且工作隊列為空則,減少工作線程個數(shù)
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
        decrementWorkerCount();
        return null;
    }

    boolean timed;      // Are workers subject to culling?

    for (;;) {
        int wc = workerCountOf(c);
        timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if (wc <= maximumPoolSize && ! (timedOut && timed))
            break;
        if (compareAndDecrementWorkerCount(c))
            return null;
        c = ctl.get();  // Re-read ctl
        if (runStateOf(c) != rs)
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }

    try {

        //根據(jù)timed選擇調(diào)用poll還是阻塞的take
        Runnable r = timed ?
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
        if (r != null)
            return r;
        timedOut = true;
    } catch (InterruptedException retry) {
        timedOut = false;
    }
}}

private void processWorkerExit(Worker w, boolean completedAbruptly){
if (completedAbruptly) // If abrupt, then workerCount wasn"t adjusted
    decrementWorkerCount();

//統(tǒng)計整個線程池完成的任務(wù)個數(shù)
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    completedTaskCount += w.completedTasks;
    workers.remove(w);
} finally {
    mainLock.unlock();
}

//嘗試設(shè)置線程池狀態(tài)為TERMINATED,如果當(dāng)前是shutdonw狀態(tài)并且工作隊列為空
//或者當(dāng)前是stop狀態(tài)當(dāng)前線程池里面沒有活動線程
tryTerminate();

//如果當(dāng)前線程個數(shù)小于核心個數(shù),則增加
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
    if (!completedAbruptly) {
        int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
        if (min == 0 && ! workQueue.isEmpty())
            min = 1;
        if (workerCountOf(c) >= min)
            return; // replacement not needed
    }
    addWorker(null, false);
}}

3.3 shutdown操作
調(diào)用shutdown后,線程池就不會在接受新的任務(wù)了,但是工作隊列里面的任務(wù)還是要執(zhí)行的,但是該方法立刻返回的,并不等待隊列任務(wù)完成在返回。

public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    //權(quán)限檢查
    checkShutdownAccess();

    //設(shè)置當(dāng)前線程池狀態(tài)為SHUTDOWN,如果已經(jīng)是SHUTDOWN則直接返回
    advanceRunState(SHUTDOWN);

    //設(shè)置中斷標(biāo)志
    interruptIdleWorkers();
    onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
    mainLock.unlock();
}
//嘗試狀態(tài)變?yōu)門ERMINATED
tryTerminate();
}

如果當(dāng)前狀態(tài)>=targetState則直接返回,否者設(shè)置當(dāng)前狀態(tài)為targetState
private void advanceRunState(int targetState) {

for (;;) {
    int c = ctl.get();
    if (runStateAtLeast(c, targetState) ||
        ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
        break;
}
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}

設(shè)置所有線程的中斷標(biāo)志,主要這里首先加了全局鎖,同時只有一個線程可以調(diào)用shutdown時候設(shè)置中斷標(biāo)志,然后嘗試獲取worker自己的鎖,獲取成功則設(shè)置中斷標(biāo)示

private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    for (Worker w : workers) {
        Thread t = w.thread;
        if (!t.isInterrupted() && w.tryLock()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            } finally {
                w.unlock();
            }
        }
        if (onlyOne)
            break;
    }
} finally {
    mainLock.unlock();
}}

3.4 shutdownNow操作
調(diào)用shutdown后,線程池就不會在接受新的任務(wù)了,并且丟棄工作隊列里面里面的任務(wù),正在執(zhí)行的任務(wù)會被中斷,但是該方法立刻返回的,并不等待激活的任務(wù)執(zhí)行完成在返回。返回隊列里面的任務(wù)列表。

調(diào)用隊列的drainTo一次當(dāng)前隊列的元素到taskList,
可能失敗,如果調(diào)用drainTo后隊列海不為空,則循環(huán)刪除,并添加到taskList
public List shutdownNow() {

List tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    checkShutdownAccess();//權(quán)限檢查
    advanceRunState(STOP);// 設(shè)置線程池狀態(tài)為stop
    interruptWorkers();//中斷線程
    tasks = drainQueue();//移動隊列任務(wù)到tasks
} finally {
    mainLock.unlock();
}
tryTerminate();
return tasks;

}

調(diào)用隊列的drainTo一次當(dāng)前隊列的元素到taskList,
可能失敗,如果調(diào)用drainTo后隊列海不為空,則循環(huán)刪除,并添加到taskList
private List drainQueue() {

BlockingQueue q = workQueue;
List taskList = new ArrayList();
q.drainTo(taskList);
if (!q.isEmpty()) {
    for (Runnable r : q.toArray(new Runnable[0])) {
        if (q.remove(r))
            taskList.add(r);
    }
}
return taskList;

}

3.5 awaitTermination操作

等待線程池狀態(tài)變?yōu)門ERMINATED則返回,或者時間超時。由于整個過程獨占鎖,所以一般調(diào)用shutdown或者shutdownNow后使用。

public boolean awaitTermination(long timeout, TimeUnit unit)

    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}


四、總結(jié)

線程池巧妙的使用一個Integer類型原子變量來記錄線程池狀態(tài)和線程池線程個數(shù),設(shè)計時候考慮到未來(2^29)-1個線程可能不夠用,到時只需要把原子變量變?yōu)長ong類型,然后掩碼位數(shù)變下就可以了,但是為啥現(xiàn)在不一勞永逸的定義為Long那,主要是考慮到使用int類型操作時候速度上比Long類型快些。

通過線程池狀態(tài)來控制任務(wù)的執(zhí)行,每個worker線程可以處理多個任務(wù),線程池通過線程的復(fù)用減少了線程創(chuàng)建和銷毀的開銷,通過使用任務(wù)隊列避免了線程的阻塞從而避免了線程調(diào)度和線程上下文切換的開銷。

另外需要注意的是調(diào)用shutdown方法作用僅僅是修改線程池狀態(tài)讓現(xiàn)在任務(wù)失敗并中斷當(dāng)前線程,這個中斷并不是讓正在運行的線程終止,而是僅僅設(shè)置下線程的中斷標(biāo)志,如果線程內(nèi)沒有使用中斷標(biāo)志做一些事情,那么這個對線程沒有影響。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/19209.html

相關(guān)文章

  • Java線程ThreadPoolExecutor原理探究

    摘要:類型是位二進(jìn)制標(biāo)示,其中高位用來表示線程池狀態(tài),后面位用來記錄線程池線程個數(shù)。創(chuàng)建一個最小線程個數(shù)為,最大為,阻塞隊列為的線程池。 一、 前言 線程池主要解決兩個問題:一方面當(dāng)執(zhí)行大量異步任務(wù)時候線程池能夠提供較好的性能,這是因為使用線程池可以使每個任務(wù)的調(diào)用開銷減少(因為線程池線程是可以復(fù)用的)。另一方面線程池提供了一種資源限制和管理的手段,比如當(dāng)執(zhí)行一系列任務(wù)時候?qū)€程的管理,每個...

    lavor 評論0 收藏0
  • Java線程ThreadPoolExecutor原理探究

    摘要:類型是位二進(jìn)制標(biāo)示,其中高位用來表示線程池狀態(tài),后面位用來記錄線程池線程個數(shù)。創(chuàng)建一個最小線程個數(shù)為,最大為,阻塞隊列為的線程池。 一、 前言 線程池主要解決兩個問題:一方面當(dāng)執(zhí)行大量異步任務(wù)時候線程池能夠提供較好的性能,這是因為使用線程池可以使每個任務(wù)的調(diào)用開銷減少(因為線程池線程是可以復(fù)用的)。另一方面線程池提供了一種資源限制和管理的手段,比如當(dāng)執(zhí)行一系列任務(wù)時候?qū)€程的管理,每個...

    AJie 評論0 收藏0
  • Java線程從使用到閱讀源碼(3/10)

    摘要:最后,我們會通過對源代碼的剖析深入了解線程池的運行過程和具體設(shè)計,真正達(dá)到知其然而知其所以然的水平。創(chuàng)建線程池既然線程池是一個類,那么最直接的使用方法一定是一個類的對象,例如。單線程線程池單線程線程 我們一般不會選擇直接使用線程類Thread進(jìn)行多線程編程,而是使用更方便的線程池來進(jìn)行任務(wù)的調(diào)度和管理。線程池就像共享單車,我們只要在我們有需要的時候去獲取就可以了。甚至可以說線程池更棒,...

    468122151 評論0 收藏0
  • 高并發(fā)

    摘要:表示的是兩個,當(dāng)其中任意一個計算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢娝氖褂茫陂_始分析它的高并發(fā)實現(xiàn)機制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個比較典型的互聯(lián)網(wǎng)高并發(fā)場景。 干貨:深度剖析分布式搜索引擎設(shè)計 分布式,高可用,和機器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個名詞,今天我們首先來說說分布式。 探究...

    supernavy 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<