摘要:同時(shí),它會(huì)通過的方法將自己注冊(cè)到線程池中。線程池中的每個(gè)工作線程都有一個(gè)自己的任務(wù)隊(duì)列,工作線程優(yōu)先處理自身隊(duì)列中的任務(wù)或順序,由線程池構(gòu)造時(shí)的參數(shù)決定,自身隊(duì)列為空時(shí),以的順序隨機(jī)竊取其它隊(duì)列中的任務(wù)。
本文首發(fā)于一世流云的專欄:https://segmentfault.com/blog...一、引言
算法領(lǐng)域有一種基本思想叫做“分治”,所謂“分治”就是將一個(gè)難以直接解決的大問題,分割成一些規(guī)模較小的子問題,以便各個(gè)擊破,分而治之。
比如:對(duì)于一個(gè)規(guī)模為N的問題,若該問題可以容易地解決,則直接解決;否則將其分解為K個(gè)規(guī)模較小的子問題,這些子問題互相獨(dú)立且與原問題性質(zhì)相同,遞歸地解這些子問題,然后將各子問題的解合并得到原問題的解,這種算法設(shè)計(jì)策略叫做分治法。
許多基礎(chǔ)算法都運(yùn)用了“分治”的思想,比如二分查找、快速排序等等。
基于“分治”的思想,J.U.C在JDK1.7時(shí)引入了一套Fork/Join框架。Fork/Join框架的基本思想就是將一個(gè)大任務(wù)分解(Fork)成一系列子任務(wù),子任務(wù)可以繼續(xù)往下分解,當(dāng)多個(gè)不同的子任務(wù)都執(zhí)行完成后,可以將它們各自的結(jié)果合并(Join)成一個(gè)大結(jié)果,最終合并成大任務(wù)的結(jié)果:
二、工作竊取算法從上述Fork/Join框架的描述可以看出,我們需要一些線程來執(zhí)行Fork出的任務(wù),在實(shí)際中,如果每次都創(chuàng)建新的線程執(zhí)行任務(wù),對(duì)系統(tǒng)資源的開銷會(huì)很大,所以Fork/Join框架利用了線程池來調(diào)度任務(wù)。
另外,這里可以思考一個(gè)問題,既然由線程池調(diào)度,根據(jù)我們之前學(xué)習(xí)普通/計(jì)劃線程池的經(jīng)驗(yàn),必然存在兩個(gè)要素:
工作線程
任務(wù)隊(duì)列
一般的線程池只有一個(gè)任務(wù)隊(duì)列,但是對(duì)于Fork/Join框架來說,由于Fork出的各個(gè)子任務(wù)其實(shí)是平行關(guān)系,為了提高效率,減少線程競(jìng)爭(zhēng),應(yīng)該將這些平行的任務(wù)放到不同的隊(duì)列中去,如上圖中,大任務(wù)分解成三個(gè)子任務(wù):子任務(wù)1、子任務(wù)2、子任務(wù)3,那么就創(chuàng)建三個(gè)任務(wù)隊(duì)列,然后再創(chuàng)建3個(gè)工作線程與隊(duì)列一一對(duì)應(yīng)。
由于線程處理不同任務(wù)的速度不同,這樣就可能存在某個(gè)線程先執(zhí)行完了自己隊(duì)列中的任務(wù)的情況,這時(shí)為了提升效率,我們可以讓該線程去“竊取”其它任務(wù)隊(duì)列中的任務(wù),這就是所謂的工作竊取算法。
“工作竊取”的示意圖如下,當(dāng)線程1執(zhí)行完自身任務(wù)隊(duì)列中的任務(wù)后,嘗試從線程2的任務(wù)隊(duì)列中“竊取”任務(wù):
對(duì)于一般的隊(duì)列來說,入隊(duì)元素都是在“隊(duì)尾”,出隊(duì)元素在“隊(duì)首”,要滿足“工作竊取”的需求,任務(wù)隊(duì)列應(yīng)該支持從“隊(duì)尾”出隊(duì)元素,這樣可以減少與其它工作線程的沖突(因?yàn)檎G闆r下,其它工作線程從“隊(duì)首”獲取自己任務(wù)隊(duì)列中的任務(wù)),滿足這一需求的任務(wù)隊(duì)列其實(shí)就是我們?cè)趈uc-collections框架中介紹過的雙端阻塞隊(duì)列——LinkedBlockingDeque。三、使用示例
當(dāng)然,出于性能考慮,J.U.C中的Fork/Join框架并沒有直接利用LinkedBlockingDeque作為任務(wù)隊(duì)列,而是自己重新實(shí)現(xiàn)了一個(gè)。
為了給接下來的分析F/J框架組件做鋪墊,我們先通過一個(gè)簡(jiǎn)單示例看下Fork/Join框架的基本使用。
假設(shè)有個(gè)非常大的long[]數(shù)組,通過FJ框架求解數(shù)組所有元素的和。
任務(wù)類定義,因?yàn)樾枰祷亟Y(jié)果,所以繼承RecursiveTask,并覆寫compute方法。任務(wù)的fork通過ForkJoinTask的fork方法執(zhí)行,join方法方法用于等待任務(wù)執(zhí)行后返回:
public class ArraySumTask extends RecursiveTask{ ? private final int[] array; private final int begin; private final int end; ? private static final int THRESHOLD = 100; ? public ArraySumTask(int[] array, int begin, int end) { this.array = array; this.begin = begin; this.end = end; } ? @Override protected Long compute() { long sum = 0; ? if (end - begin + 1 < THRESHOLD) { // 小于閾值, 直接計(jì)算 for (int i = begin; i <= end; i++) { sum += array[i]; } } else { int middle = (end + begin) / 2; ArraySumTask subtask1 = new ArraySumTask(this.array, begin, middle); ArraySumTask subtask2 = new ArraySumTask(this.array, middle + 1, end); ? subtask1.fork(); subtask2.fork(); ? long sum1 = subtask1.join(); long sum2 = subtask2.join(); ? sum = sum1 + sum2; } return sum; } }
調(diào)用方如下:
public class Main { public static void main(String[] args) { ForkJoinPool executor = new ForkJoinPool(); ArraySumTask task = new ArraySumTask(new int[10000], 0, 9999); ? ForkJoinTask future = executor.submit(task); ? // some time passed... ? if (future.isCompletedAbnormally()) { System.out.println(future.getException()); } ? try { System.out.println("result: " + future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } ? } }
注意:ForkJoinTask在執(zhí)行的時(shí)候可能會(huì)拋出異常,但是沒辦法在主線程里直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務(wù)是否已經(jīng)拋出異?;蛞呀?jīng)被取消了,并且可以通過ForkJoinTask的getException方法獲取異常.四、核心組件
在前幾小節(jié)中,我們簡(jiǎn)要介紹了Fork/Join框架和它的使用。本節(jié)我們將更進(jìn)一步,深入F/J框架,了解它的各個(gè)組件的關(guān)系和核心設(shè)計(jì)思想,本節(jié)不會(huì)涉及太多的源碼分析,而是參考 Doug Lea的這篇論文《A Java Fork/Join Framework》,從宏觀上分析F/J框架,然后分析整個(gè)框架的調(diào)度流程,閱讀完本節(jié)后,在下一節(jié)——Fork/Join框架(2) 實(shí)現(xiàn)中,我們?cè)偃ド钊朐创a會(huì)輕松很多。
F/J框架的實(shí)現(xiàn)非常復(fù)雜,內(nèi)部大量運(yùn)用了位操作和無鎖算法,撇開這些實(shí)現(xiàn)細(xì)節(jié)不談,該框架主要涉及三大核心組件:ForkJoinPool(線程池)、ForkJoinTask(任務(wù))、ForkJoinWorkerThread(工作線程),外加WorkQueue(任務(wù)隊(duì)列):
ForkJoinPool:ExecutorService的實(shí)現(xiàn)類,負(fù)責(zé)工作線程的管理、任務(wù)隊(duì)列的維護(hù),以及控制整個(gè)任務(wù)調(diào)度流程;
ForkJoinTask:Future接口的實(shí)現(xiàn)類,fork是其核心方法,用于分解任務(wù)并異步執(zhí)行;而join方法在任務(wù)結(jié)果計(jì)算完畢之后才會(huì)運(yùn)行,用來合并或返回計(jì)算結(jié)果;
ForkJoinWorkerThread:Thread的子類,作為線程池中的工作線程(Worker)執(zhí)行任務(wù);
WorkQueue:任務(wù)隊(duì)列,用于保存任務(wù);
ForkJoinPoolForkJoinPool作為Executors框架的一員,從外部看與其它線程池并沒有什么區(qū)別,僅僅是ExecutorService的一個(gè)實(shí)現(xiàn)類:
ForkJoinPool的主要工作如下:
接受外部任務(wù)的提交(外部調(diào)用ForkJoinPool的invoke/execute/submit方法提交任務(wù));
接受ForkJoinTask自身fork出的子任務(wù)的提交;
任務(wù)隊(duì)列數(shù)組(WorkQueue[])的初始化和管理;
工作線程(Worker)的創(chuàng)建/管理。
注意:ForkJoinPool提供了3類外部提交任務(wù)的方法:invoke、execute、submit,它們的主要區(qū)別在于任務(wù)的執(zhí)行方式上。
通過invoke方法提交的任務(wù),調(diào)用線程直到任務(wù)執(zhí)行完成才會(huì)返回,也就是說這是一個(gè)同步方法,且有返回結(jié)果;
通過execute方法提交的任務(wù),調(diào)用線程會(huì)立即返回,也就是說這是一個(gè)異步方法,且沒有返回結(jié)果;
通過submit方法提交的任務(wù),調(diào)用線程會(huì)立即返回,也就是說這是一個(gè)異步方法,且有返回結(jié)果(返回Future實(shí)現(xiàn)類,可以通過get獲取結(jié)果)。
ForkJoinPool對(duì)象的構(gòu)建有兩種方式:
通過3種構(gòu)造器的任意一種進(jìn)行構(gòu)造;
通過ForkJoinPool.commonPool()靜態(tài)方法構(gòu)造。
JDK8以后,F(xiàn)orkJoinPool又提供了一個(gè)靜態(tài)方法commonPool(),這個(gè)方法返回一個(gè)ForkJoinPool內(nèi)部聲明的靜態(tài)ForkJoinPool實(shí)例,主要是為了簡(jiǎn)化線程池的構(gòu)建,這個(gè)ForkJoinPool實(shí)例可以滿足大多數(shù)的使用場(chǎng)景:
public static ForkJoinPool commonPool() { // assert common != null : "static init error"; return common; }
ForkJoinPool對(duì)外提供的3種構(gòu)造器,其實(shí)最終都調(diào)用了下面這個(gè)構(gòu)造器:
/** * @param parallelism 并行級(jí)別, 默認(rèn)為CPU核心數(shù) * @param factory 工作線程工廠 * @param handler 異常處理器 * @param mode 調(diào)度模式: true表示FIFO_QUEUE; false表示LIFO_QUEUE * @param workerNamePrefix 工作線程的名稱前綴 */ private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long) (-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); }
parallelism:默認(rèn)值為CPU核心數(shù),F(xiàn)orkJoinPool里工作線程數(shù)量與該參數(shù)有關(guān),但它不表示最大線程數(shù);
factory:工作線程工廠,默認(rèn)是DefaultForkJoinWorkerThreadFactory,其實(shí)就是用來創(chuàng)建工作線程對(duì)象——ForkJoinWorkerThread;
handler:異常處理器;
config:保存parallelism和mode信息,供后續(xù)讀??;
ctl:線程池的核心控制字段
這些入?yún)⒛壳安挥藐P(guān)注,我們重點(diǎn)是mode這個(gè)字段,F(xiàn)orkJoinPool支持兩種模式:
同步模式(默認(rèn)方式)
異步模式
mode = asyncMode ? FIFO_QUEUE : LIFO_QUEUE
注意:這里的同步/異步并不是指F/J框架本身是采用同步模式還是采用異步模式工作,而是指其中的工作線程的工作方式。在F/J框架中,每個(gè)工作線程(Worker)都有一個(gè)屬于自己的任務(wù)隊(duì)列(WorkQueue),這是一個(gè)底層采用數(shù)組實(shí)現(xiàn)的雙向隊(duì)列。ForkJoinTask
同步是指:對(duì)于工作線程(Worker)自身隊(duì)列中的任務(wù),采用后進(jìn)先出(LIFO)的方式執(zhí)行;異步是指:對(duì)于工作線程(Worker)自身隊(duì)列中的任務(wù),采用先進(jìn)先出(FIFO)的方式執(zhí)行。
從Fork/Join框架的描述上來看,“任務(wù)”必須要滿足一定的條件:
支持Fork,即任務(wù)自身的分解
支持Join,即任務(wù)結(jié)果的合并
因此,J.U.C提供了一個(gè)抽象類——ForkJoinTask,來作為該類Fork/Join任務(wù)的抽象定義:
ForkJoinTask實(shí)現(xiàn)了Future接口,是一個(gè)異步任務(wù),我們?cè)谑褂肍ork/Join框架時(shí),一般需要使用線程池來調(diào)度任務(wù),線程池內(nèi)部調(diào)度的其實(shí)都是ForkJoinTask任務(wù)(即使提交的是一個(gè)Runnable或Callable任務(wù),也會(huì)被適配成ForkJoinTask)。
除了ForkJoinTask,F(xiàn)ork/Join框架還提供了兩個(gè)它的抽象實(shí)現(xiàn),我們?cè)谧远xForkJoin任務(wù)時(shí),一般繼承這兩個(gè)類:
RecursiveAction:表示具有返回結(jié)果的ForkJoin任務(wù)
RecursiveTask:表示沒有返回結(jié)果的ForkJoin任務(wù)
public abstract class RecursiveAction extends ForkJoinTask{ /** * 該任務(wù)的執(zhí)行,子類覆寫該方法 */ protected abstract void compute(); ? public final Void getRawResult() { return null; } ? protected final void setRawResult(Void mustBeNull) { } ? protected final boolean exec() { compute(); return true; } }
public abstract class RecursiveTaskextends ForkJoinTask { ? /** * 該任務(wù)的執(zhí)行結(jié)果. */ V result; ? /** * 該任務(wù)的執(zhí)行,子類覆寫該方法 */ protected abstract V compute(); ? public final V getRawResult() { return result; } ? protected final void setRawResult(V value) { result = value; } ? protected final boolean exec() { result = compute(); return true; } }
ForkJoinTask除了和ForkJoinPool 結(jié)合使用外,也可以多帶帶使用,當(dāng)我們調(diào)用ForkJoinTask的fork方法時(shí),其內(nèi)部會(huì)通過ForkJoinPool.commonPool()方法創(chuàng)建線程池,然后將自己作為任務(wù)提交給線程池。ForkJoinWorkerThread
Fork/Join框架中,每個(gè)工作線程(Worker)都有一個(gè)自己的任務(wù)隊(duì)列(WorkerQueue), 所以需要對(duì)一般的Thread做些特性化處理,J.U.C提供了ForkJoinWorkerThread類作為ForkJoinPool中的工作線程:
public class ForkJoinWorkerThread extends Thread { final ForkJoinPool pool; // 該工作線程歸屬的線程池 final ForkJoinPool.WorkQueue workQueue; // 對(duì)應(yīng)的任務(wù)隊(duì)列 ? protected ForkJoinWorkerThread(ForkJoinPool pool) { super("aForkJoinWorkerThread"); // 指定工作線程名稱 this.pool = pool; this.workQueue = pool.registerWorker(this); } ?? // ... }
ForkJoinWorkerThread 在構(gòu)造過程中,會(huì)保存所屬線程池信息和與自己綁定的任務(wù)隊(duì)列信息。同時(shí),它會(huì)通過ForkJoinPool的registerWorker方法將自己注冊(cè)到線程池中。
線程池中的每個(gè)工作線程(ForkJoinWorkerThread)都有一個(gè)自己的任務(wù)隊(duì)列(WorkQueue),工作線程優(yōu)先處理自身隊(duì)列中的任務(wù)(LIFO或FIFO順序,由線程池構(gòu)造時(shí)的參數(shù) mode 決定),自身隊(duì)列為空時(shí),以FIFO的順序隨機(jī)竊取其它隊(duì)列中的任務(wù)。WorkQueue
任務(wù)隊(duì)列(WorkQueue)是ForkJoinPool與其它線程池區(qū)別最大的地方,在ForkJoinPool內(nèi)部,維護(hù)著一個(gè)WorkQueue[]數(shù)組,它會(huì)在外部首次提交任務(wù))時(shí)進(jìn)行初始化:
volatile WorkQueue[] workQueues; // main registry
?
當(dāng)通過線程池的外部方法(submit、invoke、execute)提交任務(wù)時(shí),如果WorkQueue[]沒有初始化,則會(huì)進(jìn)行初始化;然后根據(jù)數(shù)組大小和線程隨機(jī)數(shù)(ThreadLocalRandom.probe)等信息,計(jì)算出任務(wù)隊(duì)列所在的數(shù)組索引(這個(gè)索引一定是偶數(shù)),如果索引處沒有任務(wù)隊(duì)列,則初始化一個(gè),再將任務(wù)入隊(duì)。也就是說,通過外部方法提交的任務(wù)一定是在偶數(shù)隊(duì)列,沒有綁定工作線程。
WorkQueue作為ForkJoinPool的內(nèi)部類,表示一個(gè)雙端隊(duì)列。雙端隊(duì)列既可以作為棧使用(LIFO),也可以作為隊(duì)列使用(FIFO)。ForkJoinPool的“工作竊取”正是利用了這個(gè)特點(diǎn),當(dāng)工作線程從自己的隊(duì)列中獲取任務(wù)時(shí),默認(rèn)總是以棧操作(LIFO)的方式從棧頂取任務(wù);當(dāng)工作線程嘗試竊取其它任務(wù)隊(duì)列中的任務(wù)時(shí),則是FIFO的方式。
我們?cè)贔orkJoinPool一節(jié)中曾講過,可以指定線程池的同步/異步模式(mode參數(shù)),其作用就在于此。同步模式就是“棧操作”,異步模式就是“隊(duì)列操作”,影響的就是工作線程從自己隊(duì)列中取任務(wù)的方式。
ForkJoinPool中的工作隊(duì)列可以分為兩類:
有工作線程(Worker)綁定的任務(wù)隊(duì)列:數(shù)組下標(biāo)始終是奇數(shù),稱為task queue,該隊(duì)列中的任務(wù)均由工作線程調(diào)用產(chǎn)生(工作線程調(diào)用FutureTask.fork方法);
沒有工作線程(Worker)綁定的任務(wù)隊(duì)列:數(shù)組下標(biāo)始終是偶數(shù),稱為submissions queue,該隊(duì)列中的任務(wù)全部由其它線程提交(也就是非工作線程調(diào)用execute/submit/invoke或者FutureTask.fork方法)。
五、線程池調(diào)度示例文字描述不太好理解,我們通過示意圖來看下任務(wù)入隊(duì)和“工作竊取”的整個(gè)過程:
假設(shè)現(xiàn)在通過ForkJoinPool的submit方法提交了一個(gè)FuturetTask任務(wù),參考使用示例。初始
初始狀態(tài)下,線程池中的任務(wù)隊(duì)列為空,workQueues == null,也沒有工作線程:
外部提交FutureTask任務(wù)此時(shí)會(huì)初始化任務(wù)隊(duì)列數(shù)組WorkQueue[],大小為2的冪次,然后在某個(gè)槽位(偶數(shù)位)初始化一個(gè)任務(wù)隊(duì)列(WorkQueue),并插入任務(wù):
注意,由于是非工作線程通過外部方法提交的任務(wù),所以這個(gè)任務(wù)隊(duì)列并沒有綁定工作線程。
之所以是2的冪次,是由于ForkJoinPool采用了一種隨機(jī)算法(類似ConcurrentHashMap的隨機(jī)算法),該算法通過線程池隨機(jī)數(shù)(ThreadLocalRandom的probe值)和數(shù)組的大小計(jì)算出工作線程所映射的數(shù)組槽位,這種算法要求數(shù)組大小為2的冪次。創(chuàng)建工作線程
首次提交任務(wù)后,由于沒有工作線程,所以會(huì)創(chuàng)建一個(gè)工作線程,同時(shí)在某個(gè)奇數(shù)槽的位置創(chuàng)建一個(gè)與它綁定的任務(wù)隊(duì)列,如下圖:
竊取任務(wù)ForkJoinWorkThread_1會(huì)隨機(jī)掃描workQueues中的隊(duì)列,直到找到一個(gè)可以竊取的隊(duì)列——workQueues[2],然后從該隊(duì)列的base端獲取任務(wù)并執(zhí)行,并將base加1:
竊取到的任務(wù)是FutureTask,F(xiàn)orkJoinWorkThread_1最終會(huì)調(diào)用它的compute方法(子類繼承ForkJoinTask,覆寫compute,參考本文的使用示例),該方法中會(huì)新建兩個(gè)子任務(wù),并執(zhí)行它們的fork方法:
@Override protected Long compute() { long sum = 0; ? if (end - begin + 1 < THRESHOLD) { // 小于閾值, 直接計(jì)算 for (int i = begin; i <= end; i++) { sum += array[i]; } } else { int middle = (end + begin) / 2; ArraySumTask subtask1 = new ArraySumTask(this.array, begin, middle); ArraySumTask subtask2 = new ArraySumTask(this.array, middle + 1, end); ? subtask1.fork(); subtask2.fork(); ? long sum1 = subtask1.join(); long sum2 = subtask2.join(); ? sum = sum1 + sum2; } return sum; }
之前說過,由于是由工作線程ForkJoinWorkThread_1來調(diào)用FutureTask的fork方法,所以會(huì)將這兩個(gè)子任務(wù)放入ForkJoinWorkThread_1自身隊(duì)列中:
然后,F(xiàn)orkJoinWorkThread_1會(huì)阻塞等待任務(wù)1和任務(wù)2的結(jié)果(先在subtask1.join等待):
long sum1 = subtask1.join(); long sum2 = subtask2.join();
從這里也可以看出,任務(wù)放到哪個(gè)隊(duì)列,其實(shí)是由調(diào)用線程來決定的(根據(jù)線程探針值probe計(jì)算隊(duì)列索引)。如果調(diào)用線程是工作線程,則必然有自己的隊(duì)列(task queue),則任務(wù)都會(huì)放到自己的隊(duì)列中;如果調(diào)用線程是其它線程(如主線程),則創(chuàng)建沒有工作線程綁定的任務(wù)隊(duì)列(submissions queue),然后存入任務(wù)。新的工作線程
ForkJoinWorkThread_1調(diào)用兩個(gè)子任務(wù)1和2的fork方法,除了將它們放入自己的任務(wù)隊(duì)列外,還會(huì)導(dǎo)致新增一個(gè)工作線程ForkJoinWorkThread_2:
ForkJoinWorkThread_2運(yùn)行后會(huì)像ForkJoinWorkThread_1那樣從其它隊(duì)列竊取任務(wù),如下圖,從ForkJoinWorkThread_1隊(duì)列的base端竊取一個(gè)任務(wù)(直接執(zhí)行,并不會(huì)放入自己隊(duì)列):
竊取完成后,F(xiàn)orkJoinWorkThread_2會(huì)直接執(zhí)行任務(wù)1,又回到了FutureTask子類的compute方法,假設(shè)此時(shí)又fork出兩個(gè)任務(wù)——任務(wù)3、任務(wù)4,則ForkJoinWorkThread_2最終會(huì)在任務(wù)3的join方法上等待:
如果此時(shí)還有其它工作線程,則重復(fù)上述步驟:竊取、執(zhí)行、入隊(duì)、join阻塞、返回。ForkJoinTask的join方法內(nèi)部邏輯非常復(fù)雜,上述ForkJoinWorkThread_1和ForkJoinWorkThread_2目前都在等待任務(wù)的完成,但事實(shí)上,F(xiàn)orkJoinTask存在一種互助機(jī)制,即工作線程之間可以互相幫助執(zhí)行任務(wù),這里不詳細(xì)展開,只需要知道,F(xiàn)orkJoinWorkThread_1和ForkJoinWorkThread_2可能會(huì)被其它工作線程喚醒。
我們這里假設(shè)ForkJoinWorkThread_2被其它某個(gè)工作線程喚醒,任務(wù)3和任務(wù)4的join方法依次返回了結(jié)果,那么任務(wù)1的結(jié)果也會(huì)返回,于是ForkJoinWorkThread_1也被喚醒(它在任務(wù)1的join上等待),然后ForkJoinWorkThread_1會(huì)繼續(xù)執(zhí)行任務(wù)2的join方法,如果任務(wù)2不再分解,則最終返回任務(wù)1和任務(wù)2的合并結(jié)果,計(jì)算結(jié)束。
自身隊(duì)列的任務(wù)執(zhí)行ForkJoinWorkThread_1和ForkJoinWorkThread_2喚醒執(zhí)行完竊取到的任務(wù)后,還沒有結(jié)束,它們還會(huì)去看看自身隊(duì)列中有無任務(wù)可以執(zhí)行。
/** * Executes the given task and any remaining local tasks. */ final void runTask(ForkJoinTask> task) { if (task != null) { scanState &= ~SCANNING; // mark as busy (currentSteal = task).doExec(); U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC execLocalTasks(); ForkJoinWorkerThread thread = owner; if (++nsteals < 0) // collect on overflow transferStealCount(pool); scanState |= SCANNING; if (thread != null) thread.afterTopLevelExec(); } }
上述ForkJoinPool.WorkQueue.runTask方法中,doExec()就是執(zhí)行竊取的任務(wù),而execLocalTasks用于執(zhí)行隊(duì)列本身的任務(wù)。
我們假設(shè)此時(shí)的線程池是下面這種狀態(tài):
工作線程ForkJoinWorkThread_1調(diào)用execLocalTasks方法一次性執(zhí)行自己隊(duì)列中的所有任務(wù),這時(shí)分成兩種情況:
1.異步模式(asyncMode==true)
如果構(gòu)造線程池時(shí),asyncMode為true,表示以異步模式執(zhí)行工作線程自身隊(duì)列中的任務(wù),此時(shí)會(huì)從 base -> top遍歷并執(zhí)行所有任務(wù)。
2.同步模式(asyncMode==false)
如果構(gòu)造線程池時(shí),asyncMode為false(默認(rèn)情況),表示以同步模式執(zhí)行工作線程自身隊(duì)列中的任務(wù),此時(shí)會(huì)從 top -> base 遍歷并執(zhí)行所有任務(wù)。
任務(wù)的入隊(duì)總是在top端,所以當(dāng)以同步模式遍歷時(shí),其實(shí)相當(dāng)于棧操作(從棧頂pop元素);
如果是異步模式,相當(dāng)于隊(duì)列的出隊(duì)操作(從base端poll元素)。
異步模式比較適合于那些不需要返回結(jié)果的任務(wù)。其實(shí)如果將隊(duì)列中的任務(wù)看成一棵樹(無環(huán)連通圖)的話,異步模式類似于圖的廣度優(yōu)先遍歷,同步模式類似于圖的深度優(yōu)先遍歷
假設(shè)此處以默認(rèn)的同步模式遍歷,F(xiàn)orkJoinWorkThread_1從棧頂開始執(zhí)行并移除任務(wù),先執(zhí)行任務(wù)2并移除,再執(zhí)行任務(wù)1并:
六、總結(jié)本章簡(jiǎn)要概述了Fork/Join框架的思想、主要組件及基本使用,F(xiàn)ork/Join框架的核心包含四大組件:ForkJoinTask任務(wù)類、ForkJoinPool線程池、ForkJoinWorkerThread工作線程、WorkQueue任務(wù)隊(duì)列。
本章通過示例,描述了各個(gè)組件的關(guān)系以及ForkJoin線程池的整個(gè)調(diào)度流程,F(xiàn)/J框架的核心來自于它的工作竊取及調(diào)度策略,可以總結(jié)為以下幾點(diǎn):
每個(gè)Worker線程利用它自己的任務(wù)隊(duì)列維護(hù)可執(zhí)行任務(wù);
任務(wù)隊(duì)列是一種雙端隊(duì)列,支持LIFO的push和pop操作,也支持FIFO的take操作;
任務(wù)fork的子任務(wù),只會(huì)push到它所在線程(調(diào)用fork方法的線程)的隊(duì)列;
工作線程既可以使用LIFO通過pop處理自己隊(duì)列中的任務(wù),也可以FIFO通過poll處理自己隊(duì)列中的任務(wù),具體取決于構(gòu)造線程池時(shí)的asyncMode參數(shù);
當(dāng)工作線程自己隊(duì)列中沒有待處理任務(wù)時(shí),它嘗試去隨機(jī)讀?。ǜ`取)其它任務(wù)隊(duì)列的base端的任務(wù);
當(dāng)線程進(jìn)入join操作,它也會(huì)去處理其它工作線程的隊(duì)列中的任務(wù)(自己的已經(jīng)處理完了),直到目標(biāo)任務(wù)完成(通過isDone方法);
當(dāng)一個(gè)工作線程沒有任務(wù)了,并且嘗試從其它隊(duì)列竊取也失敗了,它讓出資源(通過使用yields, sleeps或者其它優(yōu)先級(jí)調(diào)整)并且隨后會(huì)再次激活,直到所有工作線程都空閑了——此時(shí),它們都阻塞在等待另一個(gè)頂層線程的調(diào)用。
下一章將通過源碼分析更深入的理解Fork/Join調(diào)度過程。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/71835.html
摘要:并不會(huì)為每個(gè)任務(wù)都創(chuàng)建工作線程,而是根據(jù)實(shí)際情況構(gòu)造線程池時(shí)的參數(shù)確定是喚醒已有空閑工作線程,還是新建工作線程。 showImg(https://segmentfault.com/img/bVbiYSP?w=1071&h=707); 本文首發(fā)于一世流云的專欄:https://segmentfault.com/blog... 一、引言 前一章——Fork/Join框架(1) 原理,我們...
摘要:整個(gè)包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設(shè)計(jì)模式,設(shè)計(jì)了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對(duì)等進(jìn)行補(bǔ)充增強(qiáng)。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:注意線程與本地操作系統(tǒng)的線程是一一映射的。固定線程數(shù)的線程池提供了兩種創(chuàng)建具有固定線程數(shù)的的方法,固定線程池在初始化時(shí)確定其中的線程總數(shù),運(yùn)行過程中會(huì)始終維持線程數(shù)量不變。 showImg(https://segmentfault.com/img/bVbhK58?w=1920&h=1080); 本文首發(fā)于一世流云專欄:https://segmentfault.com/blog... ...
摘要:本文首發(fā)于一世流云的專欄一模式簡(jiǎn)介模式是多線程設(shè)計(jì)模式中的一種常見模式,它的主要作用就是異步地執(zhí)行任務(wù),并在需要的時(shí)候獲取結(jié)果。二中的模式在多線程基礎(chǔ)之模式中,我們?cè)?jīng)給出過模式的通用類關(guān)系圖。 showImg(https://segmentfault.com/img/bVbiwcx?w=1000&h=667); 本文首發(fā)于一世流云的專欄:https://segmentfault.co...
摘要:好了,繼續(xù)向下執(zhí)行,嘗試獲取鎖失敗后,會(huì)調(diào)用首先通過方法,將包裝成共享結(jié)點(diǎn),插入等待隊(duì)列,插入完成后隊(duì)列結(jié)構(gòu)如下然后會(huì)進(jìn)入自旋操作,先嘗試獲取一次鎖,顯然此時(shí)是獲取失敗的主線程還未調(diào)用,同步狀態(tài)還是。 showImg(https://segmentfault.com/img/remote/1460000016012541); 本文首發(fā)于一世流云的專欄:https://segmentfa...
閱讀 3657·2021-11-25 09:43
閱讀 651·2021-09-22 15:59
閱讀 1754·2021-09-06 15:00
閱讀 1777·2021-09-02 09:54
閱讀 696·2019-08-30 15:56
閱讀 1189·2019-08-29 17:14
閱讀 1847·2019-08-29 13:15
閱讀 888·2019-08-28 18:28