成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Java多線程進階(二二)—— J.U.C之synchronizer框架:Phaser

Mr_zhang / 1867人閱讀

摘要:分層支持分層一種樹形結(jié)構(gòu),通過構(gòu)造函數(shù)可以指定當前待構(gòu)造的對象的父結(jié)點。當一個的參與者數(shù)量變成時,如果有該有父結(jié)點,就會將它從父結(jié)點中溢移除。當首次將某個結(jié)點鏈接到樹中時,會同時向該結(jié)點的父結(jié)點注冊一個參與者。

本文首發(fā)于一世流云專欄:https://segmentfault.com/blog...
一、Phaser簡介

PhaserJDK1.7開始引入的一個同步工具類,適用于一些需要分階段的任務的處理。它的功能與 CyclicBarrierCountDownLatch有些類似,類似于一個多階段的柵欄,并且功能更強大,我們來比較下這三者的功能:

同步器 作用
CountDownLatch 倒數(shù)計數(shù)器,初始時設(shè)定計數(shù)器值,線程可以在計數(shù)器上等待,當計數(shù)器值歸0后,所有等待的線程繼續(xù)執(zhí)行
CyclicBarrier 循環(huán)柵欄,初始時設(shè)定參與線程數(shù),當線程到達柵欄后,會等待其它線程的到達,當?shù)竭_柵欄的總數(shù)滿足指定數(shù)后,所有等待的線程繼續(xù)執(zhí)行
Phaser 多階段柵欄,可以在初始時設(shè)定參與線程數(shù),也可以中途注冊/注銷參與者,當?shù)竭_的參與者數(shù)量滿足柵欄設(shè)定的數(shù)量后,會進行階段升級(advance)

Phaser中有一些比較重要的概念,理解了這些概念才能理解Phaser的功能。

phase(階段)

我們知道,在CyclicBarrier中,只有一個柵欄,線程在到達柵欄后會等待其它線程的到達。

Phaser也有柵欄,在Phaser中,柵欄的名稱叫做phase(階段),在任意時間點,Phaser只處于某一個phase(階段),初始階段為0,最大達到Integerr.MAX_VALUE,然后再次歸零。當所有parties參與者都到達后,phase值會遞增。

如果看過之前關(guān)于CyclicBarrier的文章,就會知道,Phaser中的phase(階段)這個概念其實和CyclicBarrier中的Generation很相似,只不過Generation沒有計數(shù)。

parties(參與者)

parties(參與者)其實就是CyclicBarrier中的參與線程的概念。

CyclicBarrier中的參與者在初始構(gòu)造指定后就不能變更,而Phaser既可以在初始構(gòu)造時指定參與者的數(shù)量,也可以中途通過register、bulkRegister、arriveAndDeregister等方法注冊/注銷參與者。

arrive(到達) / advance(進階)

Phaser注冊完parties(參與者)之后,參與者的初始狀態(tài)是unarrived的,當參與者到達(arrive)當前階段(phase)后,狀態(tài)就會變成arrived。當階段的到達參與者數(shù)滿足條件后(注冊的數(shù)量等于到達的數(shù)量),階段就會發(fā)生進階(advance)——也就是phase值+1。

Termination(終止)

代表當前Phaser對象達到終止狀態(tài),有點類似于CyclicBarrier中的柵欄被破壞的概念。

Tiering(分層)

Phaser支持分層(Tiering) —— 一種樹形結(jié)構(gòu),通過構(gòu)造函數(shù)可以指定當前待構(gòu)造的Phaser對象的父結(jié)點。之所以引入Tiering,是因為當一個Phaser有大量參與者(parties)的時候,內(nèi)部的同步操作會使性能急劇下降,而分層可以降低競爭,從而減小因同步導致的額外開銷。

在一個分層Phasers的樹結(jié)構(gòu)中,注冊和撤銷子Phaser或父Phaser是自動被管理的。當一個Phaser的參與者(parties)數(shù)量變成0時,如果有該Phaser有父結(jié)點,就會將它從父結(jié)點中溢移除。

關(guān)于Phaser的分層,后續(xù)我們在講Phaser原理時會進一步討論。

二、Phaser示例

為了更好的理解Phaser的功能,我們來看幾個示例:

示例一
通過Phaser控制多個線程的執(zhí)行時機:有時候我們希望所有線程到達指定點后再同時開始執(zhí)行,我們可以利用CyclicBarrierCountDownLatch來實現(xiàn),這里給出使用Phaser的版本。
public class PhaserTest1 {
    public static void main(String[] args) {
        Phaser phaser = new Phaser();
        for (int i = 0; i < 10; i++) {
            phaser.register();                  // 注冊各個參與者線程
       new Thread(new Task(phaser), "Thread-" + i).start();
        }
    }
}

class Task implements Runnable {
    private final Phaser phaser;

    Task(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        int i = phaser.arriveAndAwaitAdvance();     // 等待其它參與者線程到達
     // do something
        System.out.println(Thread.currentThread().getName() + ": 執(zhí)行完任務,當前phase =" + i + "");
    }
}

輸出:

Thread-8: 執(zhí)行完任務,當前phase =1
Thread-4: 執(zhí)行完任務,當前phase =1
Thread-3: 執(zhí)行完任務,當前phase =1
Thread-0: 執(zhí)行完任務,當前phase =1
Thread-5: 執(zhí)行完任務,當前phase =1
Thread-6: 執(zhí)行完任務,當前phase =1
Thread-7: 執(zhí)行完任務,當前phase =1
Thread-9: 執(zhí)行完任務,當前phase =1
Thread-1: 執(zhí)行完任務,當前phase =1
Thread-2: 執(zhí)行完任務,當前phase =1

以上示例中,創(chuàng)建了10個線程,并通過register方法注冊Phaser的參與者數(shù)量為10。當某個線程調(diào)用arriveAndAwaitAdvance方法后,arrive數(shù)量會加1,如果數(shù)量沒有滿足總數(shù)(參與者數(shù)量10),當前線程就是一直等待,當最后一個線程到達后,所有線程都會繼續(xù)往下執(zhí)行。

注意:arriveAndAwaitAdvance方法是不響應中斷的,也就是說即使當前線程被中斷,arriveAndAwaitAdvance方法也不會返回或拋出異常,而是繼續(xù)等待。如果希望能夠響應中斷,可以參考awaitAdvanceInterruptibly方法。
示例二
通過Phaser實現(xiàn)開關(guān)。在以前講CountDownLatch時,我們給出過以CountDownLatch實現(xiàn)開關(guān)的示例,也就是說,我們希望一些外部條件得到滿足后,然后打開開關(guān),線程才能繼續(xù)執(zhí)行,我們看下如何用Phaser來實現(xiàn)此功能。
public class PhaserTest2 {

    public static void main(String[] args) throws IOException {
        Phaser phaser = new Phaser(1);       // 注冊主線程,當外部條件滿足時,由主線程打開開關(guān)
        for (int i = 0; i < 10; i++) {
            phaser.register();                      // 注冊各個參與者線程
            new Thread(new Task2(phaser), "Thread-" + i).start();
        }

        // 外部條件:等待用戶輸入命令
        System.out.println("Press ENTER to continue");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        reader.readLine();

        // 打開開關(guān)
        phaser.arriveAndDeregister();
        System.out.println("主線程打開了開關(guān)");
    }
}

class Task2 implements Runnable {
    private final Phaser phaser;

    Task2(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        int i = phaser.arriveAndAwaitAdvance();     // 等待其它參與者線程到達

        // do something
        System.out.println(Thread.currentThread().getName() + ": 執(zhí)行完任務,當前phase =" + i + "");
    }
}

輸出:

主線程打開了開關(guān)
Thread-7: 執(zhí)行完任務,當前phase =1
Thread-4: 執(zhí)行完任務,當前phase =1
Thread-3: 執(zhí)行完任務,當前phase =1
Thread-1: 執(zhí)行完任務,當前phase =1
Thread-0: 執(zhí)行完任務,當前phase =1
Thread-9: 執(zhí)行完任務,當前phase =1
Thread-8: 執(zhí)行完任務,當前phase =1
Thread-2: 執(zhí)行完任務,當前phase =1
Thread-5: 執(zhí)行完任務,當前phase =1
Thread-6: 執(zhí)行完任務,當前phase =1

以上示例中,只有當用戶按下回車之后,任務才真正開始執(zhí)行。這里主線程Main相當于一個協(xié)調(diào)者,用來控制開關(guān)打開的時機,arriveAndDeregister方法不會阻塞,該方法會將到達數(shù)加1,同時減少一個參與者數(shù)量,最終返回線程到達時的phase值。

示例三
通過Phaser控制任務的執(zhí)行輪數(shù)
public class PhaserTest3 {
    public static void main(String[] args) throws IOException {

        int repeats = 3;    // 指定任務最多執(zhí)行的次數(shù)

        Phaser phaser = new Phaser() {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("---------------PHASE[" + phase + "],Parties[" + registeredParties + "] ---------------");
                return phase + 1 >= repeats  || registeredParties == 0;
            }
        };

        for (int i = 0; i < 10; i++) {
            phaser.register();                      // 注冊各個參與者線程
       new Thread(new Task3(phaser), "Thread-" + i).start();
        }
    }
}

class Task3 implements Runnable {
    private final Phaser phaser;

    Task3(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        while (!phaser.isTerminated()) {   //只要Phaser沒有終止, 各個線程的任務就會一直執(zhí)行
            int i = phaser.arriveAndAwaitAdvance();     // 等待其它參與者線程到達
            // do something
            System.out.println(Thread.currentThread().getName() + ": 執(zhí)行完任務");
        }
    }
}

輸出:

---------------PHASE[0],Parties[5] ---------------
Thread-4: 執(zhí)行完任務
Thread-1: 執(zhí)行完任務
Thread-2: 執(zhí)行完任務
Thread-3: 執(zhí)行完任務
Thread-0: 執(zhí)行完任務
---------------PHASE[1],Parties[5] ---------------
Thread-0: 執(zhí)行完任務
Thread-3: 執(zhí)行完任務
Thread-1: 執(zhí)行完任務
Thread-4: 執(zhí)行完任務
Thread-2: 執(zhí)行完任務
---------------PHASE[2],Parties[5] ---------------
Thread-2: 執(zhí)行完任務
Thread-4: 執(zhí)行完任務
Thread-1: 執(zhí)行完任務
Thread-0: 執(zhí)行完任務
Thread-3: 執(zhí)行完任務

以上示例中,我們在創(chuàng)建Phaser對象時,覆寫了onAdvance方法,這個方法類似于CyclicBarrier中的barrierAction任務。
也就是說,當最后一個參與者到達時,會觸發(fā)onAdvance方法,入?yún)?strong>phase表示到達時的phase值,registeredParties表示到達時的參與者數(shù)量,返回true表示需要終止Phaser。

我們通過phase + 1 >= repeats ,來控制階段(phase)數(shù)的上限為2(從0開始計),最終控制了每個線程的執(zhí)行任務次數(shù)為repeats次。

示例四
Phaser支持分層功能,我們先來考慮下如何用利用Phaser的分層來實現(xiàn)高并發(fā)時的優(yōu)化,在示例三中,我們其實創(chuàng)建了10個任務,然后10個線程共用一個Phaser對象,如下圖:

如果任務數(shù)繼續(xù)增大,那么同步產(chǎn)生的開銷會非常大,利用Phaser分層的功能,我們可以限定每個Phaser對象的最大使用線程(任務數(shù)),如下圖:

可以看到,上述Phasers其實構(gòu)成了一顆多叉樹,如果任務數(shù)繼續(xù)增多,還可以將Phaser的葉子結(jié)點繼續(xù)分裂,然后將分裂出的子結(jié)點供工作線程使用。

public class PhaserTest4 {
    private static final int TASKS_PER_PHASER = 4;      // 每個Phaser對象對應的工作線程(任務)數(shù)

    public static void main(String[] args) throws IOException {

        int repeats = 3;    // 指定任務最多執(zhí)行的次數(shù)
        Phaser phaser = new Phaser() {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("---------------PHASE[" + phase + "],Parties[" + registeredParties + "] ---------------");
                return phase + 1 >= repeats || registeredParties == 0;
            }
        };

        Tasker[] taskers = new Tasker[10];
        build(taskers, 0, taskers.length, phaser);       // 根據(jù)任務數(shù),為每個任務分配Phaser對象

        for (int i = 0; i < taskers.length; i++) {          // 執(zhí)行任務
            Thread thread = new Thread(taskers[i]);
            thread.start();
        }
    }

    private static void build(Tasker[] taskers, int lo, int hi, Phaser phaser) {
        if (hi - lo > TASKS_PER_PHASER) {
            for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
                int j = Math.min(i + TASKS_PER_PHASER, hi);
                build(taskers, i, j, new Phaser(phaser));
            }
        } else {
            for (int i = lo; i < hi; ++i)
                taskers[i] = new Tasker(i, phaser);
        }

    }
}

class Task4 implements Runnable {
    private final Phaser phaser;

    Task4(Phaser phaser) {
        this.phaser = phaser;
        this.phaser.register();
    }

    @Override
    public void run() {
        while (!phaser.isTerminated()) {   //只要Phaser沒有終止, 各個線程的任務就會一直執(zhí)行
            int i = phaser.arriveAndAwaitAdvance();     // 等待其它參與者線程到達
            // do something
            System.out.println(Thread.currentThread().getName() + ": 執(zhí)行完任務");
        }
    }
}
輸出:
---------------PHASE[0],Parties[3] ---------------
Thread-9: 執(zhí)行完任務
Thread-6: 執(zhí)行完任務
Thread-5: 執(zhí)行完任務
Thread-4: 執(zhí)行完任務
Thread-1: 執(zhí)行完任務
Thread-0: 執(zhí)行完任務
Thread-7: 執(zhí)行完任務
Thread-8: 執(zhí)行完任務
Thread-2: 執(zhí)行完任務
Thread-3: 執(zhí)行完任務
---------------PHASE[1],Parties[3] ---------------
Thread-3: 執(zhí)行完任務
Thread-7: 執(zhí)行完任務
Thread-0: 執(zhí)行完任務
Thread-1: 執(zhí)行完任務
Thread-5: 執(zhí)行完任務
Thread-8: 執(zhí)行完任務
Thread-2: 執(zhí)行完任務
Thread-9: 執(zhí)行完任務
Thread-6: 執(zhí)行完任務
Thread-4: 執(zhí)行完任務
---------------PHASE[2],Parties[3] ---------------
Thread-4: 執(zhí)行完任務
Thread-2: 執(zhí)行完任務
Thread-8: 執(zhí)行完任務
Thread-0: 執(zhí)行完任務
Thread-3: 執(zhí)行完任務
Thread-9: 執(zhí)行完任務
Thread-6: 執(zhí)行完任務
Thread-7: 執(zhí)行完任務
Thread-1: 執(zhí)行完任務
Thread-5: 執(zhí)行完任務
三、Phaser原理

Phaser是本系列至今為止,內(nèi)部結(jié)構(gòu)最為復雜的同步器之一。在開始深入Phaser原理之前,我們有必要先來講講Phaser的內(nèi)部組織結(jié)構(gòu)和它的設(shè)計思想。

Phaser的內(nèi)部結(jié)構(gòu)

之前我們說過,Phaser支持樹形結(jié)構(gòu),在示例四中,也給出了一個通過分層提高并發(fā)性和程序執(zhí)行效率的例子。一個復雜分層結(jié)構(gòu)的Phaser樹的內(nèi)部結(jié)構(gòu)如下圖所示:

上面圖中的幾點關(guān)鍵點:

樹的根結(jié)點root鏈接著兩個“無鎖?!薄猅reiber Stack,用于保存等待線程(比如當線程等待Phaser進入下一階段時,會根據(jù)當前階段的奇偶性,把自己掛到某個棧中),所有Phaser對象都共享這兩個棧。

當首次將某個Phaser結(jié)點鏈接到樹中時,會同時向該結(jié)點的父結(jié)點注冊一個參與者。

為什么需要向父結(jié)點注冊參與者?

首先我們要明白對于Phaser來說,什么時候會發(fā)生躍遷(advance)進入下一階段?

廢話,當然是等它所有參與者都到達的時候。那么它所等待的參與者都包含那幾類呢?

①對于一個孤立的Phaser結(jié)點(也可以看成是只有一個根結(jié)點的樹)
其等待的參與者,就是顯式注冊的參與者,這也是最常見的情況。
比如下圖,如果有10個Task共用這個Phaser,那等待的參與者數(shù)就是10,當10個線程都到達后,Phaser就會躍遷至下一階段。

對于一個非孤立的Phaser葉子結(jié)點,比如下圖中標綠的葉子結(jié)點
這種情況和①一樣,子Phaser1和子Phaser2等待的參與者數(shù)是4,子Phaser3等待的參與者數(shù)是2。

對于一個非孤立非葉子的Phaser結(jié)點,比如上圖中標藍色的結(jié)點
這是最特殊的一種情況,這也是Phaser同步器關(guān)于分層的主要設(shè)計思路。
這種情況,結(jié)點所等待的參與者數(shù)目包含兩部分:

直接顯式注冊的參與者(通過構(gòu)造器或register方法)。——等于0

子結(jié)點的數(shù)目?!扔?

也就是說在上圖中,當左一的子Phaser1的4個參與者都到達后,它會通知父結(jié)點Phaser,自己的狀態(tài)已經(jīng)OK了,這時Phaser會認為子Phaser1已經(jīng)準備就緒,會將自己的到達者數(shù)量加1,同理,當子Phaser2子Phaser3的所有參與者分別到達后,它們也會依次通知Phaser,只有當Phaser(根結(jié)點)的到達者數(shù)量為3時,才會釋放“無鎖?!敝械却木€程,并將階段數(shù)phase增加1。

這是一種層層遞歸的設(shè)計,只要當根結(jié)點的所有參與者都到達后(也就是到達參數(shù)者數(shù)等于其子結(jié)點數(shù)),所有等待線程才會放行,柵欄才會進入下一階段。

了解了上面這些,我們再來看Phaser的源碼。

同步狀態(tài)定義

Phaser使用一個long類型來保存同步狀態(tài)值State,并按位劃分不同區(qū)域的含義,通過掩碼和位運算進行賦值和操作:

棧結(jié)點定義

“無鎖棧”——Treiber Stack,保存在Phaser樹的根結(jié)點中,其余所有Phaser子結(jié)點共享這兩個棧:

結(jié)點的定義非常簡單,內(nèi)部保存了線程信息和Phsaer對象信息:

注意:ForkJoinPool.ManagedBlocker是當棧包含F(xiàn)orkJoinWorkerThread類型的QNode阻塞的時候,F(xiàn)orkJoinPool內(nèi)部會增加一個工作線程來保證并行度,后續(xù)講ForkJoin框架時我們會進行分析。

Phaser的構(gòu)造器

Phaser一共有4個構(gòu)造器,可以看到,最終其實都是調(diào)用了Phaser(Phaser parent, int parties)這個構(gòu)造器。

Phaser(Phaser parent, int parties)的內(nèi)部實現(xiàn)如下,關(guān)鍵就是給當前的Phaser對象指定父結(jié)點時,如果當前Phaser的參與者不為0,需要向父Phaser注冊一個參與者(代表當前結(jié)點本身):

注冊參與者

Phaser提供了兩個注冊參與者的方法:

register:注冊單個參與者

bulkRegister:批量注冊參與者

這兩個方法都很簡單,內(nèi)部調(diào)用了doRegister方法:

/**
 * 注冊指定數(shù)目{#registrations}的參與者
 */
private int doRegister(int registrations) {
    // 首先計算注冊后當前State要調(diào)整的值adjust
    long adjust = ((long) registrations << PARTIES_SHIFT) | registrations;
    final Phaser parent = this.parent;
    int phase;
    for (; ; ) {
        long s = (parent == null) ? state : reconcileState();   // reconcileState()調(diào)整當前Phaser的State與root一致
        int counts = (int) s;
        int parties = counts >>> PARTIES_SHIFT;                 // 參與者數(shù)目
        int unarrived = counts & UNARRIVED_MASK;                // 未到達的數(shù)目
        if (registrations > MAX_PARTIES - parties)
            throw new IllegalStateException(badRegister(s));
        phase = (int) (s >>> PHASE_SHIFT);                      // 當前Phaser所處的階段phase
        if (phase < 0)
            break;
        if (counts != EMPTY) {                                  // CASE1: 當前Phaser已經(jīng)注冊過參與者
            if (parent == null || reconcileState() == s) {
                if (unarrived == 0)    // 參與者已全部到達柵欄, 當前Phaser正在Advance, 需要阻塞等待這一過程完成
                    root.internalAwaitAdvance(phase, null);
                else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adjust))    // 否則,直接更新State
                    break;
            }
        } else if (parent == null) {                            // CASE2: 當前Phaser未注冊過參與者(第一次注冊),且沒有父結(jié)點
            long next = ((long) phase << PHASE_SHIFT) | adjust;
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))  // CAS更新當前Phaser的State值
                break;
        } else {                                                // CASE3: 當前Phaser未注冊過參與者(第一次注冊),且有父結(jié)點
            synchronized (this) {
                if (state == s) {
                    phase = parent.doRegister(1);   // 向父結(jié)點注冊一個參與者
                    if (phase < 0)
                        break;
                    while (!UNSAFE.compareAndSwapLong(this, stateOffset, s,
                            ((long) phase << PHASE_SHIFT) | adjust)) {
                        s = state;
                        phase = (int) (root.state >>> PHASE_SHIFT);
                    }
                    break;
                }
            }
        }
    }
    return phase;
}

doRegister方法用來給當前Phaser對象注冊參與者,主要有三個分支:
①當前Phaser已經(jīng)注冊過參與者
如果參與者已經(jīng)全部到達柵欄,則當前線程需要阻塞等待(因為此時phase正在變化,增加1到下一個phase),否則直接更新State。

②當前Phaser未注冊過參與者(第一次注冊),且沒有父結(jié)點
這種情況最簡單,直接更新當前Phaser的State值。

③當前Phaser未注冊過參與者(第一次注冊),且有父結(jié)點
說明當前Phaser是新加入的葉子結(jié)點,需要向父結(jié)點注冊自身,同時更新自身的State值。

注意: reconcileState方法比較特殊,因為當出現(xiàn)樹形結(jié)構(gòu)時,根結(jié)點首先進行phase的更新,所以需要顯式同步,使當前結(jié)點和根結(jié)點保持一致。

另外,阻塞等待調(diào)用的是internalAwaitAdvance方法,其實就是根據(jù)當前階段phase,將線程包裝成結(jié)點加入到root結(jié)點所指向的某個“無鎖?!敝校?/p>

/**
 * internalAwaitAdvance的主要邏輯就是:當前參與者線程等待Phaser進入下一個階段(就是phase值變化).
 * @return 返回新的階段
 */
private int internalAwaitAdvance(int phase, QNode node) {
    // assert root == this;
    releaseWaiters(phase - 1);       // 清空不用的Treiber Stack(奇偶Stack交替使用)
    boolean queued = false;                 // 入隊標識
    int lastUnarrived = 0;
    int spins = SPINS_PER_ARRIVAL;
    long s;
    int p;
    while ((p = (int) ((s = state) >>> PHASE_SHIFT)) == phase) {
        if (node == null) {           // spinning in noninterruptible mode
            int unarrived = (int) s & UNARRIVED_MASK;
            if (unarrived != lastUnarrived &&
                    (lastUnarrived = unarrived) < NCPU)
                spins += SPINS_PER_ARRIVAL;
            boolean interrupted = Thread.interrupted();
            if (interrupted || --spins < 0) { // need node to record intr
                node = new QNode(this, phase, false, false, 0L);
                node.wasInterrupted = interrupted;
            }
        } else if (node.isReleasable()) // done or aborted
            break;
        else if (!queued) {           // 將結(jié)點壓入棧頂
            AtomicReference head = (phase & 1) == 0 ? evenQ : oddQ;
            QNode q = node.next = head.get();
            if ((q == null || q.phase == phase) &&
                    (int) (state >>> PHASE_SHIFT) == phase) // avoid stale enq
                queued = head.compareAndSet(q, node);
        } else {
            try {
                // 阻塞等待
                ForkJoinPool.managedBlock(node);
            } catch (InterruptedException ie) {
                node.wasInterrupted = true;
            }
        }
    }
?
    if (node != null) {
        if (node.thread != null)
            node.thread = null;       // avoid need for unpark()
        if (node.wasInterrupted && !node.interruptible)
            Thread.currentThread().interrupt();
        if (p == phase && (p = (int) (state >>> PHASE_SHIFT)) == phase)
            return abortWait(phase); // possibly clean up on abort
    }
    releaseWaiters(phase);
    return p;
}
參與者到達并等待

arriveAndAwaitAdvance的主要邏輯如下:
首先將同步狀態(tài)值State中的未到達參與者數(shù)量減1,然后判斷未到達參與者數(shù)量是否為0?

如果不為0,則阻塞當前線程,以等待其他參與者到來;

如果為0,說明當前線程是最后一個參與者,如果有父結(jié)點則對父結(jié)點遞歸調(diào)用該方法。(因為只有根結(jié)點的未到達參與者數(shù)目為0時),才會進階phase。

四、Phaser類/接口聲明 類聲明

構(gòu)造器聲明

接口聲明


文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/76698.html

相關(guān)文章

  • Java線程進階(一)—— J.U.C并發(fā)包概述

    摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設(shè)計模式,設(shè)計了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...

    anonymoussf 評論0 收藏0
  • Java線程進階(十八)—— J.U.Csynchronizer框架:CountDownLatc

    摘要:線程可以調(diào)用的方法進入阻塞,當計數(shù)值降到時,所有之前調(diào)用阻塞的線程都會釋放。注意的初始計數(shù)值一旦降到,無法重置。 showImg(https://segmentfault.com/img/remote/1460000016012041); 本文首發(fā)于一世流云的專欄:https://segmentfault.com/blog... 一、CountDownLatch簡介 CountDow...

    Elle 評論0 收藏0
  • Java線程進階(二)—— J.U.Clocks框架:接口

    摘要:二接口簡介可以看做是類的方法的替代品,與配合使用。當線程執(zhí)行對象的方法時,當前線程會立即釋放鎖,并進入對象的等待區(qū),等待其它線程喚醒或中斷。 showImg(https://segmentfault.com/img/remote/1460000016012601); 本文首發(fā)于一世流云的專欄:https://segmentfault.com/blog... 本系列文章中所說的juc-...

    dkzwm 評論0 收藏0
  • Java線程進階(三九)—— J.U.Cexecutors框架:executors框架概述

    摘要:注意線程與本地操作系統(tǒng)的線程是一一映射的。固定線程數(shù)的線程池提供了兩種創(chuàng)建具有固定線程數(shù)的的方法,固定線程池在初始化時確定其中的線程總數(shù),運行過程中會始終維持線程數(shù)量不變。 showImg(https://segmentfault.com/img/bVbhK58?w=1920&h=1080); 本文首發(fā)于一世流云專欄:https://segmentfault.com/blog... ...

    wdzgege 評論0 收藏0
  • Java線程進階(三)—— J.U.Clocks框架:ReentrantLock

    摘要:公平策略在多個線程爭用鎖的情況下,公平策略傾向于將訪問權(quán)授予等待時間最長的線程。使用方式的典型調(diào)用方式如下二類原理的源碼非常簡單,它通過內(nèi)部類實現(xiàn)了框架,接口的實現(xiàn)僅僅是對的的簡單封裝,參見原理多線程進階七鎖框架獨占功能剖析 showImg(https://segmentfault.com/img/remote/1460000016012582); 本文首發(fā)于一世流云的專欄:https...

    jasperyang 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<