摘要:初始狀態(tài)對應(yīng)二叉樹結(jié)構(gòu)將頂點與最后一個結(jié)點調(diào)換即將頂點與最后一個結(jié)點交換,然后將索引為止置。
本文首發(fā)于一世流云專欄:https://segmentfault.com/blog...一、PriorityBlockingQueue簡介
PriorityBlockingQueue,是在JDK1.5時,隨著J.U.C包引入的一種阻塞隊列,它實現(xiàn)了BlockingQueue接口,底層基于堆實現(xiàn):
PriorityBlockingQueue是一種無界阻塞隊列,在構(gòu)造的時候可以指定隊列的初始容量。具有如下特點:
PriorityBlockingQueue與之前介紹的阻塞隊列最大的不同之處就是:它是一種優(yōu)先級隊列,也就是說元素并不是以FIFO的方式出/入隊,而是以按照權(quán)重大小的順序出隊;
PriorityBlockingQueue是真正的無界隊列(僅受內(nèi)存大小限制),它不像ArrayBlockingQueue那樣構(gòu)造時必須指定最大容量,也不像LinkedBlockingQueue默認(rèn)最大容量為Integer.MAX_VALUE;
由于PriorityBlockingQueue是按照元素的權(quán)重進入排序,所以隊列中的元素必須是可以比較的,也就是說元素必須實現(xiàn)Comparable接口;
由于PriorityBlockingQueue無界隊列,所以插入元素永遠不會阻塞線程;
PriorityBlockingQueue底層是一種基于數(shù)組實現(xiàn)的堆結(jié)構(gòu)。
關(guān)于堆,如果讀者不了解,可以參考下我的這篇博文預(yù)熱下——優(yōu)先級隊列。
注意:堆分為“大頂堆”和“小頂堆”,PriorityBlockingQueue會依據(jù)元素的比較方式選擇構(gòu)建大頂堆或小頂堆。比如:如果元素是Integer這種引用類型,那么默認(rèn)就是“小頂堆”,也就是每次出隊都會是當(dāng)前隊列最小的元素。二、PriorityBlockingQueue原理 構(gòu)造
PriorityBlockingQueue提供了四種構(gòu)造器:
/** * 默認(rèn)構(gòu)造器. * 默認(rèn)初始容量11, 以元素自然順序比較(元素必須實現(xiàn)Comparable接口) */ public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); }
/** * 指定初始容量的構(gòu)造器. * 以元素自然順序比較(元素必須實現(xiàn)Comparable接口) */ public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); }
/** * 指定初始容量和比較器的構(gòu)造器. */ public PriorityBlockingQueue(int initialCapacity, Comparator super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }
/** * 從已有集合構(gòu)造隊列. * 如果已經(jīng)集合是SortedSet或者PriorityBlockingQueue, 則保持原來的元素順序 */ public PriorityBlockingQueue(Collection extends E> c) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); boolean heapify = true; // true if not known to be in heap order boolean screen = true; // true if must screen for nulls ? if (c instanceof SortedSet>) { // 如果是有序集合 SortedSet extends E> ss = (SortedSet extends E>) c; this.comparator = (Comparator super E>) ss.comparator(); heapify = false; } else if (c instanceof PriorityBlockingQueue>) { // 如果是優(yōu)先級隊列 PriorityBlockingQueue extends E> pq = (PriorityBlockingQueue extends E>) c; this.comparator = (Comparator super E>) pq.comparator(); screen = false; if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } ? Object[] a = c.toArray(); int n = a.length; if (a.getClass() != Object[].class) a = Arrays.copyOf(a, n, Object[].class); if (screen && (n == 1 || this.comparator != null)) { // 校驗是否存在null元素 for (int i = 0; i < n; ++i) if (a[i] == null) throw new NullPointerException(); } this.queue = a; this.size = n; if (heapify) // 堆排序 heapify(); }
重點是第三種構(gòu)造器,可以看到,PriorityBlockingQueue內(nèi)部也是利用了ReentrantLock來保證并發(fā)訪問時的線程安全。
PriorityBlockingQueue如果不指定容量,默認(rèn)容量為11,內(nèi)部數(shù)組queue其實是一種二叉樹,后續(xù)我們會詳細(xì)介紹。
需要注意的是,PriorityBlockingQueue只有一個條件等待隊列——notEmpty,因為構(gòu)造時不會限制最大容量且會自動擴容,所以插入元素并不會阻塞,僅當(dāng)隊列為空時,才可能阻塞“出隊”線程。
public class PriorityBlockingQueue插入元素——put(E e)extends AbstractQueue implements BlockingQueue , java.io.Serializable { ? /** * 默認(rèn)容量. */ private static final int DEFAULT_INITIAL_CAPACITY = 11; ? /** * 最大容量. */ private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; ? /** * 內(nèi)部堆數(shù)組, 保存實際數(shù)據(jù), 可以看成一顆二叉樹: * 對于頂點queue[n], queue[2*n+1]表示左子結(jié)點, queue[2*(n+1)]表示右子結(jié)點. */ private transient Object[] queue; ? /** * 隊列中的元素個數(shù). */ private transient int size; ? /** * 比較器, 如果為null, 表示以元素自身的自然順序進行比較(元素必須實現(xiàn)Comparable接口). */ private transient Comparator super E> comparator; ? /** * 全局鎖. */ private final ReentrantLock lock; ? /** * 當(dāng)隊列為空時,出隊線程在該條件隊列上等待. */ private final Condition notEmpty; ? // ... }
PriorityBlockingQueue插入元素不會阻塞線程,put(E e)方法內(nèi)部其實是調(diào)用了offer(E e)方法:
首先獲取全局鎖(對于隊列的修改都要獲取這把鎖),然后判斷下隊列是否已經(jīng)滿了,如果滿了就先進行一次內(nèi)部數(shù)組的擴容(關(guān)于擴容,我們后面會專門講):
/** * 向隊列中插入指定元素. * 由于隊列是無界的,所以不會阻塞線程. */ public void put(E e) { offer(e); // never need to block } ? public boolean offer(E e) { if (e == null) throw new NullPointerException(); ? final ReentrantLock lock = this.lock; // 加鎖 lock.lock(); ? int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) // 隊列已滿, 則進行擴容 tryGrow(array, cap); ? try { Comparator super E> cmp = comparator; if (cmp == null) // 比較器為空, 則按照元素的自然順序進行堆調(diào)整 siftUpComparable(n, e, array); else // 比較器非空, 則按照比較器進行堆調(diào)整 siftUpUsingComparator(n, e, array, cmp); size = n + 1; // 隊列元素總數(shù)+1 notEmpty.signal(); // 喚醒一個可能正在等待的"出隊線程" } finally { lock.unlock(); } return true; }
上面最關(guān)鍵的是siftUpComparable和siftUpUsingComparator方法,這兩個方法內(nèi)部幾乎一樣,只不過前者是一個根據(jù)元素的自然順序比較,后者則根據(jù)外部比較器比較,我們重點看下siftUpComparable方法:
/** * 將元素x插入到array[k]的位置. * 然后按照元素的自然順序進行堆調(diào)整——"上浮",以維持"堆"有序. * 最終的結(jié)果是一個"小頂堆". */ private staticvoid siftUpComparable(int k, T x, Object[] array) { Comparable super T> key = (Comparable super T>) x; while (k > 0) { int parent = (k - 1) >>> 1; // 相當(dāng)于(k-1)除2, 就是求k結(jié)點的父結(jié)點索引parent Object e = array[parent]; if (key.compareTo((T) e) >= 0) // 如果插入的結(jié)點值大于父結(jié)點, 則退出 break; ? // 否則,交換父結(jié)點和當(dāng)前結(jié)點的值 array[k] = e; k = parent; } array[k] = key; }
siftUpComparable方法的作用其實就是堆的“上浮調(diào)整”,可以把堆可以想象成一棵完全二叉樹,每次插入元素都鏈接到二叉樹的最右下方,然后將插入的元素與其父結(jié)點比較,如果父結(jié)點大,則交換元素,直到?jīng)]有父結(jié)點比插入的結(jié)點大為止。這樣就保證了堆頂(二叉樹的根結(jié)點)一定是最小的元素。(注:以上僅針對“小頂堆”)
堆的“上浮”調(diào)整我們通過示例來理解下入隊的整個過程:假設(shè)初始構(gòu)造的隊列大小為6,依次插入9、2、93、10、25、90。
①初始隊列情況
②插入元素9(索引0處)
將上述數(shù)組想象成一棵完全二叉樹,其實就是下面的結(jié)構(gòu):
③插入元素2(索引1處)
對應(yīng)的二叉樹:
由于結(jié)點2的父結(jié)點為9,所以要進行“上浮調(diào)整”,最終隊列結(jié)構(gòu)如下:
④插入元素93(索引2處)
⑤插入元素10(索引3處)
⑥插入元素25(索引4處)
⑦插入元素90(索引5處)
此時,堆不滿足有序條件,因為“90”的父結(jié)點“93”大于它,所以需要“上浮調(diào)整”:
最終,堆的結(jié)構(gòu)如上,可以看到,經(jīng)過調(diào)整后,堆頂元素一定是最小的。
擴容在入隊過程中,如果隊列內(nèi)部的queue數(shù)組已經(jīng)滿了,就需要進行擴容:
public boolean offer(E e) { ? // ... while ((n = size) >= (cap = (array = queue).length)) // 隊列已滿, 則進行擴容 tryGrow(array, cap); ? // ... }
我們來看下tryGrow方法:
private void tryGrow(Object[] array, int oldCap) { lock.unlock(); // 擴容和入隊/出隊可以同時進行, 所以先釋放全局鎖 Object[] newArray = null; if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { // allocationSpinLock置1表示正在擴容 try { // 計算新的數(shù)組大小 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { // 溢出判斷 int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; // 分配新數(shù)組 } finally { allocationSpinLock = 0; } } if (newArray == null) // 擴容失?。赡苡衅渌€程正在擴容,導(dǎo)致allocationSpinLock競爭失?。? Thread.yield(); lock.lock(); // 獲取全局鎖(因為要修改內(nèi)部數(shù)組queue) if (newArray != null && queue == array) { queue = newArray; // 指向新的內(nèi)部數(shù)組 System.arraycopy(array, 0, newArray, 0, oldCap); } }
上述整個過程還是比較清晰的,由于調(diào)用tryGrow的方法一定會先獲取全局鎖,所以先釋放鎖,因為可能有線程正在出隊,擴容/出隊是可以并發(fā)執(zhí)行的(擴容的前半部分只是新建一個內(nèi)部數(shù)組,不會對出隊產(chǎn)生影響)。擴容后的內(nèi)部數(shù)組大小一般為原來的2倍。
上述需要注意的是allocationSpinLock字段,該字段通過CAS操作,置1表示有線程正在進行擴容。
刪除元素——take()刪除元素(出隊)的整個過程比較簡單,也是先獲取全局鎖,然后判斷隊列狀態(tài),如果是空,則阻塞線程,否則調(diào)用dequeue方法出隊:
/** * 出隊一個元素. * 如果隊列為空, 則阻塞線程. */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 獲取全局鎖 E result; try { while ((result = dequeue()) == null) // 隊列為空 notEmpty.await(); // 線程在noEmpty條件隊列等待 } finally { lock.unlock(); } return result; } ? private E dequeue() { int n = size - 1; // n表示出隊后的剩余元素個數(shù) if (n < 0) // 隊列為空, 則返回null return null; else { Object[] array = queue; E result = (E) array[0]; // array[0]是堆頂結(jié)點, 每次出隊都刪除堆頂結(jié)點 E x = (E) array[n]; // array[n]是堆的最后一個結(jié)點, 也就是二叉樹的最右下結(jié)點 array[n] = null; Comparator super E> cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } }
從dequeue方法可以看出,每次出隊的元素都是“堆頂結(jié)點”,對于“小頂堆”就是隊列中的最小值,對于“大頂堆”就是隊列中的最大值。
我們看下siftDownComparable方法如何實現(xiàn)堆頂點的刪除:
/** * 堆的"下沉"調(diào)整. * 刪除array[k]對應(yīng)的結(jié)點,并重新調(diào)整堆使其有序. * * @param k 待刪除的位置 * @param x 待比較的健 * @param array 堆數(shù)組 * @param n 堆的大小 */ private staticvoid siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable super T> key = (Comparable super T>) x; int half = n >>> 1; // 相當(dāng)于n除2, 即找到索引n對應(yīng)結(jié)點的父結(jié)點 while (k < half) { /** * 下述代碼中: * c保存k的左右子結(jié)點中的較小結(jié)點值 * child保存較小結(jié)點對應(yīng)的索引 */ int child = (k << 1) + 1; // k的左子結(jié)點 Object c = array[child]; ? int right = child + 1; // k的右子結(jié)點 if (right < n && ((Comparable super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; } }
上述代碼其實是經(jīng)典的堆“下沉”操作,對堆中某個頂點下沉,步驟如下:
找到該頂點的左右子結(jié)點中較小的那個;
與當(dāng)前結(jié)點交換;
重復(fù)前2步直到當(dāng)前結(jié)點沒有左右子結(jié)點或比左右子結(jié)點都小。
堆的“下沉”調(diào)整來看個示例,假設(shè)堆的初始結(jié)構(gòu)如下,現(xiàn)在出隊一個元素(索引0位置的元素2)。
①初始狀態(tài)
對應(yīng)二叉樹結(jié)構(gòu):
②將頂點與最后一個結(jié)點調(diào)換
即將頂點“2”與最后一個結(jié)點“93”交換,然后將索引5為止置null。
注意:為了提升效率(比如siftDownComparable的源碼所示)并不一定要真正交換,可以用一個變量保存索引5處的結(jié)點值,在整個下沉操作完成后再替換。但是為了理解這一過程,示例圖中全是以交換進行的。
③下沉索引0處結(jié)點
比較元素“93”和左右子結(jié)點中的最小者,發(fā)現(xiàn)“93”大于“9”,違反了“小頂堆”的規(guī)則,所以交換“93”和“9”,這一過程稱為siftdown(下沉):
④繼續(xù)下沉索引1處結(jié)點
比較元素“93”和左右子結(jié)點中的最小者,發(fā)現(xiàn)“93”大于“10”,違反了“小頂堆”的規(guī)則,所以交換“93”和“10”:
⑤比較結(jié)束
由于“93”已經(jīng)沒有左右子結(jié)點了,所以下沉結(jié)束,可以看到,此時堆恢復(fù)了有序狀態(tài),最終隊列結(jié)構(gòu)如下:
三、總結(jié)PriorityBlockingQueue屬于比較特殊的阻塞隊列,適用于有元素優(yōu)先級要求的場景。它的內(nèi)部和ArrayBlockingQueue一樣,使用一個了全局獨占鎖來控制同時只有一個線程可以進行入隊和出隊,另外由于該隊列是無界隊列,所以入隊線程并不會阻塞。
PriorityBlockingQueue始終保證出隊的元素是優(yōu)先級最高的元素,并且可以定制優(yōu)先級的規(guī)則,內(nèi)部通過使用堆(數(shù)組形式)來維護元素順序,它的內(nèi)部數(shù)組是可擴容的,擴容和出/入隊可以并發(fā)進行。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/77084.html
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設(shè)計模式,設(shè)計了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:我們來看下的類繼承圖可以看到,實現(xiàn)了接口,在多線程進階二五之框架中,我們提到過實現(xiàn)了接口,以提供和排序相關(guān)的功能,維持元素的有序性,所以就是一種為并發(fā)環(huán)境設(shè)計的有序工具類。唯一的區(qū)別是針對的僅僅是鍵值,針對鍵值對進行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發(fā)于一世流云專欄:https://seg...
摘要:僅僅當(dāng)有多個線程同時進行寫操作時,才會進行同步。可以看到,上述方法返回一個迭代器對象,的迭代是在舊數(shù)組上進行的,當(dāng)創(chuàng)建迭代器的那一刻就確定了,所以迭代過程中不會拋出并發(fā)修改異常。另外,迭代器對象也不支持修改方法,全部會拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發(fā)于一世流云專欄:https://...
摘要:我們之前已經(jīng)介紹過了,底層基于跳表實現(xiàn),其操作平均時間復(fù)雜度均為。事實上,內(nèi)部引用了一個對象,以組合方式,委托對象實現(xiàn)了所有功能。線程安全內(nèi)存的使用較多迭代是對快照進行的,不會拋出,且迭代過程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發(fā)于一世流云專欄:https://segmentfa...
摘要:接口截止目前為止,我們介紹的阻塞隊列都是實現(xiàn)了接口。該類在構(gòu)造時一般需要指定容量,如果不指定,則最大容量為。另外,由于內(nèi)部通過來保證線程安全,所以的整體實現(xiàn)時比較簡單的。另外,雙端隊列相比普通隊列,主要是多了隊尾出隊元素隊首入隊元素的功能。 showImg(https://segmentfault.com/img/bVbgZ7j?w=770&h=514); 本文首發(fā)于一世流云專欄:ht...
閱讀 1781·2021-11-11 16:55
閱讀 2579·2021-08-27 13:11
閱讀 3641·2019-08-30 15:53
閱讀 2314·2019-08-30 15:44
閱讀 1402·2019-08-30 11:20
閱讀 1049·2019-08-30 10:55
閱讀 953·2019-08-29 18:40
閱讀 3048·2019-08-29 16:13