摘要:和方法會(huì)一直阻塞調(diào)用線程,直到線程被中斷或隊(duì)列狀態(tài)可用和方法會(huì)限時(shí)阻塞調(diào)用線程,直到超時(shí)或線程被中斷或隊(duì)列狀態(tài)可用。
本文首發(fā)于一世流云專欄:https://segmentfault.com/blog...一、引言
從本節(jié)開始,我們將介紹juc-collections框架中的“阻塞隊(duì)列”部分。阻塞隊(duì)列在實(shí)際應(yīng)用中非常廣泛,許多消息中間件中定義的隊(duì)列,通常就是一種“阻塞隊(duì)列”。
那么“阻塞隊(duì)列”和我們之前討論過的ConcurrentLinkedQueue、ConcurrentLinkedDeque有什么不同呢?
ConcurrentLinkedQueue和ConcurrentLinkedDeque是以非阻塞算法實(shí)現(xiàn)的高性能隊(duì)列,其使用場(chǎng)景一般是在并發(fā)環(huán)境下,需要“隊(duì)列”/“?!边@類數(shù)據(jù)結(jié)構(gòu)時(shí)才會(huì)使用;而“阻塞隊(duì)列”通常利用了“鎖”來實(shí)現(xiàn),也就是會(huì)阻塞調(diào)用線程,其使用場(chǎng)景一般是在“生產(chǎn)者-消費(fèi)者”模式中,用于線程之間的數(shù)據(jù)交換或系統(tǒng)解耦。
在Java多線程基礎(chǔ)(七)——Producer-Consumer模式中,我們?cè)喴恼劦竭^“生產(chǎn)者-消費(fèi)者”這種模式。在這種模式中,“生產(chǎn)者”和“消費(fèi)者”是相互獨(dú)立的,兩者之間的通信需要依靠一個(gè)隊(duì)列。這個(gè)隊(duì)列,其實(shí)就是本文中的“阻塞隊(duì)列”。
引入“阻塞隊(duì)列”的最大好處就是解耦,在軟件工程中,“高內(nèi)聚,低耦合”是進(jìn)行模塊設(shè)計(jì)的準(zhǔn)則之一,這樣“生產(chǎn)者”和“消費(fèi)者”其實(shí)是互不影響的,將來任意一方需要升級(jí)時(shí),可以保證系統(tǒng)的平滑過渡。
二、BlockingQueue簡介BlockingQueue是在JDK1.5時(shí),隨著J.U.C引入的一個(gè)接口:
BlockingQueue繼承了Queue接口,提供了一些阻塞方法,主要作用如下:
當(dāng)線程向隊(duì)列中插入元素時(shí),如果隊(duì)列已滿,則阻塞線程,直到隊(duì)列有空閑位置(非滿);
當(dāng)線程從隊(duì)列中取元素(刪除隊(duì)列元素)時(shí),如果隊(duì)列未空,則阻塞線程,直到隊(duì)列有元素;
既然BlockingQueue是一種隊(duì)列,所以也具備隊(duì)列的三種基本方法:插入、刪除、讀取:
操作類型 | 拋出異常 | 返回特殊值 | 阻塞線程 | 超時(shí) |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
刪除 | remove() | poll() | take() | poll(time, unit) |
讀取 | element() | peek() | / | / |
可以看到,對(duì)于每種基本方法,“拋出異?!焙汀胺祷靥厥庵怠钡姆椒ǘx和Queue是完全一樣的。BlockingQueue只是增加了兩類和阻塞相關(guān)的方法:put(e)、take();offer(e, time, unit)、poll(time, unit)。
put(e)和take()方法會(huì)一直阻塞調(diào)用線程,直到線程被中斷或隊(duì)列狀態(tài)可用;
offer(e, time, unit)和poll(time, unit)方法會(huì)限時(shí)阻塞調(diào)用線程,直到超時(shí)或線程被中斷或隊(duì)列狀態(tài)可用。
public interface BlockingQueueextends Queue { /** * 插入元素e至隊(duì)尾, 如果隊(duì)列已滿, 則阻塞調(diào)用線程直到隊(duì)列有空閑空間. */ void put(E e) throws InterruptedException; /** * 插入元素e至隊(duì)列, 如果隊(duì)列已滿, 則限時(shí)阻塞調(diào)用線程,直到隊(duì)列有空閑空間或超時(shí). */ boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; /** * 從隊(duì)首刪除元素,如果隊(duì)列為空, 則阻塞調(diào)用線程直到隊(duì)列中有元素. */ E take() throws InterruptedException; /** * 從隊(duì)首刪除元素,如果隊(duì)列為空, 則限時(shí)阻塞調(diào)用線程,直到隊(duì)列中有元素或超時(shí). */ E poll(long timeout, TimeUnit unit) throws InterruptedException; // ... }
除此之外,BlockingQueue還具有以下特點(diǎn):
BlockingQueue隊(duì)列中不能包含null元素;
BlockingQueue接口的實(shí)現(xiàn)類都必須是線程安全的,實(shí)現(xiàn)類一般通過“鎖”保證線程安全;
BlockingQueue 可以是限定容量的。remainingCapacity()方法用于返回剩余可用容量,對(duì)于沒有容量限制的BlockingQueue實(shí)現(xiàn),該方法總是返回Integer.MAX_VALUE 。
三、再談“生產(chǎn)者-消費(fèi)者”模式最后,我們來看下如何利用BlockingQueue來實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模式。在生產(chǎn)者-消費(fèi)者模式中,一共有四類角色:生產(chǎn)者、消費(fèi)者、消息隊(duì)列、消息體。我們利用BlockingQueue來實(shí)現(xiàn)消息隊(duì)列,其余部分沒有什么變化。
Producer(生產(chǎn)者)生產(chǎn)者生產(chǎn)消息體(Data),并將消息體(Data)傳遞給通道(Channel)。
/** * 生產(chǎn)者 */ public class Producer implements Runnable { private Channel channel; public Producer(Channel channel) { this.channel = channel; } @Override public void run() { while (true) { String v = String.valueOf(ThreadLocalRandom.current().nextInt()); Data data = new Data(v); try { channel.put(data); System.out.println(Thread.currentThread().getName() + " produce :" + data); } catch (InterruptedException e) { e.printStackTrace(); } Thread.yield(); } } }Consumer(消費(fèi)者)
消費(fèi)者從通道(Channel)中獲取數(shù)據(jù),進(jìn)行處理。
/** * 消費(fèi)者 */ public class Consumer implements Runnable { private final Channel channel; public Consumer(Channel channel) { this.channel = channel; } @Override public void run() { while (true) { try { Object obj = channel.take(); System.out.println(Thread.currentThread().getName() + " consume :" + obj.toString()); } catch (InterruptedException e) { e.printStackTrace(); } Thread.yield(); } } }Channel(通道)
相當(dāng)于消息的隊(duì)列,對(duì)消息進(jìn)行排隊(duì),控制消息的傳輸。
/** * 通道類 */ public class Channel { private final BlockingQueue blockingQueue; public Channel(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; } public Object take() throws InterruptedException { return blockingQueue.take(); } public void put(Object o) throws InterruptedException { blockingQueue.put(o); } }Data(消息體/數(shù)據(jù))
Data代表了實(shí)際生產(chǎn)或消費(fèi)的數(shù)據(jù)。
/** * 數(shù)據(jù)/消息 */ public class Dataimplements Serializable { private T data; public Data(T data) { this.data = data; } public T getData() { return data; } public void setData(T data) { this.data = data; } @Override public String toString() { return "Data{" + "data=" + data + "}"; } }
調(diào)用如下:
public class Main { public static void main(String[] args) { BlockingQueue blockingQueue = new SomeQueueImplementation(); Channel channel = new Channel(blockingQueue); Producer p = new Producer(channel); Consumer c1 = new Consumer(channel); Consumer c2 = new Consumer(channel); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/77009.html
摘要:接口截止目前為止,我們介紹的阻塞隊(duì)列都是實(shí)現(xiàn)了接口。該類在構(gòu)造時(shí)一般需要指定容量,如果不指定,則最大容量為。另外,由于內(nèi)部通過來保證線程安全,所以的整體實(shí)現(xiàn)時(shí)比較簡單的。另外,雙端隊(duì)列相比普通隊(duì)列,主要是多了隊(duì)尾出隊(duì)元素隊(duì)首入隊(duì)元素的功能。 showImg(https://segmentfault.com/img/bVbgZ7j?w=770&h=514); 本文首發(fā)于一世流云專欄:ht...
摘要:整個(gè)包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設(shè)計(jì)模式,設(shè)計(jì)了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對(duì)等進(jìn)行補(bǔ)充增強(qiáng)。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:在章節(jié)中,我們說過,維護(hù)了一把全局鎖,無論是出隊(duì)還是入隊(duì),都共用這把鎖,這就導(dǎo)致任一時(shí)間點(diǎn)只有一個(gè)線程能夠執(zhí)行。入隊(duì)鎖對(duì)應(yīng)的是條件隊(duì)列,出隊(duì)鎖對(duì)應(yīng)的是條件隊(duì)列,所以每入隊(duì)一個(gè)元素,應(yīng)當(dāng)立即去喚醒可能阻塞的其它入隊(duì)線程。 showImg(https://segmentfault.com/img/bVbgCD9?w=1920&h=1080); 本文首發(fā)于一世流云專欄:https://seg...
摘要:在隊(duì)尾插入指定元素,如果隊(duì)列已滿,則阻塞線程加鎖隊(duì)列已滿。這里必須用,防止虛假喚醒在隊(duì)列上等待之所以這樣做,是防止線程被意外喚醒,不經(jīng)再次判斷就直接調(diào)用方法。 showImg(https://segmentfault.com/img/bVbgCD0?w=768&h=512); 本文首發(fā)于一世流云專欄:https://segmentfault.com/blog... 一、ArrayBl...
摘要:初始狀態(tài)對(duì)應(yīng)二叉樹結(jié)構(gòu)將頂點(diǎn)與最后一個(gè)結(jié)點(diǎn)調(diào)換即將頂點(diǎn)與最后一個(gè)結(jié)點(diǎn)交換,然后將索引為止置。 showImg(https://segmentfault.com/img/bVbgOtL?w=1600&h=800); 本文首發(fā)于一世流云專欄:https://segmentfault.com/blog... 一、PriorityBlockingQueue簡介 PriorityBlockin...
閱讀 3306·2021-11-24 09:39
閱讀 2823·2021-10-12 10:20
閱讀 1922·2019-08-30 15:53
閱讀 3086·2019-08-30 14:14
閱讀 2615·2019-08-29 15:36
閱讀 1131·2019-08-29 14:11
閱讀 1963·2019-08-26 13:51
閱讀 3420·2019-08-26 13:23