成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

線程間的同步與通信(6)——CountDownLatch源碼分析

longmon / 3294人閱讀

摘要:相較于方法,提供了超時等待機制注意,在方法中,我們用到了的返回值,如果該方法因為超時而退出時,則將返回。的這個返回值有助于我們理解該方法究竟是因為獲取到了鎖而返回,還是因為超時時間到了而返回。

前言

系列文章目錄

CountDownLatch是一個很有用的工具,latch是門閂的意思,該工具是為了解決某些操作只能在一組操作全部執(zhí)行完成后才能執(zhí)行的情景。例如,小組早上開會,只有等所有人到齊了才能開;再如,游樂園里的過山車,一次可以坐10個人,為了節(jié)約成本,通常是等夠10個人了才開。CountDown是倒數(shù)計數(shù),所以CountDownLatch的用法通常是設(shè)定一個大于0的值,該值即代表需要等待的總?cè)蝿?wù)數(shù),每完成一個任務(wù)后,將總?cè)蝿?wù)數(shù)減一,直到最后該值為0,說明所有等待的任務(wù)都執(zhí)行完了,“門閂”此時就被打開,后面的任務(wù)可以繼續(xù)執(zhí)行。

CountDownLatch本身是基于共享鎖實現(xiàn)的,如果你還不了解共享鎖,建議先讀一下逐行分析AQS源碼(3)——共享鎖的獲取與釋放,然后再繼續(xù)往下看。

核心屬性

CountDownLatch主要是通過AQS的共享鎖機制實現(xiàn)的,因此它的核心屬性只有一個sync,它繼承自AQS,同時覆寫了tryAcquireSharedtryReleaseShared,以完成具體的實現(xiàn)共享鎖的獲取與釋放的邏輯。

private final Sync sync;
/**
 * Synchronization control For CountDownLatch.
 * Uses AQS state to represent count.
 */
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}
構(gòu)造函數(shù)
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

在構(gòu)造函數(shù)中,我們就是簡單傳入了一個不小于0的任務(wù)數(shù),由上面Sync的構(gòu)造函數(shù)可知,這個任務(wù)數(shù)就是AQS的state的初始值。

核心方法

CountDownLatch最核心的方法只有兩個,一個是countDown方法,每調(diào)用一次,就會將當(dāng)前的count減一,當(dāng)count值為0時,就會喚醒所有等待中的線程;另一個是await方法,它有兩種形式,一種是阻塞式,一種是帶超時機制的形式,該方法用于將當(dāng)前等待“門閂”開啟的線程掛起,直到count值為0,這一點很類似于條件隊列,相當(dāng)于等待的條件就是count值為0,然而其底層的實現(xiàn)并不是用條件隊列,而是共享鎖。

countDown()
public void countDown() {
    sync.releaseShared(1);
}

前面說過,countDown()方法的目的就是將count值減一,并且在count值為0時,喚醒所有等待的線程,它內(nèi)部調(diào)用的其實是釋放共享鎖的操作:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

該方法由AQS實現(xiàn),但是tryReleaseShared方法由Sync類自己實現(xiàn):

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

該方法的實現(xiàn)很簡單,就是獲取當(dāng)前的state值,如果已經(jīng)為0了,直接返回false;否則通過CAS操作將state值減一,之后返回的是nextc == 0,由此可見,該方法只有在count值原來不為0,但是調(diào)用后變?yōu)?時,才會返回true,否則返回false,并且也可以看出,該方法在返回true之后,后面如果再次調(diào)用,還是會返回false。也就是說,調(diào)用該方法只有一種情況會返回true,那就是state值從大于0變?yōu)?值時,這時也是所有在門閂前的任務(wù)都完成了。

tryReleaseShared返回true以后,將調(diào)用doReleaseShared方法喚醒所有等待中的線程,該方法我們在前面的文章中已經(jīng)詳細分析過了,這里就不再贅述了。

值得一提的是,我們其實并不關(guān)心releaseShared的返回值,而只關(guān)心tryReleaseShared的返回值,或者只關(guān)心count到0了沒有,這里更像是借了共享鎖的“殼”,來完成我們的目的,事實上我們完全可以自己設(shè)一個全局變量count來實現(xiàn)相同的效果,只不過對這個全局變量的操作也必須使用CAS。

await()

與Condition的await()方法的語義相同,該方法是阻塞式地等待,并且是響應(yīng)中斷的,只不過它不是在等待signal操作,而是在等待count值為0:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

可見,await方法內(nèi)部調(diào)用的是acquireSharedInterruptibly方法,相當(dāng)于借用了獲取共享鎖的“殼”:

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

我們來回憶一下獨占模式下對應(yīng)的方法:

public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

可見,兩者用的是同一個框架,只是這里:

tryAcquire(arg) 換成了 tryAcquireShared(arg) (子類實現(xiàn))

doAcquireInterruptibly(arg) 換成了 doAcquireSharedInterruptibly(arg) (AQS提供)

我們先來看看Sync子類對于tryAcquireShared的實現(xiàn):

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

該方法似乎有點掛羊頭賣狗肉的感覺——所謂的獲取共享鎖,事實上并不是什么搶鎖的行為,沒有任何CAS操作,它就是判斷當(dāng)前的state值是不是0,是就返回1,不是就返回-1。

值得注意的是,在逐行分析AQS源碼(3)——共享鎖的獲取與釋放中我們特別提到過tryAcquireShared返回值的含義:

如果該值小于0,則代表當(dāng)前線程獲取共享鎖失敗

如果該值大于0,則代表當(dāng)前線程獲取共享鎖成功,并且接下來其他線程嘗試獲取共享鎖的行為很可能成功

如果該值等于0,則代表當(dāng)前線程獲取共享鎖成功,但是接下來其他線程嘗試獲取共享鎖的行為會失敗

所以,當(dāng)該方法的返回值不小于0時,就說明搶鎖成功,可以直接退出了,所對應(yīng)的就是count值已經(jīng)為0,所有等待的事件都滿足了。否則,我們調(diào)用doAcquireSharedInterruptibly(arg)將當(dāng)前線程封裝成Node,丟到sync queue中去阻塞等待:

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在前面我們介紹共享鎖的獲取時,已經(jīng)分析過了doAcquireShared方法,只是它是不拋出InterruptedException的,doAcquireSharedInterruptibly(arg)是它的可中斷版本,我們可以直接對比一下:

可見,它們僅僅是在對待中斷的處理方式上有所不同,其他部分都是一樣的,由于doAcquireShared在前面的文章中我們已經(jīng)詳細分析過了,這里就不再贅述了。

await(long timeout, TimeUnit unit)

相較于await()方法,await(long timeout, TimeUnit unit)提供了超時等待機制:

public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout);
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

注意,在tryAcquireSharedNanos方法中,我們用到了doAcquireSharedNanos的返回值,如果該方法因為超時而退出時,則將返回false。由于await()方法是阻塞式的,也就是說沒有獲取到鎖是不會退出的,因此它沒有返回值,換句話說,如果它正常返回了,則一定是因為獲取到了鎖而返回; 而await(long timeout, TimeUnit unit)由于有了超時機制,它是有返回值的,返回值為true則表示獲取鎖成功,為false則表示獲取鎖失敗。doAcquireSharedNanos的這個返回值有助于我們理解該方法究竟是因為獲取到了鎖而返回,還是因為超時時間到了而返回。

至于doAcquireSharedNanos的實現(xiàn)細節(jié),由于他和doAcquireSharedInterruptibly相比只是多了一個超時機制:

代碼本身很簡單,就不贅述了。

實戰(zhàn)

接下來我們來學(xué)習(xí)一個使用CountDownLatch的實際例子,Java的官方源碼已經(jīng)為我們提供了一個使用的示例代碼:

class Driver { // ...
    void main() throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(N);

        for (int i = 0; i < N; ++i) // create and start threads
            new Thread(new Worker(startSignal, doneSignal)).start();

        doSomethingElse();            // don"t let run yet
        startSignal.countDown();      // let all threads proceed
        doSomethingElse();
        doneSignal.await();           // wait for all to finish
    }
}

class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;

    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }

    public void run() {
        try {
            startSignal.await();
            doWork();
            doneSignal.countDown();
        } catch (InterruptedException ex) {
        } // return;
    }

    void doWork() { ...}
}

在這個例子中,有兩個“閘門”,一個是CountDownLatch startSignal = new CountDownLatch(1),它開啟后,等待在這個“閘門”上的任務(wù)才能開始運行;另一個“閘門”是CountDownLatch doneSignal = new CountDownLatch(N), 它表示等待N個任務(wù)都執(zhí)行完成后,才能繼續(xù)往下。

Worker實現(xiàn)了Runnable接口,代表了要執(zhí)行的任務(wù),在它的run方法中,我們先調(diào)用了startSignal.await(),等待startSignal這一“閘門”開啟,閘門開啟后,我們就執(zhí)行自己的任務(wù),任務(wù)完成后再執(zhí)行doneSignal.countDown(),將等待的總?cè)蝿?wù)數(shù)減一。

代碼本身的邏輯非常簡單好懂,這里不贅述了。

總結(jié)

CountDownLatch相當(dāng)于一個“門栓”,一個“閘門”,只有它開啟了,代碼才能繼續(xù)往下執(zhí)行。通常情況下,如果當(dāng)前線程需要等其他線程執(zhí)行完成后才能執(zhí)行,我們就可以使用CountDownLatch。

使用CountDownLatch#await方法阻塞等待一個“閘門”的開啟。

使用CountDownLatch#countDown方法減少閘門所等待的任務(wù)數(shù)。

CountDownLatch基于共享鎖實現(xiàn)。

CountDownLatch是一次性的,“閘門”開啟后,無法再重復(fù)使用,如果想重復(fù)使用,應(yīng)該用[CyclicBarrier]()

(完)

系列文章目錄

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/77262.html

相關(guān)文章

  • 線程間的同步通信(7)——CyclicBarrier源碼分析

    摘要:例如,線程需要互相等待,保證所有線程都執(zhí)行完了之后才能一起通過。獲取正在等待中的線程數(shù)注意,這里加了鎖,因為方法可能會被多個線程同時修改。只要有一行沒有處理完,所有的線程都會在處等待,最后一個執(zhí)行完的線程將會負責(zé)喚醒所有等待的線程 前言 系列文章目錄 上一篇 我們學(xué)習(xí)了基于AQS共享鎖實現(xiàn)的CountDownLatch,本篇我們來看看另一個和它比較像的并發(fā)工具CyclicBarrier...

    freewolf 評論0 收藏0
  • 系列文章目錄

    摘要:為了避免一篇文章的篇幅過長,于是一些比較大的主題就都分成幾篇來講了,這篇文章是筆者所有文章的目錄,將會持續(xù)更新,以給大家一個查看系列文章的入口。 前言 大家好,筆者是今年才開始寫博客的,寫作的初衷主要是想記錄和分享自己的學(xué)習(xí)經(jīng)歷。因為寫作的時候發(fā)現(xiàn),為了弄懂一個知識,不得不先去了解另外一些知識,這樣以來,為了說明一個問題,就要把一系列知識都了解一遍,寫出來的文章就特別長。 為了避免一篇...

    lijy91 評論0 收藏0
  • 系列文章目錄

    摘要:為了避免一篇文章的篇幅過長,于是一些比較大的主題就都分成幾篇來講了,這篇文章是筆者所有文章的目錄,將會持續(xù)更新,以給大家一個查看系列文章的入口。 前言 大家好,筆者是今年才開始寫博客的,寫作的初衷主要是想記錄和分享自己的學(xué)習(xí)經(jīng)歷。因為寫作的時候發(fā)現(xiàn),為了弄懂一個知識,不得不先去了解另外一些知識,這樣以來,為了說明一個問題,就要把一系列知識都了解一遍,寫出來的文章就特別長。 為了避免一篇...

    Yumenokanata 評論0 收藏0
  • Java 多線程并發(fā)編程面試筆錄一覽

    摘要:創(chuàng)建線程的方式方式一將類聲明為的子類。將該線程標(biāo)記為守護線程或用戶線程。其中方法隱含的線程為父線程?;謴?fù)線程,已過時。等待該線程銷毀終止。更多的使當(dāng)前線程在鎖存器倒計數(shù)至零之前一直等待,除非線 知識體系圖: showImg(https://segmentfault.com/img/bVbef6v?w=1280&h=960); 1、線程是什么? 線程是進程中獨立運行的子任務(wù)。 2、創(chuàng)建線...

    bitkylin 評論0 收藏0
  • Java 線程同步組件 CountDownLatch CyclicBarrier 原理分析

    摘要:在創(chuàng)建對象時,需要轉(zhuǎn)入一個值,用于初始化的成員變量,該成員變量表示屏障攔截的線程數(shù)。當(dāng)?shù)竭_屏障的線程數(shù)小于時,這些線程都會被阻塞住。當(dāng)所有線程到達屏障后,將會被更新,表示進入新一輪的運行輪次中。 1.簡介 在分析完AbstractQueuedSynchronizer(以下簡稱 AQS)和ReentrantLock的原理后,本文將分析 java.util.concurrent 包下的兩個...

    Anonymous1 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<