摘要:在章節(jié)中,我們說過,維護(hù)了一把全局鎖,無論是出隊還是入隊,都共用這把鎖,這就導(dǎo)致任一時間點只有一個線程能夠執(zhí)行。入隊鎖對應(yīng)的是條件隊列,出隊鎖對應(yīng)的是條件隊列,所以每入隊一個元素,應(yīng)當(dāng)立即去喚醒可能阻塞的其它入隊線程。
本文首發(fā)于一世流云專欄:https://segmentfault.com/blog...一、LinkedBlockingQueue簡介
LinkedBlockingQueue是在JDK1.5時,隨著J.U.C包引入的一種阻塞隊列,它實現(xiàn)了BlockingQueue接口,底層基于單鏈表實現(xiàn):
LinkedBlockingQueue是一種近似有界阻塞隊列,為什么說近似?因為LinkedBlockingQueue既可以在初始構(gòu)造時就指定隊列的容量,也可以不指定,如果不指定,那么它的容量大小默認(rèn)為Integer.MAX_VALUE。
LinkedBlockingQueue除了底層數(shù)據(jù)結(jié)構(gòu)(單鏈表)與ArrayBlockingQueue不同外,另外一個特點就是:
它維護(hù)了兩把鎖——takeLock和putLock。
takeLock用于控制出隊的并發(fā),putLock用于入隊的并發(fā)。這也就意味著,同一時刻,只能只有一個線程能執(zhí)行入隊/出隊操作,其余入隊/出隊線程會被阻塞;但是,入隊和出隊之間可以并發(fā)執(zhí)行,即同一時刻,可以同時有一個線程進(jìn)行入隊,另一個線程進(jìn)行出隊,這樣就可以提升吞吐量。
在ArrayBlockingQueue章節(jié)中,我們說過,ArrayBlockingQueue維護(hù)了一把全局鎖,無論是出隊還是入隊,都共用這把鎖,這就導(dǎo)致任一時間點只有一個線程能夠執(zhí)行。那么對于“生產(chǎn)者-消費者”模式來說,意味著生產(chǎn)者和消費者不能并發(fā)執(zhí)行。二、LinkedBlockingQueue原理 構(gòu)造
LinkedBlockingQueue提供了三種構(gòu)造器:
/** * 默認(rèn)構(gòu)造器. * 隊列容量為Integer.MAX_VALUE. */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
/** * 顯示指定隊列容量的構(gòu)造器 */ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node(null); }
/** * 從已有集合構(gòu)造隊列. * 隊列容量為Integer.MAX_VALUE */ public LinkedBlockingQueue(Collection extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // 這里加鎖僅僅是為了保證可見性 try { int n = 0; for (E e : c) { if (e == null) // 隊列不能包含null元素 throw new NullPointerException(); if (n == capacity) // 隊列已滿 throw new IllegalStateException("Queue full"); enqueue(new Node(e)); // 隊尾插入元素 ++n; } count.set(n); // 設(shè)置元素個數(shù) } finally { putLock.unlock(); } }
可以看到,如果不指定容量,那么它的容量大小默認(rèn)為Integer.MAX_VALUE。另外,LinkedBlockingQueue使用了一個原子變量AtomicInteger記錄隊列中元素的個數(shù),以保證入隊/出隊并發(fā)修改元素時的數(shù)據(jù)一致性。
public class LinkedBlockingQueueextends AbstractQueue implements BlockingQueue , java.io.Serializable { /** * 隊列容量. * 如果不指定, 則為Integer.MAX_VALUE */ private final int capacity; /** * 隊列中的元素個數(shù) */ private final AtomicInteger count = new AtomicInteger(); /** * 隊首指針. * head.item == null */ transient Node head; /** * 隊尾指針. * last.next == null */ private transient Node last; /** * 出隊鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** * 隊列空時,出隊線程在該條件隊列等待 */ private final Condition notEmpty = takeLock.newCondition(); /** * 入隊鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** * 隊列滿時,入隊線程在該條件隊列等待 */ private final Condition notFull = putLock.newCondition(); /** * 鏈表結(jié)點定義 */ static class Node { E item; Node next; // 后驅(qū)指針 Node(E x) { item = x; } } //... }
構(gòu)造完成后,LinkedBlockingQueue的初始結(jié)構(gòu)如下:
插入部分元素后的LinkedBlockingQueue結(jié)構(gòu):
核心方法由于接口和ArrayBlockingQueue完全一樣,所以LinkedBlockingQueue會阻塞線程的方法也一共有4個:put(E e)、offer(e, time, unit)和take()、poll(time, unit),我們先來看插入元素的方法。
插入元素——put(E e)
/** * 在隊尾插入指定的元素. * 如果隊列已滿,則阻塞線程. */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Nodenode = new Node (e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); // 獲取“入隊鎖” try { while (count.get() == capacity) { // 隊列已滿, 則線程在notFull上等待 notFull.await(); } enqueue(node); // 將新結(jié)點鏈接到“隊尾” /** * c+1 表示的元素個數(shù). * 如果,則喚醒一個“入隊線程” */ c = count.getAndIncrement(); // c表示入隊前的隊列元素個數(shù) if (c + 1 < capacity) // 入隊后隊列未滿, 則喚醒一個“入隊線程” notFull.signal(); } finally { putLock.unlock(); } if (c == 0) // 隊列初始為空, 則喚醒一個“出隊線程” signalNotEmpty(); }
插入元素時,首先需要獲得“入隊鎖”,如果隊列滿了,則當(dāng)前線程需要在notFull條件隊列等待;否則,將新元素鏈接到隊列尾部。
這里需要注意的是兩個地方:
①每入隊一個元素后,如果隊列還沒滿,則需要喚醒其它可能正在等待的“入隊線程”:
/** * c+1 表示的元素個數(shù). * 如果,則喚醒一個“入隊線程” */ c = count.getAndIncrement(); // c表示入隊前的隊列元素個數(shù) if (c + 1 < capacity) // 入隊后隊列未滿, 則喚醒一個“入隊線程” notFull.signal();
② 每入隊一個元素,都要判斷下隊列是否空了,如果空了,說明可能存在正在等待的“出隊線程”,需要喚醒它:
if (c == 0) // 隊列為空, 則喚醒一個“出隊線程” signalNotEmpty();
這里為什么不像ArrayBlockingQueue那樣,入隊完成后,直接喚醒一個在notEmpty上等待的出隊線程?
因為ArrayBlockingQueue中,入隊/出隊用的是同一把鎖,兩者不會并發(fā)執(zhí)行,所以每入隊一個元素(拿到鎖),就可以通知可能正在等待的“出隊線程”。(同一個鎖的兩個條件隊列:notEmpty、notFull)
ArrayBlockingQueue中的enqueue方法:
private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) // 隊列已滿,則重置索引為0 putIndex = 0; count++; // 元素個數(shù)+1 notEmpty.signal(); // 喚醒一個notEmpty上的等待線程(可以來隊列取元素了) }
而LinkedBlockingQueue中,入隊/出隊用的是兩把鎖,入隊/出隊是會并發(fā)執(zhí)行的。入隊鎖對應(yīng)的是notFull條件隊列,出隊鎖對應(yīng)的是notEmpty條件隊列,所以每入隊一個元素,應(yīng)當(dāng)立即去喚醒可能阻塞的其它入隊線程。當(dāng)隊列為空時,說明后面再來“出隊線程”,一定都會阻塞,所以此時可以去喚醒一個出隊線程,以提升性能。
試想以下,如果去掉上面的①和②,當(dāng)入隊線程拿到“入隊鎖”,入隊元素后,直接嘗試喚醒出隊線程,會要求去拿出隊鎖,這樣持有鎖A的同時,再去嘗試獲取鎖B,很可能引起死鎖,就算通過打破死鎖的條件避免死鎖,每次操作同時獲取兩把鎖也會降低性能。
刪除元素——table()
刪除元素的邏輯和插入元素類似。刪除元素時,首先需要獲得“出隊鎖”,如果隊列為空,則當(dāng)前線程需要在notEmpty條件隊列等待;否則,從隊首出隊一個元素:
/** * 從隊首出隊一個元素 */ public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 獲取“出隊鎖” takeLock.lockInterruptibly(); try { while (count.get() == 0) { // 隊列為空, 則阻塞線程 notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); // c表示出隊前的元素個數(shù) if (c > 1) // 出隊前隊列非空, 則喚醒一個出隊線程 notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) // 隊列初始為滿,則喚醒一個入隊線程 signalNotFull(); return x; }
/** * 隊首出隊一個元素. */ private E dequeue() { Nodeh = head; Node first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
上面需要的注意點和插入元素一樣:
①每出隊一個元素前,如果隊列非空,則需要喚醒其它可能正在等待的“出隊線程”:
c = count.getAndDecrement(); // c表示出隊前的元素個數(shù) if (c > 1) // 出隊前隊列非空, 則喚醒一個出隊線程 notEmpty.signal();
② 每入隊一個元素,都要判斷下隊列是否滿,如果是滿的,說明可能存在正在等待的“入隊線程”,需要喚醒它:
if (c == capacity) // 隊列初始為滿,則喚醒一個入隊線程 signalNotFull();三、總結(jié)
歸納一下,LinkedBlockingQueue和ArrayBlockingQueue比較主要有以下區(qū)別:
隊列大小不同。ArrayBlockingQueue初始構(gòu)造時必須指定大小,而LinkedBlockingQueue構(gòu)造時既可以指定大小,也可以不指定(默認(rèn)為Integer.MAX_VALUE,近似于無界);
底層數(shù)據(jù)結(jié)構(gòu)不同。ArrayBlockingQueue底層采用數(shù)組作為數(shù)據(jù)存儲容器,而LinkedBlockingQueue底層采用單鏈表作為數(shù)據(jù)存儲容器;
兩者的加鎖機制不同。ArrayBlockingQueue使用一把全局鎖,即入隊和出隊使用同一個ReentrantLock鎖;而LinkedBlockingQueue進(jìn)行了鎖分離,入隊使用一個ReentrantLock鎖(putLock),出隊使用另一個ReentrantLock鎖(takeLock);
LinkedBlockingQueue不能指定公平/非公平策略(默認(rèn)都是非公平),而ArrayBlockingQueue可以指定策略。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/77045.html
摘要:接口截止目前為止,我們介紹的阻塞隊列都是實現(xiàn)了接口。該類在構(gòu)造時一般需要指定容量,如果不指定,則最大容量為。另外,由于內(nèi)部通過來保證線程安全,所以的整體實現(xiàn)時比較簡單的。另外,雙端隊列相比普通隊列,主要是多了隊尾出隊元素隊首入隊元素的功能。 showImg(https://segmentfault.com/img/bVbgZ7j?w=770&h=514); 本文首發(fā)于一世流云專欄:ht...
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設(shè)計模式,設(shè)計了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對等進(jìn)行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:我們來看下的類繼承圖可以看到,實現(xiàn)了接口,在多線程進(jìn)階二五之框架中,我們提到過實現(xiàn)了接口,以提供和排序相關(guān)的功能,維持元素的有序性,所以就是一種為并發(fā)環(huán)境設(shè)計的有序工具類。唯一的區(qū)別是針對的僅僅是鍵值,針對鍵值對進(jìn)行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發(fā)于一世流云專欄:https://seg...
摘要:僅僅當(dāng)有多個線程同時進(jìn)行寫操作時,才會進(jìn)行同步??梢钥吹剑鲜龇椒ǚ祷匾粋€迭代器對象,的迭代是在舊數(shù)組上進(jìn)行的,當(dāng)創(chuàng)建迭代器的那一刻就確定了,所以迭代過程中不會拋出并發(fā)修改異常。另外,迭代器對象也不支持修改方法,全部會拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發(fā)于一世流云專欄:https://...
摘要:我們之前已經(jīng)介紹過了,底層基于跳表實現(xiàn),其操作平均時間復(fù)雜度均為。事實上,內(nèi)部引用了一個對象,以組合方式,委托對象實現(xiàn)了所有功能。線程安全內(nèi)存的使用較多迭代是對快照進(jìn)行的,不會拋出,且迭代過程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發(fā)于一世流云專欄:https://segmentfa...
閱讀 3368·2021-11-19 11:36
閱讀 2967·2021-09-27 13:34
閱讀 2022·2021-09-22 15:17
閱讀 2431·2019-08-30 13:49
閱讀 780·2019-08-26 13:58
閱讀 1384·2019-08-26 10:47
閱讀 2566·2019-08-23 18:05
閱讀 635·2019-08-23 14:25