摘要:如何優(yōu)雅的使用和理解線程池線程池中你不容錯(cuò)過(guò)的一些細(xì)節(jié)由于篇幅限制,本次可能會(huì)分為上下兩篇。不接受新的任務(wù),同時(shí)等待現(xiàn)有任務(wù)執(zhí)行完畢后退出線程池。慎用方法關(guān)閉線程池,會(huì)導(dǎo)致任務(wù)丟失除非業(yè)務(wù)允許。
前言
原以為線程池還挺簡(jiǎn)單的(平時(shí)常用,也分析過(guò)原理),這次是想自己動(dòng)手寫(xiě)一個(gè)線程池來(lái)更加深入的了解它;但在動(dòng)手寫(xiě)的過(guò)程中落地到細(xì)節(jié)時(shí)發(fā)現(xiàn)并沒(méi)想的那么容易。結(jié)合源碼對(duì)比后確實(shí)不得不佩服 Doug Lea 。
我覺(jué)得大部分人直接去看 java.util.concurrent.ThreadPoolExecutor 的源碼時(shí)都是看一個(gè)大概,因?yàn)槠渲猩婕暗搅嗽S多細(xì)節(jié)處理,還有部分 AQS 的內(nèi)容,所以想要理清楚具體細(xì)節(jié)并不是那么容易。
與其挨個(gè)分析源碼不如自己實(shí)現(xiàn)一個(gè)簡(jiǎn)版,當(dāng)然簡(jiǎn)版并不意味著功能缺失,需要保證核心邏輯一致。
所以也是本篇文章的目的:
自己動(dòng)手寫(xiě)一個(gè)五臟俱全的線程池,同時(shí)會(huì)了解到線程池的工作原理,以及如何在工作中合理的利用線程池。
再開(kāi)始之前建議對(duì)線程池不是很熟悉的朋友看看這幾篇:
這里我截取了部分內(nèi)容,也許可以埋個(gè)伏筆(坑)。
具體請(qǐng)看這兩個(gè)鏈接。
如何優(yōu)雅的使用和理解線程池
線程池中你不容錯(cuò)過(guò)的一些細(xì)節(jié)
由于篇幅限制,本次可能會(huì)分為上下兩篇。
創(chuàng)建線程池現(xiàn)在進(jìn)入正題,新建了一個(gè) CustomThreadPool 類,它的工作原理如下:
簡(jiǎn)單來(lái)說(shuō)就是往線程池里邊丟任務(wù),丟的任務(wù)會(huì)緩沖到隊(duì)列里;線程池里存儲(chǔ)的其實(shí)就是一個(gè)個(gè)的 Thread ,他們會(huì)一直不停的從剛才緩沖的隊(duì)列里獲取任務(wù)執(zhí)行。
流程還是挺簡(jiǎn)單。
先來(lái)看看我們這個(gè)自創(chuàng)的線程池的效果如何吧:
初始化了一個(gè)核心為3、最大線程數(shù)為5、隊(duì)列大小為 4 的線程池。
先往其中丟了 10 個(gè)任務(wù),由于阻塞隊(duì)列的大小為 4 ,最大線程數(shù)為 5 ,所以由于隊(duì)列里緩沖不了最終會(huì)創(chuàng)建 5 個(gè)線程(上限)。
過(guò)段時(shí)間沒(méi)有任務(wù)提交后(sleep)則會(huì)自動(dòng)縮容到三個(gè)線程(保證不會(huì)小于核心線程數(shù))。
構(gòu)造函數(shù)來(lái)看看具體是如何實(shí)現(xiàn)的。
下面則是這個(gè)線程池的構(gòu)造函數(shù):
會(huì)有以下幾個(gè)核心參數(shù):
miniSize 最小線程數(shù),等效于 ThreadPool 中的核心線程數(shù)。
maxSize 最大線程數(shù)。
keepAliveTime 線程保活時(shí)間。
workQueue 阻塞隊(duì)列。
notify 通知接口。
大致上都和 ThreadPool 中的參數(shù)相同,并且作用也是類似的。
需要注意的是其中初始化了一個(gè) workers 成員變量:
/** * 存放線程池 */ private volatile Setworkers; public CustomThreadPool(int miniSize, int maxSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, Notify notify) { workers = new ConcurrentHashSet<>(); }
workers 是最終存放線程池中運(yùn)行的線程,在 j.u.c 源碼中是一個(gè) HashSet 所以對(duì)他所有的操作都是需要加鎖。
我這里為了簡(jiǎn)便起見(jiàn)就自己定義了一個(gè)線程安全的 Set 稱為 ConcurrentHashSet。
其實(shí)原理也非常簡(jiǎn)單,和 HashSet 類似也是借助于 HashMap 來(lái)存放數(shù)據(jù),利用其 key 不可重復(fù)的特性來(lái)實(shí)現(xiàn) set ,只是這里的 HashMap 是用并發(fā)安全的 ConcurrentHashMap 來(lái)實(shí)現(xiàn)的。
這樣就能保證對(duì)它的寫(xiě)入、刪除都是線程安全的。
不過(guò)由于 ConcurrentHashMap 的 size() 函數(shù)并不準(zhǔn)確,所以我這里多帶帶利用了一個(gè) AtomicInteger 來(lái)統(tǒng)計(jì)容器大小。
創(chuàng)建核心線程往線程池中丟一個(gè)任務(wù)的時(shí)候其實(shí)要做的事情還蠻多的,最重要的事情莫過(guò)于創(chuàng)建線程存放到線程池中了。
當(dāng)然我們不能無(wú)限制的創(chuàng)建線程,不然拿線程池來(lái)就沒(méi)任何意義了。于是 miniSize maxSize 這兩個(gè)參數(shù)就有了它的意義。
但這兩個(gè)參數(shù)再哪一步的時(shí)候才起到作用呢?這就是首先需要明確的。
從這個(gè)流程圖可以看出第一步是需要判斷是否大于核心線程數(shù),如果沒(méi)有則創(chuàng)建。
結(jié)合代碼可以發(fā)現(xiàn)在執(zhí)行任務(wù)的時(shí)候會(huì)判斷是否大于核心線程數(shù),從而創(chuàng)建線程。
worker.startTask() 執(zhí)行任務(wù)部分放到后面分析。
這里的 miniSize 由于會(huì)在多線程場(chǎng)景下使用,所以也用 volatile 關(guān)鍵字來(lái)保證可見(jiàn)性。
隊(duì)列緩沖結(jié)合上面的流程圖,第二步自然是要判斷隊(duì)列是否可以存放任務(wù)(是否已滿)。
優(yōu)先會(huì)往隊(duì)列里存放。
上至封頂一旦寫(xiě)入失敗則會(huì)判斷當(dāng)前線程池的大小是否大于最大線程數(shù),如果沒(méi)有則繼續(xù)創(chuàng)建線程執(zhí)行。
不然則執(zhí)行會(huì)嘗試阻塞寫(xiě)入隊(duì)列(j.u.c 會(huì)在這里執(zhí)行拒絕策略)
以上的步驟和剛才那張流程圖是一樣的,這樣大家是否有看出什么坑嘛?
時(shí)刻小心從上面流程圖的這兩步可以看出會(huì)直接創(chuàng)建新的線程。
這個(gè)過(guò)程相對(duì)于中間直接寫(xiě)入阻塞隊(duì)列的開(kāi)銷是非常大的,主要有以下兩個(gè)原因:
創(chuàng)建線程會(huì)加鎖,雖說(shuō)最終用的是 ConcurrentHashMap 的寫(xiě)入函數(shù),但依然存在加鎖的可能。
會(huì)創(chuàng)建新的線程,創(chuàng)建線程還需要調(diào)用操作系統(tǒng)的 API 開(kāi)銷較大。
所以理想情況下我們應(yīng)該避免這兩步,盡量讓丟入線程池中的任務(wù)進(jìn)入阻塞隊(duì)列中。執(zhí)行任務(wù)
任務(wù)是添加進(jìn)來(lái)了,那是如何執(zhí)行的?
在創(chuàng)建任務(wù)的時(shí)候提到過(guò) worker.startTask() 函數(shù):
/** * 添加任務(wù),需要加鎖 * @param runnable 任務(wù) */ private void addWorker(Runnable runnable) { Worker worker = new Worker(runnable, true); worker.startTask(); workers.add(worker); }
也就是在創(chuàng)建線程執(zhí)行任務(wù)的時(shí)候會(huì)創(chuàng)建 Worker 對(duì)象,利用它的 startTask() 方法來(lái)執(zhí)行任務(wù)。
所以先來(lái)看看 Worker 對(duì)象是長(zhǎng)啥樣的:
其實(shí)他本身也是一個(gè)線程,將接收到需要執(zhí)行的任務(wù)存放到成員變量 task 處。
而其中最為關(guān)鍵的則是執(zhí)行任務(wù) worker.startTask() 這一步驟。
public void startTask() { thread.start(); }
其實(shí)就是運(yùn)行了 worker 線程自己,下面來(lái)看 run 方法。
第一步是將創(chuàng)建線程時(shí)傳過(guò)來(lái)的任務(wù)執(zhí)行(task.run),接著會(huì)一直不停的從隊(duì)列里獲取任務(wù)執(zhí)行,直到獲取不到新任務(wù)了。
任務(wù)執(zhí)行完畢后將內(nèi)置的計(jì)數(shù)器 -1 ,方便后面任務(wù)全部執(zhí)行完畢進(jìn)行通知。
worker 線程獲取不到任務(wù)后退出,需要將自己從線程池中釋放掉(workers.remove(this))。
從隊(duì)列里獲取任務(wù)其實(shí) getTask 也是非常關(guān)鍵的一個(gè)方法,它封裝了從隊(duì)列中獲取任務(wù),同時(shí)對(duì)不需要?;畹木€程進(jìn)行回收。
很明顯,核心作用就是從隊(duì)列里獲取任務(wù);但有兩個(gè)地方需要注意:
當(dāng)線程數(shù)超過(guò)核心線程數(shù)時(shí),在獲取任務(wù)的時(shí)候需要通過(guò)?;顣r(shí)間從隊(duì)列里獲取任務(wù);一旦獲取不到任務(wù)則隊(duì)列肯定是空的,這樣返回 null 之后在上文的 run() 中就會(huì)退出這個(gè)線程;從而達(dá)到了回收線程的目的,也就是我們之前演示的效果
這里需要加鎖,加鎖的原因是這里肯定會(huì)出現(xiàn)并發(fā)情況,不加鎖會(huì)導(dǎo)致 workers.size() > miniSize 條件多次執(zhí)行,從而導(dǎo)致線程被全部回收完畢。
關(guān)閉線程池最后來(lái)談?wù)劸€程關(guān)閉的事;
還是以剛才那段測(cè)試代碼為例,如果提交任務(wù)后我們沒(méi)有關(guān)閉線程,會(huì)發(fā)現(xiàn)即便是任務(wù)執(zhí)行完畢后程序也不會(huì)退出。
從剛才的源碼里其實(shí)也很容易看出來(lái),不退出的原因是 Worker 線程一定還會(huì)一直阻塞在 task = workQueue.take(); 處,即便是線程縮容了也不會(huì)小于核心線程數(shù)。
通過(guò)堆棧也能證明:
恰好剩下三個(gè)線程阻塞于此處。
而關(guān)閉線程通常又有以下兩種:
立即關(guān)閉:執(zhí)行關(guān)閉方法后不管現(xiàn)在線程池的運(yùn)行狀況,直接一刀切全部停掉,這樣會(huì)導(dǎo)致任務(wù)丟失。
不接受新的任務(wù),同時(shí)等待現(xiàn)有任務(wù)執(zhí)行完畢后退出線程池。
立即關(guān)閉我們先來(lái)看第一種立即關(guān)閉:
/** * 立即關(guān)閉線程池,會(huì)造成任務(wù)丟失 */ public void shutDownNow() { isShutDown.set(true); tryClose(false); } /** * 關(guān)閉線程池 * * @param isTry true 嘗試關(guān)閉 --> 會(huì)等待所有任務(wù)執(zhí)行完畢 * false 立即關(guān)閉線程池--> 任務(wù)有丟失的可能 */ private void tryClose(boolean isTry) { if (!isTry) { closeAllTask(); } else { if (isShutDown.get() && totalTask.get() == 0) { closeAllTask(); } } } /** * 關(guān)閉所有任務(wù) */ private void closeAllTask() { for (Worker worker : workers) { //LOGGER.info("開(kāi)始關(guān)閉"); worker.close(); } } public void close() { thread.interrupt(); }
很容易看出,最終就是遍歷線程池里所有的 worker 線程挨個(gè)執(zhí)行他們的中斷函數(shù)。
我們來(lái)測(cè)試一下:
可以發(fā)現(xiàn)后面丟進(jìn)去的三個(gè)任務(wù)其實(shí)是沒(méi)有被執(zhí)行的。
完事后關(guān)閉而正常關(guān)閉則不一樣:
/** * 任務(wù)執(zhí)行完畢后關(guān)閉線程池 */ public void shutdown() { isShutDown.set(true); tryClose(true); }
他會(huì)在這里多了一個(gè)判斷,需要所有任務(wù)都執(zhí)行完畢之后才會(huì)去中斷線程。
同時(shí)在線程需要回收時(shí)都會(huì)嘗試關(guān)閉線程:
來(lái)看看實(shí)際效果:
回收線程上文或多或少提到了線程回收的事情,其實(shí)總結(jié)就是以下兩點(diǎn):
一旦執(zhí)行了 shutdown/shutdownNow 方法都會(huì)將線程池的狀態(tài)置為關(guān)閉狀態(tài),這樣只要 worker 線程嘗試從隊(duì)列里獲取任務(wù)時(shí)就會(huì)直接返回空,導(dǎo)致 worker 線程被回收。
一旦線程池大小超過(guò)了核心線程數(shù)就會(huì)使用?;顣r(shí)間來(lái)從隊(duì)列里獲取任務(wù),所以一旦獲取不到返回 null 時(shí)就會(huì)觸發(fā)回收。
但如果我們的隊(duì)列足夠大,導(dǎo)致線程數(shù)都不會(huì)超過(guò)核心線程數(shù),這樣是不會(huì)觸發(fā)回收的。
比如這里我將隊(duì)列大小調(diào)為 10 ,這樣任務(wù)就會(huì)累計(jì)在隊(duì)列里,不會(huì)創(chuàng)建五個(gè) worker 線程。
所以一直都是 Thread-1~3 這三個(gè)線程在反復(fù)調(diào)度任務(wù)。
總結(jié)本次實(shí)現(xiàn)了線程池里大部分核心功能,我相信只要看完并動(dòng)手敲一遍一定會(huì)對(duì)線程池有不一樣的理解。
結(jié)合目前的內(nèi)容來(lái)總結(jié)下:
線程池、隊(duì)列大小要設(shè)計(jì)的合理,盡量的讓任務(wù)從隊(duì)列中獲取執(zhí)行。
慎用 shutdownNow() 方法關(guān)閉線程池,會(huì)導(dǎo)致任務(wù)丟失(除非業(yè)務(wù)允許)。
如果任務(wù)多,線程執(zhí)行時(shí)間短可以調(diào)大 keepalive 值,使得線程盡量不被回收從而可以復(fù)用線程。
同時(shí)下次會(huì)分享一些線程池的新特性,如:
執(zhí)行帶有返回值的線程。
異常處理怎么辦?
所有任務(wù)執(zhí)行完怎么通知我?
本文所有源碼:
https://github.com/crossoverJie/JCSprout/blob/master/src/main/java/com/crossoverjie/concurrent/CustomThreadPool.java
你的點(diǎn)贊與分享是對(duì)我最大的支持
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/74567.html
摘要:前言前段時(shí)間寫(xiě)過(guò)一篇線程池沒(méi)你想的那么簡(jiǎn)單,和大家一起擼了一個(gè)基本的線程池,具備線程池基本調(diào)度功能。線程池自動(dòng)擴(kuò)容縮容?;卣{(diào)以上就是線程池的構(gòu)造函數(shù)以及接口的定義。所以我們?cè)谑褂镁€程池時(shí),其中的任務(wù)一定要做好異常處理。線程異常捕獲的重要性。 showImg(https://segmentfault.com/img/remote/1460000019403163?w=1904&h=108...
摘要:如何優(yōu)雅的使用和理解線程池線程池中你不容錯(cuò)過(guò)的一些細(xì)節(jié)由于篇幅限制,本次可能會(huì)分為上下兩篇。不接受新的任務(wù),同時(shí)等待現(xiàn)有任務(wù)執(zhí)行完畢后退出線程池。慎用方法關(guān)閉線程池,會(huì)導(dǎo)致任務(wù)丟失除非業(yè)務(wù)允許。前言 原以為線程池還挺簡(jiǎn)單的(平時(shí)常用,也分析過(guò)原理),這次是想自己動(dòng)手寫(xiě)一個(gè)線程池來(lái)更加深入的了解它;但在動(dòng)手寫(xiě)的過(guò)程中落地到細(xì)節(jié)時(shí)發(fā)現(xiàn)并沒(méi)想的那么容易。結(jié)合源碼對(duì)比后確實(shí)不得不佩服 Doug Le...
摘要:列入全國(guó)計(jì)算機(jī)二級(jí)取代,部分城市試點(diǎn),引入高中。建議通過(guò)視頻學(xué)習(xí),這樣不但節(jié)省時(shí)間,而且效果很好。能否回憶起那個(gè)陡峭的學(xué)習(xí)曲線問(wèn)題越多,學(xué)的越快。出報(bào)告每完成一個(gè)項(xiàng)目,總結(jié)報(bào)告,必不可少。結(jié)構(gòu)化學(xué)習(xí),才是你我需要真正培養(yǎng)的能力。 編程就如同你學(xué)習(xí)開(kāi)車,即使,你可以一口氣,說(shuō)出一輛車的全部零部件,以及內(nèi)燃機(jī)進(jìn)氣、壓縮、做功和排氣過(guò)程,但你就是不去練如何開(kāi)車,怎么上路。你確定,你敢開(kāi)嗎?你...
摘要:但我認(rèn)為談不上的毛病,而是編程模型和之間的一種模式差異。相比類,更貼近編程模型,使得這種差異更加突出。聲明本文采用循序漸進(jìn)的示例來(lái)解釋問(wèn)題。本文假設(shè)讀者已經(jīng)使用超過(guò)一個(gè)小時(shí)。這是通過(guò)組件生命周期上綁定與的組合完成的。 本文由云+社區(qū)發(fā)表作者:Dan Abramov 接觸 React Hooks 一定時(shí)間的你,也許會(huì)碰到一個(gè)神奇的問(wèn)題: setInterval 用起來(lái)沒(méi)你想的簡(jiǎn)單。 R...
閱讀 1587·2021-10-18 13:35
閱讀 2370·2021-10-09 09:44
閱讀 824·2021-10-08 10:05
閱讀 2723·2021-09-26 09:47
閱讀 3577·2021-09-22 15:22
閱讀 441·2019-08-29 12:24
閱讀 2004·2019-08-29 11:06
閱讀 2862·2019-08-26 12:23