摘要:在隊(duì)尾插入指定元素,如果隊(duì)列已滿,則阻塞線程加鎖隊(duì)列已滿。這里必須用,防止虛假喚醒在隊(duì)列上等待之所以這樣做,是防止線程被意外喚醒,不經(jīng)再次判斷就直接調(diào)用方法。
本文首發(fā)于一世流云專欄:https://segmentfault.com/blog...一、ArrayBlockingQueue簡介
ArrayBlockingQueue是在JDK1.5時,隨著J.U.C包引入的一種阻塞隊(duì)列,它實(shí)現(xiàn)了BlockingQueue接口,底層基于數(shù)組實(shí)現(xiàn):
ArrayBlockingQueue是一種有界阻塞隊(duì)列,在初始構(gòu)造的時候需要指定隊(duì)列的容量。具有如下特點(diǎn):
隊(duì)列的容量一旦在構(gòu)造時指定,后續(xù)不能改變;
插入元素時,在隊(duì)尾進(jìn)行;刪除元素時,在隊(duì)首進(jìn)行;
隊(duì)列滿時,調(diào)用特定方法插入元素會阻塞線程;隊(duì)列空時,刪除元素也會阻塞線程;
支持公平/非公平策略,默認(rèn)為非公平策略。
這里的公平策略,是指當(dāng)線程從阻塞到喚醒后,以最初請求的順序(FIFO)來添加或刪除元素;非公平策略指線程被喚醒后,誰先搶占到鎖,誰就能往隊(duì)列中添加/刪除順序,是隨機(jī)的。二、ArrayBlockingQueue原理 構(gòu)造
ArrayBlockingQueue提供了三種構(gòu)造器:
/** * 指定隊(duì)列初始容量的構(gòu)造器. */ public ArrayBlockingQueue(int capacity) { this(capacity, false); }
/** * 指定隊(duì)列初始容量和公平/非公平策略的構(gòu)造器. */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); // 利用獨(dú)占鎖的策略 notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
/** * 根據(jù)已有集合構(gòu)造隊(duì)列 */ public ArrayBlockingQueue(int capacity, boolean fair, Collection extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // 這里加鎖是用于保證items數(shù)組的可見性 try { int i = 0; try { for (E e : c) { checkNotNull(e); // 不能有null元素 items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; // 如果隊(duì)列已滿,則重置puIndex索引為0 } finally { lock.unlock(); } }
核心就是第二種構(gòu)造器,從構(gòu)造器也可以看出,ArrayBlockingQueue在構(gòu)造時就指定了內(nèi)部數(shù)組的大小,并通過ReentrantLock來保證并發(fā)環(huán)境下的線程安全。
ArrayBlockingQueue的公平/非公平策略其實(shí)就是內(nèi)部ReentrantLock對象的策略,此外構(gòu)造時還創(chuàng)建了兩個Condition對象。在隊(duì)列滿時,插入線程需要在notFull上等待;當(dāng)隊(duì)列空時,刪除線程會在notEmpty上等待:
public class ArrayBlockingQueue核心方法extends AbstractQueue implements BlockingQueue , java.io.Serializable { /** * 內(nèi)部數(shù)組 */ final Object[] items; /** * 下一個待刪除位置的索引: take, poll, peek, remove方法使用 */ int takeIndex; /** * 下一個待插入位置的索引: put, offer, add方法使用 */ int putIndex; /** * 隊(duì)列中的元素個數(shù) */ int count; /** * 全局鎖 */ final ReentrantLock lock; /** * 非空條件隊(duì)列:當(dāng)隊(duì)列空時,線程在該隊(duì)列等待獲取 */ private final Condition notEmpty; /** * 非滿條件隊(duì)列:當(dāng)隊(duì)列滿時,線程在該隊(duì)列等待插入 */ private final Condition notFull; //... }
ArrayBlockingQueue會阻塞線程的方法一共4個:put(E e)、offer(e, time, unit)和take()、poll(time, unit),我們先來看插入元素的方法。
插入元素——put(E e)
插入元素的邏輯很簡單,用ReentrantLock來保證線程安全,當(dāng)隊(duì)列滿時,則調(diào)用線程會在notFull條件隊(duì)列上等待,否則就調(diào)用enqueue方法入隊(duì)。
/** * 在隊(duì)尾插入指定元素,如果隊(duì)列已滿,則阻塞線程. */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 加鎖 try { while (count == items.length) // 隊(duì)列已滿。這里必須用while,防止虛假喚醒 notFull.await(); // 在notFull隊(duì)列上等待 enqueue(e); // 隊(duì)列未滿, 直接入隊(duì) } finally { lock.unlock(); } }
這里需要注意一點(diǎn),隊(duì)列已滿的時候,是通過while循環(huán)判斷的,這其實(shí)是多線程設(shè)計模式中的Guarded Suspension模式:
while (count == items.length) // 隊(duì)列已滿。這里必須用while,防止虛假喚醒 notFull.await(); // 在notFull隊(duì)列上等待
之所以這樣做,是防止線程被意外喚醒,不經(jīng)再次判斷就直接調(diào)用enqueue方法。
enqueue方法:
private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) // 隊(duì)列已滿,則重置索引為0 putIndex = 0; count++; // 元素個數(shù)+1 notEmpty.signal(); // 喚醒一個notEmpty上的等待線程(可以來隊(duì)列取元素了) }
刪除元素——take()
刪除元素的邏輯和插入元素類似,區(qū)別就是:刪除元素時,如果隊(duì)列空了,則線程需要在notEmpty條件隊(duì)列上等待。
/** * 從隊(duì)首刪除一個元素, 如果隊(duì)列為空, 則阻塞線程 */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 隊(duì)列為空, 則線程在notEmpty條件隊(duì)列等待 notEmpty.await(); return dequeue(); // 隊(duì)列非空,則出隊(duì)一個元素 } finally { lock.unlock(); } }
隊(duì)列非空時,調(diào)用dequeue方法出隊(duì)一個元素:
private E dequeue() { final Object[] items = this.items; E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) // 如果隊(duì)列已空 takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); // 喚醒一個notFull上的等待線程(可以插入元素到隊(duì)列了) return x; }環(huán)形隊(duì)列
從上面的入隊(duì)/出隊(duì)操作,可以看出,ArrayBlockingQueue的內(nèi)部數(shù)組其實(shí)是一種環(huán)形結(jié)構(gòu)。
假設(shè)ArrayBlockingQueue的容量大小為6,我們來看下整個入隊(duì)過程:
①初始時
②插入元素“9”
③插入元素“2”、“10”、“25”、“93”
④插入元素“90”
注意,此時再插入一個元素“90”,則putIndex變成6,等于隊(duì)列容量6,由于是循環(huán)隊(duì)列,所以會將tableIndex重置為0:
這是隊(duì)列已經(jīng)滿了(count==6),如果再有線程嘗試插入元素,并不會覆蓋原有值,而是被阻塞。
我們再來看下出隊(duì)過程:
①出隊(duì)元素“9”
②出隊(duì)元素“2”、“10”、“25”、“93”
③出隊(duì)元素“90”
注意,此時再出隊(duì)一個元素“90”,則tabeIndex變成6,等于隊(duì)列容量6,由于是循環(huán)隊(duì)列,所以會將tableIndex重置為0:
這是隊(duì)列已經(jīng)空了(count==0),如果再有線程嘗試出隊(duì)元素,則會被阻塞。
三、總結(jié)ArrayBlockingQueue利用了ReentrantLock來保證線程的安全性,針對隊(duì)列的修改都需要加全局鎖。在一般的應(yīng)用場景下已經(jīng)足夠。對于超高并發(fā)的環(huán)境,由于生產(chǎn)者-消息者共用一把鎖,可能出現(xiàn)性能瓶頸。
另外,由于ArrayBlockingQueue是有界的,且在初始時指定隊(duì)列大小,所以如果初始時需要限定消息隊(duì)列的大小,則ArrayBlockingQueue 比較合適。后續(xù),我們會介紹另一種基于單鏈表實(shí)現(xiàn)的阻塞隊(duì)列——LinkedBlockingQueue,該隊(duì)列的最大特點(diǎn)是使用了“兩把鎖”,以提升吞吐量。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/77023.html
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設(shè)計模式,設(shè)計了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對等進(jìn)行補(bǔ)充增強(qiáng)。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:我們來看下的類繼承圖可以看到,實(shí)現(xiàn)了接口,在多線程進(jìn)階二五之框架中,我們提到過實(shí)現(xiàn)了接口,以提供和排序相關(guān)的功能,維持元素的有序性,所以就是一種為并發(fā)環(huán)境設(shè)計的有序工具類。唯一的區(qū)別是針對的僅僅是鍵值,針對鍵值對進(jìn)行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發(fā)于一世流云專欄:https://seg...
摘要:僅僅當(dāng)有多個線程同時進(jìn)行寫操作時,才會進(jìn)行同步??梢钥吹?,上述方法返回一個迭代器對象,的迭代是在舊數(shù)組上進(jìn)行的,當(dāng)創(chuàng)建迭代器的那一刻就確定了,所以迭代過程中不會拋出并發(fā)修改異常。另外,迭代器對象也不支持修改方法,全部會拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發(fā)于一世流云專欄:https://...
摘要:我們之前已經(jīng)介紹過了,底層基于跳表實(shí)現(xiàn),其操作平均時間復(fù)雜度均為。事實(shí)上,內(nèi)部引用了一個對象,以組合方式,委托對象實(shí)現(xiàn)了所有功能。線程安全內(nèi)存的使用較多迭代是對快照進(jìn)行的,不會拋出,且迭代過程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發(fā)于一世流云專欄:https://segmentfa...
摘要:接口截止目前為止,我們介紹的阻塞隊(duì)列都是實(shí)現(xiàn)了接口。該類在構(gòu)造時一般需要指定容量,如果不指定,則最大容量為。另外,由于內(nèi)部通過來保證線程安全,所以的整體實(shí)現(xiàn)時比較簡單的。另外,雙端隊(duì)列相比普通隊(duì)列,主要是多了隊(duì)尾出隊(duì)元素隊(duì)首入隊(duì)元素的功能。 showImg(https://segmentfault.com/img/bVbgZ7j?w=770&h=514); 本文首發(fā)于一世流云專欄:ht...
閱讀 3306·2021-11-24 09:39
閱讀 2823·2021-10-12 10:20
閱讀 1922·2019-08-30 15:53
閱讀 3086·2019-08-30 14:14
閱讀 2615·2019-08-29 15:36
閱讀 1131·2019-08-29 14:11
閱讀 1963·2019-08-26 13:51
閱讀 3421·2019-08-26 13:23