摘要:當(dāng)隊(duì)列滿時(shí),存儲(chǔ)元素的線程會(huì)等待隊(duì)列可用。分別是一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列。一個(gè)支持優(yōu)先級(jí)排序的無界阻塞隊(duì)列。此隊(duì)列的默認(rèn)和最大長度為。此隊(duì)列按照先進(jìn)先出的原則對(duì)元素進(jìn)行排序。
最近在看數(shù)據(jù)結(jié)構(gòu)的時(shí)候,看到了隊(duì)列這里,在實(shí)際的開發(fā)中我們很少會(huì)手動(dòng)的去實(shí)現(xiàn)一個(gè)隊(duì)列,甚至很少直接用到隊(duì)列,但是在Java的包中有一些具有特殊屬性的隊(duì)列應(yīng)用的比較廣泛,例如:阻塞隊(duì)列&并發(fā)隊(duì)列.
阻塞隊(duì)列阻塞隊(duì)列(BlockingQueue)是一個(gè)額外支持兩種操作的隊(duì)列。這兩個(gè)附加的操作是:
1、在隊(duì)列為空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强铡?br>2、當(dāng)隊(duì)列滿時(shí),存儲(chǔ)元素的線程會(huì)等待隊(duì)列可用。
阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場景,生產(chǎn)者是往隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里拿元素的線程。阻塞隊(duì)列就是生產(chǎn)者存放元素的容器,而消費(fèi)者也只從容器里拿元素。
阻塞隊(duì)列提供了四種處理方法:
拋出異常
add(e):在添加元素的時(shí)候如果隊(duì)列已滿,那么直接拋出異常。 remove(e):移除元素,如果隊(duì)列為空,那么拋出異常。 element():檢查方法。 public static void test() { ArrayBlockingQueueblockingQueue = new ArrayBlockingQueue<>(10); for (int i=0; i
返回特殊值
1、offer(e) 入隊(duì)的時(shí)候返回特殊值,在不同的阻塞隊(duì)列中實(shí)現(xiàn)有一定的差別 2、poll() 出隊(duì)的時(shí)候返回特殊的值 3、peek() 測試出隊(duì)能否成功
一直阻塞
1、put(e) 如果隊(duì)列已滿,那么會(huì)一直阻塞,直到成功 2、take() 如果隊(duì)列為空,那么出隊(duì)會(huì)一直阻塞,直到成功
阻塞,超時(shí)退出
1、offer(e,time,unit) 2、poll(time,unit)Java中的阻塞隊(duì)列
JDK7提供了7個(gè)阻塞隊(duì)列。分別是ArrayBlockingQueue :一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列。
LinkedBlockingQueue :一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列。
PriorityBlockingQueue :一個(gè)支持優(yōu)先級(jí)排序的無界阻塞隊(duì)列。
DelayQueue:一個(gè)使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的無界阻塞隊(duì)列。
SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。
LinkedTransferQueue:一個(gè)由鏈表結(jié)構(gòu)組成的無界阻塞隊(duì)列。
LinkedBlockingDeque:一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。
下面具體看一下每一種阻塞隊(duì)列的實(shí)現(xiàn)方式以及使用場景:
1. ArrayBlockingQueue特性:用數(shù)組實(shí)現(xiàn)的實(shí)現(xiàn)的有界阻塞隊(duì)列,默認(rèn)情況下不保證線程公平的訪問隊(duì)列(按照阻塞的先后順序訪問隊(duì)列),隊(duì)列可用的時(shí)候,阻塞的線程都可以爭奪隊(duì)列的訪問資格,當(dāng)然也可以使用以下的構(gòu)造方法創(chuàng)建一個(gè)公平的阻塞隊(duì)列。
ArrayBlockingQueueblockingQueue2 = new ArrayBlockingQueue<>(10, true);
下面通過源碼探究以下,這個(gè)阻塞隊(duì)列是如何實(shí)現(xiàn)的?如果實(shí)現(xiàn)公平與非公平的控制。構(gòu)造過程
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); //基于數(shù)組實(shí)現(xiàn) this.items = new Object[capacity]; /**公平與非公平是通過可重入鎖來實(shí)現(xiàn)的*/ lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } /**阻塞隊(duì)列的公平與非公平是通過可重入鎖來實(shí)現(xiàn)的,關(guān)于為什么可重入鎖可以實(shí)現(xiàn)線程訪問的公平非公平特性,我們晚一點(diǎn)分析一下ReentrantLock的實(shí)現(xiàn)原理。【關(guān)于ReentrantLock的實(shí)現(xiàn)原理】https://segmentfault.com/a/11...
add(E) 操作,如果add失敗,那么拋出異常
public boolean add(E e) { return super.add(e); } /**AbstractQueue 父類的add方法*/ public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } /**通過多態(tài)調(diào)用自己的offer(e)實(shí)現(xiàn)*/ public boolean offer(E e) { checkNotNull(e); //加鎖 final ReentrantLock lock = this.lock; lock.lock(); try { //如果隊(duì)列滿了,那么返回false if (count == items.length) return false; else { //入隊(duì) enqueue(e); return true; } } finally { lock.unlock(); } } private void enqueue(E var1) { Object[] var2 = this.items; //putIndex可以認(rèn)為是隊(duì)列的隊(duì)尾后的一個(gè)位置,數(shù)據(jù)入隊(duì)對(duì)應(yīng)的位置,如果隊(duì)列滿了,那么putIndex設(shè)置為0 var2[this.putIndex] = var1; if (++this.putIndex == var2.length) { this.putIndex = 0; } ++this.count; //喚醒一個(gè)等待在condition上的線程 this.notEmpty.signal(); }put(E e) put操作,如果隊(duì)列已滿,那么會(huì)一直阻塞,直到put成功
public void put(E var1) throws InterruptedException { checkNotNull(var1); ReentrantLock var2 = this.lock; //加鎖,可被線程中斷返回 var2.lockInterruptibly(); try { //如果隊(duì)列已經(jīng)滿了,那么阻塞 while(this.count == this.items.length) { this.notFull.await(); } //進(jìn)行入隊(duì)操作 this.enqueue(var1); } finally { var2.unlock(); } } /** * 在隊(duì)列滿的情況put操作被阻塞,那么什么時(shí)候該操作可以被喚醒呢?很顯然是隊(duì)列中出現(xiàn)空地的情況下,才會(huì)被喚醒在notFull這種條件下 * 阻塞的操作: * 所以在發(fā)生以下操作的時(shí)候,會(huì)被喚醒進(jìn)行入隊(duì)的操作 * 1、dequeue()操作 2、removeAt(int var1)操作?。场lear()?。础rainTo */take() 出隊(duì)操作,如果隊(duì)列為空,那么阻塞,直到隊(duì)列中包含對(duì)應(yīng)元素喚醒
/**實(shí)現(xiàn)比較容易,和上面的操作異曲同工*/ public E take() throws InterruptedException { ReentrantLock var1 = this.lock; var1.lockInterruptibly(); Object var2; try { while(this.count == 0) { this.notEmpty.await(); } var2 = this.dequeue(); } finally { var1.unlock(); } return var2; }個(gè)人總結(jié):實(shí)現(xiàn)阻塞操作和核心在于線程掛起以及線程的喚醒,在Java中提供了兩種線程等待以及線程喚醒的方式。一是基于對(duì)象監(jiān)視器的wait(),notify()方法?!《峭ㄟ^Condition.await()和signal()方法。2. LinkedBlockingQueue基于鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列。此隊(duì)列的默認(rèn)和最大長度為Integer.MAX_VALUE。此隊(duì)列按照先進(jìn)先出的原則對(duì)元素進(jìn)行排序。這個(gè)隊(duì)列的實(shí)現(xiàn)原理和ArrayBlockingQueue實(shí)現(xiàn)基本相同。可以看一下隊(duì)列的定義:隊(duì)列的定義
/**默認(rèn)的構(gòu)造函數(shù)*/ public LinkedBlockingQueue() { this(2147483647); } public LinkedBlockingQueue(int var1) { this.count = new AtomicInteger(); this.takeLock = new ReentrantLock(); this.notEmpty = this.takeLock.newCondition(); this.putLock = new ReentrantLock(); this.notFull = this.putLock.newCondition(); if (var1 <= 0) { throw new IllegalArgumentException(); } else { this.capacity = var1; //鏈表的頭結(jié)點(diǎn)和尾節(jié)點(diǎn),默認(rèn)是空 this.last = this.head = new LinkedBlockingQueue.Node((Object)null); } }3、PriorityBlockingQueue一個(gè)支持優(yōu)先級(jí)的無界隊(duì)列。默認(rèn)情況下元素采取自然順序排列,也可以通過比較器comparator來指定元素的排序規(guī)則。元素按照升序排列。具體是如何實(shí)現(xiàn)的?隊(duì)列的定義以及構(gòu)造方法
/**定義和通常的阻塞隊(duì)列相同,AbstractQueue中定義了隊(duì)列的基本操作,BlockingQueue中定義可阻塞隊(duì)列的相關(guān)操作定義*/ public class PriorityBlockingQueueextends AbstractQueue implements BlockingQueue /**構(gòu)造方法,默認(rèn)的無參構(gòu)造方法,調(diào)用的是另一個(gè)構(gòu)造方法,默認(rèn)定義了一個(gè)隊(duì)列的容量,那為什么說他是無界隊(duì)列呢?接著向下*/ public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } /**所有的構(gòu)造方法最后調(diào)用的構(gòu)造方法, comparator是一個(gè)比較器,通過比較器可以確定隊(duì)列中元素的排列順序*/ public PriorityBlockingQueue(int initialCapacity, Comparator super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; /**隊(duì)列是基于數(shù)組實(shí)現(xiàn)的*/ this.queue = new Object[initialCapacity]; } add 操作以及offer操作
public boolean add(E e) { return offer(e); } /** * offer操作 */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; //如果隊(duì)列已滿,那么嘗試進(jìn)行擴(kuò)容(個(gè)人感覺這里使用 >= 并不是很合理) while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator super E> cmp = comparator; if (cmp == null) //使用默認(rèn)的比較方法將e放到隊(duì)列中 siftUpComparable(n, e, array); else //使用指定的比較順序?qū)?shù)據(jù)插入到隊(duì)列中 siftUpUsingComparator(n, e, array, cmp); size = n + 1; //激活一個(gè)在notEmpty這個(gè)condition上等待的線程 notEmpty.signal(); } finally { lock.unlock(); } return true; } /**tryGrow()實(shí)現(xiàn)*/ private void tryGrow(Object[] array, int oldCap) { //這里先釋放了鎖,最后需要重新獲取鎖,那么這個(gè)時(shí)候所有的add操作都會(huì)執(zhí)行下面的代碼段 lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } if (newArray == null) // back off if another thread is allocating Thread.yield(); lock.lock(); if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }并發(fā)隊(duì)列
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/74576.html
摘要:語言在之前,提供的唯一的并發(fā)原語就是管程,而且之后提供的并發(fā)包,也是以管程技術(shù)為基礎(chǔ)的。但是管程更容易使用,所以選擇了管程。線程進(jìn)入條件變量的等待隊(duì)列后,是允許其他線程進(jìn)入管程的。并發(fā)編程里兩大核心問題互斥和同步,都可以由管程來幫你解決。 并發(fā)編程這個(gè)技術(shù)領(lǐng)域已經(jīng)發(fā)展了半個(gè)世紀(jì)了。有沒有一種核心技術(shù)可以很方便地解決我們的并發(fā)問題呢?這個(gè)問題, 我會(huì)選擇 Monitor(管程)技術(shù)。Ja...
摘要:下面是線程相關(guān)的熱門面試題,你可以用它來好好準(zhǔn)備面試。線程安全問題都是由全局變量及靜態(tài)變量引起的。持有自旋鎖的線程在之前應(yīng)該釋放自旋鎖以便其它線程可以獲得自旋鎖。 最近看到網(wǎng)上流傳著,各種面試經(jīng)驗(yàn)及面試題,往往都是一大堆技術(shù)題目貼上去,而沒有答案。 不管你是新程序員還是老手,你一定在面試中遇到過有關(guān)線程的問題。Java語言一個(gè)重要的特點(diǎn)就是內(nèi)置了對(duì)并發(fā)的支持,讓Java大受企業(yè)和程序員...
摘要:最小初始化容量。它作為堆棧隊(duì)列雙端隊(duì)列的操作和的操作是一致的,只是內(nèi)部的實(shí)現(xiàn)不同。根據(jù)元素內(nèi)容查找和刪除的效率比較低,為。但是接口有對(duì)應(yīng)的并發(fā)實(shí)現(xiàn)類類。 Queue接口的實(shí)現(xiàn)類 Queue接口作為隊(duì)列數(shù)據(jù)結(jié)構(gòu),java在實(shí)現(xiàn)的時(shí)候,直接定義了Deque接口(雙端隊(duì)列)來繼承Queue接口,并且只實(shí)現(xiàn)Deque接口。這樣java中的雙端隊(duì)列就囊括了隊(duì)列、雙端隊(duì)列、堆棧(Deque接口又定...
摘要:單線程集合本部分將重點(diǎn)介紹非線程安全集合。非線程安全集合框架的最新成員是自起推出的。這是標(biāo)準(zhǔn)的單線程陣營中唯一的有序集合。該功能能有效防止運(yùn)行時(shí)造型。檢查個(gè)集合之間不存在共同的元素。基于自然排序或找出集合中的最大或最小元素。 【編者按】本文作者為擁有十年金融軟件開發(fā)經(jīng)驗(yàn)的 Mikhail Vorontsov,文章主要概覽了所有標(biāo)準(zhǔn) Java 集合類型。文章系國內(nèi) ITOM 管理平臺(tái) O...
摘要:除此之外,還有一個(gè)接口,代表一個(gè)雙端隊(duì)列,雙端隊(duì)列可以同時(shí)從兩端刪除添加元素,因此的實(shí)現(xiàn)類既可當(dāng)成隊(duì)列使用,也可當(dāng)成棧使用。相當(dāng)于棧方法將一個(gè)元素進(jìn)該雙端隊(duì)列所表示的棧的棧頂。 Queue用于模擬隊(duì)列這種數(shù)據(jù)結(jié)構(gòu),隊(duì)列通常是指先進(jìn)先出(FIFO)的容器。隊(duì)列的頭部保存在隊(duì)列中存放時(shí)間最長的元素,隊(duì)列的尾部保存在隊(duì)列中存放時(shí)間最短的元素。新元素插入(offer)到隊(duì)列的尾部,訪問元素(p...
閱讀 3254·2021-11-12 10:36
閱讀 1344·2019-08-30 15:56
閱讀 2473·2019-08-30 11:26
閱讀 580·2019-08-29 13:00
閱讀 3635·2019-08-28 18:08
閱讀 2775·2019-08-26 17:18
閱讀 1930·2019-08-26 13:26
閱讀 2459·2019-08-26 11:39