摘要:相較于方法,提供了超時等待機制注意,在方法中,我們用到了的返回值,如果該方法因為超時而退出時,則將返回。的這個返回值有助于我們理解該方法究竟是因為獲取到了鎖而返回,還是因為超時時間到了而返回。
前言
系列文章目錄
CountDownLatch是一個很有用的工具,latch是門閂的意思,該工具是為了解決某些操作只能在一組操作全部執(zhí)行完成后才能執(zhí)行的情景。例如,小組早上開會,只有等所有人到齊了才能開;再如,游樂園里的過山車,一次可以坐10個人,為了節(jié)約成本,通常是等夠10個人了才開。CountDown是倒數(shù)計數(shù),所以CountDownLatch的用法通常是設(shè)定一個大于0的值,該值即代表需要等待的總?cè)蝿?wù)數(shù),每完成一個任務(wù)后,將總?cè)蝿?wù)數(shù)減一,直到最后該值為0,說明所有等待的任務(wù)都執(zhí)行完了,“門閂”此時就被打開,后面的任務(wù)可以繼續(xù)執(zhí)行。
CountDownLatch本身是基于共享鎖實現(xiàn)的,如果你還不了解共享鎖,建議先讀一下逐行分析AQS源碼(3)——共享鎖的獲取與釋放,然后再繼續(xù)往下看。
核心屬性CountDownLatch主要是通過AQS的共享鎖機制實現(xiàn)的,因此它的核心屬性只有一個sync,它繼承自AQS,同時覆寫了tryAcquireShared和tryReleaseShared,以完成具體的實現(xiàn)共享鎖的獲取與釋放的邏輯。
private final Sync sync;
/** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }構(gòu)造函數(shù)
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
在構(gòu)造函數(shù)中,我們就是簡單傳入了一個不小于0的任務(wù)數(shù),由上面Sync的構(gòu)造函數(shù)可知,這個任務(wù)數(shù)就是AQS的state的初始值。
核心方法CountDownLatch最核心的方法只有兩個,一個是countDown方法,每調(diào)用一次,就會將當(dāng)前的count減一,當(dāng)count值為0時,就會喚醒所有等待中的線程;另一個是await方法,它有兩種形式,一種是阻塞式,一種是帶超時機制的形式,該方法用于將當(dāng)前等待“門閂”開啟的線程掛起,直到count值為0,這一點很類似于條件隊列,相當(dāng)于等待的條件就是count值為0,然而其底層的實現(xiàn)并不是用條件隊列,而是共享鎖。
countDown()public void countDown() { sync.releaseShared(1); }
前面說過,countDown()方法的目的就是將count值減一,并且在count值為0時,喚醒所有等待的線程,它內(nèi)部調(diào)用的其實是釋放共享鎖的操作:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
該方法由AQS實現(xiàn),但是tryReleaseShared方法由Sync類自己實現(xiàn):
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
該方法的實現(xiàn)很簡單,就是獲取當(dāng)前的state值,如果已經(jīng)為0了,直接返回false;否則通過CAS操作將state值減一,之后返回的是nextc == 0,由此可見,該方法只有在count值原來不為0,但是調(diào)用后變?yōu)?時,才會返回true,否則返回false,并且也可以看出,該方法在返回true之后,后面如果再次調(diào)用,還是會返回false。也就是說,調(diào)用該方法只有一種情況會返回true,那就是state值從大于0變?yōu)?值時,這時也是所有在門閂前的任務(wù)都完成了。
在tryReleaseShared返回true以后,將調(diào)用doReleaseShared方法喚醒所有等待中的線程,該方法我們在前面的文章中已經(jīng)詳細分析過了,這里就不再贅述了。
值得一提的是,我們其實并不關(guān)心releaseShared的返回值,而只關(guān)心tryReleaseShared的返回值,或者只關(guān)心count到0了沒有,這里更像是借了共享鎖的“殼”,來完成我們的目的,事實上我們完全可以自己設(shè)一個全局變量count來實現(xiàn)相同的效果,只不過對這個全局變量的操作也必須使用CAS。
await()與Condition的await()方法的語義相同,該方法是阻塞式地等待,并且是響應(yīng)中斷的,只不過它不是在等待signal操作,而是在等待count值為0:
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
可見,await方法內(nèi)部調(diào)用的是acquireSharedInterruptibly方法,相當(dāng)于借用了獲取共享鎖的“殼”:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
我們來回憶一下獨占模式下對應(yīng)的方法:
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
可見,兩者用的是同一個框架,只是這里:
tryAcquire(arg) 換成了 tryAcquireShared(arg) (子類實現(xiàn))
doAcquireInterruptibly(arg) 換成了 doAcquireSharedInterruptibly(arg) (AQS提供)
我們先來看看Sync子類對于tryAcquireShared的實現(xiàn):
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
該方法似乎有點掛羊頭賣狗肉的感覺——所謂的獲取共享鎖,事實上并不是什么搶鎖的行為,沒有任何CAS操作,它就是判斷當(dāng)前的state值是不是0,是就返回1,不是就返回-1。
值得注意的是,在逐行分析AQS源碼(3)——共享鎖的獲取與釋放中我們特別提到過tryAcquireShared返回值的含義:
如果該值小于0,則代表當(dāng)前線程獲取共享鎖失敗
如果該值大于0,則代表當(dāng)前線程獲取共享鎖成功,并且接下來其他線程嘗試獲取共享鎖的行為很可能成功
如果該值等于0,則代表當(dāng)前線程獲取共享鎖成功,但是接下來其他線程嘗試獲取共享鎖的行為會失敗
所以,當(dāng)該方法的返回值不小于0時,就說明搶鎖成功,可以直接退出了,所對應(yīng)的就是count值已經(jīng)為0,所有等待的事件都滿足了。否則,我們調(diào)用doAcquireSharedInterruptibly(arg)將當(dāng)前線程封裝成Node,丟到sync queue中去阻塞等待:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
在前面我們介紹共享鎖的獲取時,已經(jīng)分析過了doAcquireShared方法,只是它是不拋出InterruptedException的,doAcquireSharedInterruptibly(arg)是它的可中斷版本,我們可以直接對比一下:
可見,它們僅僅是在對待中斷的處理方式上有所不同,其他部分都是一樣的,由于doAcquireShared在前面的文章中我們已經(jīng)詳細分析過了,這里就不再贅述了。
await(long timeout, TimeUnit unit)相較于await()方法,await(long timeout, TimeUnit unit)提供了超時等待機制:
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
注意,在tryAcquireSharedNanos方法中,我們用到了doAcquireSharedNanos的返回值,如果該方法因為超時而退出時,則將返回false。由于await()方法是阻塞式的,也就是說沒有獲取到鎖是不會退出的,因此它沒有返回值,換句話說,如果它正常返回了,則一定是因為獲取到了鎖而返回; 而await(long timeout, TimeUnit unit)由于有了超時機制,它是有返回值的,返回值為true則表示獲取鎖成功,為false則表示獲取鎖失敗。doAcquireSharedNanos的這個返回值有助于我們理解該方法究竟是因為獲取到了鎖而返回,還是因為超時時間到了而返回。
至于doAcquireSharedNanos的實現(xiàn)細節(jié),由于他和doAcquireSharedInterruptibly相比只是多了一個超時機制:
代碼本身很簡單,就不贅述了。
實戰(zhàn)接下來我們來學(xué)習(xí)一個使用CountDownLatch的實際例子,Java的官方源碼已經(jīng)為我們提供了一個使用的示例代碼:
class Driver { // ... void main() throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(N); for (int i = 0; i < N; ++i) // create and start threads new Thread(new Worker(startSignal, doneSignal)).start(); doSomethingElse(); // don"t let run yet startSignal.countDown(); // let all threads proceed doSomethingElse(); doneSignal.await(); // wait for all to finish } } class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() { try { startSignal.await(); doWork(); doneSignal.countDown(); } catch (InterruptedException ex) { } // return; } void doWork() { ...} }
在這個例子中,有兩個“閘門”,一個是CountDownLatch startSignal = new CountDownLatch(1),它開啟后,等待在這個“閘門”上的任務(wù)才能開始運行;另一個“閘門”是CountDownLatch doneSignal = new CountDownLatch(N), 它表示等待N個任務(wù)都執(zhí)行完成后,才能繼續(xù)往下。
Worker實現(xiàn)了Runnable接口,代表了要執(zhí)行的任務(wù),在它的run方法中,我們先調(diào)用了startSignal.await(),等待startSignal這一“閘門”開啟,閘門開啟后,我們就執(zhí)行自己的任務(wù),任務(wù)完成后再執(zhí)行doneSignal.countDown(),將等待的總?cè)蝿?wù)數(shù)減一。
代碼本身的邏輯非常簡單好懂,這里不贅述了。
總結(jié)CountDownLatch相當(dāng)于一個“門栓”,一個“閘門”,只有它開啟了,代碼才能繼續(xù)往下執(zhí)行。通常情況下,如果當(dāng)前線程需要等其他線程執(zhí)行完成后才能執(zhí)行,我們就可以使用CountDownLatch。
使用CountDownLatch#await方法阻塞等待一個“閘門”的開啟。
使用CountDownLatch#countDown方法減少閘門所等待的任務(wù)數(shù)。
CountDownLatch基于共享鎖實現(xiàn)。
CountDownLatch是一次性的,“閘門”開啟后,無法再重復(fù)使用,如果想重復(fù)使用,應(yīng)該用[CyclicBarrier]()
(完)
系列文章目錄
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/77262.html
摘要:例如,線程需要互相等待,保證所有線程都執(zhí)行完了之后才能一起通過。獲取正在等待中的線程數(shù)注意,這里加了鎖,因為方法可能會被多個線程同時修改。只要有一行沒有處理完,所有的線程都會在處等待,最后一個執(zhí)行完的線程將會負責(zé)喚醒所有等待的線程 前言 系列文章目錄 上一篇 我們學(xué)習(xí)了基于AQS共享鎖實現(xiàn)的CountDownLatch,本篇我們來看看另一個和它比較像的并發(fā)工具CyclicBarrier...
摘要:為了避免一篇文章的篇幅過長,于是一些比較大的主題就都分成幾篇來講了,這篇文章是筆者所有文章的目錄,將會持續(xù)更新,以給大家一個查看系列文章的入口。 前言 大家好,筆者是今年才開始寫博客的,寫作的初衷主要是想記錄和分享自己的學(xué)習(xí)經(jīng)歷。因為寫作的時候發(fā)現(xiàn),為了弄懂一個知識,不得不先去了解另外一些知識,這樣以來,為了說明一個問題,就要把一系列知識都了解一遍,寫出來的文章就特別長。 為了避免一篇...
摘要:創(chuàng)建線程的方式方式一將類聲明為的子類。將該線程標(biāo)記為守護線程或用戶線程。其中方法隱含的線程為父線程?;謴?fù)線程,已過時。等待該線程銷毀終止。更多的使當(dāng)前線程在鎖存器倒計數(shù)至零之前一直等待,除非線 知識體系圖: showImg(https://segmentfault.com/img/bVbef6v?w=1280&h=960); 1、線程是什么? 線程是進程中獨立運行的子任務(wù)。 2、創(chuàng)建線...
摘要:在創(chuàng)建對象時,需要轉(zhuǎn)入一個值,用于初始化的成員變量,該成員變量表示屏障攔截的線程數(shù)。當(dāng)?shù)竭_屏障的線程數(shù)小于時,這些線程都會被阻塞住。當(dāng)所有線程到達屏障后,將會被更新,表示進入新一輪的運行輪次中。 1.簡介 在分析完AbstractQueuedSynchronizer(以下簡稱 AQS)和ReentrantLock的原理后,本文將分析 java.util.concurrent 包下的兩個...
閱讀 3662·2021-11-15 11:37
閱讀 2990·2021-11-12 10:36
閱讀 4450·2021-09-22 15:51
閱讀 2394·2021-08-27 16:18
閱讀 898·2019-08-30 15:44
閱讀 2176·2019-08-30 10:58
閱讀 1792·2019-08-29 17:18
閱讀 3288·2019-08-28 18:25