摘要:當(dāng)?shù)竭_柵欄后,由于沒有滿足總數(shù)的要求,所以會一直等待,當(dāng)線程到達后,柵欄才會放行。任務(wù)其實就是當(dāng)最后一個線程到達柵欄時,后續(xù)立即要執(zhí)行的任務(wù)。
本文首發(fā)于一世流云專欄:https://segmentfault.com/blog...一、CyclicBarrier簡介
CyclicBarrier是一個輔助同步器類,在JDK1.5時隨著J.U.C一起引入。
這個類的功能和我們之前介紹的CountDownLatch有些類似。我們知道,CountDownLatch是一個倒數(shù)計數(shù)器,在計數(shù)器不為0時,所有調(diào)用await的線程都會等待,當(dāng)計數(shù)器降為0,線程才會繼續(xù)執(zhí)行,且計數(shù)器一旦變?yōu)?,就不能再重置了。
CyclicBarrier可以認(rèn)為是一個柵欄,柵欄的作用是什么?就是阻擋前行。
顧名思義,CyclicBarrier是一個可以循環(huán)使用的柵欄,它做的事情就是:
讓線程到達柵欄時被阻塞(調(diào)用await方法),直到到達柵欄的線程數(shù)滿足指定數(shù)量要求時,柵欄才會打開放行。
這其實有點像軍訓(xùn)報數(shù),報數(shù)總?cè)藬?shù)滿足教官認(rèn)為的總數(shù)時,教官才會安排后面的訓(xùn)練。
可以看下面這個圖來理解下:
一共4個線程A、B、C、D,它們到達柵欄的順序可能各不相同。當(dāng)A、B、C到達柵欄后,由于沒有滿足總數(shù)【4】的要求,所以會一直等待,當(dāng)線程D到達后,柵欄才會放行。
從CyclicBarrier的構(gòu)造器,我們也可以看出關(guān)于這個類的一些端倪,CyclicBarrier有兩個構(gòu)造器:
構(gòu)造器一:
這個構(gòu)造器的參數(shù)parties就是之前說的需要滿足的計數(shù)總數(shù)。
構(gòu)造器二:
這個構(gòu)造器稍微特殊一些,除了指定了計數(shù)總數(shù)外,傳入了一個Runnable任務(wù)。
Runnable任務(wù)其實就是當(dāng)最后一個線程到達柵欄時,后續(xù)立即要執(zhí)行的任務(wù)。
比如,軍訓(xùn)報數(shù)完畢后,總?cè)藬?shù)滿足了要求,教官就會開始命令大家執(zhí)行下一個任務(wù),這個【下一個任務(wù)】就是這里的Runnable。二、CyclicBarrier示例
我們來看一個CyclicBarrier的示例,來理解下它的功能。
假設(shè)現(xiàn)在有這樣一個場景:
5個運動員準(zhǔn)備跑步比賽,運動員在賽跑前會準(zhǔn)備一段時間,當(dāng)裁判發(fā)現(xiàn)所有運動員準(zhǔn)備完畢后,就舉起發(fā)令槍,比賽開始。
這里的起跑線就是屏障,運動員必須在起跑線等待其他運動員準(zhǔn)備完畢。
public class CyclicBarrierTest { public static void main(String[] args) { int N = 5; // 運動員數(shù) CyclicBarrier cb = new CyclicBarrier(N, new Runnable() { @Override public void run() { System.out.println("****** 所有運動員已準(zhǔn)備完畢,發(fā)令槍:跑!******"); } }); for (int i = 0; i < N; i++) { Thread t = new Thread(new PrepareWork(cb), "運動員[" + i + "]"); t.start(); } } private static class PrepareWork implements Runnable { private CyclicBarrier cb; PrepareWork(CyclicBarrier cb) { this.cb = cb; } @Override public void run() { try { Thread.sleep(500); System.out.println(Thread.currentThread().getName() + ": 準(zhǔn)備完成"); cb.await(); // 在柵欄等待 } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } }
執(zhí)行上面的程序,可能的輸出結(jié)果如下:
運動員[3]: 準(zhǔn)備完成 運動員[1]: 準(zhǔn)備完成 運動員[0]: 準(zhǔn)備完成 運動員[2]: 準(zhǔn)備完成 運動員[4]: 準(zhǔn)備完成 ****** 所有運動員已準(zhǔn)備完畢,發(fā)令槍:跑!******
從輸出可以看到,線程到達柵欄時會被阻塞(調(diào)用await方法),直到到達柵欄的線程數(shù)滿足指定數(shù)量要求時,柵欄才會打開放行。
CyclicBarrier對異常的處理我們知道,線程在阻塞過程中,可能被中斷,那么既然CyclicBarrier放行的條件是等待的線程數(shù)達到指定數(shù)目,萬一線程被中斷導(dǎo)致最終的等待線程數(shù)達不到柵欄的要求怎么辦?
CyclicBarrier一定有考慮到這種異常情況,不然其它所有等待線程都會無限制地等待下去。
那么CyclicBarrier是如何處理的呢?
我們看下CyclicBarrier的await()方法:
public int await() throws InterruptedException, BrokenBarrierException { //... }
可以看到,這個方法除了拋出InterruptedException異常外,還會拋出BrokenBarrierException。
BrokenBarrierException表示當(dāng)前的CyclicBarrier已經(jīng)損壞了,可能等不到所有線程都到達柵欄了,所以已經(jīng)在等待的線程也沒必要再等了,可以散伙了。
出現(xiàn)以下幾種情況之一時,當(dāng)前等待線程會拋出BrokenBarrierException異常:
其它某個正在await等待的線程被中斷了
其它某個正在await等待的線程超時了
某個線程重置了CyclicBarrier(調(diào)用了reset方法,后面會講到)
另外,只要正在Barrier上等待的任一線程拋出了異常,那么Barrier就會認(rèn)為肯定是湊不齊所有線程了,就會將柵欄置為損壞(Broken)狀態(tài),并傳播BrokenBarrierException給其它所有正在等待(await)的線程。
我們來對上面的例子做個改造,模擬下異常情況:
public class CyclicBarrierTest { public static void main(String[] args) throws InterruptedException { int N = 5; // 運動員數(shù) CyclicBarrier cb = new CyclicBarrier(N, new Runnable() { @Override public void run() { System.out.println("****** 所有運動員已準(zhǔn)備完畢,發(fā)令槍:跑!******"); } }); Listlist = new ArrayList<>(); for (int i = 0; i < N; i++) { Thread t = new Thread(new PrepareWork(cb), "運動員[" + i + "]"); list.add(t); t.start(); if (i == 3) { t.interrupt(); // 運動員[3]置中斷標(biāo)志位 } } Thread.sleep(2000); System.out.println("Barrier是否損壞:" + cb.isBroken()); } private static class PrepareWork implements Runnable { private CyclicBarrier cb; PrepareWork(CyclicBarrier cb) { this.cb = cb; } @Override public void run() { try { System.out.println(Thread.currentThread().getName() + ": 準(zhǔn)備完成"); cb.await(); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + ": 被中斷"); } catch (BrokenBarrierException e) { System.out.println(Thread.currentThread().getName() + ": 拋出BrokenBarrierException"); } } } }
可能的輸出結(jié)果:
運動員[0]: 準(zhǔn)備完成 運動員[2]: 準(zhǔn)備完成 運動員[1]: 準(zhǔn)備完成 運動員[3]: 準(zhǔn)備完成 運動員[3]: 被中斷 運動員[4]: 準(zhǔn)備完成 運動員[4]: 拋出BrokenBarrierException 運動員[0]: 拋出BrokenBarrierException 運動員[1]: 拋出BrokenBarrierException 運動員[2]: 拋出BrokenBarrierException Barrier是否損壞:true
這段代碼,模擬了中斷線程3的情況,從輸出可以看到,線程0、1、2首先到達Brrier等待。
然后線程3到達,由于之前設(shè)置了中斷標(biāo)志位,所以線程3拋出中斷異常,導(dǎo)致Barrier損壞,此時所有已經(jīng)在柵欄等待的線程(0、1、2)都會拋出BrokenBarrierException異常。
此時,即使再有其它線程到達柵欄(線程4),都會拋出BrokenBarrierException異常。
注意:使用CyclicBarrier時,對異常的處理一定要小心,比如線程在到達柵欄前就拋出異常,此時如果沒有重試機制,其它已經(jīng)到達柵欄的線程會一直等待(因為沒有還沒有滿足總數(shù)),最終導(dǎo)致程序無法繼續(xù)向下執(zhí)行。三、CyclicBarrier原理 CyclicBarrier的構(gòu)造
CyclicBarrier有兩個構(gòu)造器:
CyclicBarrier cb = new CyclicBarrier(10);
構(gòu)造器內(nèi)部的各個字段含義如下:
字段名 | 作用 |
---|---|
parties | 柵欄開啟需要的到達線程總數(shù) |
count | 剩余未到達的線程總數(shù) |
barrierCommand | 最后一個線程到達后執(zhí)行的任務(wù) |
CyclicBarrier 并沒有自己去實現(xiàn)AQS框架的API,而是利用了ReentrantLock和Condition。
public class CyclicBarrier { private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); // 柵欄開啟需要的到達線程總數(shù) private final int parties; // 最后一個線程到達后執(zhí)行的任務(wù) private final Runnable barrierCommand; // 剩余未到達的線程總數(shù) private int count; // 當(dāng)前輪次的運行狀態(tài) private Generation generation = new Generation(); // ... }
需要注意的是generation這個字段:
我們知道,CyclicBarrier 是可以循環(huán)復(fù)用的,所以CyclicBarrier 的每一輪任務(wù)都需要對應(yīng)一個generation 對象。
generation 對象內(nèi)部有個broken字段,用來標(biāo)識當(dāng)前輪次的CyclicBarrier 是否已經(jīng)損壞。
nextGeneration方法用來創(chuàng)建一個新的generation 對象,并喚醒所有等待線程,重置內(nèi)部參數(shù)。
我們先來看下await方法:
可以看到,無論有沒有超時功能,內(nèi)部都是調(diào)了dowait這個方法:
dowait方法并不復(fù)雜,一共有3部分:
判斷柵欄是否已經(jīng)損壞或當(dāng)前線程已經(jīng)被中斷,如果是會分別拋出異常;
如果當(dāng)前線程是最后一個到達的線程,會嘗試執(zhí)行最終任務(wù)(如果構(gòu)造CyclicBarrier對象時有傳入Runnable的話),執(zhí)行成功即返回,失敗會破壞柵欄;
對于不是最后一個到達的線程,會在Condition隊列上等待,為了防止被意外喚醒,這里用了一個自旋操作。
破壞柵欄用的是breakBarrier方法:
再來看下CyclicBarrier的reset方法:
該方法先破壞柵欄,然后開始下一輪(新建一個generation對象)。
四、CyclicBarrier接口/類聲明類聲明:
構(gòu)造器聲明:
接口聲明:
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/76618.html
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設(shè)計模式,設(shè)計了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:線程可以調(diào)用的方法進入阻塞,當(dāng)計數(shù)值降到時,所有之前調(diào)用阻塞的線程都會釋放。注意的初始計數(shù)值一旦降到,無法重置。 showImg(https://segmentfault.com/img/remote/1460000016012041); 本文首發(fā)于一世流云的專欄:https://segmentfault.com/blog... 一、CountDownLatch簡介 CountDow...
摘要:分層支持分層一種樹形結(jié)構(gòu),通過構(gòu)造函數(shù)可以指定當(dāng)前待構(gòu)造的對象的父結(jié)點。當(dāng)一個的參與者數(shù)量變成時,如果有該有父結(jié)點,就會將它從父結(jié)點中溢移除。當(dāng)首次將某個結(jié)點鏈接到樹中時,會同時向該結(jié)點的父結(jié)點注冊一個參與者。 showImg(https://segmentfault.com/img/remote/1460000016010947); 本文首發(fā)于一世流云專欄:https://segme...
摘要:在時,引入了包,該包中的大多數(shù)同步器都是基于來構(gòu)建的??蚣芴峁┝艘惶淄ㄓ玫臋C制來管理同步狀態(tài)阻塞喚醒線程管理等待隊列。指針用于在結(jié)點線程被取消時,讓當(dāng)前結(jié)點的前驅(qū)直接指向當(dāng)前結(jié)點的后驅(qū)完成出隊動作。 showImg(https://segmentfault.com/img/remote/1460000016012438); 本文首發(fā)于一世流云的專欄:https://segmentfau...
摘要:無限期等待另一個線程執(zhí)行特定操作。線程安全基本版請說明以及的區(qū)別值都不能為空數(shù)組結(jié)構(gòu)上,通過數(shù)組和鏈表實現(xiàn)。優(yōu)先考慮響應(yīng)中斷,而不是響應(yīng)鎖的普通獲取或重入獲取。只是在最后獲取鎖成功后再把當(dāng)前線程置為狀態(tài)然后再中斷線程。 前段時間在慕課網(wǎng)直播上聽小馬哥面試勸退(面試虐我千百遍,Java 并發(fā)真討厭),發(fā)現(xiàn)講得東西比自己拿到offer還要高興,于是自己在線下做了一點小筆記,供各位參考。 課...
閱讀 1622·2021-09-02 15:41
閱讀 1014·2021-09-02 15:11
閱讀 1311·2021-07-28 00:15
閱讀 2333·2019-08-30 15:55
閱讀 1169·2019-08-30 15:54
閱讀 1712·2019-08-30 15:54
閱讀 2994·2019-08-30 14:02
閱讀 2542·2019-08-29 16:57