成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

基于AQS構(gòu)建CountDownLatch、CyclicBarrier和Semaphore

shixinzhang / 3470人閱讀

摘要:對(duì)于,我們僅僅需要關(guān)心兩個(gè)方法,一個(gè)是方法,另一個(gè)是方法。首先,我們來看方法,它代表線程阻塞,等待的值減為。首先,的源碼實(shí)現(xiàn)和大相徑庭,基于的共享模式的使用,而基于來實(shí)現(xiàn)。

前言

本文先用 CountDownLatch 將共享模式說清楚,然后順著把其他 AQS 相關(guān)的類 CyclicBarrier、Semaphore 的源碼一起過一下。

CountDownLatch

CountDownLatch 這個(gè)類是比較典型的 AQS 的共享模式的使用,這是一個(gè)高頻使用的類。latch 的中文意思是門栓、柵欄,具體怎么解釋我就不廢話了,大家隨意,看兩個(gè)例子就知道在哪里用、怎么用了。

使用例子

我們看下 Doug Lea 在 java doc 中給出的例子,這個(gè)例子非常實(shí)用,我們經(jīng)常會(huì)寫這個(gè)代碼。

假設(shè)我們有 N ( N > 0 ) 個(gè)任務(wù),那么我們會(huì)用 N 來初始化一個(gè) CountDownLatch,然后將這個(gè) latch 的引用傳遞到各個(gè)線程中,在每個(gè)線程完成了任務(wù)后,調(diào)用 latch.countDown() 代表完成了一個(gè)任務(wù)。

調(diào)用 latch.await() 的方法的線程會(huì)阻塞,直到所有的任務(wù)完成。

class Driver2 { // ...
    void main() throws InterruptedException {
        CountDownLatch doneSignal = new CountDownLatch(N);
        Executor e = Executors.newFixedThreadPool(8);

        // 創(chuàng)建 N 個(gè)任務(wù),提交給線程池來執(zhí)行
        for (int i = 0; i < N; ++i) // create and start threads
            e.execute(new WorkerRunnable(doneSignal, i));

        // 等待所有的任務(wù)完成,這個(gè)方法才會(huì)返回
        doneSignal.await();           // wait for all to finish
    }
}

class WorkerRunnable implements Runnable {
    private final CountDownLatch doneSignal;
    private final int i;

    WorkerRunnable(CountDownLatch doneSignal, int i) {
        this.doneSignal = doneSignal;
        this.i = i;
    }

    public void run() {
        try {
            doWork(i);
            // 這個(gè)線程的任務(wù)完成了,調(diào)用 countDown 方法
            doneSignal.countDown();
        } catch (InterruptedException ex) {
        } // return;
    }

    void doWork() { ...}
}

所以說 CountDownLatch 非常實(shí)用,我們常常會(huì)將一個(gè)比較大的任務(wù)進(jìn)行拆分,然后開啟多個(gè)線程來執(zhí)行,等所有線程都執(zhí)行完了以后,再往下執(zhí)行其他操作。這里例子中,只有 main 線程調(diào)用了 await 方法。

我們?cè)賮砜戳硪粋€(gè)例子,這個(gè)例子很典型,用了兩個(gè) CountDownLatch:

class Driver { // ...
    void main() throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(N);

        for (int i = 0; i < N; ++i) // create and start threads
            new Thread(new Worker(startSignal, doneSignal)).start();

        // 這邊插入一些代碼,確保上面的每個(gè)線程先啟動(dòng)起來,才執(zhí)行下面的代碼。
        doSomethingElse();            // don"t let run yet
        // 因?yàn)檫@里 N == 1,所以,只要調(diào)用一次,那么所有的 await 方法都可以通過
        startSignal.countDown();      // let all threads proceed
        doSomethingElse();
        // 等待所有任務(wù)結(jié)束
        doneSignal.await();           // wait for all to finish
    }
}

class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;

    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }

    public void run() {
        try {
            // 為了讓所有線程同時(shí)開始任務(wù),我們讓所有線程先阻塞在這里
            // 等大家都準(zhǔn)備好了,再打開這個(gè)門栓
            startSignal.await();
            doWork();
            doneSignal.countDown();
        } catch (InterruptedException ex) {
        } // return;
    }

    void doWork() { ...}
}

這個(gè)例子中,doneSignal 同第一個(gè)例子的使用,我們說說這里的 startSignal。N 個(gè)新開啟的線程都調(diào)用了startSignal.await() 進(jìn)行阻塞等待,它們阻塞在柵欄上,只有當(dāng)條件滿足的時(shí)候(startSignal.countDown()),它們才能同時(shí)通過這個(gè)柵欄。如果始終只有一個(gè)線程調(diào)用 await 方法等待任務(wù)完成,那么 CountDownLatch 就會(huì)簡(jiǎn)單很多,所以之后的源碼分析讀者一定要在腦海中構(gòu)建出這么一個(gè)場(chǎng)景:有 m 個(gè)線程是做任務(wù)的,有 n 個(gè)線程在某個(gè)柵欄上等待這 m 個(gè)線程做完任務(wù),直到所有 m 個(gè)任務(wù)完成后,n 個(gè)線程同時(shí)通過柵欄。

源碼分析

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
// 老套路了,內(nèi)部封裝一個(gè) Sync 類繼承自 AQS
private static final class Sync extends AbstractQueuedSynchronizer {
    Sync(int count) {
        // 這樣就 state == count 了
        setState(count);
    }
    ...
}

代碼都是套路,先分析套路:AQS 里面的 state 是一個(gè)整數(shù)值,這邊用一個(gè) int count 參數(shù)其實(shí)初始化就是設(shè)置了這個(gè)值,所有調(diào)用了 await 方法的等待線程會(huì)掛起,然后有其他一些線程會(huì)做 state = state - 1 操作,當(dāng) state 減到 0 的同時(shí),那個(gè)線程會(huì)負(fù)責(zé)喚醒調(diào)用了 await 方法的所有線程。都是套路啊,只是 Doug Lea 的套路很深,代碼很巧妙,不然我們也沒有要分析源碼的必要。

對(duì)于 CountDownLatch,我們僅僅需要關(guān)心兩個(gè)方法,一個(gè)是 countDown() 方法,另一個(gè)是 await() 方法。countDown() 方法每次調(diào)用都會(huì)將 state 減 1,直到 state 的值為 0;而 await 是一個(gè)阻塞方法,當(dāng) state 減為 0 的時(shí)候,await 方法才會(huì)返回。await 可以被多個(gè)線程調(diào)用,讀者這個(gè)時(shí)候腦子里要有個(gè)圖:所有調(diào)用了 await 方法的線程阻塞在 AQS 的阻塞隊(duì)列中,等待條件滿足(state == 0),將線程從隊(duì)列中一個(gè)個(gè)喚醒過來。

我們用以下程序來分析源碼,t1 和 t2 負(fù)責(zé)調(diào)用 countDown() 方法,t3 和 t4 調(diào)用 await 方法阻塞:

public class CountDownLatchDemo {

    public static void main(String[] args) {

        CountDownLatch latch = new CountDownLatch(2);

        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException ignore) {
                }
                // 休息 5 秒后(模擬線程工作了 5 秒),調(diào)用 countDown()
                latch.countDown();
            }
        }, "t1");

        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException ignore) {
                }
                // 休息 10 秒后(模擬線程工作了 10 秒),調(diào)用 countDown()
                latch.countDown();
            }
        }, "t2");

        t1.start();
        t2.start();

        Thread t3 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    // 阻塞,等待 state 減為 0
                    latch.await();
                    System.out.println("線程 t3 從 await 中返回了");
                } catch (InterruptedException e) {
                    System.out.println("線程 t3 await 被中斷");
                    Thread.currentThread().interrupt();
                }
            }
        }, "t3");
        Thread t4 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    // 阻塞,等待 state 減為 0
                    latch.await();
                    System.out.println("線程 t4 從 await 中返回了");
                } catch (InterruptedException e) {
                    System.out.println("線程 t4 await 被中斷");
                    Thread.currentThread().interrupt();
                }
            }
        }, "t4");

        t3.start();
        t4.start();
    }
}

上述程序,大概在過了 10 秒左右的時(shí)候,會(huì)輸出:

線程 t3 從 await 中返回了
線程 t4 從 await 中返回了
// 這兩條輸出,順序不是絕對(duì)的
// 后面的分析,我們假設(shè) t3 先進(jìn)入阻塞隊(duì)列

接下來,我們按照流程一步一步走:先 await 等待,然后被喚醒,await 方法返回。

首先,我們來看 await() 方法,它代表線程阻塞,等待 state 的值減為 0。

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 這也是老套路了,我在第二篇的中斷那一節(jié)說過了
    if (Thread.interrupted())
        throw new InterruptedException();
    // t3 和 t4 調(diào)用 await 的時(shí)候,state 都大于 0。
    // 也就是說,這個(gè) if 返回 true,然后往里看
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
// 只有當(dāng) state == 0 的時(shí)候,這個(gè)方法才會(huì)返回 1
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

從方法名我們就可以看出,這個(gè)方法是獲取共享鎖,并且此方法是可中斷的(中斷的時(shí)候拋出 InterruptedException 退出這個(gè)方法)。

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 1. 入隊(duì)
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 同上,只要 state 不等于 0,那么這個(gè)方法返回 -1
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 2
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

我們?cè)僖徊讲娇淳唧w的流程。首先,我們看 countDown() 方法:

public void countDown() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    // 只有當(dāng) state 減為 0 的時(shí)候,tryReleaseShared 才返回 true
    // 否則只是簡(jiǎn)單的 state = state - 1 那么 countDown 方法就結(jié)束了
    if (tryReleaseShared(arg)) {
        // 喚醒 await 的線程
        doReleaseShared();
        return true;
    }
    return false;
}
// 這個(gè)方法很簡(jiǎn)單,用自旋的方法實(shí)現(xiàn) state 減 1
protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

countDown 方法就是每次調(diào)用都將 state 值減 1,如果 state 減到 0 了,那么就調(diào)用下面的方法進(jìn)行喚醒阻塞隊(duì)列中的線程:

// 調(diào)用這個(gè)方法的時(shí)候,state == 0
// 這個(gè)方法先不要看所有的代碼,按照思路往下到我寫注釋的地方,其他的之后還會(huì)仔細(xì)分析
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // t3 入隊(duì)的時(shí)候,已經(jīng)將頭節(jié)點(diǎn)的 waitStatus 設(shè)置為 Node.SIGNAL(-1) 了
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 就是這里,喚醒 head 的后繼節(jié)點(diǎn),也就是阻塞隊(duì)列中的第一個(gè)節(jié)點(diǎn)
                // 在這里,也就是喚醒 t3
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // todo
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

一旦 t3 被喚醒后,我們繼續(xù)回到 await 的這段代碼,parkAndCheckInterrupt 返回,我們先不考慮中斷的情況:

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r); // 2. 這里是下一步
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 1. 喚醒后這個(gè)方法返回
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

接下來,t3 會(huì)進(jìn)到 setHeadAndPropagate(node, r) 這個(gè)方法,先把 head 給占了,然后喚醒隊(duì)列中其他的線程:

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);

    // 下面說的是,喚醒當(dāng)前 node 之后的節(jié)點(diǎn),即 t3 已經(jīng)醒了,馬上喚醒 t4
    // 類似的,如果 t4 后面還有 t5,那么 t4 醒了以后,馬上將 t5 給喚醒了
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            // 又是這個(gè)方法,只是現(xiàn)在的 head 已經(jīng)不是原來的空節(jié)點(diǎn)了,是 t3 的節(jié)點(diǎn)了
            doReleaseShared();
    }
}

又回到這個(gè)方法了,那么接下來,我們好好分析 doReleaseShared 這個(gè)方法,我們根據(jù)流程,頭節(jié)點(diǎn) head 此時(shí)是 t3 節(jié)點(diǎn)了:

// 調(diào)用這個(gè)方法的時(shí)候,state == 0
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 1. h == null: 說明阻塞隊(duì)列為空
        // 2. h == tail: 說明頭結(jié)點(diǎn)可能是剛剛初始化的頭節(jié)點(diǎn),
        //   或者是普通線程節(jié)點(diǎn),但是此節(jié)點(diǎn)既然是頭節(jié)點(diǎn)了,那么代表已經(jīng)被喚醒了,阻塞隊(duì)列沒有其他節(jié)點(diǎn)了
        // 所以這兩種情況不需要進(jìn)行喚醒后繼節(jié)點(diǎn)
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // t4 將頭節(jié)點(diǎn)(此時(shí)是 t3)的 waitStatus 設(shè)置為 Node.SIGNAL(-1) 了
            if (ws == Node.SIGNAL) {
                // 這里 CAS 失敗的場(chǎng)景請(qǐng)看下面的解讀
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 就是這里,喚醒 head 的后繼節(jié)點(diǎn),也就是阻塞隊(duì)列中的第一個(gè)節(jié)點(diǎn)
                // 在這里,也就是喚醒 t4
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     // 這個(gè) CAS 失敗的場(chǎng)景是:執(zhí)行到這里的時(shí)候,剛好有一個(gè)節(jié)點(diǎn)入隊(duì),入隊(duì)會(huì)將這個(gè) ws 設(shè)置為 -1
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 如果到這里的時(shí)候,前面喚醒的線程已經(jīng)占領(lǐng)了 head,那么再循環(huán)
        // 否則,就是 head 沒變,那么退出循環(huán),
        // 退出循環(huán)是不是意味著阻塞隊(duì)列中的其他節(jié)點(diǎn)就不喚醒了?當(dāng)然不是,喚醒的線程之后還是會(huì)調(diào)用這個(gè)方法的
        if (h == head)                   // loop if head changed
            break;
    }
}

我們分析下最后一個(gè) if 語(yǔ)句,然后才能解釋第一個(gè) CAS 為什么可能會(huì)失?。?/p>

h == head:說明頭節(jié)點(diǎn)還沒有被剛剛用 unparkSuccessor 喚醒的線程(這里可以理解為 t4)占有,此時(shí) break 退出循環(huán)。
h != head:頭節(jié)點(diǎn)被剛剛喚醒的線程(這里可以理解為 t4)占有,那么這里重新進(jìn)入下一輪循環(huán),喚醒下一個(gè)節(jié)點(diǎn)(這里是 t4 )。我們知道,等到 t4 被喚醒后,其實(shí)是會(huì)主動(dòng)喚醒 t5、t6、t7...,那為什么這里要進(jìn)行下一個(gè)循環(huán)來喚醒 t5 呢?我覺得是出于吞吐量的考慮。
滿足上面的 2 的場(chǎng)景,那么我們就能知道為什么上面的 CAS 操作 compareAndSetWaitStatus(h, Node.SIGNAL, 0) 會(huì)失敗了?

因?yàn)楫?dāng)前進(jìn)行 for 循環(huán)的線程到這里的時(shí)候,可能剛剛喚醒的線程 t4 也剛剛好到這里了,那么就有可能 CAS 失敗了。

for 循環(huán)第一輪的時(shí)候會(huì)喚醒 t4,t4 醒后會(huì)將自己設(shè)置為頭節(jié)點(diǎn),如果在 t4 設(shè)置頭節(jié)點(diǎn)后,for 循環(huán)才跑到 if (h == head),那么此時(shí)會(huì)返回 false,for 循環(huán)會(huì)進(jìn)入下一輪。t4 喚醒后也會(huì)進(jìn)入到這個(gè)方法里面,那么 for 循環(huán)第二輪和 t4 就有可能在這個(gè) CAS 相遇,那么就只會(huì)有一個(gè)成功了。

CyclicBarrier

字面意思是“可重復(fù)使用的柵欄”,CyclicBarrier 相比 CountDownLatch 來說,要簡(jiǎn)單很多,其源碼沒有什么高深的地方,它是 ReentrantLock 和 Condition 的組合使用。看如下示意圖,CyclicBarrier 和 CountDownLatch 是不是很像,只是 CyclicBarrier 可以有不止一個(gè)柵欄,因?yàn)樗臇艡冢˙arrier)可以重復(fù)使用(Cyclic)。
首先,CyclicBarrier 的源碼實(shí)現(xiàn)和 CountDownLatch 大相徑庭,CountDownLatch 基于 AQS 的共享模式的使用,而 CyclicBarrier 基于 Condition 來實(shí)現(xiàn)。

因?yàn)?CyclicBarrier 的源碼相對(duì)來說簡(jiǎn)單許多,讀者只要熟悉了前面關(guān)于 Condition 的分析,那么這里的源碼是毫無壓力的,就是幾個(gè)特殊概念罷了。

廢話結(jié)束,先上基本屬性和構(gòu)造方法:

public class CyclicBarrier {
    // 我們說了,CyclicBarrier 是可以重復(fù)使用的,我們把每次從開始使用到穿過柵欄當(dāng)做"一代"
    private static class Generation {
        boolean broken = false;
    }

    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    // CyclicBarrier 是基于 Condition 的
    // Condition 是“條件”的意思,CyclicBarrier 的等待線程通過 barrier 的“條件”是大家都到了柵欄上
    private final Condition trip = lock.newCondition();

    // 參與的線程數(shù)
    private final int parties;

    // 如果設(shè)置了這個(gè),代表越過柵欄之前,要執(zhí)行相應(yīng)的操作
    private final Runnable barrierCommand;

    // 當(dāng)前所處的“代”
    private Generation generation = new Generation();

    // 還沒有到柵欄的線程數(shù),這個(gè)值初始為 parties,然后遞減
    // 還沒有到柵欄的線程數(shù) = parties - 已經(jīng)到柵欄的數(shù)量
    private int count;

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

我用一圖來描繪下 CyclicBarrier 里面的一些概念:

看圖我們也知道了,CyclicBarrier 的源碼最重要的就是 await() 方法了。

首先,先看怎么開啟新的一代:

// 開啟新的一代,當(dāng)最后一個(gè)線程到達(dá)柵欄上的時(shí)候,調(diào)用這個(gè)方法來喚醒其他線程,同時(shí)初始化“下一代”
private void nextGeneration() {
    // 首先,需要喚醒所有的在柵欄上等待的線程
    trip.signalAll();
    // 更新 count 的值
    count = parties;
    // 重新生成“新一代”
    generation = new Generation();
}

看看怎么打破一個(gè)柵欄:

private void breakBarrier() {
    // 設(shè)置狀態(tài) broken 為 true
    generation.broken = true;
    // 重置 count 為初始值 parties
    count = parties;
    // 喚醒所有已經(jīng)在等待的線程
    trip.signalAll();
}

這兩個(gè)方法之后用得到,現(xiàn)在開始分析最重要的等待通過柵欄方法 await 方法:

// 不帶超時(shí)機(jī)制
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
// 帶超時(shí)機(jī)制,如果超時(shí)拋出 TimeoutException 異常
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

繼續(xù)往里看:

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
    final ReentrantLock lock = this.lock;
    // 先要獲取到鎖,然后在 finally 中要記得釋放鎖
    // 如果記得 Condition 部分的話,我們知道 condition 的 await 會(huì)釋放鎖,signal 的時(shí)候需要重新獲取鎖
    lock.lock();
    try {
        final Generation g = generation;
        // 檢查柵欄是否被打破,如果被打破,拋出 BrokenBarrierException 異常
        if (g.broken)
            throw new BrokenBarrierException();
        // 檢查中斷狀態(tài),如果中斷了,拋出 InterruptedException 異常
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        // index 是這個(gè) await 方法的返回值
        // 注意到這里,這個(gè)是從 count 遞減后得到的值
        int index = --count;

        // 如果等于 0,說明所有的線程都到柵欄上了,準(zhǔn)備通過
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                // 如果在初始化的時(shí)候,指定了通過柵欄前需要執(zhí)行的操作,在這里會(huì)得到執(zhí)行
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                // 如果 ranAction 為 true,說明執(zhí)行 command.run() 的時(shí)候,沒有發(fā)生異常退出的情況
                ranAction = true;
                // 喚醒等待的線程,然后開啟新的一代
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    // 進(jìn)到這里,說明執(zhí)行指定操作的時(shí)候,發(fā)生了異常,那么需要打破柵欄
                    // 之前我們說了,打破柵欄意味著喚醒所有等待的線程,設(shè)置 broken 為 true,重置 count 為 parties
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        // 如果是最后一個(gè)線程調(diào)用 await,那么上面就返回了
        // 下面的操作是給那些不是最后一個(gè)到達(dá)柵欄的線程執(zhí)行的
        for (;;) {
            try {
                // 如果帶有超時(shí)機(jī)制,調(diào)用帶超時(shí)的 Condition 的 await 方法等待,直到最后一個(gè)線程調(diào)用 await
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 如果到這里,說明等待的線程在 await(是 Condition 的 await)的時(shí)候被中斷
                if (g == generation && ! g.broken) {
                    // 打破柵欄
                    breakBarrier();
                    // 打破柵欄后,重新拋出這個(gè) InterruptedException 異常給外層調(diào)用的方法
                    throw ie;
                } else {
                    // 到這里,說明 g != generation, 說明新的一代已經(jīng)產(chǎn)生,即最后一個(gè)線程 await 執(zhí)行完成,
                    // 那么此時(shí)沒有必要再拋出 InterruptedException 異常,記錄下來這個(gè)中斷信息即可
                    // 或者是柵欄已經(jīng)被打破了,那么也不應(yīng)該拋出 InterruptedException 異常,
                    // 而是之后拋出 BrokenBarrierException 異常
                    Thread.currentThread().interrupt();
                }
            }

              // 喚醒后,檢查柵欄是否是“破的”
            if (g.broken)
                throw new BrokenBarrierException();

            // 這個(gè) for 循環(huán)除了異常,就是要從這里退出了
            // 我們要清楚,最后一個(gè)線程在執(zhí)行完指定任務(wù)(如果有的話),會(huì)調(diào)用 nextGeneration 來開啟一個(gè)新的代
            // 然后釋放掉鎖,其他線程從 Condition 的 await 方法中得到鎖并返回,然后到這里的時(shí)候,其實(shí)就會(huì)滿足 g != generation 的
            // 那什么時(shí)候不滿足呢?barrierCommand 執(zhí)行過程中拋出了異常,那么會(huì)執(zhí)行打破柵欄操作,
            // 設(shè)置 broken 為true,然后喚醒這些線程。這些線程會(huì)從上面的 if (g.broken) 這個(gè)分支拋 BrokenBarrierException 異常返回
            // 當(dāng)然,還有最后一種可能,那就是 await 超時(shí),此種情況不會(huì)從上面的 if 分支異常返回,也不會(huì)從這里返回,會(huì)執(zhí)行后面的代碼
            if (g != generation)
                return index;

            // 如果醒來發(fā)現(xiàn)超時(shí)了,打破柵欄,拋出異常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

好了,我想我應(yīng)該講清楚了吧,我好像幾乎沒有漏掉任何一行代碼吧?

下面開始收尾工作。

首先,我們看看怎么得到有多少個(gè)線程到了柵欄上,處于等待狀態(tài):

public int getNumberWaiting() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return parties - count;
    } finally {
        lock.unlock();
    }
}

判斷一個(gè)柵欄是否被打破了,這個(gè)很簡(jiǎn)單,直接看 broken 的值即可:

public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally {
        lock.unlock();
    }
}

前面我們?cè)谡f await 的時(shí)候也幾乎說清楚了,什么時(shí)候柵欄會(huì)被打破,總結(jié)如下:

1.中斷,我們說了,如果某個(gè)等待的線程發(fā)生了中斷,那么會(huì)打破柵欄,同時(shí)拋出 InterruptedException 異常;
2.超時(shí),打破柵欄,同時(shí)拋出 TimeoutException 異常;
3.指定執(zhí)行的操作拋出了異常,這個(gè)我們前面也說過。
最后,我們來看看怎么重置一個(gè)柵欄:

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

我們?cè)O(shè)想一下,如果初始化時(shí),指定了線程 parties = 4,前面有 3 個(gè)線程調(diào)用了 await 等待,在第 4 個(gè)線程調(diào)用 await 之前,我們調(diào)用 reset 方法,那么會(huì)發(fā)生什么?

首先,打破柵欄,那意味著所有等待的線程(3個(gè)等待的線程)會(huì)喚醒,await 方法會(huì)通過拋出 BrokenBarrierException 異常返回。然后開啟新的一代,重置了 count 和 generation,相當(dāng)于一切歸零了。

怎么樣,CyclicBarrier 源碼很簡(jiǎn)單吧。

Semaphore

有了 CountDownLatch 的基礎(chǔ)后,分析 Semaphore 會(huì)簡(jiǎn)單很多。Semaphore 是什么呢?它類似一個(gè)資源池(讀者可以類比線程池),每個(gè)線程需要調(diào)用 acquire() 方法獲取資源,然后才能執(zhí)行,執(zhí)行完后,需要 release 資源,讓給其他的線程用。

大概大家也可以猜到,Semaphore 其實(shí)也是 AQS 中共享鎖的使用,因?yàn)槊總€(gè)線程共享一個(gè)池嘛。

套路解讀:創(chuàng)建 Semaphore 實(shí)例的時(shí)候,需要一個(gè)參數(shù) permits,這個(gè)基本上可以確定是設(shè)置給 AQS 的 state 的,然后每個(gè)線程調(diào)用 acquire 的時(shí)候,執(zhí)行 state = state - 1,release 的時(shí)候執(zhí)行 state = state + 1,當(dāng)然,acquire 的時(shí)候,如果 state = 0,說明沒有資源了,需要等待其他線程 release。

構(gòu)造方法:

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

這里和 ReentrantLock 類似,用了公平策略和非公平策略。

看 acquire 方法:

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public void acquireUninterruptibly() {
    sync.acquireShared(1);
}
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}
public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

這幾個(gè)方法也是老套路了,大家基本都懂了吧,這邊多了兩個(gè)可以傳參的 acquire 方法,不過大家也都懂的吧,如果我們需要一次獲取超過一個(gè)的資源,會(huì)用得著這個(gè)的。

我們接下來看不拋出 InterruptedException 異常的 acquireUninterruptibly() 方法吧:

public void acquireUninterruptibly() {
    sync.acquireShared(1);
}
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

前面說了,Semaphore 分公平策略和非公平策略,我們對(duì)比一下兩個(gè) tryAcquireShared 方法:

// 公平策略:
protected int tryAcquireShared(int acquires) {
    for (;;) {
        // 區(qū)別就在于是不是會(huì)先判斷是否有線程在排隊(duì),然后才進(jìn)行 CAS 減操作
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
// 非公平策略:
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

也是老套路了,所以從源碼分析角度的話,我們其實(shí)不太需要關(guān)心是不是公平策略還是非公平策略,它們的區(qū)別往往就那么一兩行。

我們?cè)倩氐?acquireShared 方法,

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

由于 tryAcquireShared(arg) 返回小于 0 的時(shí)候,說明 state 已經(jīng)小于 0 了(沒資源了),此時(shí) acquire 不能立馬拿到資源,需要進(jìn)入到阻塞隊(duì)列等待,雖然貼了很多代碼,不在乎多這點(diǎn)了:

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

這個(gè)方法我就不介紹了,線程掛起后等待有資源被 release 出來。接下來,我們就要看 release 的方法了:

// 任務(wù)介紹,釋放一個(gè)資源
public void release() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        // 溢出,當(dāng)然,我們一般也不會(huì)用這么大的數(shù)
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

tryReleaseShared 方法總是會(huì)返回 true,然后是 doReleaseShared,這個(gè)也是我們熟悉的方法了,我就貼下代碼,不分析了,這個(gè)方法用于喚醒所有的等待線程:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

Semphore 的源碼確實(shí)很簡(jiǎn)單,基本上都是分析過的老代碼的組合使用了。

總結(jié)

寫到這里,終于把 AbstractQueuedSynchronizer 基本上說完了,對(duì)于 Java 并發(fā),Doug Lea 真的是神一樣的存在。日后我們還會(huì)接觸到很多 Doug Lea 的代碼,希望我們大家都可以朝著大神的方向不斷打磨自己的技術(shù),少一些高大上的架構(gòu),多一些實(shí)實(shí)在在的優(yōu)秀代碼吧。

(全文完)

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/72339.html

相關(guān)文章

  • 長(zhǎng)文慎入-探索Java并發(fā)編程與高并發(fā)解決方案

    摘要:所有示例代碼請(qǐng)見下載于基本概念并發(fā)同時(shí)擁有兩個(gè)或者多個(gè)線程,如果程序在單核處理器上運(yùn)行多個(gè)線程將交替地?fù)Q入或者換出內(nèi)存這些線程是同時(shí)存在的,每個(gè)線程都處于執(zhí)行過程中的某個(gè)狀態(tài),如果運(yùn)行在多核處理器上此時(shí),程序中的每個(gè)線程都 所有示例代碼,請(qǐng)見/下載于 https://github.com/Wasabi1234... showImg(https://upload-images.jians...

    SimpleTriangle 評(píng)論0 收藏0
  • Java并發(fā)多線程 - 并發(fā)工具類JUC

    摘要:將屏障重置為其初始狀態(tài)。注意,在由于其他原因造成損壞之后,實(shí)行重置可能會(huì)變得很復(fù)雜此時(shí)需要使用其他方式重新同步線程,并選擇其中一個(gè)線程來執(zhí)行重置。 安全共享對(duì)象策略 1.線程限制 : 一個(gè)被線程限制的對(duì)象,由線程獨(dú)占,并且只能被占有它的線程修改2.共享只讀 : 一個(gè)共享只讀的對(duì)象,在沒有額外同步的情況下,可以被多個(gè)線程并發(fā)訪問,但是任何線程都不能修改它3.線程安全對(duì)象 : 一個(gè)線程安全...

    wuyumin 評(píng)論0 收藏0
  • Java多線程&高并發(fā)

    摘要:線程啟動(dòng)規(guī)則對(duì)象的方法先行發(fā)生于此線程的每一個(gè)動(dòng)作。所以局部變量是不被多個(gè)線程所共享的,也就不會(huì)出現(xiàn)并發(fā)問題。通過獲取到數(shù)據(jù),放入當(dāng)前線程處理完之后將當(dāng)前線程中的信息移除。主線程必須在啟動(dòng)其他線程后立即調(diào)用方法。 一、線程安全性 定義:當(dāng)多個(gè)線程訪問某個(gè)類時(shí),不管運(yùn)行時(shí)環(huán)境采用何種調(diào)度方式,或者這些線程將如何交替執(zhí)行,并且在主調(diào)代碼中不需要任何額外的同步或協(xié)同,這個(gè)類都能表現(xiàn)出正確的行...

    SQC 評(píng)論0 收藏0
  • BATJ都愛問的多線程面試題

    摘要:今天給大家總結(jié)一下,面試中出鏡率很高的幾個(gè)多線程面試題,希望對(duì)大家學(xué)習(xí)和面試都能有所幫助。指令重排在單線程環(huán)境下不會(huì)出先問題,但是在多線程環(huán)境下會(huì)導(dǎo)致一個(gè)線程獲得還沒有初始化的實(shí)例。使用可以禁止的指令重排,保證在多線程環(huán)境下也能正常運(yùn)行。 下面最近發(fā)的一些并發(fā)編程的文章匯總,通過閱讀這些文章大家再看大廠面試中的并發(fā)編程問題就沒有那么頭疼了。今天給大家總結(jié)一下,面試中出鏡率很高的幾個(gè)多線...

    高勝山 評(píng)論0 收藏0
  • Java 線程同步組件 CountDownLatchCyclicBarrier 原理分析

    摘要:在創(chuàng)建對(duì)象時(shí),需要轉(zhuǎn)入一個(gè)值,用于初始化的成員變量,該成員變量表示屏障攔截的線程數(shù)。當(dāng)?shù)竭_(dá)屏障的線程數(shù)小于時(shí),這些線程都會(huì)被阻塞住。當(dāng)所有線程到達(dá)屏障后,將會(huì)被更新,表示進(jìn)入新一輪的運(yùn)行輪次中。 1.簡(jiǎn)介 在分析完AbstractQueuedSynchronizer(以下簡(jiǎn)稱 AQS)和ReentrantLock的原理后,本文將分析 java.util.concurrent 包下的兩個(gè)...

    Anonymous1 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<