摘要:叫做回環(huán)是因為當所有等待線程都被釋放以后,可以被重用。我們暫且把這個狀態(tài)就叫做,當調(diào)用方法之后,線程就處于了。
CountDownLatch
CountDownLatch 類位于 java.util.concurrent 包下,利用它可以實現(xiàn)類似計數(shù)器的功能。比如有一個任務A,它要等待其他4個任務執(zhí)行完畢之后才能執(zhí)行,此時就可以利用CountDownLatch來實現(xiàn)這種功能了。
CountDownLatch類只提供了一個構(gòu)造器:
public CountDownLatch(int count) { }; //參數(shù)count為計數(shù)值
然后下面這3個方法是CountDownLatch類中最重要的方法:
public void await() throws InterruptedException { }; //調(diào)用await()方法的線程會被掛起,它會等待直到count值為0才繼續(xù)執(zhí)行 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //和await()類似,只不過等待一定的時間后count值還沒變?yōu)?的話就會繼續(xù)執(zhí)行 public void countDown() { }; //將count值減1
代碼實現(xiàn)
package sychronized; import static net.mindview.util.Print.*; import java.util.concurrent.*; class Task implements Runnable{ private static int count = 0; private final int id = count++; final CountDownLatch latch ; public Task(CountDownLatch latch){ this.latch = latch; } @Override public void run(){ try { print(this+"正在執(zhí)行"); TimeUnit.MILLISECONDS.sleep(3000); print(this+"執(zhí)行完畢"); latch.countDown(); } catch (InterruptedException e) { print(this + " 被中斷"); } } @Override public String toString() { return "Task-"+id; } } public class Test { public static void main(String[] args) { final CountDownLatch latch = new CountDownLatch(2); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new Task(latch)); exec.execute(new Task(latch)); try { print("等待2個子線程執(zhí)行完畢..."); long start = System.currentTimeMillis(); latch.await(); long end = System.currentTimeMillis(); print("2個子線程已經(jīng)執(zhí)行完畢 "+(end - start)); print("繼續(xù)執(zhí)行主線程"); }catch (InterruptedException e){ print("主線程被中斷"); } exec.shutdown(); } } #輸出結(jié)果: 等待2個子線程執(zhí)行完畢... Task-0正在執(zhí)行 Task-1正在執(zhí)行 Task-0執(zhí)行完畢 Task-1執(zhí)行完畢 2個子線程已經(jīng)執(zhí)行完畢 3049 繼續(xù)執(zhí)行主線程CyclicBarrier
字面意思回環(huán)柵欄,通過它可以實現(xiàn)讓一組線程等待至某個狀態(tài)之后再全部同時執(zhí)行。叫做回環(huán)是因為當所有等待線程都被釋放以后,CyclicBarrier可以被重用。我們暫且把這個狀態(tài)就叫做barrier,當調(diào)用await()方法之后,線程就處于barrier了。
CyclicBarrier類位于java.util.concurrent包下,CyclicBarrier提供2個構(gòu)造器:
參數(shù)parties指讓多少個線程或者任務等待至barrier狀態(tài)
參數(shù)barrierAction為當這些線程都達到barrier狀態(tài)時會執(zhí)行的內(nèi)容
public CyclicBarrier(int parties, Runnable barrierAction) {} public CyclicBarrier(int parties) {}
然后CyclicBarrier中最重要的方法就是 await 方法,它有2個重載版本:
第一個版本比較常用,用來掛起當前線程,直至所有線程都到達barrier狀態(tài)再同時執(zhí)行后續(xù)任務;
第二個版本是讓這些線程等待至一定的時間,如果還有線程沒有到達barrier狀態(tài)就直接讓到達barrier的線程執(zhí)行后續(xù)任務。
public int await() throws InterruptedException, BrokenBarrierException { }; public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };
代碼展示
package sychronized; import java.util.Random; import java.util.concurrent.*; import static net.mindview.util.Print.*; class WriteTask implements Runnable{ private static int count = 0; private final int id = count++; private CyclicBarrier barrier ; private static Random random = new Random(47); public WriteTask(CyclicBarrier cyclicBarrier) { this.barrier = cyclicBarrier; } @Override public void run() { print(this+"開始寫入數(shù)據(jù)..."); try { TimeUnit.MILLISECONDS.sleep(random.nextInt(5000)); //以睡眠來模擬寫入數(shù)據(jù)操作 print(this+"寫入數(shù)據(jù)完畢,等待其他線程寫入完畢"+" "+System.currentTimeMillis()); barrier.await(); } catch (InterruptedException e) { print(this + "is interrupted!"); }catch(BrokenBarrierException e){ throw new RuntimeException(e); } print("所有任務寫入完畢,繼續(xù)處理其他任務... "+System.currentTimeMillis()); } @Override public String toString() { return getClass().getSimpleName()+"-"+id; } } public class CyclicBarrierTest { public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N); ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < N; ++i){ exec.execute(new WriteTask(barrier)); } exec.shutdown(); } } #輸出結(jié)果: WriteTask-3 開始寫入數(shù)據(jù)... WriteTask-2 開始寫入數(shù)據(jù)... WriteTask-1 開始寫入數(shù)據(jù)... WriteTask-0 開始寫入數(shù)據(jù)... WriteTask-2 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢 1512048648904 WriteTask-1 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢 1512048650042 WriteTask-0 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢 1512048650209 WriteTask-3 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢 1512048652606 所有任務寫入完畢,繼續(xù)處理其他任務... 1512048652607 所有任務寫入完畢,繼續(xù)處理其他任務... 1512048652607 所有任務寫入完畢,繼續(xù)處理其他任務... 1512048652607 所有任務寫入完畢,繼續(xù)處理其他任務... 1512048652607
**
如果說想在所有線程寫入操作完之后,進行額外的其他操作可以為CyclicBarrier提供Runnable參數(shù):
**
package sychronized; import java.util.Random; import java.util.concurrent.*; import static net.mindview.util.Print.*; class WriteTask implements Runnable{ private static int count = 0; private final int id = count++; private CyclicBarrier barrier ; private static Random random = new Random(47); public WriteTask(CyclicBarrier cyclicBarrier) { this.barrier = cyclicBarrier; } @Override public void run() { print(this+" 開始寫入數(shù)據(jù)..."); try { TimeUnit.MILLISECONDS.sleep(random.nextInt(5000)); //以睡眠來模擬寫入數(shù)據(jù)操作 print(this+" 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢"+" "+System.currentTimeMillis()); barrier.await(); TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { print(this + "is interrupted!"); }catch(BrokenBarrierException e){ throw new RuntimeException(e); } print("所有任務寫入完畢,繼續(xù)處理其他任務... "+System.currentTimeMillis()+Thread.currentThread()); } @Override public String toString() { return getClass().getSimpleName()+"-"+id; } } public class CyclicBarrierTest { public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N, new Runnable() { @Override public void run() { print(Thread.currentThread()); } }); ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < N; ++i){ exec.execute(new WriteTask(barrier)); } exec.shutdown(); } } #輸出結(jié)果為: WriteTask-3 開始寫入數(shù)據(jù)... WriteTask-1 開始寫入數(shù)據(jù)... WriteTask-2 開始寫入數(shù)據(jù)... WriteTask-0 開始寫入數(shù)據(jù)... WriteTask-1 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢 1512049061954 WriteTask-2 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢 1512049063092 WriteTask-0 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢 1512049063261 WriteTask-3 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢 1512049065657 Thread[pool-1-thread-4,5,main] 所有任務寫入完畢,繼續(xù)處理其他任務... 1512049065668Thread[pool-1-thread-2,5,main] 所有任務寫入完畢,繼續(xù)處理其他任務... 1512049065668Thread[pool-1-thread-1,5,main] 所有任務寫入完畢,繼續(xù)處理其他任務... 1512049065668Thread[pool-1-thread-4,5,main] 所有任務寫入完畢,繼續(xù)處理其他任務... 1512049065668Thread[pool-1-thread-3,5,main]
從結(jié)果可以看出,當四個線程都到達barrier狀態(tài)后,會從四個線程中選擇一個線程去執(zhí)行Runnable。
另外CyclicBarrier是可以重用的,看下面這個例子:
package sychronized; import java.util.Random; import java.util.concurrent.*; import static net.mindview.util.Print.*; class WriteTask implements Runnable{ private static int count = 0; private final int id = count++; private CyclicBarrier barrier ; private static Random random = new Random(47); public WriteTask(CyclicBarrier cyclicBarrier) { this.barrier = cyclicBarrier; } @Override public void run() { while (!Thread.interrupted()){ print(this+" 開始寫入數(shù)據(jù)..."); try { TimeUnit.MILLISECONDS.sleep(random.nextInt(5000)); //以睡眠來模擬寫入數(shù)據(jù)操作 print(this+" 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢"+" "+System.currentTimeMillis()); barrier.await(); TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { print(this + "is interrupted!"); }catch(BrokenBarrierException e){ throw new RuntimeException(e); } print("所有任務寫入完畢,繼續(xù)處理其他任務... "+System.currentTimeMillis()); } } @Override public String toString() { return getClass().getSimpleName()+"-"+id; } } class CyclicBarrierManager implements Runnable{ private CyclicBarrier barrier ; private ExecutorService exec; public CyclicBarrierManager(CyclicBarrier barrier, ExecutorService exec,int N){ this.barrier = barrier ; this.exec = exec; for (int i = 0; i < N-1; ++i){ exec.execute(new WriteTask(barrier)); } } @Override public void run(){ while (!Thread.interrupted()){ try { barrier.await(); }catch (InterruptedException e){ print(getClass().getSimpleName()+" 被中斷了!"); }catch (BrokenBarrierException e){ throw new RuntimeException(e); } } } } public class CyclicBarrierTest { public static void main(String[] args) throws Exception{ int N = 4; CyclicBarrier barrier = new CyclicBarrier(N); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new CyclicBarrierManager(barrier,exec,N)); exec.shutdown(); } } #輸出結(jié)果: WriteTask-1 開始寫入數(shù)據(jù)... WriteTask-2 開始寫入數(shù)據(jù)... WriteTask-0 開始寫入數(shù)據(jù)... WriteTask-2 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢 1512051484365 WriteTask-0 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢 1512051485503 WriteTask-1 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢 1512051488068 所有任務寫入完畢,繼續(xù)處理其他任務... 1512051488078 所有任務寫入完畢,繼續(xù)處理其他任務... 1512051488078 WriteTask-2 開始寫入數(shù)據(jù)... 所有任務寫入完畢,繼續(xù)處理其他任務... 1512051488078 WriteTask-1 開始寫入數(shù)據(jù)... WriteTask-0 開始寫入數(shù)據(jù)... WriteTask-0 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢 1512051488513 WriteTask-1 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢 1512051489045 WriteTask-2 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢 1512051489945 所有任務寫入完畢,繼續(xù)處理其他任務... 1512051489955 WriteTask-0 開始寫入數(shù)據(jù)... 所有任務寫入完畢,繼續(xù)處理其他任務... 1512051489955 所有任務寫入完畢,繼續(xù)處理其他任務... 1512051489955 WriteTask-2 開始寫入數(shù)據(jù)... WriteTask-1 開始寫入數(shù)據(jù)... WriteTask-2 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢 1512051490155 WriteTask-1 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢 1512051494477 WriteTask-0 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢 1512051494823 所有任務寫入完畢,繼續(xù)處理其他任務... 1512051494833 所有任務寫入完畢,繼續(xù)處理其他任務... 1512051494833 WriteTask-0 開始寫入數(shù)據(jù)... 所有任務寫入完畢,繼續(xù)處理其他任務... 1512051494833 WriteTask-1 開始寫入數(shù)據(jù)... WriteTask-2 開始寫入數(shù)據(jù)... WriteTask-2 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢 1512051494961 WriteTask-0 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢 1512051496040 WriteTask-1 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢 1512051498121 所有任務寫入完畢,繼續(xù)處理其他任務... 1512051498132 所有任務寫入完畢,繼續(xù)處理其他任務... 1512051498132 WriteTask-1 開始寫入數(shù)據(jù)... 所有任務寫入完畢,繼續(xù)處理其他任務... 1512051498132Semaphore
Semaphore翻譯成字面意思為 信號量,Semaphore 可以同時讓多個線程同時訪問共享資源,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。
Semaphore類位于java.util.concurrent包下,它提供了2個構(gòu)造器:
public Semaphore(int permits) { //參數(shù)permits表示許可數(shù)目,即同時可以允許多少線程進行訪問 sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { //這個多了一個參數(shù)fair表示是否是公平的,即等待時間越久的越先獲取許可 sync = (fair)? new FairSync(permits) : new NonfairSync(permits); }
下面說一下Semaphore類中比較重要的幾個方法,首先是acquire()、release()方法:
public void acquire() throws InterruptedException { } //獲取一個許可 public void acquire(int permits) throws InterruptedException { } //獲取permits個許可 public void release() {} //釋放一個許可 public void release(int permits) {} //釋放permits個許可
acquire()用來獲取一個許可,若無許可能夠獲得,則會一直等待,直到獲得許可。
release()用來釋放許可。
注意,在釋放許可之前,必須先獲獲得許可。
這4個方法都會被阻塞,如果想立即得到執(zhí)行結(jié)果,可以使用下面幾個方法:
public boolean tryAcquire() { }; //嘗試獲取一個許可,若獲取成功,則立即返回true,若獲取失敗,則立即返回false public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { }; //嘗試獲取一個許可,若在指定的時間內(nèi)獲取成功,則立即返回true,否則則立即返回false public boolean tryAcquire(int permits) { }; //嘗試獲取permits個許可,若獲取成功,則立即返回true,若獲取失敗,則立即返回false public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //嘗試獲取permits個許可,若在指定的時間內(nèi)獲取成功,則立即返回true,否則則立即返回false
package sychronized; import java.util.Random; import java.util.concurrent.*; import static net.mindview.util.Print.*; class Worker implements Runnable{ private static int count = 0; private final int id = count++; private int finished = 0; private Random random = new Random(47); private Semaphore semaphore; public Worker(Semaphore semaphore){ this.semaphore = semaphore; } @Override public void run(){ try { while (!Thread.interrupted()){ semaphore.acquire(); print(this+" 占用一個機器在生產(chǎn)... "); TimeUnit.MILLISECONDS.sleep(random.nextInt(2000)); synchronized (this){ print(" 已經(jīng)生產(chǎn)了"+(++finished)+"個產(chǎn)品,"+"釋放出機器"); } semaphore.release(); } } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString() { return getClass().getSimpleName()+"-"+id; } } public class SemaphoreTest { public static void main(String[] args) { int N = 8; //工人數(shù) Semaphore semaphore = new Semaphore(5); //機器數(shù)目 ExecutorService exec = Executors.newCachedThreadPool(); for (int i = 0; i < N; ++i){ exec.execute(new Worker(semaphore)); } exec.shutdown(); } }總結(jié)
CountDownLatch和CyclicBarrier都能夠?qū)崿F(xiàn)線程之間的等待,只不過它們側(cè)重點不同:
CountDownLatch 一般用于某個線程A等待若干個其他線程執(zhí)行完任務之后,它才執(zhí)行;
CyclicBarrier 一般用于一組線程互相等待至某個狀態(tài),然后這一組線程再同時執(zhí)行;
CountDownLatch 是不能夠重用的,而 CyclicBarrier 是可以重用的。
Semaphore 其實和鎖有點類似,它一般用于控制對 某組 資源的訪問權(quán)限,而鎖是控制對 某個 資源的訪問權(quán)限。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/70668.html
摘要:前言之前學多線程的時候沒有學習線程的同步工具類輔助類。而其它線程完成自己的操作后,調(diào)用使計數(shù)器減。信號量控制一組線程同時執(zhí)行。 前言 之前學多線程的時候沒有學習線程的同步工具類(輔助類)。ps:當時覺得暫時用不上,認為是挺高深的知識點就沒去管了.. 在前幾天,朋友發(fā)了一篇比較好的Semaphore文章過來,然后在瀏覽博客的時候又發(fā)現(xiàn)面試還會考,那還是挺重要的知識點。于是花了點時間去了解...
摘要:所有示例代碼請見下載于基本概念并發(fā)同時擁有兩個或者多個線程,如果程序在單核處理器上運行多個線程將交替地換入或者換出內(nèi)存這些線程是同時存在的,每個線程都處于執(zhí)行過程中的某個狀態(tài),如果運行在多核處理器上此時,程序中的每個線程都 所有示例代碼,請見/下載于 https://github.com/Wasabi1234... showImg(https://upload-images.jians...
摘要:倒計時鎖,線程中調(diào)用使進程進入阻塞狀態(tài),當達成指定次數(shù)后通過繼續(xù)執(zhí)行每個線程中剩余的內(nèi)容。實現(xiàn)分階段的的功能測試代碼拿客網(wǎng)站群三產(chǎn)創(chuàng)建于年月日。 同步器 為每種特定的同步問題提供了解決方案 Semaphore Semaphore【信號標;旗語】,通過計數(shù)器控制對共享資源的訪問。 測試類: package concurrent; import concurrent.th...
摘要:對于,我們僅僅需要關心兩個方法,一個是方法,另一個是方法。首先,我們來看方法,它代表線程阻塞,等待的值減為。首先,的源碼實現(xiàn)和大相徑庭,基于的共享模式的使用,而基于來實現(xiàn)。 前言 本文先用 CountDownLatch 將共享模式說清楚,然后順著把其他 AQS 相關的類 CyclicBarrier、Semaphore 的源碼一起過一下。 CountDownLatch CountDown...
閱讀 3473·2021-09-08 09:36
閱讀 2575·2019-08-30 15:54
閱讀 2364·2019-08-30 15:54
閱讀 1774·2019-08-30 15:44
閱讀 2396·2019-08-26 14:04
閱讀 2448·2019-08-26 14:01
閱讀 2884·2019-08-26 13:58
閱讀 1341·2019-08-26 13:47