成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

ForkJoin框架之CountedCompleter,工作線程及并行流

msup / 3486人閱讀

摘要:前言在前面的文章框架之中梳理了框架的簡(jiǎn)要運(yùn)行格架和異常處理流程顯然要理解框架的調(diào)度包含工作竊取等思想需要去中了解而對(duì)于的拓展和使用則需要了解它的一些子類前文中偶爾會(huì)提到的一個(gè)子類直譯為計(jì)數(shù)的完成器前文也說(shuō)過(guò)的并行流其實(shí)就是基于了框架實(shí)現(xiàn)因此

前言

在前面的文章"ForkJoin框架之ForkJoinTask"中梳理了ForkJoin框架的簡(jiǎn)要運(yùn)行格架和異常處理流程,顯然要理解ForkJoin框架的調(diào)度,包含工作竊取等思想,需要去ForkJoinPool中了解,而對(duì)于ForkJoinTask的拓展和使用則需要了解它的一些子類,前文中偶爾會(huì)提到ForkJoinTask的一個(gè)子類:CountedCompleter,直譯為計(jì)數(shù)的完成器.

前文也說(shuō)過(guò),JAVA8的并行流其實(shí)就是基于了ForkJoin框架實(shí)現(xiàn),因此并行流其實(shí)就在使用我們前面提到的工作竊取和分治思想.為了方便對(duì)于ForkJoinTask的理解,本文將詳述CountedCompleter(同時(shí)在ForkJoinPool中也需要了解它),以及前文提到的工作線程ForkJoinWorkerThread,并簡(jiǎn)單看一看并行流.

CountedCompleter源碼

根據(jù)doug的注釋,CoutedCompleter是一個(gè)特殊的ForkJoinTask,它會(huì)在觸發(fā)完成動(dòng)作時(shí),檢查有沒有掛起action,若沒有則執(zhí)行一個(gè)完成動(dòng)作.這個(gè)概念有些抽象,必須結(jié)合源碼和源碼作者給出的示例加以理解,同樣的,理解了它,也就理解了CountedCompleter的擴(kuò)展類的實(shí)現(xiàn)方式,從而能閱讀懂有關(guān)的源碼(如并行流中涉及到運(yùn)行集拆分,結(jié)果合并,運(yùn)算調(diào)度等源碼).

它也是一個(gè)抽象類,基于ForkJoinTask的exec函數(shù)進(jìn)行了若干擴(kuò)展.

public abstract class CountedCompleter extends ForkJoinTask 

//任務(wù)的完成者,很明顯這是一個(gè)全局的棧結(jié)構(gòu)(暫時(shí)這么理解吧,其實(shí)也不太嚴(yán)格).
final CountedCompleter completer;
//重要字段,代表完成前掛起的任務(wù)數(shù)量,用volatile修飾.
volatile int pending;
//帶有completer的構(gòu)造器.
protected CountedCompleter(CountedCompleter completer) {
    this.completer = completer;
}
//不帶completer的構(gòu)造器
protected CountedCompleter() {
    this.completer = null;
}
//抽象的compute方法,它是類似ForkJoinTask的擴(kuò)展方式.
public abstract void compute();
//重寫的exec方法
protected final boolean exec() {
    //直接調(diào)用compute方法并返回false.回到ForkJoinTask類中的doExec方法,可以看到
    //調(diào)用了exec后若得到true值,將會(huì)執(zhí)行setCompletion(NORMAL)動(dòng)作.且該動(dòng)作將在首次喚醒等待結(jié)果的線程.
    //此處return了false,將不去執(zhí)行上述操作.詳情參考上篇文章.
    compute();
    return false;
}

以上是CountedCompleter的簽名,字段,構(gòu)造器和核心的抽象方法compute,其實(shí)整個(gè)CountedCompleter就是在圍著這點(diǎn)東西轉(zhuǎn),首先看一看與ForkJoinTask的結(jié)合.

顯然,CountedCompleter簡(jiǎn)單重寫了ForkJoinTask的exec方法簡(jiǎn)單調(diào)用抽象的compute方法并返回false,當(dāng)出現(xiàn)異常時(shí),流程不變,但當(dāng)compute方式正常完成的情況,將不可能進(jìn)行父類后續(xù)的設(shè)置完成和喚醒操作.因此它必須由CountedCompleter自定義的完成.

而CountedCompleter也確實(shí)暴露了一些公有函數(shù),但是調(diào)用的時(shí)機(jī)卻要用戶繼承它之后決定.我們先來(lái)繼續(xù)一些輔助源碼并理解Completer的設(shè)計(jì)理念,稍后再來(lái)看它的完成方法.

//onCompletion勾子方法,默認(rèn)空實(shí)現(xiàn).
//CountedCompleter在tryComplete方法中會(huì)在符合完成的第一個(gè)條件(無(wú)掛起任務(wù))的情況下執(zhí)行它.
//complete方法也會(huì)對(duì)它有無(wú)條件地調(diào)用.
//關(guān)于這兩個(gè)方法稍后詳述.
//它的實(shí)現(xiàn)取決于要實(shí)現(xiàn)的操作,并行流中的一些ops會(huì)在此處進(jìn)行一些中間結(jié)果處理,比如結(jié)果集的合并(reduce操作).
public void onCompletion(CountedCompleter caller) {
}

//重寫ForkJoinTask中的方法.上篇源碼分享文章中提過(guò),在ForkJoinTask的setExceptionalCompletion會(huì)調(diào)用internalPropagateException
//傳遞異常,而且是個(gè)空實(shí)現(xiàn),而在CountedCompleter中實(shí)現(xiàn)了該方法,并在內(nèi)部調(diào)用onExceptionalCompletion
void internalPropagateException(Throwable ex) {
    CountedCompleter a = this, s = a;
    //循環(huán)判斷每一個(gè)task是否要傳遞異常給它的completer
    //無(wú)方法體的while循環(huán).道格大神的代碼神跡.
    while (a.onExceptionalCompletion(ex, s) &&
            //要傳遞給completer且具備completer且completer還不是完成態(tài)(正?;蚍钦?
           (a = (s = a).completer) != null && a.status >= 0 &&
            //則令completer去記錄異常完成,若記錄成功則進(jìn)入下一輪循環(huán).
           a.recordExceptionalCompletion(ex) == EXCEPTIONAL)
        ;
    //因?yàn)閛nExceptionalCompletion固定返回true,若沒有中間完成的任務(wù),直到最后一個(gè)completer,也就是root,
    //root不具備completer,將中斷循環(huán).
}

//異常完成勾子方法.
//按上一節(jié)的概念,當(dāng)ForkJoinTask執(zhí)行出錯(cuò),即exec->compute出錯(cuò)時(shí),最終會(huì)調(diào)到此勾子.或當(dāng)手動(dòng)completeExceptionally或cancel時(shí).
public boolean onExceptionalCompletion(Throwable ex, CountedCompleter caller) {
    //直接返回true,顯然也是一個(gè)供擴(kuò)展的方法.返回true代表異常應(yīng)該傳遞給this的completer.
    return true;
}

//返回completer
public final CountedCompleter getCompleter() {
    return completer;
}

//返回掛起任務(wù)數(shù)量.
public final int getPendingCount() {
    return pending;
}

//設(shè)置掛起任務(wù)數(shù)量
public final void setPendingCount(int count) {
    pending = count;
}

//原子地為掛起任務(wù)數(shù)量添加delta
public final void addToPendingCount(int delta) {
    U.getAndAddInt(this, PENDING, delta);
}

   //原子地將當(dāng)前掛起任務(wù)數(shù)量從expected更改到count
public final boolean compareAndSetPendingCount(int expected, int count) {
    return U.compareAndSwapInt(this, PENDING, expected, count);
}

//將當(dāng)前任務(wù)的掛起數(shù)量原子減至0.
public final int decrementPendingCountUnlessZero() {
    int c;
    do {} while ((c = pending) != 0 &&
                 !U.compareAndSwapInt(this, PENDING, c, c - 1));
    return c;
}

//返回root completer.邏輯很簡(jiǎn)單.
public final CountedCompleter getRoot() {
    CountedCompleter a = this, p;
    while ((p = a.completer) != null)
        a = p;
    return a;
}

以上是幾個(gè)工具函數(shù),邏輯也很簡(jiǎn)單,僅有一處可能留有疑問:完成態(tài)/異常態(tài)是如何傳遞的.

現(xiàn)在大家應(yīng)該理解為什么ForkJoinTask要將internalPropagateException置為空實(shí)現(xiàn)了,顯然,對(duì)于不同方式的實(shí)現(xiàn),確實(shí)需要不同的傳遞行為.CountedCompleter保存了一個(gè)類似"棧結(jié)構(gòu)"的任務(wù)鏈,雖然提前講到棧底即為root任務(wù)(當(dāng)然root在底部還是頂部本身不重要),顯然任何一個(gè)子任務(wù)出現(xiàn)了問題,與它關(guān)聯(lián)的父任務(wù)的行為顯然要有一個(gè)明確的由子類定義的規(guī)則.

我們看到在重寫的internalPropagateException方法中,不停地判斷當(dāng)前任務(wù)是否要將異常信號(hào)傳遞給鏈上的下一個(gè)任務(wù)(on方法始終返回true,沒關(guān)系我們可以在子類中重寫),然后讓未完成的completer去記錄同一個(gè)異常ex.

那么問題來(lái)了,只要completer已完成過(guò)(正常完成過(guò)異常完成或取消),顯然while循環(huán)中斷,completer和它的后續(xù)completer將不會(huì)被處理(1).同樣,若傳遞異常的任務(wù)本身就是另一個(gè)或幾個(gè)任務(wù)的completer,它的異常信息顯然不會(huì)反向傳遞(2).

對(duì)于問題(1),顯然如果后續(xù)的completer已出現(xiàn)過(guò)異常,必然也會(huì)走一遍同樣的邏輯,傳遞給后面的completer,如果它正常完成,也必然要有相應(yīng)向后傳遞的行為,否則無(wú)法解決(1),我們接下來(lái)即論述相關(guān)方法.

對(duì)于問題(2),顯然問題(1)中描述的情況與此有所交集,如果我們建立了一個(gè)CountedCompleter任務(wù),并在compute方法中大肆fork子任務(wù)入隊(duì),fork之后不等子任務(wù)完成,也不獲取子任務(wù)的執(zhí)行結(jié)果,直接將父任務(wù)setCompletion或者setExceptionalCompletion,子任務(wù)還是會(huì)繼續(xù)執(zhí)行的.

為了便于理解,我們繼續(xù)來(lái)看與任務(wù)的完成有關(guān)的方法.

//嘗試完成根任務(wù)或減少棧鏈下游的某一個(gè)completer的掛起數(shù)(包含它自身).
public final void tryComplete() {
    //1.初始用a保存this,后續(xù)為當(dāng)前操作任務(wù),用s保存a.
    CountedCompleter a = this, s = a;
    for (int c;;) {
        //2.第一次進(jìn)入或在6造成競(jìng)態(tài)的某一次循環(huán)中,a(this或this的completer鏈中的某一個(gè))的的掛起任務(wù)數(shù)為0,代表它掛起的任務(wù)都完成了.
        if ((c = a.pending) == 0) {
            //3.a的勾子方法,若已經(jīng)運(yùn)行過(guò)4,且判斷條件為假未能到5并在下一次循環(huán)重新回到3的情況,a!=s且a是s的completer,
            //在對(duì)onCompletion重寫時(shí),可以根據(jù)this與參數(shù)是否相等進(jìn)行判斷,如并行流聚合時(shí)可以根據(jù)這個(gè)條件進(jìn)行結(jié)果集的合并.
            a.onCompletion(s);
            //4.將a指向自己的completer,s指向原來(lái)的a.
            if ((a = (s = a).completer) == null) {
                //5.原來(lái)a的completer不存在,即a不是root,不需要再傳遞了,讓root進(jìn)行quietlyComplete并返回.
                //此時(shí)說(shuō)明整條鏈上的competer掛起任務(wù)全部是0.
                s.quietlyComplete();
                return;
            }
            //隱藏的7.當(dāng)原a的completer存在(a不是root)的情況,繼續(xù)對(duì)該complter判斷掛起任務(wù)數(shù)或嘗試減1,對(duì)下一個(gè)元素開啟下一輪循環(huán).
        }
        //6.對(duì)this的completer棧的某一次循環(huán)時(shí)發(fā)現(xiàn)了掛起任務(wù)數(shù)不為0的,則對(duì)該completer的掛起數(shù)減1,
        //表示它掛起的任務(wù)完成了一個(gè),并返回.若在此時(shí)恰好出現(xiàn)了競(jìng)態(tài),另一條鏈上的任務(wù)搶先減一,則當(dāng)前
        //的a要進(jìn)入下一循環(huán),它可能會(huì)在2處判斷通過(guò),進(jìn)入到鏈上的下一個(gè)completer的傳播邏輯.
        else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
            return;
    }
}

//基本等效于tryComplete,只是不執(zhí)行onCompletion,tryComplete會(huì)在判斷鏈上某個(gè)completer的掛起任務(wù)數(shù)是0立即執(zhí)行onCompletion.
public final void propagateCompletion() {
    CountedCompleter a = this, s = a;
    for (int c;;) {
        if ((c = a.pending) == 0) {
            if ((a = (s = a).completer) == null) {
                s.quietlyComplete();
                return;
            }
        }
        else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
            return;
    }
}


//complete方法,邏輯簡(jiǎn)單,絲毫不考慮掛起數(shù),直接執(zhí)行當(dāng)前task的幾個(gè)完成方法,并嘗試對(duì)completer進(jìn)行tryComplete.
//它不改變自己的掛起任務(wù)數(shù),但會(huì)讓completer對(duì)棧上的其他completer或自身嘗試減少掛起數(shù)或完成root.
public void complete(T rawResult) {
    CountedCompleter p;
    setRawResult(rawResult);//使用參數(shù)設(shè)置為當(dāng)前任務(wù)的結(jié)果,盡管它為空方法.
    onCompletion(this);//直接調(diào)用onCompletion勾子.
    quietlyComplete();//安靜地將status置為NORMAL.
    if ((p = completer) != null)
        //自己不改變自身掛起數(shù),也不嘗試完成root,但讓completer嘗試去向下執(zhí)行這些操作.
        p.tryComplete();
}

//沒辦法多帶帶理解這個(gè)方法名.官方注釋是和nextComplete放置在循環(huán)中使用.
public final CountedCompleter firstComplete() {
    for (int c;;) {
        if ((c = pending) == 0)
            //1.當(dāng)前task沒有掛起任務(wù)數(shù),則返回它.
            return this;
        else if (U.compareAndSwapInt(this, PENDING, c, c - 1))
            //2.否則嘗試減少一個(gè)掛起任務(wù)數(shù)并返回null.但當(dāng)出現(xiàn)競(jìng)態(tài)時(shí),可能導(dǎo)致未能進(jìn)入2而在下一次循環(huán)進(jìn)入1.
            return null;
    }
}

//結(jié)合前面的firstComplete互相理解,它會(huì)對(duì)當(dāng)前任務(wù)判斷是否有completer,有則對(duì)該completer進(jìn)行firstComplete,
//否則將當(dāng)前任務(wù)安靜完成并返回null.
//故結(jié)果只能返回null或completer
public final CountedCompleter nextComplete() {
    CountedCompleter p;
    if ((p = completer) != null)
        //有completer且completer已無(wú)掛起任務(wù)數(shù),則返回completer,
        //有completer且completer有掛起任務(wù)數(shù),則嘗試對(duì)該任務(wù)數(shù)減一并返回null.出現(xiàn)競(jìng)態(tài)則可能返回該completer.
        return p.firstComplete();
    else {
        //無(wú)completer,安靜完成當(dāng)前任務(wù)并返回null.
        quietlyComplete();
        return null;
    }
}

//等同于getRoot().quietlyComplete()
public final void quietlyCompleteRoot() {
    for (CountedCompleter a = this, p;;) {
        if ((p = a.completer) == null) {
            a.quietlyComplete();
            return;
        }
        a = p;
    }
}


//如果當(dāng)前任務(wù)未完成,嘗試去出棧執(zhí)行,并處理至多給定數(shù)量的其他未處理任務(wù),且對(duì)這些未處理任務(wù)
//來(lái)說(shuō),當(dāng)前任務(wù)處于它們的完成路徑上(即這些任務(wù)是completer棧鏈的前置任務(wù)),實(shí)現(xiàn)特殊的工作竊取.
public final void helpComplete(int maxTasks) {
    Thread t; ForkJoinWorkerThread wt;
    if (maxTasks > 0 && status >= 0) {
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            //當(dāng)前線程是ForkJoinWorkerThread,嘗試執(zhí)行當(dāng)前任務(wù)并嘗試從線程的工作隊(duì)列中嘗試幫助前置任務(wù)執(zhí)行.
            (wt = (ForkJoinWorkerThread)t).pool.
                helpComplete(wt.workQueue, this, maxTasks);
        else
            //使用common池的externalHelpComplete方法.
            ForkJoinPool.common.externalHelpComplete(this, maxTasks);
    }
}

上一段代碼總體邏輯不難,有以下幾點(diǎn)總結(jié):

1.顯然tryComplete方法在調(diào)用后的最終結(jié)果只有兩個(gè):自己或completer鏈前方的某一個(gè)completer的掛起任務(wù)數(shù)減1(1),自己或completer鏈前方某一個(gè)completer(root)的quietlyComplete被執(zhí)行(2).簡(jiǎn)單來(lái)說(shuō),就是讓root進(jìn)行quietlyComplete(鏈上每一個(gè)掛起任務(wù)數(shù)都是0)或讓鏈上的某一個(gè)completer減少一個(gè)掛起任務(wù).

2.tryComplete方法只會(huì)對(duì)root進(jìn)行quietlyComplete,進(jìn)而setComplete(NORMAL),對(duì)于鏈上的其他任務(wù),最多會(huì)幫助掛起數(shù)減一,而不會(huì)把它們置為完成態(tài),但是線程池在執(zhí)行任務(wù)時(shí),或者直接對(duì)一個(gè)鏈上的completer進(jìn)行invoke,doExec甚至get等操作時(shí),這些方法會(huì)將該中間completer進(jìn)行setComplete.

3.每一個(gè)CountedCompleter都可能有自己的completer棧鏈,每一個(gè)CountedCompleter也可以位于其他CountedCompleter的棧鏈上且上游不唯一而下游唯一一(倒樹形),任何一條棧鏈只能有一個(gè)root,root的completer為null.

4.從tryComplete方法來(lái)看正常運(yùn)行情況下的規(guī)則,每一個(gè)CountedCompleter的tryComplete只能向前影響到鏈上的另一個(gè)completer,因?yàn)閷?shí)現(xiàn)數(shù)量的增加方法有好幾處,用戶在實(shí)現(xiàn)時(shí),隨時(shí)可能將一些completer的數(shù)量設(shè)置成任意的數(shù),故可以出現(xiàn)前面tryComplete注釋中隱藏的7的情況,即存在一個(gè)completer,它的下一個(gè)completer的掛起數(shù)是0,它卻能將下下個(gè)completer安靜完成或?qū)⑵鋻炱饠?shù)減一,即跨無(wú)掛起數(shù)節(jié)點(diǎn)傳遞.

5.前面列出的helpComplete方法是CountedCompleter的特殊工作竊取方法(或者也不能叫作竊取,因?yàn)榉莄ommon池情況竊取的是自己線程的任務(wù),common池則依賴于一個(gè)探測(cè)值),具體的竊取細(xì)節(jié)在ForkJoinPool中,將在后面的文章中論述,但簡(jiǎn)單的邏輯已經(jīng)在注釋中描述清楚,把它歸到這一塊,也是因?yàn)樗c前面描述的邏輯有所糾葛.124提到了tryComplete的向前影響結(jié)果,而在實(shí)際的應(yīng)用中,我們可能會(huì)有各種各樣的情景,ForkJoin框架無(wú)法阻止我們對(duì)ForkJoinTask的exec函數(shù)進(jìn)行任意式的擴(kuò)展,也無(wú)法阻止我們對(duì)CountedCompleter的compute任意擴(kuò)展,那么如何在我們?nèi)我馔卣沟那榫跋卤3中屎徒?比如下面這個(gè)使用場(chǎng)景:

a.建立一種ForkJoinTask,直接繼承CountedCompleter并重寫compute方法,則它可以運(yùn)行在ForkJoinPool中.

b.我們接下來(lái)在compute方法中多次根據(jù)計(jì)算結(jié)果集的大小進(jìn)行拆分并遞歸fork子任務(wù)入池,父任務(wù)成為子任務(wù)的completer,同時(shí)compute方法自身也負(fù)責(zé)不可拆分的計(jì)算邏輯,并在自身這一塊計(jì)算結(jié)束后,可能等待所有fork入池的子任務(wù)結(jié)束,也可能不等待子任務(wù),直接結(jié)束父任務(wù),讓線程空出來(lái)做其他的事.

c.所有子任務(wù)結(jié)束后,使用一個(gè)合并函數(shù)合并子任務(wù)的結(jié)果集和自身的結(jié)果,并作為最終的結(jié)果.然后tryComplete(如果b中使用了join,或者判斷當(dāng)前任務(wù)是root).

顯然,b中fork出的子任務(wù),也同樣要執(zhí)行bc的邏輯.那么可能出現(xiàn)這樣的情況:

不同的父任務(wù)子任務(wù)在ForkJoinPool最初始?jí)喝氘?dāng)前工作線程的隊(duì)列中,但隨時(shí)可能被其他工作線程甚至外部線程偷去執(zhí)行.

父任務(wù)搶先搶得運(yùn)行資源,運(yùn)行完自己計(jì)算的部分,而入池的子任務(wù)及子孫任務(wù)有大量未完成.

難道父任務(wù)的執(zhí)行線程就這樣干等?在前一篇文章中說(shuō)過(guò),ForkJoin框架適宜多計(jì)算,輕io,輕阻塞的情況,且本身就是為了避免線程忙的忙死餓的餓死,因此每個(gè)任務(wù)等待子任務(wù)執(zhí)行結(jié)束是不可取的,這或許也是為什么有了ForkJoinTask,卻還要有CountedCompleter的原因之一吧.

若我們?cè)谌魏蚊恳粋€(gè)任務(wù)中只是單純地將該分出去的子任務(wù)fork入池并執(zhí)行自己那一部分,并不讓當(dāng)前線程join子任務(wù)呢?(事實(shí)上不join子任務(wù)恰好可以將當(dāng)前線程的資源騰出來(lái)做其他的事)

所以,除了前面5中提到的若干種(124)向前影響completer棧鏈的掛起數(shù)或root的完成態(tài),還需要一個(gè)能向棧鏈后方有所影響的操作,比如幫助子任務(wù)的完成,畢竟子任務(wù)也是b中fork出來(lái)且由自己入隊(duì)的.

helpComplete方法就可以做到這一點(diǎn),它在ForkJoinPool中,它僅應(yīng)在當(dāng)前任務(wù)未完成時(shí)使用,首先它會(huì)嘗試將當(dāng)前任務(wù)從出隊(duì)列并執(zhí)行(ForkJoinPool::popCC及成功后續(xù)doExec,LIFO),出隊(duì)失敗則表示正在被執(zhí)行甚至被偷去執(zhí)行.出隊(duì)這一步之后,再嘗試自己的線程工作隊(duì)列中找出自己的子孫任務(wù)(FIFO)并進(jìn)行執(zhí)行(ForkJoinPool::pollAndExecCC).

而若執(zhí)行完某個(gè)父任務(wù)的工作線程必然會(huì)調(diào)用tryComplete等有關(guān)方法,將自身或棧鏈后方的某一個(gè)completer的掛起數(shù)減一,甚至因?yàn)橐恍┎缓侠淼腶pi使用(如直接更改了后方某個(gè)任務(wù)的掛起數(shù)量)而直接終止了root,將root任務(wù)標(biāo)記成完成態(tài).(注意前面強(qiáng)調(diào)的"運(yùn)行完自己計(jì)算的部分",這就是否定本句話的關(guān)鍵了,前面也說(shuō)明"helpComplete僅在當(dāng)前任務(wù)未完成時(shí)使用",顯然,完成了自己負(fù)責(zé)的計(jì)算內(nèi)容并不代表當(dāng)前任務(wù)完成了,因?yàn)樗淖尤蝿?wù)還沒有完成,因此它不會(huì)調(diào)用tryComplete,并且可以去幫助子任務(wù))

同時(shí),執(zhí)行完父任務(wù)負(fù)責(zé)的計(jì)算內(nèi)容的任務(wù)線程也會(huì)去找它棧鏈后方的其他任務(wù),按照b的邏輯,這將是它的子任務(wù),幫助它們完成,每完成一個(gè)子任務(wù)(子任務(wù)無(wú)子任務(wù),不再help的情況),會(huì)進(jìn)行tryComplete傳遞一次.

余下的方法很簡(jiǎn)單.

//重寫自ForkJoinTask的結(jié)果,前文也說(shuō)過(guò)CountedCompleter也不維護(hù)result,返回null.
//但并行流或者一些其他并行操作可以實(shí)現(xiàn)此結(jié)果,比如ConcurrentHashMap中支持的map reduce操作.
public T getRawResult() { return null; }

//同上,默認(rèn)空,一些子類會(huì)有特別的實(shí)現(xiàn).
protected void setRawResult(T t) { }

顯然,completer棧鏈上的所有任務(wù)是可以并行執(zhí)行的,且每一個(gè)完成都可以向后tryComplete一次,并在其后可以幫助前面的任務(wù)完成,而我們?nèi)魧?shí)現(xiàn)上述兩個(gè)方法,完全可以將自身運(yùn)算的結(jié)果設(shè)置進(jìn)去,在root被安靜完成后,ForkJoinTask將可以get到結(jié)果(或join也將返回結(jié)果),可在此時(shí)合并計(jì)算結(jié)果,有些結(jié)果顯然是可以并行的.

一些操作,比如find類型,任何一個(gè)子任務(wù)完成了find,就可以直接讓root結(jié)束,然后直接讓整條棧鏈上的任務(wù)cancelIgnoringExceptions.

一些需要聚合每一個(gè)任務(wù)結(jié)果的操作,比如reduce類型,需要每個(gè)父任務(wù)根據(jù)子任務(wù)的結(jié)果去reduce,它的父任務(wù)再根據(jù)他和兄弟任務(wù)的結(jié)果reduce,最終合并到root.顯然,mapper由子任務(wù)實(shí)現(xiàn),reducer由父任務(wù)實(shí)現(xiàn).

一些接近find或reduce類型(或者說(shuō)find的變種),比如filter,每一個(gè)任務(wù)都會(huì)有結(jié)果,這個(gè)結(jié)果可能是自己負(fù)責(zé)的原集中的一部分子集,也可能就是個(gè)空集,父任務(wù)合并每個(gè)子任務(wù)的結(jié)果集,直到root.

排序類型的操作,如使用歸并排序,顯然每個(gè)父任務(wù)即是divider也是merger,分解出的每個(gè)子集交給子任務(wù)去計(jì)算,父任務(wù)再去負(fù)責(zé)merge.

......

以上是ForkJoinTask的抽象子類CountedCompleter的源碼分析,接下來(lái)我們繼續(xù)分析工作線程.

ForkJoinWorkerThread源碼

只要對(duì)java的線程結(jié)構(gòu)稍有了解,ForkJoinWorkerThread的源碼十分簡(jiǎn)單,且前面提過(guò),ForkJoinTask被聲稱是一個(gè)輕量于普通線程和Future的實(shí)體,而它在ForkJoinPool中的運(yùn)行載體便是ForkJoinWorkerThread,這個(gè)輕量究竟體現(xiàn)在何處?

//類簽名,直接繼承自Thread
public class ForkJoinWorkerThread extends Thread {
//每個(gè)ForkJoinWorkerThread都只能屬于一個(gè)線程池,且保存該池的引用.
final ForkJoinPool pool; 
//每個(gè)ForkJoinWorkerThread都有一個(gè)工作隊(duì)列, 顯然隊(duì)列中的任務(wù)就是該線程干活的最小單位了.它也是工作竊取機(jī)制的核心.             
final ForkJoinPool.WorkQueue workQueue; 

//構(gòu)造函數(shù),創(chuàng)建時(shí)指定線程池.
protected ForkJoinWorkerThread(ForkJoinPool pool) {
    // 線程名稱
    super("aForkJoinWorkerThread");
    this.pool = pool;
    //將工作線程注冊(cè)到ForkJoinPool后會(huì)返回一個(gè)工作隊(duì)列,供當(dāng)前線程使用和供其他線程偷取.
    this.workQueue = pool.registerWorker(this);
}

//帶線程組的構(gòu)造器
ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup,
                     AccessControlContext acc) {
    super(threadGroup, null, "aForkJoinWorkerThread");
    //inheritedAccessControlContext是從Thread繼承下來(lái)的,字面意思是繼承的訪問控制上下文,設(shè)置為acc.
    U.putOrderedObject(this, INHERITEDACCESSCONTROLCONTEXT, acc);
    //注冊(cè)入池之前,清除掉本地化信息
    eraseThreadLocals(); 
    this.pool = pool;
    this.workQueue = pool.registerWorker(this);
}

//返回注冊(cè)的池.

public ForkJoinPool getPool() {
    return pool;
}

//返回當(dāng)前線程工作隊(duì)列在池中的索引,每個(gè)隊(duì)列都會(huì)維護(hù)一個(gè)在池中的索引.
public int getPoolIndex() {
    return workQueue.getPoolIndex();
}


//空函數(shù),可交給子類實(shí)現(xiàn),按照官方注釋,它的作用是在構(gòu)造之后(這個(gè)構(gòu)造不是指new出線程對(duì)象,
//而是在run方法已進(jìn)入的時(shí)候,說(shuō)明"構(gòu)造"是指線程已經(jīng)完成了創(chuàng)建能夠正常運(yùn)行),處理任務(wù)之前.
protected void onStart() {
}


//工作線程終止時(shí)的勾子方法,負(fù)責(zé)執(zhí)行一些有關(guān)的清理操作.但是若要重寫它,必須在方法的
//最后調(diào)用super.onTermination.參數(shù)exception是造成該線程終止的異常.若是正常結(jié)束,
//則它是null.
protected void onTermination(Throwable exception) {
}

//核心方法.
public void run() {
    //doug在這一塊標(biāo)注"只運(yùn)行一次",查看ForkJoinPool的源碼,
    //ForkJoinPool中會(huì)有一個(gè)WorkQueue的數(shù)組,在取消線程的注冊(cè)后,
    //本線程關(guān)聯(lián)的WorkQueue會(huì)從該數(shù)組移除,但WorkQueue中的array不會(huì)置空.
    if (workQueue.array == null) {
        Throwable exception = null;
        try {
            //前面說(shuō)過(guò)的預(yù)先操作
            onStart();
            //用線程池的runWorker方法執(zhí)行,傳入隊(duì)列.
            pool.runWorker(workQueue);
        } catch (Throwable ex) {
            //發(fā)生異常,中斷前記錄下來(lái)
            exception = ex;
        } finally {
            try {
                //將記錄下來(lái)的異常調(diào)用勾子方法.
                onTermination(exception);
            } catch (Throwable ex) {
                if (exception == null)
                    //執(zhí)行勾子方法本身出現(xiàn)了異常,記錄下來(lái)
                    exception = ex;
            } finally {
                //調(diào)用線程池的解除注冊(cè)方法,會(huì)將本線程的WorkQueue從數(shù)組中移除,同時(shí)使用上述異常.
                pool.deregisterWorker(this, exception);
            }
        }
    }
}

//擦除本地變量.把當(dāng)前線程的兩個(gè)ThreadLocalMap全部置空
final void eraseThreadLocals() {
    U.putObject(this, THREADLOCALS, null);
    U.putObject(this, INHERITABLETHREADLOCALS, null);
}

//每正常運(yùn)行完一次頂級(jí)task,就調(diào)用一次它.這個(gè)頂級(jí)任務(wù)自帶易誤解天性,其實(shí)可以理解為每一次從隊(duì)列取出的任務(wù).
void afterTopLevelExec() {
}




//自帶子類.它不具備任何特殊權(quán)限,也不是用戶定義的任何線程組的成員,每次運(yùn)行完一個(gè)頂級(jí)任務(wù),
//則擦除本地化變量.
static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
   //自已創(chuàng)建默認(rèn)線程組.
    private static final ThreadGroup innocuousThreadGroup =
        createThreadGroup();
    //訪問控制上下文支持權(quán)限.
    private static final AccessControlContext INNOCUOUS_ACC =
        new AccessControlContext(
            new ProtectionDomain[] {
                new ProtectionDomain(null, null)
            });
    //構(gòu)造函數(shù).
    InnocuousForkJoinWorkerThread(ForkJoinPool pool) {
        super(pool, innocuousThreadGroup, INNOCUOUS_ACC);
    }

    @Override 
    void afterTopLevelExec() {
        //在每一次從隊(duì)列取出的"頂級(jí)"任務(wù)運(yùn)行后即擦除本地化變量.
        eraseThreadLocals();
    }

    @Override 
    public ClassLoader getContextClassLoader() {
        //如果獲取線程上下文類加載器,永遠(yuǎn)直接返回系統(tǒng)類加載器.
        return ClassLoader.getSystemClassLoader();
    }

    //嘗試對(duì)未捕獲異常處理器的設(shè)置,忽略.
    @Override 
    public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }

    //禁止直接設(shè)置線程的上下文類加載器.
    @Override 
    public void setContextClassLoader(ClassLoader cl) {
        throw new SecurityException("setContextClassLoader");
    }

    
    //創(chuàng)建一個(gè)以頂級(jí)線程組為父的線程組.
    private static ThreadGroup createThreadGroup() {
        try {
            sun.misc.Unsafe u = sun.misc.Unsafe.getUnsafe();
            Class tk = Thread.class;
            Class gk = ThreadGroup.class;
            long tg = u.objectFieldOffset(tk.getDeclaredField("group"));
            long gp = u.objectFieldOffset(gk.getDeclaredField("parent"));
            //當(dāng)前線程的所屬組.
            ThreadGroup group = (ThreadGroup)
                u.getObject(Thread.currentThread(), tg);
            //循環(huán)條件,當(dāng)前線程的所屬組不是null
            while (group != null) {
                //不停地循環(huán)向上取parent
                ThreadGroup parent = (ThreadGroup)u.getObject(group, gp);
                if (parent == null)
                    //發(fā)現(xiàn)無(wú)parent的線程組,說(shuō)明是系統(tǒng)頂級(jí)線程組,用它當(dāng)parent創(chuàng)建一個(gè)"無(wú)害"線程組返回.
                    return new ThreadGroup(group,
                                           "InnocuousForkJoinWorkerThreadGroup");
                //有parent,把它賦給group開啟下一輪循環(huán).
                group = parent;
            }
        } catch (Exception e) {
            //有異常用Error包裝拋出.
            throw new Error(e);
        }
        //不能return就拋出Error.
        throw new Error("Cannot create ThreadGroup");
    }
}

以上是工作線程的代碼,粗略總結(jié)一下它和普通線程的區(qū)別.

首先,它內(nèi)部會(huì)維護(hù)一個(gè)工作隊(duì)列,用它來(lái)實(shí)現(xiàn)任務(wù)調(diào)度和竊取.

其次,它提供了一些擴(kuò)展,如每次頂層任務(wù)運(yùn)行結(jié)束,清理ThreadLocal,這也是一種保護(hù)機(jī)制,避免同線程的本地化數(shù)據(jù)隨之污染.但粗略去看ForkJoinPool的代碼,發(fā)現(xiàn)它只是在每次從隊(duì)列取出并運(yùn)行完一個(gè)任務(wù)后清除,并稱這個(gè)為"頂級(jí)循環(huán)",這倒也沒錯(cuò),但這個(gè)任務(wù)并不能稱之為頂級(jí)任務(wù),因?yàn)檫@里的任務(wù)類型是ForkJoinTask,不一定是CountedCompleter等明顯標(biāo)識(shí)了依賴關(guān)系的子類,所以父任務(wù)和子任務(wù)被塞進(jìn)一個(gè)隊(duì)列,即使未被竊取,只由當(dāng)前線程執(zhí)行,兩次的本地化數(shù)據(jù)也是不同的.

不過(guò)如果我們?cè)贔orkJoinTask的exec方法中加入本地化,或在CountedCompleter中加入本地化,顯然每一個(gè)在此生成的子任務(wù)都會(huì)在相應(yīng)的線程執(zhí)行doExec時(shí)設(shè)置這些屬性,并在執(zhí)行結(jié)束后清除.

最后官方提供的默認(rèn)子類,以及一些線程組,優(yōu)先級(jí),權(quán)限等作者也未深入研究,但是我們構(gòu)建線程池的時(shí)候有一個(gè)參數(shù)就是"線程工廠",了解下它或許能對(duì)后續(xù)的ForkJoinPool源碼閱讀有所幫助.

接下來(lái)簡(jiǎn)述一個(gè)官方提供的案例,并以此聊一聊并行流.

官方案例

第一節(jié)論述了CountedCompleter,顯然它作為一個(gè)抽象類,只是定義了某一些環(huán)節(jié),以及一些環(huán)節(jié)的子環(huán)節(jié)的組合過(guò)程,而具體的實(shí)現(xiàn)與使用它定義的api則由用戶實(shí)現(xiàn),它的源碼中并無(wú)使用(當(dāng)然也可以看一些子類,但比較復(fù)雜),在CountedCompleter的源碼注釋中,道格大神提供了若干案例,這里舉出兩個(gè)來(lái)簡(jiǎn)要說(shuō)明一下前面論述過(guò)的使用方式,也可以為下一節(jié)論述官方提供的子類(并行流api中)提供閱讀基礎(chǔ).

第一個(gè)是并行的可竊取的分治查找算法.

@Test
public void testDivideSearch(){
    Integer[] array = new Integer[10000000];
    for(int i = 0; i < array.length; i++){
        array[i] = i+1;
    }
    AtomicReference result = new AtomicReference<>();
    Integer find = new Searcher<>(null, array, result, 0,
            array.length - 1,this::match).invoke();
    LOGGER.info("查找結(jié)束,任務(wù)返回:{},result:{}",find,result.get());

}

static class Searcher extends CountedCompleter {

    final E[] array; final AtomicReference result; final int lo, hi;
    final Function matcher;

    Searcher(CountedCompleter p, E[] array, AtomicReference result,
             int lo, int hi,Function matcher){
        super(p);
        this.array = array;
        this.result = result;
        this.lo = lo;
        this.hi = hi;
        this.matcher = matcher;
    }
    @Override
    public void compute() {
        int l = this.lo;int h = this.hi;
        while(result.get() == null && h >= l){

            if(h - l >=2){
                int mid = (l + h)>>>1;
                //添加掛起任務(wù)數(shù)量,這樣當(dāng)出現(xiàn)tryComplete時(shí)可以觸發(fā)root的結(jié)束(未查到)
                addToPendingCount(1);
                new Searcher(this,array,result,mid,h,matcher).fork();
                h = mid;
            }else{
                E x = array[l];
                if(matcher.apply(x) &&  result.compareAndSet(null,x)){
                    super.quietlyCompleteRoot();
                }
                break;
            }
        }
        //當(dāng)前未有任何一個(gè)線程查到結(jié)果,當(dāng)前任務(wù)也完成了子集查找,減少一個(gè)掛起數(shù)量,若掛起數(shù)已減至0則終止.
        if(null == result.get())
            tryComplete();
    }

}

private boolean match(Integer x) {
    return x > 2000000 &&  x%2 ==0 && x%3 == 0 && x%5 ==0 && x %7 ==0;
}

該案例的邏輯很簡(jiǎn)單,給定一個(gè)非常大的數(shù)組,充分利用本機(jī)的資源去查找滿足一個(gè)條件的元素.為了方便,在具體的查找數(shù)據(jù)上選定了整型,查找的條件也非常簡(jiǎn)單.

在該案例中,會(huì)對(duì)結(jié)果進(jìn)行分治,首先分治出足夠多的子任務(wù),剩下的不需再分的父任務(wù)由當(dāng)前線程完成,子任務(wù)則壓入工作隊(duì)列,其他空閑的線程就會(huì)來(lái)偷取子任務(wù)并執(zhí)行.當(dāng)有任務(wù)一個(gè)子任務(wù)查找到相應(yīng)的數(shù)字后,即將它存放到result,并安靜地完成根任務(wù).

此時(shí)整個(gè)任務(wù)鏈處在一個(gè)非常尷尬的情況:查找到結(jié)果的子任務(wù)將root設(shè)置為完成,而整條鏈上的非root任務(wù)均未完成.但因循環(huán)條件不滿足,退出了循環(huán).此時(shí)查到result已有值,并不執(zhí)行最后的tryComplete,執(zhí)行結(jié)束,任務(wù)的status依舊為未完成,是否有重復(fù)執(zhí)行的問題?

答案是沒有問題,因?yàn)镕orkJoinTask絕對(duì)會(huì)在ForkJoinPool中調(diào)度(哪怕是common池),在common池中,任務(wù)執(zhí)行前必須出隊(duì),盡管compute方法在本例中沒有將這些任務(wù)設(shè)置為完成,但任務(wù)不會(huì)被二次執(zhí)行.可見,上一章中費(fèi)大力介紹的status字段也有無(wú)用的時(shí)候.

但是除了root任務(wù)需要使用到獲取結(jié)果的功能,需要保證status是負(fù)數(shù),它產(chǎn)生的子孫任務(wù)還有什么用呢?所有compute方法會(huì)因?yàn)檠h(huán)中止而結(jié)束,此后的這些任務(wù)不存在任何外部引用,會(huì)被gc清理,即使存在外部引用,用它去獲取子孫任務(wù)的執(zhí)行情況或result也沒有任何意義.

顯然這個(gè)案例解決了至少兩個(gè)疑問,一是怎么實(shí)現(xiàn)一個(gè)保存result的ForkJoinTask,二是ForkJoin框架如何在查找方面大幅提升性能,很明顯,相比單線程遍歷的辦法,此例多線程查詢,且任何一個(gè)子任務(wù)在并行條件下完成了查詢,整個(gè)大任務(wù)均可以終止.

第二個(gè)是傳說(shuō)中的map?reduce.大數(shù)據(jù)中常使用此概念(跨節(jié)點(diǎn)).

在并行流中,map可以代表非阻斷操作,reduce可以代表阻斷操作,但是reduce同樣可以并行地執(zhí)行.

道格在注釋上給出了兩個(gè)map?reduce案例,我們只看第一個(gè),它也是后續(xù)并行流一節(jié)我們要看的例子比較相近的解法.方法二有些繞,較難理解,但也優(yōu)雅.

@Test
public void testMapReduce() {
    Integer[] array = {1, 2, 3};
    //方法一.
    Integer result = new MapRed<>(null, array, (a)->a+2, (a,b)->a+b,  0,array.length).invoke();
    LOGGER.info("方法一result:{}",result);
    //方法二我就不抄了,就在官方注釋上.
    result = new MapReducer<>(null, array, (a) -> a + 1
            , (a, b) -> a + b, 0, array.length, null).invoke();
    LOGGER.info("方法二result:{}", result);

}


/**
 * 第一種map reduce方式,很好理解.
 * @param 
 */
private class MapRed extends CountedCompleter {
    final E[] array;
    final MyMapper mapper;
    final MyReducer reducer;
    final int lo, hi;
    MapRed sibling;//兄弟節(jié)點(diǎn)的引用
    E result;

    MapRed(CountedCompleter p, E[] array, MyMapper mapper,
               MyReducer reducer, int lo, int hi) {
        super(p);
        this.array = array;
        this.mapper = mapper;
        this.reducer = reducer;
        this.lo = lo;
        this.hi = hi;
    }

    public void compute() {
        if (hi - lo >= 2) {
            int mid = (lo + hi) >>> 1;
            MapRed left = new MapRed(this, array, mapper, reducer, lo, mid);
            MapRed right = new MapRed(this, array, mapper, reducer, mid, hi);
            left.sibling = right;
            right.sibling = left;
            //只掛起右任務(wù)
            setPendingCount(1);
            right.fork();
            //直接運(yùn)算左任務(wù).
            left.compute();     
        } else {
            if (hi > lo)
                result = mapper.apply(array[lo]);
            //它會(huì)依次調(diào)用onCompletion.并且是自己調(diào)自己或completer調(diào)子,
            //且只有左右兩個(gè)子后完成的能調(diào)成功(父任務(wù)的掛起數(shù)達(dá)到0).
            tryComplete();
        }
    }

    public void onCompletion(CountedCompleter caller) {
        //忽略自己調(diào)自己.
        if (caller != this) {
            //參數(shù)是子任務(wù).
            MapRed child = (MapRed) caller;
            MapRed sib = child.sibling;
            //設(shè)置父的result.
            if (sib == null || sib.result == null)
                result = child.result;
            else
                result = reducer.apply(child.result, sib.result);
        }
    }

    public E getRawResult() {
        return result;
    }
}
//mapper和reducer簡(jiǎn)單的不能再簡(jiǎn)單.
@FunctionalInterface
private static interface MyMapper {
    E apply(E e);
}
@FunctionalInterface
private static interface MyReducer {
    E apply(E a, E b);
}

上面的邏輯也很簡(jiǎn)單,首先就是對(duì)任務(wù)的分解,簡(jiǎn)單的將任務(wù)分為左和右,左直接由父任務(wù)執(zhí)行(可能再分),右則入池,所有子任務(wù)直到不能再分(葉子任務(wù))以map為result,每個(gè)葉子任務(wù)完成后會(huì)調(diào)用tryComplete.

這個(gè)動(dòng)作會(huì)觸發(fā)一系列的completer棧元素的掛起數(shù)下降或完成,顯然,如果把completer理解為一個(gè)普通樹(這是作者很少見到的非二叉樹的情況,盡管這個(gè)例子寫成了二叉樹,我們完全可以在compute中將父任務(wù)一分為多,而不是限2個(gè)),從葉子節(jié)點(diǎn)開始,每個(gè)葉子節(jié)點(diǎn)完成(result是mapper的結(jié)果)會(huì)嘗試onCompletion并減少父節(jié)點(diǎn)的掛起任務(wù)數(shù),但只有同父節(jié)點(diǎn)的最后一個(gè)兄弟節(jié)點(diǎn)可以進(jìn)入onCompletion設(shè)置父節(jié)點(diǎn)的結(jié)果,并且由于這個(gè)設(shè)置過(guò)程的前提是父節(jié)點(diǎn)符合掛起任務(wù)數(shù)為0,因此符合循環(huán)繼續(xù)的條件,葉子節(jié)點(diǎn)的動(dòng)作會(huì)繼續(xù)向上判斷父節(jié)點(diǎn)的父節(jié)點(diǎn),直到root為止.假設(shè)線程數(shù)量足夠,保證每個(gè)子任務(wù)都有一個(gè)線程處理,那么深度每上一層,就會(huì)有一半(非二叉樹的情況每個(gè)父節(jié)點(diǎn)只能有一個(gè)通過(guò))的執(zhí)行葉子節(jié)點(diǎn)任務(wù)的線程因不符合某個(gè)任務(wù)的掛起數(shù)量為0的條件而退出,這樣逐級(jí)傳導(dǎo),最后到root調(diào)用它最后一個(gè)子節(jié)點(diǎn)的onCompletion,使用reducer進(jìn)行合并.

本例中進(jìn)行結(jié)果合并的寫法(onCompletion)只適合二叉樹,有興趣的讀者可以看看道格在注釋中給出的第二種寫法,幾叉都可以.而且該實(shí)現(xiàn)很優(yōu)雅,并未寫onCompletion函數(shù),但是寫法真心夠繞的.

并行流簡(jiǎn)述

在JAVA8中支持了lamda表達(dá)式的同時(shí),也支持了函數(shù)式編程,由此出現(xiàn)了一種新型的計(jì)算方式:流式計(jì)算,也出現(xiàn)了一種讓包括作者在內(nèi)很多人興奮不已的編程方式:響應(yīng)式編程.

流式計(jì)算的核心在于Stream?api,流有很多分類,比如并行流和串行流,這點(diǎn)可以顧名思義,同樣的,流中的每一個(gè)操作都可以劃分類型,比如阻斷操作和非阻斷操作.

java中實(shí)現(xiàn)并行流就是基于這些操作,CountedCompleter的一些子類就是這些操作的類型,顯然,如在前一篇文章所說(shuō),使用了并行流,就是使用了ForkJoin框架.

當(dāng)我們使用下面的代碼,會(huì)發(fā)生什么操作?

Stream.of(1,2,3,4,5).parallel().map(x -> x + 1).reduce((a, b) -> a + b).get();

//map只是將動(dòng)作簡(jiǎn)單地記了下來(lái),包裝起來(lái),等到阻斷操作時(shí)才會(huì)真正執(zhí)行. 位于ReferencePipeline
public final  Stream map(Function mapper) {
    Objects.requireNonNull(mapper);//非空檢查
    //返回一個(gè)無(wú)狀態(tài)操作.
    return new StatelessOp(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override
        Sink opWrapSink(int flags, Sink sink) {
            //典型的適配器模式.將action一律封裝為Sink.
            return new Sink.ChainedReference(sink) {
                @Override
                public void accept(P_OUT u) {
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}
//阻斷操作reduce位于 ReferencePipeline
public final Optional reduce(BinaryOperator accumulator) {
    return evaluate(ReduceOps.makeRef(accumulator));
}
//AbstractPipeline
final  R evaluate(TerminalOp terminalOp) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
//TerminalOp阻斷操作接口的默認(rèn)方法
default  R evaluateParallel(PipelineHelper helper,
                                  Spliterator spliterator) {
    if (Tripwire.ENABLED)
        Tripwire.trip(getClass(), "{0} triggering TerminalOp.evaluateParallel serial default");
    return evaluateSequential(helper, spliterator);
}
//看ReduceOps 它返回了一內(nèi)部類ReduceTask
public  R evaluateParallel(PipelineHelper helper,
                                     Spliterator spliterator) {
        return new ReduceTask<>(this, helper, spliterator).invoke().get();
    }
//內(nèi)部類ReduceTask間接繼承自CountedCompleter
private static final class ReduceTask>
        extends AbstractTask> {
    private final ReduceOp op;

    ReduceTask(ReduceOp op,
               PipelineHelper helper,
               Spliterator spliterator) {
        super(helper, spliterator);
        this.op = op;
    }

    ReduceTask(ReduceTask parent,
               Spliterator spliterator) {
        super(parent, spliterator);
        this.op = parent.op;
    }
    //老外起的名子,造小孩. 
    @Override
    protected ReduceTask makeChild(Spliterator spliterator) {
        //和上面的例子非常相似的代碼,只是封裝更好.
        return new ReduceTask<>(this, spliterator);
    }

    @Override
    protected S doLeaf() {
        //葉子節(jié)點(diǎn)做這個(gè).
        return helper.wrapAndCopyInto(op.makeSink(), spliterator);
    }

    //重寫了前面提過(guò)的onCompletion函數(shù)
    @Override
    public void onCompletion(CountedCompleter caller) {
        if (!isLeaf()) {
            //不是葉子節(jié)點(diǎn).這條件,和前面咱們分析的多么匹配.
            //計(jì)算左結(jié)果
            S leftResult = leftChild.getLocalResult();
            //聯(lián)合右結(jié)果.
            leftResult.combine(rightChild.getLocalResult());
            //聯(lián)合完的結(jié)果就是當(dāng)前completer的結(jié)果.
            setLocalResult(leftResult);
        }
        // 直接父類是AbstractTask,它會(huì)對(duì)父,左右子幫助gc.
        super.onCompletion(caller);
    }
}
//AbstractTask幫助gc
public void onCompletion(CountedCompleter caller) {
    spliterator = null;
    leftChild = rightChild = null;
}
//更多實(shí)現(xiàn)細(xì)節(jié)自閱...

顯然,并行流(至少我舉的這個(gè)例子)是基于ForkJoin框架的.分治的思想與前面道格的例子相似,只是更加優(yōu)雅和封裝更好.有了前面的基礎(chǔ),若要詳細(xì)熟悉并行流原理,需要進(jìn)一步了解的只有他們的繼承樹,分割聚合組件等邊角料,核心的調(diào)度思想已經(jīng)不再是困難.

回到問題,當(dāng)我們使用并行流時(shí)發(fā)生了什么?首先是非阻斷操作時(shí),與串行流情況同樣,也是先將action封裝成適配器,僅在阻斷操作發(fā)生時(shí)的調(diào)度不同,并行流在阻斷操作下使用ForkJoin框架進(jìn)行調(diào)度,任務(wù)的分割則使用它的Splitor,結(jié)果的合并也有它的Combiner.其他的流程與上面的案例無(wú)異.

后語(yǔ)

1.CountedCompleter使用普通樹的結(jié)構(gòu)存放動(dòng)作,但是它又是另類的樹,因?yàn)樽庸?jié)點(diǎn)能找到父節(jié)點(diǎn),父節(jié)點(diǎn)卻找不到子節(jié)點(diǎn),而只知道子節(jié)點(diǎn)代表的動(dòng)作未執(zhí)行的數(shù)量,因此或許從訪問方式的角度來(lái)看還是用棧來(lái)理解更好.在這里樹既是數(shù)據(jù)結(jié)構(gòu),也是一個(gè)另類的操作棧.只從一個(gè)completer往下看,它是個(gè)棧,但從父節(jié)點(diǎn)的角度來(lái)講,它是一個(gè)訪問不到子節(jié)點(diǎn)的普通樹(或許我們不應(yīng)該強(qiáng)行為它套上一個(gè)數(shù)據(jù)結(jié)構(gòu),不然總覺得不倫不類,但是用樹這個(gè)形狀便于理解).每個(gè)節(jié)點(diǎn)會(huì)存放掛起任務(wù)數(shù)量,每個(gè)節(jié)點(diǎn)的任務(wù)完成未必會(huì)設(shè)置它自己的完成態(tài),但會(huì)嘗試將completer父元素棧(或者樹的一條線)上的每個(gè)任務(wù)掛起數(shù)量減一或?qū)⒏?jié)點(diǎn)安靜置為完成態(tài).關(guān)于具體的理解和代碼實(shí)現(xiàn),以及如何保存一個(gè)任務(wù)的運(yùn)行結(jié)果,可以參考前面案例的章節(jié),也可以以此為基礎(chǔ)去看并行流的源碼,但也要相應(yīng)的理解并行流為了便捷實(shí)現(xiàn)而提供的各種分割合并組件.

2.ForkJoinWorkerThread是運(yùn)行在ForkJoinPool中的主要線程,它內(nèi)部維護(hù)了一個(gè)工作任務(wù)隊(duì)列,并存放了該隊(duì)列在線程池中的間接索引.借此實(shí)現(xiàn)任務(wù)的竊取,避免過(guò)于空閑等待,任務(wù)fork會(huì)直接push到該隊(duì)列,第一次擴(kuò)容時(shí),才給該隊(duì)列初始化任務(wù)數(shù)組,當(dāng)線程從池中卸載時(shí),不會(huì)清除掉該數(shù)組,這樣線程無(wú)法再次啟動(dòng).線程的啟動(dòng)有一些勾子,官方提供的線程工廠有兩個(gè),一個(gè)直接創(chuàng)建ForkJoinWorkerThread,另一個(gè)創(chuàng)建它的子類

InnocuousForkJoinWorkerThread,它除了一些安全策略外,最大的區(qū)別在于ForkJoinWorkerThread在注冊(cè)入池前進(jìn)行本地化數(shù)據(jù)的清理,而它則每次完成一個(gè)主任務(wù)處理就清理一次.

3.并行流是ForkJoin框架的一個(gè)典型應(yīng)用,JAVA8?Stream?api中的并行流定義了大量的以CountedCompleter為基礎(chǔ)的操作.利用分割/合并和周邊組件實(shí)現(xiàn)了基于ForkJoin框架的并行計(jì)算調(diào)度.

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/77888.html

相關(guān)文章

  • ForkJoin框架ForkJoinPool

    摘要:前言在前面的三篇文章中先后介紹了框架的任務(wù)組件體系體系源碼并簡(jiǎn)單介紹了目前的并行流應(yīng)用場(chǎng)景框架本質(zhì)上是對(duì)的擴(kuò)展它依舊支持經(jīng)典的使用方式即任務(wù)池的配合向池中提交任務(wù)并異步地等待結(jié)果毫無(wú)疑問前面的文章已經(jīng)解釋了框架的新穎性初步了解了工作竊取 前言 在前面的三篇文章中先后介紹了ForkJoin框架的任務(wù)組件(ForkJoinTask體系,CountedCompleter體系)源碼,并簡(jiǎn)單介紹...

    mayaohua 評(píng)論0 收藏0
  • ForkJoin框架ForkJoinTask

    摘要:前言在前面的文章和響應(yīng)式編程中提到了和后者毫無(wú)疑問是一個(gè)線程池前者則是一個(gè)類似經(jīng)典定義的概念官方有一個(gè)非常無(wú)語(yǔ)的解釋就是運(yùn)行在的一個(gè)任務(wù)抽象就是運(yùn)行的線程池框架包含和若干的子類它的核心在于分治和工作竅取最大程度利用線程池中的工作線程避免忙的 前言 在前面的文章CompletableFuture和響應(yīng)式編程中提到了ForkJoinTask和ForkJoinPool,后者毫無(wú)疑問是一個(gè)線程...

    crossoverJie 評(píng)論0 收藏0
  • 《java 8 實(shí)戰(zhàn)》讀書筆記 -第六章 用收集數(shù)據(jù)

    摘要:分區(qū)函數(shù)返回一個(gè)布爾值,這意味著得到的分組的鍵類型是,于是它最多可以分為兩組是一組,是一組。當(dāng)遍歷到流中第個(gè)元素時(shí),這個(gè)函數(shù)執(zhí)行時(shí)會(huì)有兩個(gè)參數(shù)保存歸約結(jié)果的累加器已收集了流中的前個(gè)項(xiàng)目,還有第個(gè)元素本身。 一、收集器簡(jiǎn)介 把列表中的交易按貨幣分組: Map transactionsByCurrencies = transactions.stream().collect(groupi...

    Airy 評(píng)論0 收藏0
  • java并發(fā)編程學(xué)習(xí)6--并行

    摘要:類似的你可以用將并行流變?yōu)轫樞蛄?。中的使用順序求和并行求和將流轉(zhuǎn)為并行流配置并行流線程池并行流內(nèi)部使用了默認(rèn)的,默認(rèn)的線程數(shù)量就是處理器的數(shù)量包括虛擬內(nèi)核通過(guò)得到。 【概念 并行流就是一個(gè)把內(nèi)容分成多個(gè)數(shù)據(jù)塊,并用不同的線程分別處理每一個(gè)數(shù)據(jù)塊的流。在java7之前,并行處理數(shù)據(jù)很麻煩,第一,需要明確的把包含數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu)分成若干子部分。第二,給每一個(gè)子部分分配一個(gè)獨(dú)立的線程。第三,適...

    mgckid 評(píng)論0 收藏0
  • Java 8 的 JVM 有多快?Fork-Join 性能基準(zhǔn)測(cè)試

    摘要:這減輕了手動(dòng)重復(fù)執(zhí)行相同基準(zhǔn)測(cè)試的痛苦,并簡(jiǎn)化了獲取結(jié)果的流程。處理項(xiàng)目的代碼并從標(biāo)有注釋的方法處生成基準(zhǔn)測(cè)試程序。用和運(yùn)行該基準(zhǔn)測(cè)試得到以下結(jié)果。同時(shí),和的基線測(cè)試結(jié)果也有略微的不同。 Java 8 已經(jīng)發(fā)布一段時(shí)間了,許多開發(fā)者已經(jīng)開始使用 Java 8。本文也將討論最新發(fā)布在 JDK 中的并發(fā)功能更新。事實(shí)上,JDK 中已經(jīng)有多處java.util.concurrent 改動(dòng),但...

    Euphoria 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<