摘要:一導(dǎo)語(yǔ)最近在學(xué)習(xí)并發(fā)編程原理,所以準(zhǔn)備整理一下自己學(xué)到的知識(shí),先寫一篇的源碼分析,之后希望可以慢慢寫完整個(gè)并發(fā)編程。了解了的構(gòu)造函數(shù)之后,我們?cè)賮?lái)看它的核心代碼,首先是。
一、導(dǎo)語(yǔ)
最近在學(xué)習(xí)并發(fā)編程原理,所以準(zhǔn)備整理一下自己學(xué)到的知識(shí),先寫一篇CountDownLatch的源碼分析,之后希望可以慢慢寫完整個(gè)并發(fā)編程。
二、什么是CountDownLatchCountDownLatch是java的JUC并發(fā)包里的一個(gè)工具類,可以理解為一個(gè)倒計(jì)時(shí)器,主要是用來(lái)控制多個(gè)線程之間的通信。
比如有一個(gè)主線程A,它要等待其他4個(gè)子線程執(zhí)行完畢之后才能執(zhí)行,此時(shí)就可以利用CountDownLatch來(lái)實(shí)現(xiàn)這種功能了。
public static void main(String[] args){ System.out.println("主線程和他的兩個(gè)小兄弟約好去吃火鍋"); System.out.println("主線程進(jìn)入了飯店"); System.out.println("主線程想要開始動(dòng)筷子吃飯"); //new一個(gè)計(jì)數(shù)器,初始值為2,當(dāng)計(jì)數(shù)器為0時(shí),主線程開始執(zhí)行 CountDownLatch latch = new CountDownLatch(2); new Thread(){ public void run() { try { System.out.println("子線程1——小兄弟A 正在到飯店的路上"); Thread.sleep(3000); System.out.println("子線程1——小兄弟A 到飯店了"); //一個(gè)小兄弟到了,計(jì)數(shù)器-1 latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }; }.start(); new Thread(){ public void run() { try { System.out.println("子線程2——小兄弟B 正在到飯店的路上"); Thread.sleep(3000); System.out.println("子線程2——小兄弟B 到飯店了"); //另一個(gè)小兄弟到了,計(jì)數(shù)器-1 latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }; }.start(); //主線程等待,直到其他兩個(gè)小兄弟也進(jìn)入飯店(計(jì)數(shù)器==0),主線程才能吃飯 latch.await(); System.out.println("主線程終于可以開始吃飯了~"); }四、源碼分析
核心代碼:
CountDownLatch latch = new CountDownLatch(1); latch.await(); latch.countDown();
其中構(gòu)造函數(shù)的參數(shù)是計(jì)數(shù)器的值;
await()方法是用來(lái)阻塞線程,直到計(jì)數(shù)器的值為0
countDown()方法是執(zhí)行計(jì)數(shù)器-1操作
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
這段代碼很簡(jiǎn)單,首先if判斷傳入的count是否<0,如果小于0直接拋異常。
然后new一個(gè)類Sync,這個(gè)Sync是什么呢?我們一起來(lái)看下
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } //嘗試獲取共享鎖 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //嘗試釋放共享鎖 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
可以看到Sync是一個(gè)內(nèi)部類,繼承了AQS,AQS是一個(gè)同步器,之后我們會(huì)詳細(xì)講。
其中有幾個(gè)核心點(diǎn):
變量 state是父類AQS里面的變量,在這里的語(yǔ)義是計(jì)數(shù)器的值
getState()方法也是父類AQS里的方法,很簡(jiǎn)單,就是獲取state的值
tryAcquireShared和tryReleaseShared也是父類AQS里面的方法,在這里CountDownLatch對(duì)他們進(jìn)行了重寫,先有個(gè)印象,之后詳講。
2、了解了CountDownLatch的構(gòu)造函數(shù)之后,我們?cè)賮?lái)看它的核心代碼,首先是await()。public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
可以看到,其實(shí)是通過(guò)內(nèi)部類Sync調(diào)用了父類AQS的acquireSharedInterruptibly()方法。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //判斷線程是否是中斷狀態(tài) if (Thread.interrupted()) throw new InterruptedException(); //嘗試獲取state的值 if (tryAcquireShared(arg) < 0)//step1 doAcquireSharedInterruptibly(arg);//step2 }
tryAcquireShared(arg)這個(gè)方法就是我們剛才在Sync內(nèi)看到的重寫父類AQS的方法,意思就是判斷是否getState() == 0,如果state為0,返回1,則step1處不進(jìn)入if體內(nèi)acquireSharedInterruptibly(int arg)方法執(zhí)行完畢。若state!=0,則返回-1,進(jìn)入if體內(nèi)step2處。
下面我們來(lái)看acquireSharedInterruptibly(int arg)方法:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //step1、把當(dāng)前線程封裝為共享類型的Node,加入隊(duì)列尾部 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //step2、獲取當(dāng)前node的前一個(gè)元素 final Node p = node.predecessor(); //step3、如果前一個(gè)元素是隊(duì)首 if (p == head) { //step4、再次調(diào)用tryAcquireShared()方法,判斷state的值是否為0 int r = tryAcquireShared(arg); //step5、如果state的值==0 if (r >= 0) { //step6、設(shè)置當(dāng)前node為隊(duì)首,并嘗試釋放共享鎖 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //step7、是否可以安心掛起當(dāng)前線程,是就掛起;并且判斷當(dāng)前線程是否中斷 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { //step8、如果出現(xiàn)異常,failed沒有更新為false,則把當(dāng)前node從隊(duì)列中取消 if (failed) cancelAcquire(node); } }
按照代碼中的注釋,我們可以大概了解該方法的內(nèi)容,下面我們來(lái)仔細(xì)看下其中調(diào)用的一些方法是干什么的。
1、首先看addWaiter()
//step1 private Node addWaiter(Node mode) { //把當(dāng)前線程封裝為node Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure //獲取當(dāng)前隊(duì)列的隊(duì)尾tail,并賦值給pred Node pred = tail; //如果pred!=null,即當(dāng)前隊(duì)尾不為null if (pred != null) { //把當(dāng)前隊(duì)尾tail,變成當(dāng)前node的前繼節(jié)點(diǎn) node.prev = pred; //cas更新當(dāng)前node為新的隊(duì)尾 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //如果隊(duì)尾為空,走enq方法 enq(node);//step1.1 return node; } ----------------------------------------------------------------- //step1.1 private Node enq(final Node node) { for (;;) { Node t = tail; //如果隊(duì)尾tail為null,初始化隊(duì)列 if (t == null) { // Must initialize //cas設(shè)置一個(gè)新的空node為隊(duì)首 if (compareAndSetHead(new Node())) tail = head; } else { //cas把當(dāng)前node設(shè)置為新隊(duì)尾,把前隊(duì)尾設(shè)置成當(dāng)前node的前繼節(jié)點(diǎn) node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
2、接下來(lái)我們?cè)趤?lái)看setHeadAndPropagate()方法,看其內(nèi)部實(shí)現(xiàn)
//step6 private void setHeadAndPropagate(Node node, int propagate) { //獲取隊(duì)首head Node h = head; // Record old head for check below //設(shè)置當(dāng)前node為隊(duì)首,并取消node所關(guān)聯(lián)的線程 setHead(node); // if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //如果當(dāng)前node的后繼節(jié)點(diǎn)為null或者是shared類型的 if (s == null || s.isShared()) //釋放鎖,喚醒下一個(gè)線程 doReleaseShared();//step6.1 } } -------------------------------------------------------------------- //step6.1 private void doReleaseShared() { for (;;) { //找到頭節(jié)點(diǎn) Node h = head; if (h != null && h != tail) { //獲取頭節(jié)點(diǎn)狀態(tài) int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //喚醒head節(jié)點(diǎn)的next節(jié)點(diǎn) unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }3、接下來(lái)我們來(lái)看countDown()方法。
public void countDown() { sync.releaseShared(1); }
可以看到調(diào)用的是父類AQS的releaseShared 方法
public final boolean releaseShared(int arg) { //state-1 if (tryReleaseShared(arg)) {//step1 //喚醒等待線程,內(nèi)部調(diào)用的是LockSupport.unpark方法 doReleaseShared();//step2 return true; } return false; } ------------------------------------------------------------------ //step1 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { //獲取當(dāng)前state的值 int c = getState(); if (c == 0) return false; int nextc = c-1; //cas操作來(lái)進(jìn)行原子減1 if (compareAndSetState(c, nextc)) return nextc == 0; } }五、總結(jié)
CountDownLatch主要是通過(guò)計(jì)數(shù)器state來(lái)控制是否可以執(zhí)行其他操作,如果不能就通過(guò)LockSupport.park()方法掛起線程,直到其他線程執(zhí)行完畢后喚醒它。
下面我們通過(guò)一個(gè)簡(jiǎn)單的圖來(lái)幫助我們理解一下:
PS:本人也是還在學(xué)習(xí)的路上,理解的也不是特別透徹,如有錯(cuò)誤,愿傾聽教誨。^_^
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/74165.html
摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢娝氖褂茫陂_始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購(gòu),是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來(lái),我們一步一步來(lái)?yè)羝魄皟蓚€(gè)名詞,今天我們首先來(lái)說(shuō)說(shuō)分布式。 探究...
摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢娝氖褂?,在開始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購(gòu),是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來(lái),我們一步一步來(lái)?yè)羝魄皟蓚€(gè)名詞,今天我們首先來(lái)說(shuō)說(shuō)分布式。 探究...
摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢娝氖褂?,在開始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購(gòu),是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來(lái),我們一步一步來(lái)?yè)羝魄皟蓚€(gè)名詞,今天我們首先來(lái)說(shuō)說(shuō)分布式。 探究...
摘要:今天我們來(lái)聊一聊的使用場(chǎng)景。使用場(chǎng)景在某些業(yè)務(wù)情況下,要求我們等某個(gè)條件或者任務(wù)完成后才可以繼續(xù)處理后續(xù)任務(wù)。同時(shí)在線程完成時(shí)也會(huì)觸發(fā)一定事件。方便業(yè)務(wù)繼續(xù)向下執(zhí)行。第個(gè)毒販如果當(dāng)前已經(jīng)沒有可以毒販,立刻返回被干掉了干掉一個(gè)。 作者 : 畢來(lái)生微信: 878799579 前言 ? 在 java.util.concurrent 包中提供了多種并發(fā)容器類來(lái)改進(jìn)同步容器 的性能。今天...
摘要:基礎(chǔ)問題的的性能及原理之區(qū)別詳解備忘筆記深入理解流水線抽象關(guān)鍵字修飾符知識(shí)點(diǎn)總結(jié)必看篇中的關(guān)鍵字解析回調(diào)機(jī)制解讀抽象類與三大特征時(shí)間和時(shí)間戳的相互轉(zhuǎn)換為什么要使用內(nèi)部類對(duì)象鎖和類鎖的區(qū)別,,優(yōu)缺點(diǎn)及比較提高篇八詳解內(nèi)部類單例模式和 Java基礎(chǔ)問題 String的+的性能及原理 java之yield(),sleep(),wait()區(qū)別詳解-備忘筆記 深入理解Java Stream流水...
閱讀 678·2023-04-26 02:03
閱讀 1045·2021-11-23 09:51
閱讀 1159·2021-10-14 09:42
閱讀 1750·2021-09-13 10:23
閱讀 975·2021-08-27 13:12
閱讀 851·2019-08-30 11:21
閱讀 1010·2019-08-30 11:14
閱讀 1055·2019-08-30 11:09