摘要:接口截止目前為止,我們介紹的阻塞隊(duì)列都是實(shí)現(xiàn)了接口。該類在構(gòu)造時(shí)一般需要指定容量,如果不指定,則最大容量為。另外,由于內(nèi)部通過來保證線程安全,所以的整體實(shí)現(xiàn)時(shí)比較簡(jiǎn)單的。另外,雙端隊(duì)列相比普通隊(duì)列,主要是多了隊(duì)尾出隊(duì)元素隊(duì)首入隊(duì)元素的功能。
本文首發(fā)于一世流云專欄:https://segmentfault.com/blog...一、LinkedBlockingDeque簡(jiǎn)介
LinkedBlockingDeque和ConcurrentLinkedDeque類似,都是一種雙端隊(duì)列的結(jié)構(gòu),只不過LinkedBlockingDeque同時(shí)也是一種阻塞隊(duì)列,它是在JDK1.5時(shí)隨著J.U.C包引入的,實(shí)現(xiàn)了BlockingDueue接口,底層基于雙鏈表實(shí)現(xiàn):
注意:LinkedBlockingDeque底層利用ReentrantLock實(shí)現(xiàn)同步,并不像ConcurrentLinkedDeque那樣采用無鎖算法。
另外,LinkedBlockingDeque是一種近似有界阻塞隊(duì)列,為什么說近似?因?yàn)長(zhǎng)inkedBlockingDeque既可以在初始構(gòu)造時(shí)就指定隊(duì)列的容量,也可以不指定,如果不指定,那么它的容量大小默認(rèn)為Integer.MAX_VALUE。
BlockingDeque接口截止目前為止,我們介紹的阻塞隊(duì)列都是實(shí)現(xiàn)了BlockingQueue接口。和普通雙端隊(duì)列接口——Deque一樣,J.U.C中也有一種阻塞的雙端隊(duì)列接口——BlockingDeque。BlockingDeque是JDK1.6時(shí),J.U.C包新增的一個(gè)接口:
BlockingDeque的類繼承關(guān)系圖:
我們知道,BlockingQueue中阻塞方法一共有4個(gè):put(e)、take();offer(e, time, unit)、poll(time, unit),忽略限時(shí)等待的阻塞方法,一共就兩個(gè):
隊(duì)尾入隊(duì):put(e)
隊(duì)首出隊(duì):take()
BlockingDeque相對(duì)于BlockingQueue,最大的特點(diǎn)就是增加了在隊(duì)首入隊(duì)/隊(duì)尾出隊(duì)的阻塞方法。下面是兩個(gè)接口的比較:
阻塞方法 | BlockingQueue | BlockingDeque |
---|---|---|
隊(duì)首入隊(duì) | / | putFirst(e) |
隊(duì)首出隊(duì) | take() | takeFirst() |
隊(duì)尾入隊(duì) | put(e) | putLast(e) |
隊(duì)尾出隊(duì) | / | takeLast() |
LinkedBlockingDeque一共三種構(gòu)造器,不指定容量時(shí),默認(rèn)為Integer.MAX_VALUE:
/** * 默認(rèn)構(gòu)造器. */ public LinkedBlockingDeque() { this(Integer.MAX_VALUE); }
/** * 指定容量的構(gòu)造器. */ public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; }
/** * 從已有集合構(gòu)造隊(duì)列. */ public LinkedBlockingDeque(Collection extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock lock = this.lock; lock.lock(); // Never contended, but necessary for visibility try { for (E e : c) { if (e == null) throw new NullPointerException(); if (!linkLast(new Node內(nèi)部結(jié)構(gòu)(e))) throw new IllegalStateException("Deque full"); } } finally { lock.unlock(); } }
LinkedBlockingDeque內(nèi)部是雙鏈表的結(jié)構(gòu),結(jié)點(diǎn)Node的定義如下:
/** * 雙鏈表結(jié)點(diǎn)定義 */ static final class Node{ /** * 結(jié)點(diǎn)值, null表示該結(jié)點(diǎn)已被移除. */ E item; /** * 前驅(qū)結(jié)點(diǎn)指針. */ Node prev; /** * 后驅(qū)結(jié)點(diǎn)指針. */ Node next; Node(E x) { item = x; } }
字段first指向隊(duì)首結(jié)點(diǎn),字段last指向隊(duì)尾結(jié)點(diǎn)。另外LinkedBlockingDeque利用ReentrantLock來保證線程安全,所有對(duì)隊(duì)列的修改操作都需要先獲取這把全局鎖:
public class LinkedBlockingDeque隊(duì)尾入隊(duì)——putextends AbstractQueue implements BlockingDeque , java.io.Serializable { /** * 隊(duì)首結(jié)點(diǎn)指針. */ transient Node first; /** * 隊(duì)尾結(jié)點(diǎn)指針. */ transient Node last; /** * 隊(duì)列元素個(gè)數(shù). */ private transient int count; /** * 隊(duì)列容量. */ private final int capacity; /** * 全局鎖 */ final ReentrantLock lock = new ReentrantLock(); /** * 出隊(duì)線程條件隊(duì)列(隊(duì)列為空時(shí),出隊(duì)線程在此等待) */ private final Condition notEmpty = lock.newCondition(); /** * 入隊(duì)線程條件隊(duì)列(隊(duì)列為滿時(shí),入隊(duì)線程在此等待) */ private final Condition notFull = lock.newCondition(); //... }
先來看下,LinkedBlockingDeque是如何實(shí)現(xiàn)正常的從隊(duì)尾入隊(duì)的:
/** * 在隊(duì)尾入隊(duì)元素e. * 如果隊(duì)列已滿, 則阻塞線程. */ public void put(E e) throws InterruptedException { putLast(e); } public void putLast(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // 隊(duì)列不能包含null元素 Nodenode = new Node (e); // 創(chuàng)建入隊(duì)結(jié)點(diǎn) final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkLast(node)) // 隊(duì)列已滿, 則阻塞線程 notFull.await(); } finally { lock.unlock(); } }
put方法內(nèi)部調(diào)用了putLast方法,這是Deque接口獨(dú)有的方法。上述入隊(duì)操作的關(guān)鍵是linkLast方法:
/** * 將結(jié)點(diǎn)node鏈接到隊(duì)尾, 如果失敗, 則返回false. */ private boolean linkLast(Nodenode) { // assert lock.isHeldByCurrentThread(); if (count >= capacity) // 隊(duì)列已滿, 直接返回 return false; // 以下是雙鏈表的"尾插"操作 Node l = last; node.prev = l; last = node; if (first == null) first = node; else l.next = node; ++count; // 隊(duì)列元素加1 notEmpty.signal(); // 喚醒一個(gè)等待的出隊(duì)線程 return true; }
linkLast方法在隊(duì)尾插入一個(gè)結(jié)點(diǎn),插入失敗(隊(duì)列已滿的情況)則返回false。插入成功,則喚醒一個(gè)正在等待的出隊(duì)線程:
初始:
隊(duì)尾插入結(jié)點(diǎn)node:
隊(duì)首入隊(duì)就是雙鏈表的“頭插法”插入一個(gè)結(jié)點(diǎn),如果隊(duì)列已滿,則阻塞調(diào)用線程:
/** * 在隊(duì)首入隊(duì)元素e. * 如果隊(duì)列已滿, 則阻塞線程. */ public void putFirst(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); Nodenode = new Node (e); final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkFirst(node)) // 隊(duì)列已滿, 則阻塞線程 notFull.await(); } finally { lock.unlock(); } }
/** * 在隊(duì)首插入一個(gè)結(jié)點(diǎn), 插入失敗則返回null. */ private boolean linkFirst(Nodenode) { // assert lock.isHeldByCurrentThread(); if (count >= capacity) // 隊(duì)列已滿 return false; // 以下是雙鏈表的“頭插”操作 Node f = first; node.next = f; first = node; if (last == null) last = node; else f.prev = node; ++count; // 隊(duì)列元素?cái)?shù)量加1 notEmpty.signal(); // 喚醒一個(gè)等待的出隊(duì)線程 return true; }
初始:
隊(duì)首插入結(jié)點(diǎn)node:
隊(duì)首出隊(duì)的邏輯很簡(jiǎn)單,如果隊(duì)列為空,則阻塞調(diào)用線程:
/** * 從隊(duì)首出隊(duì)一個(gè)元素, 如果隊(duì)列為空, 則阻塞線程. */ public E take() throws InterruptedException { return takeFirst(); } public E takeFirst() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; while ((x = unlinkFirst()) == null) // 隊(duì)列為空, 則阻塞線程 notEmpty.await(); return x; } finally { lock.unlock(); } }
實(shí)際的出隊(duì)由unlinkFirst方法執(zhí)行:
/** * 從隊(duì)首刪除一個(gè)元素, 失敗則返回null. */ private E unlinkFirst() { // assert lock.isHeldByCurrentThread(); Nodef = first; if (f == null) // 隊(duì)列為空 return null; // 以下是雙鏈表的頭部刪除過程 Node n = f.next; E item = f.item; f.item = null; f.next = f; // help GC first = n; if (n == null) last = null; else n.prev = null; --count; // 隊(duì)列元素個(gè)數(shù)減1 notFull.signal(); // 喚醒一個(gè)等待的入隊(duì)線程 return item; }
初始:
刪除隊(duì)首結(jié)點(diǎn):
隊(duì)尾出隊(duì)的邏輯很簡(jiǎn)單,如果隊(duì)列為空,則阻塞調(diào)用線程:
/** * 從隊(duì)尾出隊(duì)一個(gè)元素, 如果隊(duì)列為空, 則阻塞線程. */ public E takeLast() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; while ((x = unlinkLast()) == null) // 隊(duì)列為空, 阻塞線程 notEmpty.await(); return x; } finally { lock.unlock(); } }
實(shí)際的出隊(duì)由unlinkLast方法執(zhí)行:
/** * 刪除隊(duì)尾元素, 如果失敗, 則返回null. */ private E unlinkLast() { // assert lock.isHeldByCurrentThread(); Nodel = last; if (l == null) // 隊(duì)列為空 return null; // 以下為雙鏈表的尾部刪除過程 Node p = l.prev; E item = l.item; l.item = null; l.prev = l; // help GC last = p; if (p == null) first = null; else p.next = null; --count; // 隊(duì)列元素個(gè)數(shù)減1 notFull.signal(); // 喚醒一個(gè)等待的入隊(duì)線程 return item; }
初始:
刪除隊(duì)尾結(jié)點(diǎn):
LinkedBlockingDeque作為一種阻塞雙端隊(duì)列,提供了隊(duì)尾刪除元素和隊(duì)首插入元素的阻塞方法。該類在構(gòu)造時(shí)一般需要指定容量,如果不指定,則最大容量為Integer.MAX_VALUE。另外,由于內(nèi)部通過ReentrantLock來保證線程安全,所以LinkedBlockingDeque的整體實(shí)現(xiàn)時(shí)比較簡(jiǎn)單的。
另外,雙端隊(duì)列相比普通隊(duì)列,主要是多了【隊(duì)尾出隊(duì)元素】/【隊(duì)首入隊(duì)元素】的功能。
阻塞隊(duì)列我們知道一般用于“生產(chǎn)者-消費(fèi)者”模式,而雙端阻塞隊(duì)列在“生產(chǎn)者-消費(fèi)者”就可以利用“雙端”的特性,從隊(duì)尾出隊(duì)元素。
考慮下面這樣一種場(chǎng)景:有多個(gè)消費(fèi)者,每個(gè)消費(fèi)者有自己的一個(gè)消息隊(duì)列,生產(chǎn)者不斷的生產(chǎn)數(shù)據(jù)扔到隊(duì)列中,消費(fèi)者消費(fèi)數(shù)據(jù)有快又慢。為了提升效率,速度快的消費(fèi)者可以從其它消費(fèi)者隊(duì)列的隊(duì)尾出隊(duì)元素放到自己的消息隊(duì)列中,由于是從其它隊(duì)列的隊(duì)尾出隊(duì),這樣可以減少并發(fā)沖突(其它消費(fèi)者從隊(duì)首出隊(duì)元素),又能提升整個(gè)系統(tǒng)的吞吐量。這其實(shí)是一種“工作竊取算法”的思路。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/77132.html
摘要:在章節(jié)中,我們說過,維護(hù)了一把全局鎖,無論是出隊(duì)還是入隊(duì),都共用這把鎖,這就導(dǎo)致任一時(shí)間點(diǎn)只有一個(gè)線程能夠執(zhí)行。入隊(duì)鎖對(duì)應(yīng)的是條件隊(duì)列,出隊(duì)鎖對(duì)應(yīng)的是條件隊(duì)列,所以每入隊(duì)一個(gè)元素,應(yīng)當(dāng)立即去喚醒可能阻塞的其它入隊(duì)線程。 showImg(https://segmentfault.com/img/bVbgCD9?w=1920&h=1080); 本文首發(fā)于一世流云專欄:https://seg...
摘要:整個(gè)包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設(shè)計(jì)模式,設(shè)計(jì)了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對(duì)等進(jìn)行補(bǔ)充增強(qiáng)。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:我們來看下的類繼承圖可以看到,實(shí)現(xiàn)了接口,在多線程進(jìn)階二五之框架中,我們提到過實(shí)現(xiàn)了接口,以提供和排序相關(guān)的功能,維持元素的有序性,所以就是一種為并發(fā)環(huán)境設(shè)計(jì)的有序工具類。唯一的區(qū)別是針對(duì)的僅僅是鍵值,針對(duì)鍵值對(duì)進(jìn)行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發(fā)于一世流云專欄:https://seg...
摘要:僅僅當(dāng)有多個(gè)線程同時(shí)進(jìn)行寫操作時(shí),才會(huì)進(jìn)行同步??梢钥吹?,上述方法返回一個(gè)迭代器對(duì)象,的迭代是在舊數(shù)組上進(jìn)行的,當(dāng)創(chuàng)建迭代器的那一刻就確定了,所以迭代過程中不會(huì)拋出并發(fā)修改異常。另外,迭代器對(duì)象也不支持修改方法,全部會(huì)拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發(fā)于一世流云專欄:https://...
摘要:我們之前已經(jīng)介紹過了,底層基于跳表實(shí)現(xiàn),其操作平均時(shí)間復(fù)雜度均為。事實(shí)上,內(nèi)部引用了一個(gè)對(duì)象,以組合方式,委托對(duì)象實(shí)現(xiàn)了所有功能。線程安全內(nèi)存的使用較多迭代是對(duì)快照進(jìn)行的,不會(huì)拋出,且迭代過程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發(fā)于一世流云專欄:https://segmentfa...
閱讀 1278·2021-11-17 09:33
閱讀 1747·2021-09-09 11:53
閱讀 3216·2021-09-04 16:45
閱讀 1394·2021-08-17 10:12
閱讀 2391·2019-08-30 15:55
閱讀 1782·2019-08-30 15:53
閱讀 2411·2019-08-30 15:52
閱讀 2562·2019-08-29 18:41