摘要:前言在前面的幾篇文章中詳述了框架的若干組分在相應(yīng)的官方文檔中總會不時地提起同樣的也提到可以用于幫助運行在中的運行時保持有效的執(zhí)行并行度其實特指其他都在等待一個的前進時熟悉的朋友都知道它的大概組成部分包含支持并發(fā)的容器同步器線程池阻塞隊列原子
前言
在前面的幾篇文章中詳述了ForkJoin框架的若干組分,在相應(yīng)的官方文檔中總會不時地提起"Phaser",同樣的,也提到Phaser可以用于幫助運行在ForkJoinPool中的ForkJoinTask運行時保持有效的執(zhí)行并行度(其實特指其他task都在等待一個phase的前進時).
熟悉JUC的朋友都知道它的大概組成部分包含:Containers(支持并發(fā)的容器),Synchronizers(同步器),Executors(線程池),BlockingQueue(阻塞隊列),Atomic(原子類),Lock?and?Condition(鎖).而Phaser和CyclicBarrier,Semaphore等一樣是一個同步器.
本文主要介紹Phaser的內(nèi)部實現(xiàn),粗略介紹使用,它的源碼相比于線程池較為簡單,但最好能對比其他同步器來了解,讀者最好擁有juc其他同步器,原子類,部分ForkJoin框架的基礎(chǔ).
同時,本文也會再次提到ForkJoinPool::managedBlock(blocker),之前在ForkJoinPool一文提到了實現(xiàn)和接口,而在CompletableFuture中見到了一個blocker的實現(xiàn).
Phaser源碼首先來看一些與Phaser狀態(tài)有關(guān)的簡單的常量.
//64位整數(shù)表示Phaser的狀態(tài). private volatile long state; private static final int MAX_PARTIES = 0xffff;//最大parties,后16位表示. private static final int MAX_PHASE = Integer.MAX_VALUE;//最大phase,最大整數(shù)值. private static final int PARTIES_SHIFT = 16;//取parties使用的移位數(shù),16 private static final int PHASE_SHIFT = 32;//取phase的移位數(shù),32 private static final int UNARRIVED_MASK = 0xffff; //未到的,取后16位. private static final long PARTIES_MASK = 0xffff0000L; //參加者,17-32位. private static final long COUNTS_MASK = 0xffffffffL; //數(shù)量,后32位. private static final long TERMINATION_BIT = 1L << 63;//終止態(tài),首位. // 特殊值. private static final int ONE_ARRIVAL = 1; private static final int ONE_PARTY = 1 << PARTIES_SHIFT; private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY;//第1位和17位.顯然,它表示了一個ONE_ARRIVAL信息和PARTY信息. private static final int EMPTY = 1; //對一個state s計算unarrived的count, private static int unarrivedOf(long s) { //直接取整數(shù)位,如果等于EMPTY(1)則返回0,否則取后16位. int counts = (int)s; return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); } //對一個state,取出parties信息,直接取state的17至32位. private static int partiesOf(long s) { return (int)s >>> PARTIES_SHIFT; } //對于一個state,取出phase信息,直接取前32位. private static int phaseOf(long s) { return (int)(s >>> PHASE_SHIFT); } //對于一個state,取出arrived信息 private static int arrivedOf(long s) { int counts = (int)s; //state的后32位等于1(EMPTY)返回0,否則返回parties(state的17至32位,參考上面的partiesOf方法)和UNARRIVED(state的后16位)的差. return (counts == EMPTY) ? 0 : (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK); }
上面都是一些常量,沒什么可分析的,簡單來個總結(jié).
Phaser用一個long型的state保存狀態(tài)信息.
state的前32位表示phase,后16位表示unarrivied,17至32位表示parties,parties減去unarrived即arrived.
下面我們看一些成員變量和有關(guān)函數(shù).
//this的父,可以是null表示none private final Phaser parent; //phaser顯然是個樹的結(jié)果,root代表根,如果當前phaser不在樹內(nèi),則root==this private final Phaser root; //偶數(shù)隊列和奇數(shù)隊列.它們存放等待線程棧的頭,為了減少當添加線程與釋放線程的競態(tài), //這里使用了兩個隊列并互相切換,子phaser共享root的隊列以加快釋放. private final AtomicReferenceevenQ; private final AtomicReference oddQ; //決定某個phase的等待線程隊列. private AtomicReference queueFor(int phase) { //選擇隊列的方法,如果參數(shù)phase是偶數(shù),使用evenQ,否則oddQ. return ((phase & 1) == 0) ? evenQ : oddQ; } //出現(xiàn)arrive事件時的邊界異常信息. private String badArrive(long s) { return "Attempted arrival of unregistered party for " + stateToString(s); } //注冊時的邊界異常信息. private String badRegister(long s) { return "Attempt to register more than " + MAX_PARTIES + " parties for " + stateToString(s); } //他們都用到的stateToString(s),計算參數(shù)s對應(yīng)的phase,parties,arrived. private String stateToString(long s) { return super.toString() + "[phase = " + phaseOf(s) + " parties = " + partiesOf(s) + " arrived = " + arrivedOf(s) + "]"; }
為了便于理解,先來看隊列的實現(xiàn).
//表示等待隊列的QNode,實現(xiàn)了ManagedBlocker static final class QNode implements ForkJoinPool.ManagedBlocker { //存放所屬phaser final Phaser phaser; //所屬phase final int phase; //是否可擾動 final boolean interruptible; //是否定時 final boolean timed; //是否已擾動 boolean wasInterrupted; //計時相關(guān) long nanos; final long deadline; //關(guān)聯(lián)線程,當是null時,取消等待. volatile Thread thread; //下一個QNode QNode next; QNode(Phaser phaser, int phase, boolean interruptible, boolean timed, long nanos) { this.phaser = phaser; this.phase = phase; this.interruptible = interruptible; this.nanos = nanos; this.timed = timed; this.deadline = timed ? System.nanoTime() + nanos : 0L; //取當前線程. thread = Thread.currentThread(); } //isReleasable方法 public boolean isReleasable() { if (thread == null) //1.線程已置空(如2),返回true釋放. return true; if (phaser.getPhase() != phase) { //2.發(fā)現(xiàn)phaser所處的phase不是構(gòu)建QNode時的phase了,就置線程為空,返回true. thread = null; return true; } if (Thread.interrupted()) //3.如果當前線程擾動了. wasInterrupted = true; if (wasInterrupted && interruptible) { //4.發(fā)現(xiàn)擾動標記,并且QNode配置為可擾動,則置線程null并返回true thread = null; return true; } if (timed) { //5.定時邏輯,還有nanos,計算新的時長. if (nanos > 0L) { nanos = deadline - System.nanoTime(); } if (nanos <= 0L) { //已經(jīng)到時間,返回true,線程置空. thread = null; return true; } } return false; } //block邏輯 public boolean block() { if (isReleasable()) return true; else if (!timed) //不定時的park LockSupport.park(this); else if (nanos > 0L) //定時的情況. LockSupport.parkNanos(this, nanos); //老規(guī)矩 return isReleasable(); } }
前面介紹過CompletableFuture的Singnaller,以及ForkJoinPool中的managedBlock,這一塊的邏輯顯然駕輕就熟.
很明顯,如果我們在ForkJoinPool中使用它作為blocker,并在相應(yīng)的ForkJoinTask的exec或CountedCompleter的compute方法中使用ForkJoinPool::managedBlock(blocker),將每個ForkJoinWorkerThread在阻塞前構(gòu)建一個QNode進入Phaser的等待隊列(雖然還沒有講到相關(guān)內(nèi)容,但是Phaser顯然不用我們直接操作內(nèi)部類QNode),那么它將依照上述邏輯進行補償,保障有效的并行度.
前面完成了承前啟后,預熱到此結(jié)束,開始看Phaser的核心方法.
//doArrive方法 //它是arrive和arriveAndDeregister方法的主要實現(xiàn).手動調(diào)用這些方法可以加速通過和最小化競態(tài)窗口期. //參數(shù)代表要從當前state中減去的調(diào)整數(shù)值,它的單位依托于業(yè)務(wù),當為arrive時減去的單位為ONE_ARRIVAL, //當為arriveAndDeregister時減去的單位為ONE_DEREGISTER. private int doArrive(int adjust) { final Phaser root = this.root; for (;;) { //1.變量s初始化,取決于是否當前Phaser是root.不是root將試圖從root同步滯后的state. long s = (root == this) ? state : reconcileState(); //計算phase,前32位. int phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) //2.負數(shù)直接返回.說明原來的state首位就是1,前面的TERMINATE_BIT就是64位置1. return phase; //取count,后32位. int counts = (int)s; //計算unarrived,和前面一樣的邏輯. int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); if (unarrived <= 0)//2.1 //沒有unarrived了,說明不應(yīng)該調(diào)用此方法,拋出異常,信息就是前面介紹過的badArrive throw new IllegalStateException(badArrive(s)); //3.嘗試將state減去adjust數(shù). if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) { //3.1cas成功后,unarrived余1,則前進一個phase if (unarrived == 1) { //3.1.1取出parties作為下一個state的基礎(chǔ). long n = s & PARTIES_MASK; //3.1.2 下一個unarrived,數(shù)值上等于parties. int nextUnarrived = (int)n >>> PARTIES_SHIFT; if (root == this) { //3.1.3當前Phaser是root,onAdvance返回true,則加上終止信號. if (onAdvance(phase, nextUnarrived)) n |= TERMINATION_BIT; else if (nextUnarrived == 0) //3.1.4 onAdvance返回false,而計算得出的nextUnarrived是0,即沒有parties,n加上一個empty(1) n |= EMPTY; else //3.1.5nextUnArrived不是0,加到n上. n |= nextUnarrived; //3.1.6前面的流程完成了state的后32位(parties和unarrived),接下來處理前32位. //限定在MAX_PHASE之內(nèi),對當前phase加1. int nextPhase = (phase + 1) & MAX_PHASE; //將nextPhase的值加到n的前32位.并用n去cas掉原來的state,因為有3處入口的cas,此處一定能成功 n |= (long)nextPhase << PHASE_SHIFT; UNSAFE.compareAndSwapLong(this, stateOffset, s, n); //更新到新的phase,喚醒等待的waiter. releaseWaiters(phase); } //3.1.7當前Phaser不是root,當nextUnarrived計算得0時,像父傳遞解除注冊,參數(shù)ONE_DEREGISTER //會同時減去一個unarrived和一個parties.下輪循環(huán)正常應(yīng)進入3.1.8 else if (nextUnarrived == 0) { phase = parent.doArrive(ONE_DEREGISTER); //完成傳遞后,將自己的state置empty. UNSAFE.compareAndSwapLong(this, stateOffset, s, s | EMPTY); } else //3.1.8,當前Phaser不是root,計算的nextUnarrived非0,像父傳遞一個arrive事件,減去一個unarrived. phase = parent.doArrive(ONE_ARRIVAL); } //3.2返回當前phase,可能是已進入3.1遞增的.僅有此處可退出循環(huán). return phase; } } }
關(guān)于該方法的執(zhí)行流程,我們結(jié)合幾個周邊方法一并分析,先來看注冊方法和onAdvance勾子.
//注冊和批量注冊.參數(shù)代表parties和unarrived字段的增加數(shù),它必須大于0. private int doRegister(int registrations) { // 1.用參數(shù)計算一個adjust,同時包含parties和arrive. long adjust = ((long)registrations << PARTIES_SHIFT) | registrations; final Phaser parent = this.parent; int phase; //循環(huán)嘗試更改. for (;;) { //2.存在parent,則用root的phase調(diào)整this的state. long s = (parent == null) ? state : reconcileState(); //取出當前state中保存的counts,parties,unarrived信息. int counts = (int)s; int parties = counts >>> PARTIES_SHIFT; int unarrived = counts & UNARRIVED_MASK; if (registrations > MAX_PARTIES - parties) //要注冊的數(shù)量大于了余量,拋出異常. throw new IllegalStateException(badRegister(s)); //3.計算出phase phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) //phase為負說明state為負,即終止態(tài),終止. break; //4.當前state表示的參與數(shù)非空的邏輯,當前注冊非首次注冊. if (counts != EMPTY) { if (parent == null || reconcileState() == s) { //this是root或者從root同步的state不變,繼續(xù)執(zhí)行,否則重新循環(huán). if (unarrived == 0) //4.1本輪循環(huán)通過原state計算的unarrived為0,說明應(yīng)等待下一phase,使用root等待 root.internalAwaitAdvance(phase, null); else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adjust)) //4.2本輪循環(huán)未發(fā)現(xiàn)應(yīng)等待下一phase,嘗試原子更新,增加adjust到state上. break; } } //5.當前不存在counts,且自身就是root,代表root的首次注冊. else if (parent == null) { //5.1計算下一個state,因為沒有參與數(shù),使用phase初始化前32位,并使用adjust做后32位. long next = ((long)phase << PHASE_SHIFT) | adjust; if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) //5.2 cas成功,退出,不成功,下輪循環(huán). break; } //6.是首次注冊,但也不是root的邏輯.代表非root的Phaser的首次注冊. else { //6.1對當前Phaser加鎖并double check,避免同時調(diào)用.加鎖失敗的線程將在后續(xù)進入2的邏輯. synchronized (this) { //double check state未發(fā)生改變. if (state == s) { //6.2首先向父Phaser注冊1. phase = parent.doRegister(1); if (phase < 0) //發(fā)現(xiàn)進入終止態(tài),直接停止. break; //6.3向父Phaser注冊成功,循環(huán)嘗試cas掉老的state,新state的算法同上,phase加adjust. //在整個while循環(huán)中,不再考慮phase進入終止態(tài)的情況,因為這些操作處于同一個"事務(wù)"中, //且因競態(tài)等原因,若某次cas時計入了負數(shù)的phase,方法返回后也可以及時發(fā)現(xiàn). while (!UNSAFE.compareAndSwapLong (this, stateOffset, s, ((long)phase << PHASE_SHIFT) | adjust)) { //如果cas不成功,則讀取s為新的state,計算新的phase并重新循環(huán). s = state; phase = (int)(root.state >>> PHASE_SHIFT); // assert (int)s == EMPTY; } //6.4cas成功后退出循環(huán). break; } //如果if(state==s)判斷失敗,說明有別的線程有當前線程進入synchronized塊前已經(jīng)加鎖并執(zhí)行了內(nèi)部的邏輯且稍后釋放了鎖, //這樣當前線程加鎖成功,但if判斷失敗,它會立即釋放鎖并返回到2. } } } return phase; } //使用root的phase調(diào)整this的state,更新滯后的結(jié)果.這一般發(fā)生在root前進了phase但是 //子phaser還沒有做到這一步,這種情況下,子phaser必須完成這個前進的步驟,這一過程中,phase將 //被置為root的phase,unarrived則會重置為parties,若parties為0,則置為EMPTY.返回結(jié)果state. private long reconcileState() { final Phaser root = this.root; long s = state; //不是root才進行下一步. if (root != this) { int phase, p; //cas,phase采用root,parties不變,unarrived重置為parties或EMPTY. while ((phase = (int)(root.state >>> PHASE_SHIFT)) != (int)(s >>> PHASE_SHIFT) && //phase滯后于root //嘗試cas. !UNSAFE.compareAndSwapLong (this, stateOffset, s, //確定新state的前32位,使用root的phase. s = (((long)phase << PHASE_SHIFT) | //新phase<0,后32位直接取this的state表示的counts. ((phase < 0) ? (s & COUNTS_MASK) : //phase有效,this的state表示的parties為0,則后32位使用empty (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY : //否則,后32位使用parties. ((s & PARTIES_MASK) | p)))))) s = state; } return s; } //onAdvance勾子方法,參數(shù)為當前phase和注冊的parties數(shù). //默認實現(xiàn)為parties數(shù)為0,方法返回true時,調(diào)用者會嘗試終止Phaser.(參考前面的doArrive).隨后調(diào)用isTerminated方法將返回true. //執(zhí)行此方法時拋出的運行時異常或Error將直接上拋給嘗試advance相應(yīng)的phase的線程,這種情況下不會發(fā)生phase的advance. //方法的入?yún)⒈硎镜氖荘haser當前的state(未advance前),因此若在onAdvance方法中執(zhí)行arrive,regist,waiting這三種操作的行為是不確定的也不可靠的. //如果當前Phaser是一個級聯(lián)的成員,那么onAdvance只會由root在每次advance時調(diào)用. //方法的默認實現(xiàn)返回true的場景目前只能是經(jīng)過數(shù)次arriveAndDeregister調(diào)用造成parties歸零的結(jié)果.我們繼承Phaser可以輕易地重寫此行為, //比如簡單粗暴地返回false,那么將永遠允許新的注冊. protected boolean onAdvance(int phase, int registeredParties) { return registeredParties == 0; }
經(jīng)過前面的代碼分析,已經(jīng)對Phaser的核心函數(shù)doRegister,doArrive有了全面的了解.
兩者都會在一開始同步root的phase,且如果出現(xiàn)落后root的情況,同步了新的phase的同時,也會重新初始化unarrived,并且使用parties的值.
doArrive方法會每次調(diào)整unarrived數(shù)量(也可包含parties數(shù)量,如果使用了解除注冊),當Phaser調(diào)用自身的arrive/arriveAndDeregister時,會做出相應(yīng)的減少,并根據(jù)是否為root而決定向上遞歸.
Phaser減少自身unarrived信號(也可能同時有parties信號)后,若發(fā)現(xiàn)這已經(jīng)是最后一個unarrived信號,則進行接下來的判斷:
1.當前Phaser是root,advance并喚醒waiter.(重要的喚醒操作執(zhí)行點,root一輪完成)
2.當前Phaser不是root,且它已經(jīng)不具備繼續(xù)下一輪的條件(計算nextUnarrived為0,即parties已經(jīng)被arriveAndDeregister置0),則從父Phaser減少一個unarrived和parties.
3.當前Phaser不是root,但它仍具有parties,滿足進行下一輪的條件(計算nextUnarrived不是0),則從父Phaser減少一個unarrived,但不減少parties.
顯然,子Phaser的最后一個unarrived的消失一定會造成父的unarrived減少,子Phaser不能繼續(xù)下一phase的register和arrive時,從父Phaser中卸載.
若不是本Phaser的最后一個unarrived信號,則直接結(jié)束,相當于只進行了上面的減少信號操作.
doRegister方法的邏輯大致相反,不同于doArrive,它的參數(shù)registrations同時作用于parties和unarrived,即兩個位上同時加上registrations參數(shù).它的大致邏輯:
1.當前注冊并非首次注冊,且出現(xiàn)unarrived==0,即本輪已經(jīng)完成了arrive,那么本輪將不能注冊,需要等待root更新到下輪.(這也是我們碰到的第一個阻塞)
2.當前注冊并非首次注冊,unarrived也不是0,則在本phase進行注冊,增加相應(yīng)的parties和unarrived.
3.當前注冊是root的首次注冊,給root的state加上相應(yīng)的parties和unarrived.
4.當前注冊是非root的首次注冊,加鎖(this),對自己的state加上相應(yīng)的parties和unarrived(同上,以registrations為單位),而對parent加上一個parties和unarrived單位.
很明顯,對于單Phaser的情況非常好理解,每次減少unarrived數(shù)量(先不考慮減少parties),則最終導致Phaser自身進入下一個phase,然后重新初始化unarrived到下一輪,unarrived的新值是前一輪剩下的parties數(shù)量.
當我們同時也嘗試減少parties數(shù)量,即解除parties的注冊,最終導致沒有parties,那么Phaser將進入終止態(tài).
整個過程中,只要Phaser沒進入終止態(tài),隨時可以進行新的注冊,并增加parties和unarrived的數(shù)量.每個arrive可以減少unarrived的數(shù)量為任何正整數(shù),不一定是1.
對于多Phaser的情況,有兩個特殊點:
1.對任意Phaser樹中的某一個Phaser調(diào)用注冊操作,會令自身加上相應(yīng)參數(shù)個parties和unarrived單位,僅會在該Phaser第一次注冊時增加父Phaser(極端可能,僅從一個葉子節(jié)點第一個注冊的情況下可一直遞歸到root)的parties數(shù)和unarrived數(shù)各1單位(不論參數(shù)是多少).
2.對任意Phaser樹中的某一個Phaser調(diào)用arrive操作,會令自身減去相應(yīng)的參數(shù)個parties和unarrived單位,同時僅當本Phaser此時是最后一個unarrived時,會減去父Phaser的一個unarrived單位(當前子Phaser仍舊有parties可以構(gòu)建下一phase),或減去父Phaser一個Parties和unarrived單位.(極端情況下,每一級都是最后一個unarrived時,減少葉子節(jié)點的最后一個unarrived會遞歸到root).
每新增一個子Phaser,父Phaser就會增加一個要完成觸發(fā)phase的advance前必須要等到arrive的單位;每一個子Phaser中所有的arrive完成,父Phaser都將減少一個要等待advance所必需觸發(fā)的arrive.
目前沒有看到await方法,但可以提前說明,等待操作完全依賴于root是否完成本輪.也就是所有子Phaser都完成了同一輪(arrive打滿),才能讓父Phaser本身減去一個所有arrive單位,再觸發(fā)父Phaser本輪的完成,此時對任何已完成的Phaser進入注冊,都會進入上述的root.internalAwaitAdvance(phase, null)方法等待root進入下一phase.如果對已經(jīng)完成所有arrive的Phaser繼續(xù)進行arrive操作,因為unarrived已經(jīng)是0,則會拋出異常.
所以對于使用子Phaser的場景,如果發(fā)生很巧妙的情況,Phaser樹上當前子Phaser的arrive結(jié)束條件滿足了,使得新來的注冊只能等待下一輪次,而其他分支的子Phaser又偏偏不能完成本輪次,那么新的phaser.doRegister方法將阻塞在此.
好在我們使用Phaser可能會類似CyclicBarrier的使用方式,可對每一輪(phase)進行注冊并等待(也許只等一輪,那么arrive就要帶上deregister),每一輪最后一個線程arrive了,就會停止所有線程的等待,讓所有線程繼續(xù)執(zhí)行,同時開啟了下一輪次,這些線程此時又可以不經(jīng)注冊直接在新的輪次中進行等待,直到最后一個arrive了,再次喚醒所有線程并繼續(xù)執(zhí)行,同時Phaser再前進一輪,如此往復.中間使用arrive并deregister的線程會從本輪起減少一個unarrive數(shù)量(因為parties也減少了,所以再下一輪初始化unarrive數(shù)量時也會減少一次).我們可以讓這些線程參與任意的輪次,但要注意的是,如果有線程中途不參加了,一定要解除注冊,否則因為每輪初始化時,要等待arrive的數(shù)量都是上一輪剩下的parties數(shù)量,有線程停止了執(zhí)行,卻不減少parties數(shù),那么下輪所有等待的線程將永遠等不到phaser滿足喚醒的條件.
上述的過程中可以明顯的看出,目前已介紹的兩個重要核心函數(shù):注冊和arrive并沒有直接記錄和操作線程的操作,相應(yīng)的操作在等待方法和喚醒方法中(前面提到過release),我們稍后介紹.
現(xiàn)在假設(shè)一個特殊的使用場景,也可以區(qū)別于CyclicBarrie和CountDownLatch的使用.還是上面的例子,但是我們準備的線程數(shù)與Phaser的parties數(shù)/unarrived數(shù)不同(一般前者要多些),會發(fā)生什么事?
首先創(chuàng)建了Phaser,不指定最初parties數(shù),并用每個線程去注冊(我甚至可以用一個線程去重復注冊,每次的參數(shù)registrations還可以不同,注冊的作用并不是將當前線程壓入隊列,而是為本phase設(shè)置一個unarrive數(shù)量,以控制到達下個phase前必須有多少次arrive的發(fā)生),則parties數(shù)和unarrived的初值完全與此有關(guān),是一個依托于我們隨意注冊而產(chǎn)生的隨意值.那么假定我們的線程數(shù)量大于這個parties數(shù)(假定調(diào)用注冊方法的線程和arrive及等待的線程無關(guān)),并令有的線程執(zhí)行arrive(完全可以一次arrive減去多個信號量,甚至一個線程多次arrive),有的線程執(zhí)行await等待信號advance到下一個phase(一個線程在一個周期只能調(diào)用一次),有的線程執(zhí)行了arrive也等待phase前進(這種情況一個線程一周期也只能一次.其實這些分別對應(yīng)了還未介紹的arrive,waitAdvance,arriveAndWaitAdvance等方法),多帶帶進行await操作的線程可以是任意數(shù)量,執(zhí)行arrive方法的線程加上執(zhí)行arrive并wait的操作的線程和必須超過unarrived,這才能喚醒等待線程.
目前這些還比較抽象,等到我們看過相應(yīng)的幾個方法便了然了.
onAdvance的方法默認實現(xiàn)就是判斷本階段注冊的parties數(shù)量,如果已經(jīng)是0則說明沒有parties了,Phaser應(yīng)該結(jié)束.但是我們其實可以重新實現(xiàn),比如參數(shù)中同時傳入了當前的phase,我可以規(guī)定上面的例子中phase最多只有3輪次,那么不論什么時候arrive,發(fā)現(xiàn)了當前phase已進入3輪,Phaser就被終止.當然,這一過程是由root執(zhí)行的,但是子Phaser的phase會在每次注冊和arrive發(fā)生時同步root,因此本例中對于phase數(shù)的判斷可以粗放到所有Phaser,對于parties數(shù)則只能作用于root(事實上調(diào)用onAdvance的一定是root).
接下來看全量構(gòu)造方法和若干和上面有關(guān)的公有函數(shù).
//初始化一個Phaser,指定parent,指定未到來的參與者數(shù)(unarrived parties),但這只是一個初值, //當我們在任何時候調(diào)用注冊方法時,還會相應(yīng)的增加. public Phaser(Phaser parent, int parties) { if (parties >>> PARTIES_SHIFT != 0) //太大了,超過了后16位能表示的整數(shù). throw new IllegalArgumentException("Illegal number of parties"); //初始phase為0. int phase = 0; this.parent = parent; if (parent != null) { //1.有parent的情況,共享parent的root,隊列,并向parent中注冊1個parties和unarrived, //同時同步一次phase(表面上是同步了parent的,實際上前面已經(jīng)看過,會同步root). final Phaser root = parent.root; this.root = root; this.evenQ = root.evenQ; this.oddQ = root.oddQ; if (parties != 0) phase = parent.doRegister(1); } else { //2.無parent的情況,root就是this,并初始化奇偶等待隊列.它使用原子引用的形式存放一個QNode,而QNode我們前面已介紹. this.root = this; this.evenQ = new AtomicReference(); this.oddQ = new AtomicReference (); } //統(tǒng)一初始化state,后32位的決定依托于parties,如果parties是0則給予EMPTY,直接不管高32位. //不為0則給予phase設(shè)置為前32位,parties設(shè)置parties位和unarrived位. this.state = (parties == 0) ? (long)EMPTY : ((long)phase << PHASE_SHIFT) | ((long)parties << PARTIES_SHIFT) | ((long)parties); } //注冊方法,就是調(diào)用doRegister,參數(shù)1. //它會向this添加一個unarrived的party,如果正巧root正在進行advance,它需要等待下個phase. //如果this有parent,且它之前沒有過注冊的parties,則首次注冊會觸發(fā)自身向parent的注冊. //如果this已經(jīng)終止了,那么嘗試注冊將會無效并返回負值.如果注冊的數(shù)量大于了最大支持parties(后16位整數(shù)), //會拋出IllegalStateException public int register() { return doRegister(1); } //批量注冊指定的信號量,并返回最新的phase.規(guī)則基本同上. public int bulkRegister(int parties) { if (parties < 0) throw new IllegalArgumentException(); if (parties == 0) //參數(shù)0直接查詢最新的phase返回 return getPhase(); return doRegister(parties); } //arrive一個信號,不等待其他arrive事件,返回最新phase(終止態(tài)為負). //當前Phaser的arrive事件已滿,則對parent來說也會觸發(fā)一個arrive.(如果有parent) public int arrive() { return doArrive(ONE_ARRIVAL); } //arrive并解除一個注冊parties,也不阻塞等待其他arrive.如果當前Phaser的解除注冊操作 //將parties減至0,且this有parent,這將導致parent也減少一個parties(本phaser解除在parent的注冊). public int arriveAndDeregister() { return doArrive(ONE_DEREGISTER); }
接下來要看上面已經(jīng)做足了鋪墊的等待方法了,并結(jié)合前面的隊列一塊看.
//令當前線程"到達"此phaser并等待其他parties,它等效于awaitAdvance(arrive()). //注意,按照道格的注釋,如果你在一個未進行注冊(調(diào)用register)的線程里調(diào)用此方法其實是一個使用錯誤, //但是從本方法和前面以及后面有關(guān)的方法來看,所有記錄線程的方法均只與arrive和等待有關(guān),與注冊無關(guān). //因此Phaser本身無法規(guī)避這種使用錯誤,我們完全可以使用另一個線程去注冊,而當前線程去arrive,將兩個動作分開. //方法會返回arrive時最新的phase號.終止時會是負值. public int arriveAndAwaitAdvance() { //記錄root,開始循環(huán). final Phaser root = this.root; for (;;) { //1.預計算,首先同步state long s = (root == this) ? state : reconcileState(); //計算phase int phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) //已終結(jié)直接返回最終phase. return phase; //計算counts,unarrived int counts = (int)s; int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); if (unarrived <= 0) //已經(jīng)沒有空余的unarrived信號了,不能再調(diào)用arrive,拋出異常. throw new IllegalStateException(badArrive(s)); //2.減余arrive的有關(guān)邏輯.嘗試cas減去一個arrive if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= ONE_ARRIVAL)) { if (unarrived > 1) //2.1當前要減的信號不是本Phaser的最后一個信號量,調(diào)用root的等待方法.參數(shù)2是node,傳空. return root.internalAwaitAdvance(phase, null); if (root != this) //2.2當前要減的信號量是非root的Phaser的最后一個,遞歸給parent(雖然用了return,但是parent也可能在進入2.1后阻塞). return parent.arriveAndAwaitAdvance(); //2.3當前要減的信號量是root的最后一個. //2.3.1準備計算下一個狀態(tài),先取出state的parties信息. long n = s & PARTIES_MASK; //計算nextUnarrived,它是現(xiàn)在的parties. int nextUnarrived = (int)n >>> PARTIES_SHIFT; //2.3.2前進phase邏輯. if (onAdvance(phase, nextUnarrived)) //需要終止,給新state的計算基石n加上終止標記. n |= TERMINATION_BIT; else if (nextUnarrived == 0) //計算的nextUnarrived是0,即沒有parties,加上空標記位. n |= EMPTY; else //下一輪能正常進行,加上nextUnarrived位. n |= nextUnarrived; //2.3.3給n加上下一個phase. int nextPhase = (phase + 1) & MAX_PHASE; n |= (long)nextPhase << PHASE_SHIFT; if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n)) //用n進行cas不成功,將新的phase返回. //說明一下,因為方法執(zhí)行到此前已經(jīng)執(zhí)行過2的入口cas,減去了最后一個unarrived,因此在2到此的過程中若有新的注冊, //它內(nèi)部會讀到0個unarrived,就會等待下一個phase(參考前面介紹過的注冊方法),因此cas失敗不會是因為2之后有新的注冊. //在整個arrive系列的方法中,最后一次arrive發(fā)生后,本Phaser不可能有其他線程再去執(zhí)行類似2處的減余的情況. //故出現(xiàn)這種情況的原因目前來看有二,一是還未介紹的強制關(guān)閉Phaser的方法,此時也會突兀地改掉state造成cas恰巧失敗,二是 //出現(xiàn)一些用戶做出的奇葩行為,比如重寫了其他公有方法.我們自然忽略第二種情況,doug大神也是簡單注釋了一個"terminated". return (int)(state >>> PHASE_SHIFT); // terminated //cas成功,釋放等待隊列中的線程,返回下一個phase(因為在此過程中的register會等到advance,此時的phase已經(jīng)是nextPhase了). releaseWaiters(phase); return nextPhase; } //3.減余失敗說明出現(xiàn)競態(tài),直接開啟下輪循環(huán)重新減余. } } //等待當前Phaser從給定的phase前進結(jié)束,如果當前phase不等于給定的phase,或者Phaser已終止立即返回. //1.傳入phase為負,返回它本身. //2.傳入的phase不是最新的phase,返回最新的. //3.傳入了最新的phase,等待到advance并返回advance后的phase. public int awaitAdvance(int phase) { final Phaser root = this.root; long s = (root == this) ? state : reconcileState(); int p = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; if (p == phase) //匹配成功,等root前進.參數(shù)node為null return root.internalAwaitAdvance(phase, null); return p; } //參考前面的幾個方法,區(qū)別是可擾動. public int awaitAdvanceInterruptibly(int phase) throws InterruptedException { final Phaser root = this.root; long s = (root == this) ? state : reconcileState(); int p = (int)(s >>> PHASE_SHIFT); if (phase < 0) //1.參數(shù)phase小于0直接返回它本身. return phase; if (p == phase) { //2.參數(shù)phase匹配,回憶一個前面介紹的QNode,匹配當前Phaser和phase,配置為可擾動且不計時. QNode node = new QNode(this, phase, true, false, 0L); //3.放入root的等待隊列阻塞. p = root.internalAwaitAdvance(phase, node); if (node.wasInterrupted) //4.等待結(jié)束,判斷是否是擾動造成的結(jié)束,前面介紹過QNode的相關(guān)邏輯, //它實現(xiàn)了ForkJoinPool.ManagedBlocker,因此在managedBlock方法進行時, //會循環(huán)調(diào)用問詢是否能release,當我們配置了可擾動且擾動了,就會標記這個wasInterrupted,釋放線程引用并返回. //發(fā)現(xiàn)此種情況拋出異常. //同時,當發(fā)現(xiàn)等待成功,也會結(jié)束,釋放線程引用并返回,但不帶有擾動標記. throw new InterruptedException(); } //5.返回1處之前讀取的phase或3處得到的最新phase值. return p; } //同上方法,但帶有計時. public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { long nanos = unit.toNanos(timeout); final Phaser root = this.root; long s = (root == this) ? state : reconcileState(); int p = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; if (p == phase) { //不同于上面方法的地方,建立的QNode帶有計時和等待時長. QNode node = new QNode(this, phase, true, true, nanos); p = root.internalAwaitAdvance(phase, node); if (node.wasInterrupted) //被擾動的情況. throw new InterruptedException(); else if (p == phase) //時間到了phase沒有前進,超時. throw new TimeoutException(); } return p; }
前面的幾個核心方法粗略過完,補充一些重要內(nèi)容.
首先在前面曾分析過有線程阻塞等待下一個phase的情況,并沒有加上定時等待的考慮.在超時的情況下,阻塞的線程可能會收到異常并退出.
建立QNode可以限定是否定時和可擾動,這取決于我們使用哪個方法去await.
除最后一個線程arrive外,所有線程調(diào)用這些方法都會減少一個arrive并加入等待隊列,直到(1)配置了定時且超時,(2)當前是可擾動等待且使用了Thread.interrupt(),(3)最后一個線程使用上述方法或arrive方法,使得Phaser前進了一個輪次,internalWaitAdvance結(jié)束.其中(1)(2)均會遷成arrive線程拋出異常,只有(3)才是正常的情況.
QNode前面已介紹,它是一個blocker,需要調(diào)用ForkJoinPool::managedBlock才會起作用(顯然root的internalAwaitAdvance必然與此方法有關(guān)聯(lián)).當然這個作用與任務(wù)是否運行在ForkJoinPool無關(guān),如果等待phaser前進的線程是運行在ForkJoinPool中的ForkJoinWorkerThread,顯然會在internalAwaitAdvance期間進行補償.這一塊可參考前面的"CompletableFuture與響應(yīng)式編程"和"ForkJoin框架之ForkJoinPool"兩篇文章.
另外,這些代碼也再次說明了root的作用: (1)對一切非root的Phaser進行等待都會用root的internalAwaitAdvance;(2)每次注冊或arrive一定會同步root的最新phase.
其中(1)也間接說明了為什么構(gòu)建Phaser時只有root創(chuàng)建等待隊列,所有子Phaser共享.
上面還保留了一個疑問,提到了"強制關(guān)閉Phaser"造成arriveAndAwaitAdvance出現(xiàn)cas失敗的問題,doug大神直接注釋了一個terminated,我們馬上來看這一塊,以及一些周邊的公共函數(shù),加深理解,然后再來解決關(guān)于等待隊列最后的一些問題.
//強制關(guān)閉Phaser,讓Phaser進入終止態(tài),但是這個過程不影響它已注冊的parties,如果此Phaser是 //一個Phaser樹中的成員,那么所有phaser集中的Phaser都會關(guān)閉,如果它已經(jīng)關(guān)閉,此方法無效.此方法可以 //用于若干任務(wù)出現(xiàn)意料之外異常的情況下的協(xié)調(diào)恢復. public void forceTermination() { // Only need to change root state final Phaser root = this.root; long s; //已是終止態(tài)直接忽略. while ((s = root.state) >= 0) { //直接嘗試給root的state加上終止位.顯然加上了它,子Phaser在注冊和arrive等方法同步回新的phase就是個負數(shù), //因此更改root的phase為負相當于判了所有Phaser的死刑.唯一需要解決的是已經(jīng)阻塞在root.internalAwaitAdvandce的線程. if (UNSAFE.compareAndSwapLong(root, stateOffset, s, s | TERMINATION_BIT)) { // 加上終止位成功,先后喚醒偶數(shù)等待隊列和奇數(shù)等待隊列. releaseWaiters(0); // Waiters on evenQ releaseWaiters(1); // Waiters on oddQ //返回 return; } } } //返回當前phase,直接用root的state去取. public final int getPhase() { return (int)(root.state >>> PHASE_SHIFT); } //查詢注冊的parties數(shù)量.調(diào)用前面介紹過的partiesOf public int getRegisteredParties() { return partiesOf(state); }
//查詢已經(jīng)arrived的parties數(shù)量.調(diào)用介紹過的arriveOf
public int getArrivedParties() { return arrivedOf(reconcileState()); } //查詢未arrive的parties數(shù)量,調(diào)用前面介紹過的unarrivedOf public int getUnarrivedParties() { return unarrivedOf(reconcileState()); } //返回parent public Phaser getParent() { return parent; } //返回root public Phaser getRoot() { return root; } //判斷當前Phaser是否終止,直接取root的state是否為負,可見,終止態(tài)完全取決于root. public boolean isTerminated() { return root.state < 0L; }
這些方法都比較簡單,只有forceTermination需要再強調(diào)一翻,前面介紹arrayAndAwaitAdvance時曾提過在減去最后一個unarrived信號后去cas到下一個phase失敗的情況,doug大神簡單注釋了一句terminated,直接返回了當前的phase(顯然只能是負),在周邊方法重重加鎖的前提下,那一次cas的失敗唯一一處就是強制關(guān)閉,因為它只改關(guān)閉標記位,相當于動了phase,而沒有動unarrived標記位和parties標記位.所以重寫Phaser的方法要謹慎,很可能不小心打破了這個封裝.
從上面的有關(guān)方法可以看出,子Phaser的終止態(tài)嚴重依賴于root,目前可以確定的是root的phase一旦表現(xiàn)出終止態(tài),所有新來的注冊,arrive,arrive并await將會立即返回,唯一需要關(guān)注的就是root被設(shè)置了終止標記后,正陷入等待的線程怎么辦的問題.
我們下面就來看Phaser的等待機制,這里面又能見到道格大神非常有趣的玩法.
//工具方法,移除某個phase的等待者. private void releaseWaiters(int phase) { QNode q; //保存隊列中的隊首 Thread t; // 保存線程引用. //取隊列,用phase的奇偶決定,phase是偶數(shù)就取偶數(shù)隊列,否則取奇數(shù)隊列.而這個phase其實只用來取隊列了,后續(xù)的操作與它無關(guān). AtomicReferencehead = (phase & 1) == 0 ? evenQ : oddQ; //循環(huán),找出所有phase不等于root的phase的(其實root是最大的,所以就是找出非最新phase加入進來的waiter QNode) while ((q = head.get()) != null && q.phase != (int)(root.state >>> PHASE_SHIFT)) { //找出了,利用原子引用將head指向next. if (head.compareAndSet(q, q.next) && (t = q.thread) != null) { //發(fā)現(xiàn)阻塞者,喚醒線程.回憶下前面實現(xiàn)blocker方法中的isReleaseble和block方法都有將線程置空的操作.(三種情況,喚醒擾動超時都會置空) //但是那些方法并沒有將代表該阻塞線程的QNode移除隊列,因此可能會發(fā)現(xiàn)thread已經(jīng)是null(代表無阻塞者)的情況,只需要移除隊列即可. q.thread = null; LockSupport.unpark(t); } } } //上面releaseWaiters方法的一個變種,但它只會處理遍歷過程中位于頭部的元素,出現(xiàn)正常的等待節(jié)點就會立即返回. //此方法在這一塊可以有效的減少內(nèi)存的占用.退出時返回當前的phase. private int abortWait(int phase) { //同樣,參數(shù)phase只是用來選擇要處理的隊列. AtomicReference head = (phase & 1) == 0 ? evenQ : oddQ; for (;;) { Thread t; QNode q = head.get(); //計算最新phase的值p int p = (int)(root.state >>> PHASE_SHIFT); if (q == null || ((t = q.thread) != null && q.phase == p)) //1.出現(xiàn)q為null代表整隊列元素已出隊,直接返回p; //或者在出隊過程中head(q)記錄的線程引用還在,說明未超時或擾動,且是本phase的等待節(jié)點,終止循環(huán)并返回最新phase. return p; if (head.compareAndSet(q, q.next) && t != null) { //進入條件,參考1的條件,因為1會直接返回.故進入2的條件其實是q非空且處于舊的phase.只有這種情況才可以出隊. //2.將q出隊,置空線程引用,釋放線程. q.thread = null; LockSupport.unpark(t); } } } //計算有效cpu,控制自旋. private static final int NCPU = Runtime.getRuntime().availableProcessors(); //常量,每輪arrive等待的字旋數(shù),取決于NCPU,小于2則取1,不小于2取2的8次冪. static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8; //珊珊來遲的內(nèi)部等待方法.它可能會一直阻塞到phase的advance發(fā)生(除非取消了等待). //此方法僅限r(nóng)oot調(diào)用.參數(shù)phase表示當前的phase,參數(shù)node表示等待節(jié)點,用于追蹤節(jié)點的擾動或超時. //如果是null,表示是一次不可擾動的等待.返回值為當前最新的phase. private int internalAwaitAdvance(int phase, QNode node) { // 1.調(diào)用releaseWaiters,傳入?yún)?shù)phase的前一個phase,顯然這只是決定釋放哪一個隊列.參數(shù)絕對實時準確的情況下會先將老的隊列釋放掉. releaseWaiters(phase-1); //節(jié)點入隊標記,入隊了就會變?yōu)閠rue boolean queued = false; //記錄每一輪循環(huán)的unarrived數(shù)量,用于決定是否擴增自旋等待次數(shù). int lastUnarrived = 0; //自旋數(shù),參考上面的計算邏輯. int spins = SPINS_PER_ARRIVAL; long s; int p; //開啟循環(huán),直到phase前進為止或內(nèi)部判斷已取消等待. while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) { //2.傳入node是null,即非可擾動的模式邏輯.只有非可擾動模式才有自旋. if (node == null) { //2.1每輪自讀進入都會嘗試計算新的unarrived,如果發(fā)現(xiàn)出現(xiàn)了變動(變大或者變小), //會將它保存到前面的lastUnarrived. int unarrived = (int)s & UNARRIVED_MASK; if (unarrived != lastUnarrived && (lastUnarrived = unarrived) < NCPU) //發(fā)現(xiàn)新變化的unarrived head = (phase & 1) == 0 ? evenQ : oddQ; QNode q = node.next = head.get(); //這一行不起眼的if條件代碼真的是一個悄無聲息解決了一個大暗坑的地方,后面說. if ((q == null || q.phase == phase) && (int)(state >>> PHASE_SHIFT) == phase) //double check避免臟入隊,入隊條件是(1)無頭,(2)或者頭元素的phase等于參數(shù)phase(因為相鄰的兩個phase絕對不會入同一個隊). //滿足(1)(2)的同時,還要滿足(3),參數(shù)phase就是當前的state表示的phase(因為此方法只能root使用,故為root表示的最新phase). //條件滿足,入隊,取代原來的head,原來head代表的node成為node的next.而條件不滿足進入下一循環(huán),很可能while條件就不滿足了退出循環(huán). queued = head.compareAndSet(q, node); } //5.已經(jīng)在某一輪循環(huán)入隊了,使用ForkJoinPool的managedBlock管理block,其間可能會釋放線程引用. else { try { //5.1它內(nèi)部也有循環(huán),且會調(diào)用前面看到過的isReleasable和block實現(xiàn),顯然它一旦結(jié)束(包含擾動),一定會造成下輪外循環(huán)終止于3處. ForkJoinPool.managedBlock(node); } catch (InterruptedException ie) { //5.2出現(xiàn)擾動異常catch住,并保存.下輪循環(huán)也會終止在3處. node.wasInterrupted = true; } } } //6.走出上面的while循環(huán),可能是root已經(jīng)advance到下一個phase(2前的循環(huán)),也可能是傳入node的情況下出現(xiàn)了擾動或超時(5)造成(3)滿足 if (node != null) { //6.1node存在代表可能已經(jīng)壓入隊列,結(jié)果要么是已出現(xiàn)擾動或超時(方法結(jié)束后會拋出異常),要么是已正常完成. //顯然,代碼執(zhí)行到此處就要返回了,阻塞的線程會拋出異常結(jié)束(超時或擾動)或繼續(xù)執(zhí)行(正常advance), //沒有必要去嘗試喚醒能執(zhí)行出前面while循環(huán)到達6馬上要返回的線程. if (node.thread != null) //6.2取消node中的線程引用,避免外面的線程嘗試喚醒. node.thread = null; // avoid need for unpark() if (node.wasInterrupted && !node.interruptible) //6.3如果node本身設(shè)置了不可被擾動,但5.2處判斷線程本身拋出了擾動異常,卻被catch住了,此處擾動本線程. Thread.currentThread().interrupt(); if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase) //6.4發(fā)現(xiàn)phase并未前進.還是參數(shù)傳入的pahse,說明一定是擾動或超時的結(jié)果,abortWait對本phase使用的隊列進行清理, //而清理的目標前面已論述過,是本隊列頭部開始的早于本phase的元素.(發(fā)現(xiàn)一個不滿足條件的就停止了清理). return abortWait(phase); // possibly clean up on abort } //7.退出上面的while循環(huán)一定會到此幫助釋放早于最新階段的waiter.注意,是早于最新phase的,參數(shù)phase只是決定了選哪個隊列(奇偶). //如果是6.4代表的那種擾動超時情況,此處其實釋放的是舊的結(jié)果.被喚醒的線程其實一般是執(zhí)行在5.1處阻塞的.當前線程能運行到此絕對不需要喚醒. releaseWaiters(phase); return p; }
到此Phaser的代碼解析已完畢,我們來分析關(guān)于隊列,等待和喚醒的問題.
1.Phaser維護了兩個"隊列",不論加入等待隊列還是彈出等待隊列,都是從頭部進行,新加入的成員會成功隊列的新頭,原來的頭會成為它的next,彈出時next成為新頭.所以相當于一個對頭部的"后進先出",考慮官方起名和注釋,我們依舊保持隊列這個稱呼.
2.喚醒時,會從隊列的頭部依次彈出node?的phase早于root的最新phase的node,.
3.等待時,入隊的node成為新的頭.
4.當輪次增加時,會使用和本輪不同的隊列增加元素,同時也會喚醒本輪中等待的node.
因為喚醒和等待同時進行,且各自操作各自的隊列(不同的phase),因此彼此之間沒有競態(tài)(盡管一個是頭入一個是頭出),可以說設(shè)計巧妙,下面我們來腦洞大開,思考一個極端情況.
我們假設(shè)一種極端的phase切換場景,奇數(shù)phase大量等待入隊,偶數(shù)phase則迅速完成.假設(shè)當前phase對應(yīng)的隊列是奇數(shù)對列,輪次提升完成后,它去釋放當前的隊列元素,結(jié)果未等這個釋放操作執(zhí)行完畢,偶數(shù)隊列的輪次很快執(zhí)行完,奇數(shù)隊列中積壓了成千上萬個node未能釋放,輪次卻又切回到了奇數(shù)隊列,會出現(xiàn)什么事?
顯然奇數(shù)隊列如果一直保持這種極端場景,它會越來越龐大,逼近撐爆內(nèi)存的同時,大量線程也會得不到釋放,甚至于老一輪的線程需要等待新一輪的線程去釋放.為什么老一輪的線程會去等待新一輪的線程釋放呢?
releaseWaiter的方法我們已經(jīng)看出,它只會釋放phase早于最新的node,此時最新壓入的元素屬于當前最新的phase,顯然不滿足條件,那么會造成奇數(shù)隊列中兩輪前壓入的元素不能得到清除,兩輪前就在釋放當時積壓node的線程(那一輪最后一個arrive)發(fā)現(xiàn)不符合清理條件,就直接return并終止了,只能等待本輪最后一個arrive出現(xiàn)后繼續(xù)進行釋放.如果本輪最后一個arrive出現(xiàn)很晚,在下一輪依舊保持如此極端,往返數(shù)輪,確實會導致奇數(shù)隊列中積壓大量node,且第一輪就在等待該輪次結(jié)束的線程早就滿足了釋放條件(升到了2輪),事實上可能是第n輪才得到釋放,這還符合Phaser的定義嗎?我們使用它,就是要保證每一輪多帶帶使用,每一輪次達到條件,線程釋放并執(zhí)行,下一輪次是下一輪次.
然而doug的代碼就是這個樣子,想遍各種極端,覺得可能找到了bug,那么就需要仔細思考了.作者來簡述一下這個趟坑的分析過程.
這個問題確實已經(jīng)得到了極大的規(guī)避了,畢竟是個極端情況.
1.線程的喚醒真的很快,盡管此處除了喚醒還包含了原子引用的更新(每次出隊都要cas).
2.如果沒有注冊,顯然就沒有arrive相關(guān)的情況,盡管可以多帶帶調(diào)用,但必須保證在arrive時傳入的數(shù)量此時已經(jīng)注冊了,因此每一輪次(phase)中可能積壓等待喚醒的線程的操作一定是在注冊之后,但是我們回憶一下,注冊方法的第一步就是要等待完成advance,而且傳給internalAwaitAdvance的node會是null,即不能擾動和超時,所以當本輪次阻塞了一定數(shù)量的線程后,如果不去arrive,也不考慮超時和擾動的情況,那么線程將一直阻塞.我們不可能在輪次advance前進行注冊,也就不可能在advance之前進行新一phase的arrive.
3.當本輪次的最后一個arrive線程觸發(fā)了輪次的更新后,才可以開啟注冊以及新輪次的arrive,但是此時使用了另一個等待隊列,而觸發(fā)了輪次更新的上一輪的arrive線程將會立即進行前一個隊列中積壓的線程的喚醒操作.只有該喚醒操作足夠慢,且新的輪次極快就完成了的情況,才可能造成在原arrive線程未能及時釋放奇數(shù)隊列的情況下,新一輪次再次向其中添加元素.
4.最重要的還在上面的internalAwaitAdvance方法,那一段被作者標上了入隊條件的注釋處,要想入隊,必須if ((q == null || q.phase == phase) &&加上后面的條件,而這兩個條件的限定已經(jīng)很明顯,要想入隊,必須滿足該等待隊列沒有元素或者隊首是本輪的元素,而該方法又是下一輪首次注冊時必須等待完成的,下一輪的arrive又必須發(fā)生在下一輪的首次注冊之后,因此根本不會出現(xiàn)本輪wait的線程還要等下一輪甚至下N輪的線程去釋放的極端情況,哪怕真的去做一個極端測試:讓奇數(shù)輪大量積壓線程,讓偶數(shù)輪快速切換,然后測試第一輪壓入的線程到底是不是本輪釋放的.(作者差點就要寫單元測試去做這個極端測試了!)
這一段不經(jīng)意的if,一個小小的條件,如果不注意真的忽略了,小代碼大功效,誰能想到,這么深的暗坑就這樣被規(guī)避了.
總結(jié)前面已經(jīng)詳述了Phaser的源碼以及若干趟坑辛路.其實已經(jīng)沒什么好總結(jié)的了,就在此順便對比常見同步器CyclicBarrier,CountDownLatch,Semaphore的特征和實現(xiàn).
從使用特征上看:
1.CountDownLatch是一次性的,只能初始化決定parties數(shù)量,等待者可以是多個,每次釋放都會減少一個信號量,直到歸0時為止,最后一個釋放者將喚醒其他等待的線程.它也不能繼續(xù)使用.
2.CyclicBarrier是可重用的,分代的,每一代之間彼此獨立,但是每一代的初始parties是相同的,不可在運行期內(nèi)動態(tài)調(diào)整,每一代最后一個線程會去開啟一下代,并可以在此時運行一個用戶指定的action,與此同時喚醒其他線程繼續(xù)執(zhí)行.它可以在運行完一代后繼續(xù)被使用.并且它還支持重置.
3.Semaphore是一個資源量的典型,如果說CountDownLatch和CyclicBarrier或者Phaser都是等到"人夠了"再放行,Semaphore卻是起到限流的作用,它控制了有限的令牌數(shù),這個數(shù)量不可以動態(tài)地更改,在不能acquire到足夠的令牌數(shù)時,線程將阻塞,直到其他線程釋放了足量的令牌數(shù)并喚醒它為止.每一個持有了令牌的線程都可以喚醒阻塞等待獲取的線程.
4.Phaser的功能上不同很明顯,首先它的參與者數(shù)量幾乎時刻可變(除了正在進入下一phase期間),隨時可以增加減少parties數(shù)量,每一phase等待者可以是多個,每一phase中,每個能從internalAwaitAdvance方法中走出循環(huán)的線程都可以幫助喚醒,當然最終能進入喚醒操作還是要歸功于最后一個arrive的線程(盡管它arrive后其他線程醒來后也會幫助喚醒).Phaser的喚醒者不一定是參與者.
從實現(xiàn)來看:
1.CountDownLatch借助了aqs來實現(xiàn)parties的釋放,它使用cas+park的方式,不使用Lock.
2.CyclicBarrier需要借助重入鎖和condition,每一個await的線程都要全局加鎖,阻塞時await在condition上.
3.Semaphore在實現(xiàn)上類似CountDownLatch,也是基于aqs,只不過它允許獲取和釋放,對state有增有減,總量不變.也是cas+park的方式阻塞,也不使用Lock
4.Phaser因為功能的要求,不基于AQS(它不能有構(gòu)建時就固定的state,盡管可以初始化一個state,但它必須支持改變),它依托于原子引用實現(xiàn)了一個內(nèi)部的隊列,相應(yīng)的等待/入隊/喚醒等操作通過cas自旋+park的方式,同樣不使用Lock.并利用雙隊列的方式規(guī)避了前一輪的釋放和后一輪的響醒的阻塞.
此外還有兩點結(jié)合前面的推理和自測驗證的結(jié)論:
1.Phaser中的每一個phase是保證了可見性的,經(jīng)作者自測,在任何使用Phaser的代碼中await前后,不會出現(xiàn)串phase讀出的亂序情況(側(cè)面說明每個phase不會依賴后一個或幾個phase的釋放).
2.Phaser需要對await的線程進行阻塞時,是將它打包成一個node(blocker),利用ForkJoinPool來block的.如果使用Phaser同步的任務(wù)是運行在ForkJoinPool中的,它將會利用到相應(yīng)的補償機制,經(jīng)作者自測,這將保證Phaser中block的每一個任務(wù)必然得到執(zhí)行,每一個阻塞的線程必然得到釋放.
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/75190.html
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設(shè)計模式,設(shè)計了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:本人郵箱歡迎轉(zhuǎn)載轉(zhuǎn)載請注明網(wǎng)址代碼已經(jīng)全部托管有需要的同學自行下載引言講完了和今天講一個跟這兩個類有點類似的移相器中引入了一種新的可重復使用的同步屏障稱為移相器擁有與和類似的功勞但是這個類提供了更加靈活的應(yīng)用和都是只適用于固定數(shù)量的參與者 本人郵箱: 歡迎轉(zhuǎn)載,轉(zhuǎn)載請注明網(wǎng)址 http://blog.csdn.net/tianshi_kcogithub: https://github....
摘要:分層支持分層一種樹形結(jié)構(gòu),通過構(gòu)造函數(shù)可以指定當前待構(gòu)造的對象的父結(jié)點。當一個的參與者數(shù)量變成時,如果有該有父結(jié)點,就會將它從父結(jié)點中溢移除。當首次將某個結(jié)點鏈接到樹中時,會同時向該結(jié)點的父結(jié)點注冊一個參與者。 showImg(https://segmentfault.com/img/remote/1460000016010947); 本文首發(fā)于一世流云專欄:https://segme...
摘要:倒計時鎖,線程中調(diào)用使進程進入阻塞狀態(tài),當達成指定次數(shù)后通過繼續(xù)執(zhí)行每個線程中剩余的內(nèi)容。實現(xiàn)分階段的的功能測試代碼拿客網(wǎng)站群三產(chǎn)創(chuàng)建于年月日。 同步器 為每種特定的同步問題提供了解決方案 Semaphore Semaphore【信號標;旗語】,通過計數(shù)器控制對共享資源的訪問。 測試類: package concurrent; import concurrent.th...
摘要:前言作為一款流行的游戲動畫框架受到很多開發(fā)者的青睞最近筆者在逛意大利開發(fā)者論壇的時候發(fā)現(xiàn)了這款小游戲所以就照著說明做了一下在這里記錄下來開發(fā)準備插件腳本飛刀和靶子的圖像或者這個項目里面有的腳本和需要的圖像文件開始制作搭建基本的項目創(chuàng)建一個 前言 phaser作為一款流行的游戲/動畫框架,受到很多web開發(fā)者的青睞,最近筆者在逛意大利開發(fā)者:emanueleferonato論壇的時候發(fā)現(xiàn)...
閱讀 6213·2021-11-22 15:32
閱讀 828·2021-11-11 16:54
閱讀 3166·2021-10-13 09:40
閱讀 2173·2021-09-03 10:35
閱讀 1843·2021-08-09 13:47
閱讀 1881·2019-08-30 15:55
閱讀 1941·2019-08-30 15:43
閱讀 2463·2019-08-29 17:06