摘要:前言在前面的三篇文章中先后介紹了框架的任務(wù)組件體系體系源碼并簡單介紹了目前的并行流應(yīng)用場景框架本質(zhì)上是對的擴(kuò)展它依舊支持經(jīng)典的使用方式即任務(wù)池的配合向池中提交任務(wù)并異步地等待結(jié)果毫無疑問前面的文章已經(jīng)解釋了框架的新穎性初步了解了工作竊取
前言
在前面的三篇文章中先后介紹了ForkJoin框架的任務(wù)組件(ForkJoinTask體系,CountedCompleter體系)源碼,并簡單介紹了目前的并行流應(yīng)用場景.ForkJoin框架本質(zhì)上是對Executor-Runnable/Callable-Future/FutureTask的擴(kuò)展,它依舊支持經(jīng)典的Executor使用方式,即任務(wù)+池的配合,向池中提交任務(wù),并異步地等待結(jié)果.
毫無疑問,前面的文章已經(jīng)解釋了ForkJoin框架的新穎性,初步了解了工作竊取依托的數(shù)據(jù)結(jié)構(gòu),ForkJoinTask/CountedCompleter在執(zhí)行期的行為,也提到它們一定要在ForkJoinPool中進(jìn)行運(yùn)行和調(diào)度,這也是本文力求解決的問題.
ForkJoinPool源碼ForkJoinPool源碼是ForkJoin框架中最復(fù)雜,最難理解的部分,且因?yàn)榻徊嬉蕾嘑orkJoinTask,CountedCompleter,ForkJoinWorkerThread,作者在前面多帶帶用兩篇文章分析了它們,以前兩篇文章為基礎(chǔ),重復(fù)部分本文不再詳述.
首先看類簽名.
//禁止偽共享 @sun.misc.Contended //繼承自AbstractExecutorService public class ForkJoinPool extends AbstractExecutorService
前面的幾篇文章不止一次強(qiáng)調(diào)過ForkJoin框架的"輕量線程,輕量任務(wù)"等概念,也提到少量線程-多數(shù)計(jì)算,資源空閑時(shí)竊取任務(wù).并介紹了基于status狀態(tài)的調(diào)度(ForkJoinTask系列),不基于status而由子任務(wù)觸發(fā)完成的調(diào)度(CountedCompleter系列),顯然它們的共性就是讓線程在正常調(diào)度的前提下盡量少的空閑,最大幅度利用cpu資源,偽共享/緩存行的問題在ForkJoin框架中顯然會是一個(gè)更大的性能大殺器.在1.8之前,一般通過補(bǔ)位的方式解決偽共享問題,1.8之后,官方使用@Contended注解,令虛擬機(jī)盡量注解標(biāo)注的字段(字段的情況)或成員字段放置在不同的緩存行,從而規(guī)避了偽共享問題.
建立ForkJoinPool可以直接new,也可以使用Executors的入口方法.
//Executors方法,顯然ForkJoinPool被稱作工作竊取線程池.參數(shù)指定了并行度. public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, //默認(rèn)線程工廠,前文中已提過默認(rèn)的ForkJoinWorkerThread ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } //不提供并行度. public static ExecutorService newWorkStealingPool() { return new ForkJoinPool //使用所有可用的處理器 (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } //對應(yīng)的,ForkJoinPool的構(gòu)造器們. //不指定任何參數(shù). public ForkJoinPool() { //并行度取MAX_CAP和可用處理器數(shù)的最小值. this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), //默認(rèn)的線程工廠.無異常處理器,非異步模式. defaultForkJoinWorkerThreadFactory, null, false); } //同上,只是使用參數(shù)中的并行度. public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { //并行度需要校驗(yàn) this(checkParallelism(parallelism), //校驗(yàn)線程工廠 checkFactory(factory), //參數(shù)指定的未捕獲異常處理器. handler, //前面的幾處代碼asyncMode都是false,會選用LIFO隊(duì)列,是true是會選用FIFO隊(duì)列,后面詳述. asyncMode ? FIFO_QUEUE : LIFO_QUEUE, //線程名前綴 "ForkJoinPool-" + nextPoolId() + "-worker-"); //檢查許可,不關(guān)心. checkPermission(); } //檢查方法很簡單. //并行度不能大于MAX_CAP不能不大于0. private static int checkParallelism(int parallelism) { if (parallelism <= 0 || parallelism > MAX_CAP) throw new IllegalArgumentException(); return parallelism; } //線程工廠非空即可. private static ForkJoinWorkerThreadFactory checkFactory (ForkJoinWorkerThreadFactory factory) { if (factory == null) throw new NullPointerException(); return factory; } //最終構(gòu)造器,私有.待介紹完一些基礎(chǔ)字段后再述. private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; //config初始化值,用并行度與mode取或,顯然mode是FIFO時(shí),將有一個(gè)第17位的1. this.config = (parallelism & SMASK) | mode; //np保存并行度(正數(shù))的相反數(shù)(補(bǔ)碼). long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); }
了解其他線程池源碼的朋友可以去回憶其他線程池的構(gòu)建,不論是調(diào)度線程池還是普通的線程池或者緩存池,他們其實(shí)都設(shè)置了核心線程數(shù)和最大線程數(shù).當(dāng)然這要看定義"線程池分類"的視角,以Executors入口的api分類,或許可以分類成固定線程池,緩沖池,單線程池,調(diào)度池,工作竊取池;但以真正的實(shí)現(xiàn)分類,其實(shí)只有ThreadPoolExecutor系列(固定線程池,單線程池都直接是ThreadPoolExecutor,調(diào)度池是它的子類,緩沖池也是ThreadPoolExecutor,只是阻塞隊(duì)列限定為SynchronizedQueue)和ForkJoinPool系列(工作竊取池).
作者更傾向于用實(shí)現(xiàn)的方式區(qū)分,也間接參照Executors的api使用用途的區(qū)分方式.如果不使用Executors的入口api,不論哪種ThreadPoolExecutor系列,我們都可以提供線程池的大小配置,阻塞隊(duì)列,線程空閑存活時(shí)間及單位,池滿拒絕策略,線程工廠等,而所謂的緩存池和固定池的區(qū)別只是隊(duì)列的區(qū)別.
調(diào)度池的構(gòu)造參數(shù)與ThreadPoolExecutor無異,只是內(nèi)限了阻塞隊(duì)列的類型,它雖然是ThreadPoolExecutor的擴(kuò)展,卻不僅沒有拓充參數(shù),反而減少了兩個(gè)參數(shù):阻塞隊(duì)列和最大線程數(shù).阻塞隊(duì)列被默認(rèn)設(shè)置為內(nèi)部類DelayQueue,它實(shí)現(xiàn)了BlockingQueue,最大線程數(shù)則為整數(shù)上限,同時(shí)新增的對任務(wù)的延時(shí)或重試等屬性則是依托于內(nèi)部維護(hù)的一個(gè)FutureTask的擴(kuò)展,并未增加到構(gòu)造參數(shù).
而到了ForkJoinPool,我們看到的是截然不同于ThreadPoolExecutor系列的構(gòu)建方式.首先根本沒有提供核心線程和最大線程數(shù),線程空閑存活時(shí)間的參數(shù)和阻塞隊(duì)列以及池滿拒絕策略;線程工廠也僅能提供生產(chǎn)ForkJoinWorkerThread的工廠bean;還具備一些ThreadPoolExecutor沒有的參數(shù),如未捕獲異常處理器,同步異步模式,工作線程前綴(其實(shí)別的類型的線程工廠也可以提供線程前綴,默認(rèn)就是常見的pool-前綴)等.
顯然從參數(shù)看便可猜測出若干不同于其他線程池的功能.但我們更關(guān)心其中的一些參數(shù)設(shè)置.
一般的參數(shù)都能見名知義,僅有config和ctl難以理解,此處也不詳細(xì)介紹,只說他們的初值的初始化.
config是并行度與SMASK取與運(yùn)算再與mode取或,這里并行度最大是15位整數(shù)(MAX_CAP=0x7FFF),而SMASK作用于整數(shù)后16位,mode在FIFO為1<<16,LIFO是0.很好計(jì)算.
ctl其實(shí)是一個(gè)控制信號,我們后面會在具體源碼就地解釋,它的計(jì)算先通過了一個(gè)局部變量np.
np的計(jì)算方法是將并行度的相反數(shù)(補(bǔ)碼)轉(zhuǎn)換為長整型.前面簡單分析,并行度不會大于MAX_CAP,因此np至少前49位全部是1.
計(jì)算ctl時(shí),將np左移AC_SHIFT即為取后16位,將np左移TC_SHIFT即取它的后32位,分別與AC_MASK和TC_SHIFT,表示取np的后16位分別放置于ctl的前16位和33至48位.而ctl的后32位初值為0.
因?yàn)樯傻腸tl前16位和后16位相等,如果仔細(xì)用數(shù)學(xué)驗(yàn)證,可以發(fā)現(xiàn),對前16位和后16位的末位同時(shí)加1,當(dāng)添加了parallel次后,ctl將歸0.這也是添加worker限制的重要數(shù)理依據(jù).
前面列舉了獲取ForkJoinPool實(shí)例的幾種方法,初步展示了構(gòu)造一個(gè)ForkJoinPool的屬性,也暴露了一些實(shí)現(xiàn)細(xì)節(jié),而這些細(xì)節(jié)依賴于一些字段和成員函數(shù),我們先從它們開始.
//ForkJoinWorkerThread的線程工廠. public static interface ForkJoinWorkerThreadFactory { //創(chuàng)建新線程要實(shí)現(xiàn)的方法. public ForkJoinWorkerThread newThread(ForkJoinPool pool); } //前面看到的默認(rèn)線程工廠. static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new ForkJoinWorkerThread(pool); } } //創(chuàng)建InnocuousForkJoinWorkerThread的線程工廠,上一文已經(jīng)介紹過. static final class InnocuousForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { private static final AccessControlContext innocuousAcc; static { Permissions innocuousPerms = new Permissions(); innocuousPerms.add(modifyThreadPermission); innocuousPerms.add(new RuntimePermission( "enableContextClassLoaderOverride")); innocuousPerms.add(new RuntimePermission( "modifyThreadGroup")); innocuousAcc = new AccessControlContext(new ProtectionDomain[] { new ProtectionDomain(null, innocuousPerms) }); } public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return (ForkJoinWorkerThread.InnocuousForkJoinWorkerThread) java.security.AccessController.doPrivileged( new java.security.PrivilegedAction() { public ForkJoinWorkerThread run() { return new ForkJoinWorkerThread. InnocuousForkJoinWorkerThread(pool); }}, innocuousAcc); } } //空任務(wù) static final class EmptyTask extends ForkJoinTask { private static final long serialVersionUID = -7721805057305804111L; EmptyTask() { status = ForkJoinTask.NORMAL; } //狀態(tài)直接是已正常完成. public final Void getRawResult() { return null; } public final void setRawResult(Void x) {} public final boolean exec() { return true; } }
以上是線程工廠和一個(gè)默認(rèn)的EmptyTask.接下來看一些跨池和工作隊(duì)列的公用常量.
// 與邊界有關(guān)的常量 static final int SMASK = 0xffff; // 后16位. static final int MAX_CAP = 0x7fff; // 前面在定并行度時(shí)參考的最大容量. static final int EVENMASK = 0xfffe; // 后16位驗(yàn)偶數(shù) static final int SQMASK = 0x007e; // 最大64個(gè)偶數(shù)槽,從第2位至7位共6位,2的6次方. // 與WorkQueue有關(guān) static final int SCANNING = 1; // 對WorkQueue正在運(yùn)行任務(wù)的標(biāo)記 static final int INACTIVE = 1 << 31; // 標(biāo)記負(fù)數(shù) static final int SS_SEQ = 1 << 16; // 版本號使用,第17位1 // ForkJoinPool和WorkQueue的config有關(guān)常量. static final int MODE_MASK = 0xffff << 16; // 能濾取前16位. static final int LIFO_QUEUE = 0;//前面提到過的,非async模式(false),值取0. static final int FIFO_QUEUE = 1 << 16;//async模式(true),值取1. static final int SHARED_QUEUE = 1 << 31; // 共享隊(duì)列標(biāo)識,符號位表示負(fù).
以上的字段含義只是粗略的描述,先有一個(gè)印象,后面看到時(shí)自然理解其含義.
接下來看核心的WorkQueue內(nèi)部類.
//前面的文章說過,它是一個(gè)支持工作竊取和外部提交任務(wù)的隊(duì)列.顯然,它的實(shí)例對內(nèi)存部局十分敏感, //WorkQueue本身的實(shí)例,或者內(nèi)部數(shù)組元素均應(yīng)避免共享同一緩存行. @sun.misc.Contended static final class WorkQueue { //隊(duì)列內(nèi)部數(shù)組的初始容量,默認(rèn)是2的12次方,它必須是2的幾次方,且不能小于4. //但它應(yīng)該設(shè)置一個(gè)較大的值來減少隊(duì)列間的緩存行共享. //在前面的java運(yùn)行時(shí)和54篇java官方文檔術(shù)語中曾提到,jvm通常會將 //數(shù)組放在能夠共享gc標(biāo)記(如卡片標(biāo)記)的位置,這樣每一次寫都會造成嚴(yán)重內(nèi)存競態(tài). static final int INITIAL_QUEUE_CAPACITY = 1 << 13; //最大內(nèi)部數(shù)組容量,默認(rèn)64M,也必須是2的平方,但不大于1<<(31-數(shù)組元素項(xiàng)寬度), //根據(jù)官方注釋,這可以確保無需計(jì)算索引概括,但定義一個(gè)略小于此的值有助于用戶在 //系統(tǒng)飽合前捕獲失控的程序. static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M // unsafe機(jī)制有關(guān)的字段. private static final sun.misc.Unsafe U; private static final int ABASE; private static final int ASHIFT; private static final long QTOP; private static final long QLOCK; private static final long QCURRENTSTEAL; static { try { U = sun.misc.Unsafe.getUnsafe(); Class> wk = WorkQueue.class; Class> ak = ForkJoinTask[].class; //top字段的句柄. QTOP = U.objectFieldOffset (wk.getDeclaredField("top")); //qlock字段的句柄. QLOCK = U.objectFieldOffset (wk.getDeclaredField("qlock")); //currentSteal的句柄 QCURRENTSTEAL = U.objectFieldOffset (wk.getDeclaredField("currentSteal")); //ABASE是ForkJoinTask數(shù)組的首地址. ABASE = U.arrayBaseOffset(ak); //scale代表數(shù)組元素的索引大小.它必須是2的平方. int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); //計(jì)算ASHIFT,它是31與scale的高位0位數(shù)量的差值.因?yàn)樯弦徊郊s定了scale一定是一個(gè)正的2的幾次方, //ASHIFT的結(jié)果一定會大于1.可以理解ASHIFT是數(shù)組索引大小的有效位數(shù). ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } } //插曲,在Integer類的numberOfLeadingZeros方法,果然一流的程序是數(shù)學(xué). public static int numberOfLeadingZeros(int i) { // HD, Figure 5-6 if (i == 0) //i本身已是0,毫無疑問地返回32.本例中i是2起,所以不會. return 32; //先將n初始化1.最后會減掉首位1. int n = 1; //i的前16位不存在非零值,則將n加上16并移除i的前16位.將i轉(zhuǎn)換為一個(gè)以原i后16位開頭的新值. if (i >>> 16 == 0) { n += 16; i <<= 16; } //不論前一步結(jié)果如何,若此時(shí)i的前8位不存在非零值,則n加上8,i移除前8位.將i轉(zhuǎn)換為原i的后24位開頭的新值. if (i >>> 24 == 0) { n += 8; i <<= 8; } //不論前一步結(jié)果如何,若此時(shí)i的前4位不存在非零值,則n加上4,i移除前4位.將i轉(zhuǎn)換為原i的后28位開頭的新值. if (i >>> 28 == 0) { n += 4; i <<= 4; } //不論前一步結(jié)果如何,若此時(shí)i的前2位不存在非零值,則n加上2,i移除前2位.將i轉(zhuǎn)換為原i的后30位開頭的新值. if (i >>> 30 == 0) { n += 2; i <<= 2; } //經(jīng)過前面的運(yùn)算,i的前30位的非零值數(shù)量已經(jīng)記入n, //在前一步的基礎(chǔ)上,此時(shí)i的前1位若存在非零值,則n-1,否則n保留原值. n -= i >>> 31; return n; } //回到WorkQueue // 實(shí)例字段 volatile int scanState; // 版本號,小于0代表不活躍,注釋解釋奇數(shù)代表正在掃描,但從代碼語義上看正好相反. int stackPred; // 前一個(gè)池棧控制信號(ctl),它保有前一個(gè)棧頂記錄. int nsteals; // 偷盜的任務(wù)數(shù) int hint; // 一個(gè)隨機(jī)數(shù),用于決定偷取任務(wù)的索引. int config; // 配置,表示池的索引和模式 volatile int qlock; // 隊(duì)列鎖,1表示鎖了,小于0表示終止,其他情況是0. volatile int base; // 底,表示下一個(gè)poll操作的插槽索引 int top; // 頂,表示下一個(gè)push操作的插槽索引 ForkJoinTask>[] array; // 存放任務(wù)元素的數(shù)組,初始不分配,首擴(kuò)容會分配. final ForkJoinPool pool; // 包含該隊(duì)列的池,可能在某些時(shí)刻是null. final ForkJoinWorkerThread owner; // 持有該隊(duì)列的線程,如果隊(duì)列是共享的,owner是null. volatile Thread parker; // 在調(diào)用park阻塞的owner,非阻塞時(shí)為null volatile ForkJoinTask> currentJoin; // 被在awaitJoin中join的task. volatile ForkJoinTask> currentSteal; // 字面意思當(dāng)前偷的任務(wù),主要用來helpStealer方法使用. //工作隊(duì)列構(gòu)造器,只初始化線程池,owner等字段. WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) { this.pool = pool; this.owner = owner; // Place indices in the center of array (that is not yet allocated) //base和top初始均為INITIAL_QUEUE_CAPACITY的一半,也就是2的11次方. base = top = INITIAL_QUEUE_CAPACITY >>> 1; } //返回本隊(duì)列在池中的索引,使用config的2至4位表示.因?yàn)閏onfig的最后一位是奇偶位,忽略. final int getPoolIndex() { return (config & 0xffff) >>> 1; } //返回隊(duì)列中的任務(wù)數(shù). final int queueSize() { //非owner的調(diào)用者必須先讀base,用base-top,得到的結(jié)果小于0則取相反數(shù),否則取0. //忽略即時(shí)的負(fù)數(shù),它并不嚴(yán)格準(zhǔn)確. int n = base - top; return (n >= 0) ? 0 : -n; } //判斷隊(duì)列是否為空隊(duì).本方法較為精確,對于近空隊(duì)列,要檢查是否有至少一個(gè)未被占有的任務(wù). final boolean isEmpty() { ForkJoinTask>[] a; int n, m, s; //base大于等于top,說明空了. return ((n = base - (s = top)) >= 0 || //有容量,且恰好計(jì)算為1,可能只有一個(gè)任務(wù). (n == -1 && //計(jì)算為1,再驗(yàn)數(shù)組是不是空的. ((a = array) == null || (m = a.length - 1) < 0 || //取該位置元素的值判空,空則說明isEmpty. //取值的方式是取ForkJoinTask.class首地址加上偏移量(數(shù)組長度減一(最后一個(gè)元素位置,經(jīng)典案例32-1)與運(yùn)算top減一左移ASHIFT(索引大小有效位數(shù))位)的值. U.getObject (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null))); } //將一個(gè)任務(wù)壓入隊(duì)列,前文提過的fork最終就會壓隊(duì).但此方法只能由非共享隊(duì)列的持有者調(diào)用. //當(dāng)使用線程池的"外部壓入"externalPush方法時(shí),壓入共享隊(duì)列. final void push(ForkJoinTask> task) { ForkJoinTask>[] a; ForkJoinPool p; //保存當(dāng)時(shí)的base top. int b = base, s = top, n; //如果數(shù)組被移除則忽略. if ((a = array) != null) { //數(shù)組最后一個(gè)下標(biāo).如長度32,則m取31這個(gè)質(zhì)數(shù).此時(shí)保存一個(gè)m,對于保存后其他push操作相當(dāng)于打了屏障. int m = a.length - 1; //向數(shù)組中的指定位置壓入該任務(wù).位置包含上面的m和s進(jìn)行與運(yùn)算(數(shù)組中的位置),結(jié)果左移索引有效長度位(索引長度),再加上數(shù)組首索引偏移量(起始地址). U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); //將top加1. U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { //計(jì)算舊的任務(wù)數(shù)量,發(fā)現(xiàn)不大于1個(gè),說明原來很可能工作線程正在阻塞等待新的任務(wù).需要喚醒它. if ((p = pool) != null) //signalWork會根據(jù)情況,添加新的工作線程或喚醒等待任務(wù)的線程. p.signalWork(p.workQueues, this); } else if (n >= m)//2. //任務(wù)數(shù)量超出了,對數(shù)組擴(kuò)容. growArray(); } } //添加任務(wù)過程主流程無鎖,包括可能出現(xiàn)的growArray.當(dāng)原隊(duì)列為空時(shí),它會初始化一個(gè)數(shù)組,否則擴(kuò)容一倍. //持有者調(diào)用時(shí),不需要加鎖,但當(dāng)其他線程調(diào)用時(shí),需要持有鎖.在resize過程中,base可以移動(dòng),但top不然. final ForkJoinTask>[] growArray() { //記錄老數(shù)組. ForkJoinTask>[] oldA = array; //根據(jù)老數(shù)組決定新容量,老數(shù)組空則INITIAL_QUEUE_CAPACITY否則國倍. int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY; if (size > MAXIMUM_QUEUE_CAPACITY) //新大小大于最大數(shù)組大小則拒絕. throw new RejectedExecutionException("Queue capacity exceeded"); int oldMask, t, b; //直接將原來的數(shù)組引用替換成新的. ForkJoinTask>[] a = array = new ForkJoinTask>[size]; //如果是初次分配,就此打住返回a,是擴(kuò)容,且老數(shù)組非空則進(jìn)入下面的循環(huán)拷貝. if (oldA != null && (oldMask = oldA.length - 1) >= 0 && (t = top) - (b = base) > 0) { //根據(jù)前面的運(yùn)算,size一定是2的冪,減一用來哈希,這是經(jīng)典處理辦法. int mask = size - 1; do { ForkJoinTask> x; //老數(shù)組base自增過若干次的得到b,它代表的元素對應(yīng)的索引. int oldj = ((b & oldMask) << ASHIFT) + ABASE; //用b在新數(shù)組中找出索引. int j = ((b & mask) << ASHIFT) + ABASE; //老數(shù)組中用索引取出元素. x = (ForkJoinTask>)U.getObjectVolatile(oldA, oldj); if (x != null && //老數(shù)組置空,放入新數(shù)組. U.compareAndSwapObject(oldA, oldj, x, null)) U.putObjectVolatile(a, j, x); //每處理完一個(gè)task,就將base自增1,直到top為止. } while (++b != t); } //返回新數(shù)組. return a; } //存在下一個(gè)任務(wù),彈出,順序是后進(jìn)先出.此方法僅限非共享隊(duì)列的owner調(diào)用. final ForkJoinTask> pop() { ForkJoinTask>[] a; ForkJoinTask> t; int m; //還有元素. if ((a = array) != null && (m = a.length - 1) >= 0) { //1.top至少比base大一.注意,每次循環(huán)都會讀出新的top,它是volatile修飾的. for (int s; (s = top - 1) - base >= 0;) { //top對應(yīng)的索引. long j = ((m & s) << ASHIFT) + ABASE; //2.該索引沒有元素,break,返回null.而且就代表這個(gè)位置的確是null,與競態(tài)無關(guān). //因?yàn)榇朔椒▋Howner線程使用,不會出現(xiàn)另一個(gè)線程計(jì)算了同樣的j,且先執(zhí)行了3的情況. //出現(xiàn)這種情況,則是此位置的任務(wù)當(dāng)先被執(zhí)行并出棧,或者就從未設(shè)置過任務(wù),后續(xù)分析這種極端情況. //故如果出現(xiàn)某個(gè)任務(wù)在數(shù)組的中間,提前被執(zhí)行并置空(非pop或poll方式),那么再對WorkQueue進(jìn)行pop時(shí)將會中斷, //留下一部分null之后的任務(wù)不能出棧,所以可以允許任務(wù)非pop或poll方式查出并執(zhí)行,但為了能pop出所有任務(wù),不能中間置null. if ((t = (ForkJoinTask>)U.getObject(a, j)) == null) break; //3.有元素,將該索引位置置null.若cas失敗,說明元素被取出了, //但下次循環(huán)即使在2處break并返回null,也不是因?yàn)楦倯B(tài),因?yàn)槊看窝h(huán)到1都會讀取新的top, //也就有新的j. if (U.compareAndSwapObject(a, j, t, null)) { //數(shù)組位置置null的同時(shí)top減1. U.putOrderedInt(this, QTOP, s); return t; } } } //循環(huán)退出,說明top位置沒有元素,也相當(dāng)于說明數(shù)組為空.顯然此方法的另一個(gè)作用是將隊(duì)列壓縮,空隊(duì)列會將top先降到base+1,再循環(huán)最后一次將top降到base. return null; } //如果b是base,使用FIFO的次序嘗試無競態(tài)取底部的任務(wù).它會在ForkJoinPool的scan和helpStealer中使用. final ForkJoinTask> pollAt(int b) { ForkJoinTask> t; ForkJoinTask>[] a; if ((a = array) != null) { //和前面一樣的的方式計(jì)算b對應(yīng)的索引j int j = (((a.length - 1) & b) << ASHIFT) + ABASE; if ((t = (ForkJoinTask>)U.getObjectVolatile(a, j)) != null && //j對應(yīng)位置有task且當(dāng)前base==b,嘗試將task出隊(duì). base == b && U.compareAndSwapObject(a, j, t, null)) { //出隊(duì)成功base增1.不需要額外的同步,因?yàn)閮蓚€(gè)線程不可能同時(shí)在上面的cas成功. //當(dāng)一切條件匹配(b就是base且j位置有元素),pollAt同一個(gè)b只會有一個(gè)返回非空的t. //如果多個(gè)線程傳入的b不相等,在同一時(shí)刻只有一個(gè)會等于base. base = b + 1; return t; } } return null; } //用FIFO的次序取下一個(gè)任務(wù). final ForkJoinTask> poll() { ForkJoinTask>[] a; int b; ForkJoinTask> t; //1.循環(huán)從base取任務(wù),當(dāng)base增長到top或其他操作重置array為null則終止循環(huán). while ((b = base) - top < 0 && (a = array) != null) { //前面已敘述過取索引的邏輯,使用一個(gè)top到base間的數(shù)與數(shù)組長度-1與運(yùn)算并左移索引長度位再加上數(shù)組基準(zhǔn)偏移量.后面不再綴述. int j = (((a.length - 1) & b) << ASHIFT) + ABASE; //取出task t = (ForkJoinTask>)U.getObjectVolatile(a, j); //2.如果發(fā)生競態(tài),base已經(jīng)不是b,直接開啟下一輪循環(huán)把新的base讀給b. if (base == b) { if (t != null) { //3.當(dāng)前t是base任務(wù),用cas置空,base+1,返回t. //如果此處發(fā)生競態(tài),則只有一個(gè)線程可以成功返回t并重置base(4). //不成功的線程會開啟下一輪循環(huán),此時(shí)成功線程可能未來的及執(zhí)行4更新base, //也可能已經(jīng)更新base,則導(dǎo)致先前失敗的線程在2處通過,經(jīng)5種或判隊(duì)列空返回,或非空再次循環(huán),而 //在當(dāng)前成功線程執(zhí)行4成功后,所有前面失敗的線程可以在1處讀到新的base,這些線程 //在下一次循環(huán)中依舊只會有一個(gè)成功彈出t并重置base,直到所有線程執(zhí)行完畢. if (U.compareAndSwapObject(a, j, t, null)) { //4重置加返回 base = b + 1; return t; } } //5.t取出的是空,發(fā)現(xiàn)此時(shí)臨時(shí)變量b(其他成功線程在此輪循環(huán)前置的base)已增至top-1,且當(dāng)前線程又沒能成功的彈出t,說明一定會有一個(gè)線程 //將t彈出并更新base到top的值,當(dāng)前線程沒必要再開下一個(gè)循環(huán)了,直接break并返回null. //t取出的是空,但是沒到top,說明只是被提前執(zhí)行并置空了,那么繼續(xù)讀取新的base并循環(huán),且若沒有其他線程去更改base,array的長度,或者把top降到 //base,則當(dāng)前線程就永遠(yuǎn)死循環(huán)下去了,因?yàn)槊看窝h(huán)都是125且每個(gè)變量都不變.因此為避免循環(huán),每個(gè)任務(wù)可以提前執(zhí)行,但一定不能提前離隊(duì)(置null). //也就是說:只能用poll或pop方式彈出任務(wù),其他方式獲得任務(wù)并執(zhí)行是允許的,但不能在執(zhí)行后置null,留待后續(xù)源碼驗(yàn)證一下. else if (b + 1 == top) // now empty break; } } //從循環(huán)退出來有兩種情況,可能是在5處滿足退出條件,或者在2處發(fā)現(xiàn)b已經(jīng)是臟數(shù)據(jù),下輪循環(huán)不滿足循環(huán)條件所致.兩種都應(yīng)該返回null. return null; } //根據(jù)mode來取下一個(gè)本隊(duì)列元素.根據(jù)模式. final ForkJoinTask> nextLocalTask() { //當(dāng)前WorkQueue的配置了FIFO,則poll,否則pop. //盡管還未看到注冊worker的源碼,在此提前透露下,ForkJoinPool也有一個(gè)config(前面講構(gòu)造函數(shù)提過) //該config保存了mode信息,并原樣賦給了WorkQueue的mode.注意,相應(yīng)的任務(wù)會出隊(duì). return (config & FIFO_QUEUE) == 0 ? pop() : poll(); } //根據(jù)模式取出下一個(gè)任務(wù),但是不出隊(duì). final ForkJoinTask> peek() { ForkJoinTask>[] a = array; int m; //空隊(duì),返回null. if (a == null || (m = a.length - 1) < 0) return null; //根據(jù)mode定位要取的索引j. int i = (config & FIFO_QUEUE) == 0 ? top - 1 : base; int j = ((i & m) << ASHIFT) + ABASE; //返回讀出的值,不出隊(duì). return (ForkJoinTask>)U.getObjectVolatile(a, j); } //如果參數(shù)t是當(dāng)前隊(duì)的top,則彈出. final boolean tryUnpush(ForkJoinTask> t) { ForkJoinTask>[] a; int s; if ((a = array) != null && (s = top) != base && //1.滿足非空條件.嘗試用t去當(dāng)當(dāng)作計(jì)算出的索引位置的原任務(wù)的值并cas為null來出隊(duì). U.compareAndSwapObject (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) { //cas成功,說明t確實(shí)是top,將top減一返回true. U.putOrderedInt(this, QTOP, s); return true; } //2.cas失敗或不滿足1的條件,返回false. return false; } //移除并取消隊(duì)列中所有已知的任務(wù),忽略異常. final void cancelAll() { ForkJoinTask> t; if ((t = currentJoin) != null) { //有currentJoin,引用置空,取消并忽略異常. currentJoin = null; ForkJoinTask.cancelIgnoringExceptions(t); } if ((t = currentSteal) != null) { //有currentSteal,引用置空,取消并忽略異常. currentSteal = null; ForkJoinTask.cancelIgnoringExceptions(t); } //除了上面兩個(gè),就只剩下數(shù)組中的任務(wù)了.按LILO的順序彈出并依次取消,忽略所有異常. while ((t = poll()) != null) ForkJoinTask.cancelIgnoringExceptions(t); } // 以下是執(zhí)行方法. //按FIFO順序從隊(duì)首彈出任務(wù)并執(zhí)行所有非空任務(wù). final void pollAndExecAll() { for (ForkJoinTask> t; (t = poll()) != null;) //很明顯,如果未按嚴(yán)格順序執(zhí)行,先執(zhí)行中間的一個(gè)任務(wù), //再調(diào)用本方法,則會半路中止. t.doExec(); } //移除并執(zhí)行完所有本隊(duì)列的任務(wù),如果是先進(jìn)先出,則執(zhí)行前面的pollAndExecAll方法. //否則pop循環(huán)執(zhí)行到空為止.按前面的分析,只要堅(jiān)持只能pop或poll彈出,其他方式執(zhí)行任務(wù)但不能置空的原則, //可以保證pop或poll出現(xiàn)空的情況只能是競態(tài)發(fā)生的情況. final void execLocalTasks() { int b = base, m, s; ForkJoinTask>[] a = array; //初始滿足條件,top至少比base大1.隊(duì)列非空. if (b - (s = top - 1) <= 0 && a != null && (m = a.length - 1) >= 0) { //不是FIFO模式. if ((config & FIFO_QUEUE) == 0) { for (ForkJoinTask> t;;) { //原子getAndSet,查出并彈出原本的task if ((t = (ForkJoinTask>)U.getAndSetObject (a, ((m & s) << ASHIFT) + ABASE, null)) == null) //彈出的task是空,break.說明整個(gè)工作流程中,如果未保證嚴(yán)格有序, //如先從中間的某個(gè)任務(wù)開始執(zhí)行并且出隊(duì)了,再調(diào)用execLocalTasks,會導(dǎo)致中間停頓. //只執(zhí)行不出隊(duì),則至少不會中斷.出現(xiàn)t是null的情況只能是競態(tài)或末尾. break; //top減一,執(zhí)行任務(wù). U.putOrderedInt(this, QTOP, s); t.doExec(); //如果base大于等于top,則中止. if (base - (s = top - 1) > 0) break; } } //是FIFO模式,pollAndExecAll. else pollAndExecAll(); } } //重點(diǎn)入口方法來了,前面留下諸多關(guān)于執(zhí)行任務(wù)是否出隊(duì)的討論,下面來分析入口方法. //該方法的入口是每個(gè)工作線程的run方法,因此只有一個(gè)線程. final void runTask(ForkJoinTask> task) { //傳入task是空直接不理會. if (task != null) { //標(biāo)記成忙.scanState是WorkQueue的成員變量,每個(gè)WorkQueue只有一個(gè)值, //前面說過,一般情況下,每個(gè)線程會有一個(gè)WorkQueue,所以某種情況來講也可以標(biāo)記為 //當(dāng)前ForkJoinWorkerThread繁忙. //SCANNING常量值是1,這個(gè)操作實(shí)質(zhì)上就是將scanState變量的個(gè)位置0,也就是變成了偶數(shù)并標(biāo)記它要忙了. //顯然偶數(shù)才表示忙碌,這也是為什么前面覺得官方注釋scanState是奇數(shù)表示"正在掃描"很奇怪. scanState &= ~SCANNING; //將currentSteal設(shè)置為傳入的任務(wù),并運(yùn)行該任務(wù),若該任務(wù)內(nèi)部進(jìn)行了分叉,則進(jìn)入相應(yīng)的入隊(duì)邏輯. (currentSteal = task).doExec(); //執(zhí)行完該任務(wù)后,將currentSteal置空.將該task釋放掉,幫助gc. U.putOrderedObject(this, QCURRENTSTEAL, null); //調(diào)用前面提到的,根據(jù)mode選擇依次pop或poll的方式將自己的工作隊(duì)列內(nèi)的任務(wù)出隊(duì)并執(zhí)行的方法. execLocalTasks(); //到此,自己隊(duì)列中的所有任務(wù)都已經(jīng)完成.包含偷來的任務(wù)fork后又入隊(duì)到自己隊(duì)列的子任務(wù). //取出owner線程.處理偷取任務(wù)有關(guān)的一些信息. ForkJoinWorkerThread thread = owner; if (++nsteals < 0) //發(fā)現(xiàn)當(dāng)前WorkQueue偷來的任務(wù)數(shù)即將溢出了,將它轉(zhuǎn)到線程池. transferStealCount(pool); //取消忙碌標(biāo)記. scanState |= SCANNING; if (thread != null) //執(zhí)行afterTopLevelExec勾子方法,上一節(jié)中介紹ForkJoinWorkerThread時(shí)已介紹. thread.afterTopLevelExec(); } //方法結(jié)束,注意,沒有任何操作將task從所在的數(shù)組中移除,不論這個(gè)task是哪個(gè)WorkQueue中的元素. //同時(shí),此方法原則上講可以多次調(diào)用(盡管事實(shí)上就一次調(diào)用),入口處和出口處分別用忙碌標(biāo)記來標(biāo)記scanState,但重復(fù)標(biāo)記顯然不影響執(zhí)行. } //如果線程池中已經(jīng)初始化了用于記錄的stealCounter,則用它加上當(dāng)前WorkQueue的nsteals/或最大整數(shù)(發(fā)生溢出時(shí)). //并初始化當(dāng)前WorkQueue的nsteals. final void transferStealCount(ForkJoinPool p) { AtomicLong sc; if (p != null && (sc = p.stealCounter) != null) { //線程池中存放了stealCounter,它是一個(gè)原子整數(shù). int s = nsteals; nsteals = 0; //恢復(fù)0. //若nsteals是負(fù),增加最大整數(shù),否則增加nsteal sc.getAndAdd((long)(s < 0 ? Integer.MAX_VALUE : s)); } } //如果task存在,則將它從隊(duì)列中移除并執(zhí)行,發(fā)現(xiàn)有位于頂部的取消任務(wù),則移除之,只用于awaitJoin. //如果隊(duì)列空并且任務(wù)不知道完成了,則返回true. final boolean tryRemoveAndExec(ForkJoinTask> task) { ForkJoinTask>[] a; int m, s, b, n; //進(jìn)入if的條件,存在非空任務(wù)數(shù)組,參數(shù)task非空. if ((a = array) != null && (m = a.length - 1) >= 0 && task != null) { //循環(huán)條件,隊(duì)列非空.從s開始遍歷到b,也就是從頂?shù)降?后進(jìn)先出. while ((n = (s = top) - (b = base)) > 0) { //1.內(nèi)層循環(huán). for (ForkJoinTask> t;;) { //2.從頂開始的索引j,每次向下找一個(gè). long j = ((--s & m) << ASHIFT) + ABASE; if ((t = (ForkJoinTask>)U.getObject(a, j)) == null) //3.取出的是空,返回值取決于top是不是內(nèi)層循環(huán)是第一次運(yùn)行,外循環(huán)每次會將s更新為新top, //內(nèi)循環(huán)則會每次將s減一.內(nèi)循環(huán)只跑了一次的情況,顯然會返回true. //顯然這種情況下top也沒有被其他線程更新,內(nèi)循環(huán)又是第一次跑,那么將足以說明當(dāng)前隊(duì)列為空,該為false. //true的情況,向下遍歷了幾個(gè)元素打到了底,未進(jìn)入46 10這三種要重開啟一輪外循環(huán)的情況,也沒找到task. //不管怎樣,發(fā)現(xiàn)空任務(wù)就返回. return s + 1 == top;// 比預(yù)期短,第一個(gè)或第n個(gè)出現(xiàn)了空值,但循環(huán)條件未false else if (t == task) { //找到的任務(wù)t不是空,且是目標(biāo)任務(wù). boolean removed = false; if (s + 1 == top) { //4.發(fā)現(xiàn)是首輪內(nèi)循環(huán),s+1==top成立,進(jìn)行pop操作,將task彈出并將top減一. //顯然,task是最頂任務(wù),可以用pop方式,將它置空. if (U.compareAndSwapObject(a, j, task, null)) { U.putOrderedInt(this, QTOP, s); //5.置removed為true. removed = true; } } //6.不是首輪循環(huán),而且base沒有在處理期間發(fā)生改變. else if (base == b) //7.嘗試將task替換成一個(gè)EmptyTask實(shí)例.成功則removed是true, //這樣雖然該任務(wù)出了隊(duì),但在隊(duì)上還有一個(gè)空的任務(wù),而不會出現(xiàn)前面擔(dān)心的中間null //的情況,也不改變top或base的值. removed = U.compareAndSwapObject( a, j, task, new EmptyTask()); if (removed) //8.只要任務(wù)成功出隊(duì)(不論是4還是7,則執(zhí)行. task.doExec(); //9.只要找到任務(wù),退出內(nèi)循環(huán),回到外循環(huán)重置相應(yīng)的條件. break; } //10.本輪內(nèi)循環(huán)沒找到匹配task的任務(wù). else if (t.status < 0 && s + 1 == top) {//官方注釋是取消. //11.若t是完成的任務(wù)且是首輪內(nèi)循環(huán)且top未變動(dòng),將該任務(wù)出隊(duì)并令top減一. if (U.compareAndSwapObject(a, j, t, null)) U.putOrderedInt(this, QTOP, s); //12.只要進(jìn)入此分支就退出內(nèi)循環(huán). break; } if (--n == 0) //13.內(nèi)循環(huán)每執(zhí)行到此一次,就說明有一次沒找到目標(biāo)任務(wù),減少n(開始時(shí)的base top差值).達(dá)0時(shí)返回false停止循環(huán). //即每個(gè)內(nèi)循環(huán)都只能執(zhí)行n次,進(jìn)入外循環(huán)時(shí)重置n. return false; } //14.結(jié)束了任何一輪內(nèi)循環(huán)時(shí),發(fā)現(xiàn)目標(biāo)task已經(jīng)完成,則停止外循環(huán)返回false. if (task.status < 0) return false; } } //15.task參數(shù)傳空,或者當(dāng)前WorkQueue沒有任務(wù),直接返回true. return true; } //簡單梳理一下tryRemoveAndExec的執(zhí)行流程和生命周期. //a.顯然,一上來就判隊(duì)列的空和參數(shù)的空,如果第一個(gè)if都進(jìn)不去,按約定返回true. //b.經(jīng)過1初始化一個(gè)內(nèi)層循環(huán),并初始化了n,它決定內(nèi)循環(huán)最多跑n次,如果內(nèi)循環(huán)一直不break(9找到任務(wù)或12發(fā)現(xiàn)頂部任務(wù)是完成態(tài)),也假定一般碰不到14(發(fā)現(xiàn)目標(biāo)任務(wù)完成了) //也沒有出現(xiàn)幾種return(3查出null,14某輪內(nèi)循環(huán)目標(biāo)task發(fā)現(xiàn)被完成了),那么最終只會耗盡次數(shù),遍歷到底,在13處return false(確定此輪循環(huán)task不在隊(duì)列) //c.如果出現(xiàn)了幾種break(9,12),9其實(shí)代表查到任務(wù),12代表頂部任務(wù)已完成(官方說取消),那就會停止內(nèi)循環(huán),重新開啟一輪外循環(huán),初始化n,繼續(xù)從新的top到base遍歷(b). //但此時(shí),可能找不到task了(它已經(jīng)在上一輪內(nèi)循環(huán)出隊(duì)或被替換成代理),但也可能實(shí)際上未出隊(duì)(該task不是top,即4,base也發(fā)生了改變造成7未執(zhí)行),那么可能在本輪循環(huán) //找到任務(wù),在b中進(jìn)入相應(yīng)的break,并且成功移除并會進(jìn)入d,也可能沒進(jìn)入break而是再重復(fù)一次b. //d.如果某一次break成功刪除了任務(wù),那么外循環(huán)更新了n,base,top,重啟了一次內(nèi)循環(huán),但是所有找到task的分支不會再有了,如果接下來不再碰到被完成(取消)的頂部任務(wù)11-12, //同樣也沒發(fā)現(xiàn)目標(biāo)task完成了(不進(jìn)14),那么最終的結(jié)果就是n次內(nèi)循環(huán)后n降低到0,直接return false. //e.從b-d任何一次內(nèi)循環(huán)在最后發(fā)現(xiàn)了task結(jié)束,立即返回false.否則,它可能在某一次內(nèi)循環(huán)中彈出并執(zhí)行了該任務(wù),卻可能一直在等待它完成,因此這個(gè)機(jī)制可以讓等待task完成前, //幫助當(dāng)前WorkQueue清理頂部無效任務(wù)等操作. //此方法適用于不論共享或者獨(dú)有的模式,只在helpComplete時(shí)使用. //它會彈出和task相同的CountedCompleter,在前一節(jié)講解CountedCompleter時(shí)已介紹過此方法. //父Completer僅能在棧鏈上找到它的父和祖先completer并幫助減掛起任務(wù)數(shù)或完成root,但在此處 //它可以幫助棧鏈上的前置(子任務(wù)),前提是要popCC彈出. final CountedCompleter> popCC(CountedCompleter> task, int mode) { int s; ForkJoinTask>[] a; Object o; //當(dāng)前隊(duì)列有元素. if (base - (s = top) < 0 && (a = array) != null) { //老邏輯從頂部確定j. long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; if ((o = U.getObjectVolatile(a, j)) != null && (o instanceof CountedCompleter)) { //當(dāng)前隊(duì)列中存在類型為CountedCompleter的元素.對該completer棧鏈開啟一個(gè)循環(huán). CountedCompleter> t = (CountedCompleter>)o; for (CountedCompleter> r = t;;) { //對該CountedCompleter及它的completer棧元素進(jìn)行遍歷,每一個(gè)遍歷到的臨時(shí)存放r. //找到r==task,說明有一個(gè)completer位于task的執(zhí)行路徑. if (r == task) { //mode小于0,這個(gè)mode其實(shí)有誤解性,它的調(diào)用者其實(shí)是將一個(gè)WorkQueue的config傳給了這個(gè)mode. //而config只有兩處初始化,一是將線程注冊到池的時(shí)候,初始化WorkQueue, //二是外部提交的任務(wù),使用externalSubmit時(shí)新建的WorkQueue,config會是負(fù)值且沒有owner. //它也說明是共享隊(duì)列,需要有鎖定機(jī)制.. if (mode < 0) { //另一個(gè)字段qlock派上了用場,將它置為1表示加鎖. if (U.compareAndSwapInt(this, QLOCK, 0, 1)) { //加鎖成功,在top和array這過程中未發(fā)生變動(dòng)的情況下,嘗試 //將t出隊(duì),此時(shí)t是棧頂上的元素,它的completer棧鏈前方有task. if (top == s && array == a && U.compareAndSwapObject(a, j, t, null)) { U.putOrderedInt(this, QTOP, s - 1); U.putOrderedInt(this, QLOCK, 0); return t; } //不論出隊(duì)成功還是失敗,解鎖. U.compareAndSwapInt(this, QLOCK, 1, 0); } } //非共享隊(duì)列,直接將t出列. else if (U.compareAndSwapObject(a, j, t, null)) { U.putOrderedInt(this, QTOP, s - 1); return t; } //只要找到,哪怕兩處cas出現(xiàn)不成功的情況,也是競態(tài)失敗,break終止循環(huán). break; } //r不等于task,找出r的父并開始下輪循環(huán),直到root或找到task為止. else if ((r = r.completer) == null) // try parent break; } } } //空隊(duì)列,頂部不是Completer或者不是task的子任務(wù),返回null. return null; } //嘗試在無競態(tài)下偷取此WorkQueue中與給定task處于同一個(gè)completer棧鏈上的任務(wù)并運(yùn)行它, //若不成功,返回一個(gè)校驗(yàn)合/控制信號給調(diào)用它的helpComplete方法. //返回規(guī)則,成功偷取則返回1;返回2代表可重試(被其他小偷擊敗),如果隊(duì)列非空但未找到匹配task,返回-1, //其他情況返回一個(gè)強(qiáng)制負(fù)的基準(zhǔn)索引. final int pollAndExecCC(CountedCompleter> task) { int b, h; ForkJoinTask>[] a; Object o; if ((b = base) - top >= 0 || (a = array) == null) //空隊(duì)列,與最小整數(shù)(負(fù)值)取或作為信號h h = b | Integer.MIN_VALUE; // to sense movement on re-poll else { //從底部取索引j long j = (((a.length - 1) & b) << ASHIFT) + ABASE; //用該索引取task取出null,說明被捷足先登了,信號置為可重試. if ((o = U.getObjectVolatile(a, j)) == null) h = 2; // retryable //取出的非空任務(wù)類型不是CountedCompleter.說明不匹配,信號-1 else if (!(o instanceof CountedCompleter)) h = -1; // unmatchable else { //是CountedCompleter類型 CountedCompleter> t = (CountedCompleter>)o; for (CountedCompleter> r = t;;) { //基本同上個(gè)方法的邏輯,只是上個(gè)方法t取的是top,這里取base. //r從t開始找它的父,直到它本身或它的父等于task.將它從底端出隊(duì). if (r == task) { if (base == b && U.compareAndSwapObject(a, j, t, null)) { //出隊(duì)成功,因?yàn)槲覀冋业氖莃ase,且競態(tài)成功,直接更新base即可. base = b + 1; //出隊(duì)后執(zhí)行該出隊(duì)的任務(wù).返回1代表成功. t.doExec(); h = 1; // success } //base被其他線程修改了,或者cas競態(tài)失敗.(其實(shí)是一個(gè)情況),信號2,可以從新的base開始重試. else h = 2; // lost CAS //只要找到task的子任務(wù)就break,返回競態(tài)成功或可重試的信號. break; } //迭代函數(shù),當(dāng)前r不是task,將r指向它的父,直到某一個(gè)r的父是task或者是null進(jìn)入else if. else if ((r = r.completer) == null) { //能夠進(jìn)來,說明r已經(jīng)指向了root,卻沒有找到整條鏈上有這個(gè)task,返回信號為未匹配到. h = -1; // unmatched break; } } } } return h; } //如果當(dāng)前線程擁有此隊(duì)列且明顯未被鎖定,返回true. final boolean isApparentlyUnblocked() { Thread wt; Thread.State s; //前面提過的scanState會在一上來runTask時(shí)和1的反碼取與運(yùn)算,直到運(yùn)行完任務(wù)才會反向運(yùn)算. //這個(gè)過程,scanState的最后一位會置0,但這與此判斷條件關(guān)系不大. //前面對scanState有所注釋,小于0代表不活躍. return (scanState >= 0 && //隊(duì)列處于活躍態(tài)且當(dāng)前線程的狀態(tài)不是阻塞,不是等待,不是定時(shí)等待,則返回true. (wt = owner) != null && (s = wt.getState()) != Thread.State.BLOCKED && s != Thread.State.WAITING && s != Thread.State.TIMED_WAITING); } }
到此終于WorkQueue內(nèi)部類的代碼告一段落.
這一段介紹了WorkQueue的內(nèi)部實(shí)現(xiàn)機(jī)制,以及與上一節(jié)有關(guān)的提到的CountedCompleter在幫助子任務(wù)時(shí)處于WorkQueue的實(shí)現(xiàn)細(xì)節(jié)(似乎默認(rèn)情況下即asnycMode傳true時(shí)只會從當(dāng)前工作線程隊(duì)列取頂部元素,從其他隨機(jī)隊(duì)列的底部開取,有可能可以重復(fù)取,具體細(xì)節(jié)到ForkJoinPool的helpComplete相關(guān)源碼再說),以及構(gòu)建好的WorkQueue會有哪些可能的狀態(tài)和相應(yīng)的字段,以及若干模式(同步異步或者LIFO,FIFO等),出隊(duì)入隊(duì)的操作,還提出了隊(duì)列中元素為什么中間不能為空,如果出現(xiàn)要將中間元素出隊(duì)怎么辦?別忘了答案是換成一個(gè)EmptyTask.
不妨小結(jié)一下WorkQueue的大致結(jié)構(gòu).
1.它規(guī)避了偽共享.
2.它用scanState表示運(yùn)行狀態(tài),版本號,小于0代表不活躍維護(hù)了忙碌標(biāo)記,也用scanState在runTask入口開始運(yùn)行任務(wù)時(shí)標(biāo)記為忙碌(偶數(shù)),結(jié)束后再取消忙碌狀態(tài)(奇數(shù)).注釋解釋奇數(shù)代表正在掃描,但從代碼語義上看正好相反
3.它維護(hù)了一個(gè)可以擴(kuò)容的數(shù)組,也維護(hù)了足夠大的top和base,[base,top)或許可以形象地表示它的集合,pop是從top-1開始,poll從base開始,當(dāng)任務(wù)壓入隊(duì)成功后,檢查若top-base達(dá)到了數(shù)組長度,也就是集合[base,top)的元素?cái)?shù)達(dá)到或者超過了隊(duì)列數(shù)組長度,將對數(shù)組進(jìn)行擴(kuò)容,因使用數(shù)組長度-1與哈希值的方式,擴(kuò)容前后原數(shù)組元素索引不變,新壓入隊(duì)列的元素將在此基礎(chǔ)上無縫添加,因此擴(kuò)容也規(guī)避了出現(xiàn)中間任務(wù)null的情況.初始容量在runWorker時(shí)分配.
4.它維護(hù)了偷取任務(wù)的記錄和個(gè)數(shù),并在溢出等情況及時(shí)累加給池.它也維護(hù)了阻塞者線程和主人線程.
5.它可能沒有主人線程(共享隊(duì)列),或有主人線程(非共享,注冊入池時(shí)生成)
6.它維護(hù)了隊(duì)列鎖qlock,但目前僅在popCC且當(dāng)前為共享隊(duì)列情況下使用,保證爭搶的同步.
7.其他一些字段如config,currentJoin,hint,parker等,需要在后續(xù)的線程池自身代碼中結(jié)合前面的源碼繼續(xù)了解,包含stackPred據(jù)說保持前一個(gè)池棧的運(yùn)行信號.
WorkQueue本質(zhì)也是一個(gè)內(nèi)部類,它雖然定義了一系列實(shí)現(xiàn),但這些實(shí)現(xiàn)方法的調(diào)度還是由ForkJoinPool來實(shí)現(xiàn),所以我們還是要回歸到ForkJoinPool自身的方法和公有api上,遇到使用上面WorkQueue定義好的工具方法時(shí),我們再來回顧.
前面已經(jīng)看了一些影響WorkQueue的位于ForkJoinPool的常量,再來繼續(xù)看其他的ForkJoinPool中的一些常量.
//默認(rèn)線程工廠.前面提過兩個(gè)實(shí)現(xiàn) public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; //是否允許啟動(dòng)者在方法中殺死線程的許可,我們忽略這方面的內(nèi)容. private static final RuntimePermission modifyThreadPermission; //靜態(tài)的common池 static final ForkJoinPool common; common池的并行度. static final int commonParallelism; //tryComensate方法中對構(gòu)造備用線程的創(chuàng)造. private static int commonMaxSpares; //池順序號,創(chuàng)建工作線程會拼接在名稱上. private static int poolNumberSequence; //同步方法同,遞增的池id. private static final synchronized int nextPoolId() { return ++poolNumberSequence; } // 以下為一些靜態(tài)配置常量. //IDLE_TIMEOUT代表了一個(gè)初始的納秒單位的超時(shí)時(shí)間,默認(rèn)為2s,它用于線程觸發(fā)靜止停頓以等待新的任務(wù). //一旦超過了這個(gè) 時(shí)長,線程將會嘗試收縮worker數(shù)量.為了避免某些如長gc等停頓的影響,這個(gè)值應(yīng)該足夠大 private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec //為應(yīng)對定時(shí)器下沖設(shè)置的空閑超時(shí)容忍度. private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L; // 20ms //它是commonMaxSpares靜態(tài)初始化時(shí)的初值,這個(gè)值遠(yuǎn)超普通的需要,但距離 //MAX_CAP和一般的操作系統(tǒng)線程限制要差很遠(yuǎn),這也使得jvm能夠在資源耗盡前 //捕獲資源的濫用. private static final int DEFAULT_COMMON_MAX_SPARES = 256; //在block之前自旋等待的次數(shù),它在awaitRunStateLock方法和awaitWork方法中使用, //但它事實(shí)上是0,因此這兩個(gè)方法其實(shí)在用隨機(jī)的自旋次數(shù),設(shè)置為0也減少了cpu的使用. //如果將它的值改為大于0的值,那么必須設(shè)置為2的冪,至少4.這個(gè)值設(shè)置達(dá)到2048已經(jīng)可以 //耗費(fèi)一般上下文切換時(shí)間的一小部分. private static final int SPINS = 0; //種子生成器的默認(rèn)增量.注冊新worker時(shí)詳述. private static final int SEED_INCREMENT = 0x9e3779b9;
上面都是一些常量的聲明定義,下面看一些與線程池config和ctl有關(guān)的常量,以及前面構(gòu)造器提過的變量.
// 高低位 private static final long SP_MASK = 0xffffffffL;//long型低32位. private static final long UC_MASK = ~SP_MASK;//long型高32位. // 活躍數(shù). private static final int AC_SHIFT = 48;//移位偏移量,如左移到49位開始. private static final long AC_UNIT = 0x0001L << AC_SHIFT;//1<<48代表一個(gè)活躍數(shù)單位. private static final long AC_MASK = 0xffffL << AC_SHIFT;//long型高16位(49-64) // 總數(shù)量 private static final int TC_SHIFT = 32;//移位偏移量,33位開始. private static final long TC_UNIT = 0x0001L << TC_SHIFT;//1<<32代表一個(gè)總數(shù)量 private static final long TC_MASK = 0xffffL << TC_SHIFT;//33-48位 private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); //第48位 //與運(yùn)行狀態(tài)有關(guān)的位,顯然后面的runState是個(gè)int型,這些移位數(shù)也明顯是int的范圍. //SHUTDOWN顯然一定是負(fù)值,其他值也都是2的冪. private static final int RSLOCK = 1;//run state鎖,簡單來說就是奇偶位. private static final int RSIGNAL = 1 << 1;//2 運(yùn)行狀態(tài)的喚醒. private static final int STARTED = 1 << 2;//4,啟動(dòng) private static final int STOP = 1 << 29;//30位,代表停. private static final int TERMINATED = 1 << 30;//31位代表終止. private static final int SHUTDOWN = 1 << 31;//32位代表關(guān)閉. //實(shí)例字段. volatile long ctl; // 代表池的主要控制信號,long型 volatile int runState; // 可以鎖的運(yùn)行狀態(tài) final int config; // 同時(shí)保存了并行度和模式(開篇的構(gòu)造函數(shù)) int indexSeed; // 索引種子,生成worker的索引 volatile WorkQueue[] workQueues; // 工作隊(duì)列的注冊數(shù)組. final ForkJoinWorkerThreadFactory factory;//線程工廠 final UncaughtExceptionHandler ueh; // 每一個(gè)worker線程的未捕獲異常處理器. final String workerNamePrefix; // 工作線程名稱前綴. volatile AtomicLong stealCounter; // 代表偷取任務(wù)數(shù)量,前面提過,官方注釋說也用作同步監(jiān)視器
僅僅看這些字段的簡單描述是無法徹底搞清楚它們的含義的,還是要到應(yīng)用的代碼來看,我們繼續(xù)向下看ForkJoinPool中的一些方法.
//嘗試對當(dāng)前的runState加鎖標(biāo)志位,并返回一個(gè)runState,這個(gè)runState可能是原值(無競態(tài))或新值(競態(tài)且成功). //不太準(zhǔn)確的語言可以說是"鎖住"runState這個(gè)字段,其實(shí)不是,從代碼上下文看, //該標(biāo)志位被設(shè)置為1的期間,嘗試去lock的線程可以去更改runState的其他位,比如信號位. //而lockRunState成功的線程則是緊接著去更改ctl控制信號,工作隊(duì)列等運(yùn)行時(shí)數(shù)據(jù),故可以稱runState在鎖標(biāo)志這一塊 //可以理解為運(yùn)行狀態(tài)鎖. private int lockRunState() { int rs; //runState已經(jīng)是奇數(shù),表示已經(jīng)鎖上了,awaitRunState return ((((rs = runState) & RSLOCK) != 0 || //發(fā)現(xiàn)原本沒鎖,嘗試將原rs置為rs+1,即變?yōu)槠鏀?shù). !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ? //原來鎖了或者嘗試競態(tài)加鎖不成功,等待加鎖成功,否則直接返回rs. awaitRunStateLock() : rs); } //自旋或阻塞等待runstate鎖可用,這與上面的runState字段有關(guān).也是上一個(gè)方法的自旋+阻塞實(shí)現(xiàn). private int awaitRunStateLock() { Object lock; boolean wasInterrupted = false; for (int spins = SPINS, r = 0, rs, ns;;) { //每輪循環(huán)重讀rs. if (((rs = runState) & RSLOCK) == 0) { //1.發(fā)現(xiàn)rs還是偶數(shù),嘗試將它置為奇數(shù).(鎖) if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) { //2,鎖成功后發(fā)現(xiàn)擾動(dòng)了,則擾動(dòng)當(dāng)前線程,catch住不符合安全策略的情況. if (wasInterrupted) { try { //2.1擾動(dòng).它將影響到后面awaitWork方法的使用. Thread.currentThread().interrupt(); } catch (SecurityException ignore) { } } //2.2返回的是新的runStatus,相當(dāng)于原+1,是個(gè)奇數(shù). //注意,此方法中只有此處一個(gè)出口,也就是說必須要鎖到結(jié)果. return ns; } } //在1中發(fā)現(xiàn)被鎖了或者2處爭鎖競態(tài)失敗. else if (r == 0) //3.所有循環(huán)中只會執(zhí)行一次,如果簡單去看,nextSecondarySeed是一個(gè)生成 //偽隨機(jī)數(shù)的代碼,它不會返回0值.r的初值是0. r = ThreadLocalRandom.nextSecondarySeed(); else if (spins > 0) { //4.有自旋次數(shù),則將r的值進(jìn)行一些轉(zhuǎn)換并開啟下輪循環(huán).默認(rèn)spins是0,不會有自旋次數(shù). //從源碼來看,自旋的唯一作用就是改變r(jià)的值,使之可能重新進(jìn)入3,也會根據(jù)r的結(jié)果決定是否減 //少一次自旋. //r的算法,將當(dāng)前r的后6位保留,用r的后26位與前26位異或被保存為r的前26位(a). //再將(a)的結(jié)果處理,r的前21位保持不變,后11位與前11位異或并保存為r的后11位(b). //再將(b)的結(jié)果處理,r的后7位保持不變,用前25位與后25位異或并保存為r的前25位(c) //個(gè)中數(shù)學(xué)原理,有興趣的研究一下吧. //顯然,自旋次數(shù)并不是循環(huán)次數(shù),它只能決定進(jìn)入6中鎖代碼塊前要運(yùn)行至少幾輪循環(huán). r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift if (r >= 0) //經(jīng)過上面的折騰r還不小于0的,減少一個(gè)自旋次數(shù). //自旋次數(shù)不是每次循環(huán)都減一,但減到0之后不代表方法停止循環(huán),而是進(jìn)入2(成功)或者6(阻塞). --spins; } //某一次循環(huán),r不為0,不能進(jìn)入3,自旋次數(shù)也不剩余,不能進(jìn)入4.則到此. else if ((rs & STARTED) == 0 || (lock = stealCounter) == null) //5.線程池的runState表示還未開啟,或者還未初始化偷鎖(stealCounter),說明 //還沒完成初始化,此處是初始化時(shí)的競態(tài),直接讓出當(dāng)前線程的執(zhí)行權(quán).等到重新獲取執(zhí)行權(quán)時(shí), //重新循環(huán),讀取新的runState并進(jìn)行. Thread.yield(); // initialization race else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {//可重入 //6.沒能對runState加鎖,也不是5中的初始化時(shí)競態(tài)的情況,嘗試加上信號位,以stealCounter進(jìn)行加鎖. //顯然,這種加信號位的加法不會因?yàn)樾盘栁欢?而會因?yàn)閞unState的其他字段比如鎖標(biāo)識位失敗,這時(shí) //重新開始循環(huán)即可. synchronized (lock) { //明顯的double check if ((runState & RSIGNAL) != 0) { //6.1當(dāng)前pool的runState有信號位的值,說明沒有線程去釋放信號位. try { //6.2runState期間沒有被去除信號位,等待. lock.wait(); } catch (InterruptedException ie) { //6.3等待過程中發(fā)生異常,且不是記錄一個(gè)標(biāo)記,在2處會因它中斷當(dāng)前線程. if (!(Thread.currentThread() instanceof ForkJoinWorkerThread)) wasInterrupted = true; } } else //6.4當(dāng)前runState沒有信號位的值,說明被釋放了,順便喚醒等待同步塊的線程.讓他們繼續(xù)轉(zhuǎn)圈. lock.notifyAll(); } } } } //解鎖runState,前面解釋過,這個(gè)鎖可以理解為對runState的鎖標(biāo)志位進(jìn)行設(shè)定,而設(shè)定成功的結(jié)果就是可以改信號量ctl. //它會解鎖runState,并會用新的runState替換. private void unlockRunState(int oldRunState, int newRunState) { //首先嘗試cas.cas成功可能會導(dǎo)致上一個(gè)方法中進(jìn)入同步塊的線程改走6.4喚醒阻塞線程. if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) { //cas不成功,直接強(qiáng)制更改. Object lock = stealCounter; runState = newRunState;// 這一步可能清除掉信號位.使上一個(gè)方法中已進(jìn)入同步塊的線程改走6.4 if (lock != null) //強(qiáng)制更換為新的運(yùn)行狀態(tài)后,喚醒所有等待lock的線程. synchronized (lock) { lock.notifyAll(); } } }
上面的幾個(gè)方法是對runState字段進(jìn)行操作的,并利用了信號位,鎖標(biāo)識位,運(yùn)行狀態(tài)位.
顯然,雖然可以不精確地說加鎖解鎖是對runState的鎖標(biāo)識位進(jìn)行設(shè)置,嚴(yán)格來說,這卻是為ctl/工作隊(duì)列等運(yùn)行時(shí)數(shù)據(jù)服務(wù)的(后面再述),顯然精確說是對運(yùn)行時(shí)數(shù)據(jù)的修改權(quán)限加鎖.
同樣的,加鎖過程采用自旋+阻塞的方式,整個(gè)循環(huán)中同時(shí)兼容了線程池還在初始化(處理方式讓出執(zhí)行權(quán)),設(shè)定了自旋次數(shù)(處理方式,隨機(jī)數(shù)判斷要不要減少自旋次數(shù),自旋次數(shù)降0前不會阻塞)這兩種情況,也順便在阻塞被擾動(dòng)的情況下暫時(shí)忽略擾動(dòng),只在成功設(shè)置鎖標(biāo)識位后順手負(fù)責(zé)擾動(dòng)當(dāng)前線程.
簡單剝離這三種情況,加鎖過程是一輪輪的循環(huán),會嘗試設(shè)置鎖標(biāo)識位,成功則返回新標(biāo)識,不成功則去設(shè)置信號位(可能已經(jīng)有其他線程設(shè)
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/75118.html
摘要:前言在前面的文章框架之中梳理了框架的簡要運(yùn)行格架和異常處理流程顯然要理解框架的調(diào)度包含工作竊取等思想需要去中了解而對于的拓展和使用則需要了解它的一些子類前文中偶爾會提到的一個(gè)子類直譯為計(jì)數(shù)的完成器前文也說過的并行流其實(shí)就是基于了框架實(shí)現(xiàn)因此 前言 在前面的文章ForkJoin框架之ForkJoinTask中梳理了ForkJoin框架的簡要運(yùn)行格架和異常處理流程,顯然要理解ForkJoi...
摘要:這減輕了手動(dòng)重復(fù)執(zhí)行相同基準(zhǔn)測試的痛苦,并簡化了獲取結(jié)果的流程。處理項(xiàng)目的代碼并從標(biāo)有注釋的方法處生成基準(zhǔn)測試程序。用和運(yùn)行該基準(zhǔn)測試得到以下結(jié)果。同時(shí),和的基線測試結(jié)果也有略微的不同。 Java 8 已經(jīng)發(fā)布一段時(shí)間了,許多開發(fā)者已經(jīng)開始使用 Java 8。本文也將討論最新發(fā)布在 JDK 中的并發(fā)功能更新。事實(shí)上,JDK 中已經(jīng)有多處java.util.concurrent 改動(dòng),但...
摘要:前言在前面的文章和響應(yīng)式編程中提到了和后者毫無疑問是一個(gè)線程池前者則是一個(gè)類似經(jīng)典定義的概念官方有一個(gè)非常無語的解釋就是運(yùn)行在的一個(gè)任務(wù)抽象就是運(yùn)行的線程池框架包含和若干的子類它的核心在于分治和工作竅取最大程度利用線程池中的工作線程避免忙的 前言 在前面的文章CompletableFuture和響應(yīng)式編程中提到了ForkJoinTask和ForkJoinPool,后者毫無疑問是一個(gè)線程...
摘要:使用方式要把任務(wù)提交到線程池,必須創(chuàng)建的一個(gè)子類,其中是并行化任務(wù)產(chǎn)生的結(jié)果如果沒有結(jié)果使用類型。對一個(gè)子任務(wù)調(diào)用的話,可以使一個(gè)子任務(wù)重用當(dāng)前線程,避免線程池中多分配一個(gè)任務(wù)帶來的開銷。 【概念 分支和并框架的目的是以遞歸的方式將可以并行的任務(wù)拆分成更小的任務(wù),然后將每個(gè)子任務(wù)的結(jié)果合并起來生成整體的結(jié)果,它是ExecutorService的一個(gè)實(shí)現(xiàn),它把子任務(wù)分配給線程池(Fork...
摘要:對于任務(wù)的分割,要求各個(gè)子任務(wù)之間相互獨(dú)立,能夠并行獨(dú)立地執(zhí)行任務(wù),互相之間不影響。是叉子分叉的意思,即將大任務(wù)分解成并行的小任務(wù),是連接結(jié)合的意思,即將所有并行的小任務(wù)的執(zhí)行結(jié)果匯總起來。使用方法會阻塞并等待子任務(wù)執(zhí)行完并得到其結(jié)果。 Fork/Join是什么? Fork/Join框架是Java7提供的并行執(zhí)行任務(wù)框架,思想是將大任務(wù)分解成小任務(wù),然后小任務(wù)又可以繼續(xù)分解,然后每個(gè)小...
閱讀 2593·2021-10-25 09:45
閱讀 1254·2021-10-14 09:43
閱讀 2310·2021-09-22 15:23
閱讀 1538·2021-09-22 14:58
閱讀 1944·2019-08-30 15:54
閱讀 3554·2019-08-30 13:00
閱讀 1367·2019-08-29 18:44
閱讀 1580·2019-08-29 16:59