摘要:在之前,除了類外,并沒有其它適合并發(fā)環(huán)境的棧數(shù)據(jù)結(jié)構(gòu)。作為雙端隊(duì)列,可以當(dāng)作棧來使用,并且高效地支持并發(fā)環(huán)境。
本文首發(fā)于一世流云專欄:https://segmentfault.com/blog...一、引言
在開始講ConcurrentLinkedDeque之前,我們先來了解下Deque這種數(shù)據(jù)結(jié)構(gòu),我們知道Queue是一種具有FIFO特點(diǎn)的數(shù)據(jù)結(jié)構(gòu),元素只能在隊(duì)首進(jìn)行“入隊(duì)”操作,在隊(duì)尾進(jìn)行“出隊(duì)”操作。
而Deque(double-ended queue)是一種雙端隊(duì)列,也就是說可以在任意一端進(jìn)行“入隊(duì)”,也可以在任意一端進(jìn)行“出隊(duì)”:
Deque的數(shù)據(jù)結(jié)構(gòu)示意圖如下:
我們?cè)賮砜聪翵DK中Queue和Deque這兩種數(shù)據(jù)結(jié)構(gòu)的接口定義,看看Deque和Queue相比有哪些增強(qiáng):
Queue接口定義Queue的接口非常簡(jiǎn)單,一共只有三種類型的操作:入隊(duì)、出隊(duì)、讀取。
上述方法,可以劃分如下:
操作類型 | 拋出異常 | 返回特殊值 |
---|---|---|
入隊(duì) | add(e) | offer(e) |
出隊(duì) | remove() | poll() |
讀取 | element() | peek() |
每種操作類型,都給出了兩種方法,區(qū)別就是其中一種操作在隊(duì)列的狀態(tài)不滿足某些要求時(shí),會(huì)拋出異常;另一種,則直接返回特殊值(如null)。
Deque接口定義Queue接口的所有方法Deque都具備,只不過隊(duì)首/隊(duì)尾都可以進(jìn)行“出隊(duì)”和“入隊(duì)”操作:
操作類型 | 拋出異常 | 返回特殊值 |
---|---|---|
隊(duì)首入隊(duì) | addFirst(e) | offerFirst(e) |
隊(duì)首出隊(duì) | removeFirst() | pollFirst() |
隊(duì)首讀取 | getFirst() | peekFirst() |
隊(duì)尾入隊(duì) | addLast(e) | offerLast(e) |
隊(duì)尾出隊(duì) | removeLast() | pollLast() |
隊(duì)尾讀取 | getLast() | peekLast() |
除此之外,Deque還可以當(dāng)作“?!眮硎褂?,我們知道“棧”是一種具有“LIFO”特點(diǎn)的數(shù)據(jù)結(jié)構(gòu)(關(guān)于棧,可以參考我的這篇博文:棧),Deque提供了push、pop、peek這三個(gè)棧方法,一般實(shí)現(xiàn)這三個(gè)方法時(shí),可以利用已有方法,即有如下映射關(guān)系:
棧方法 | Deque方法 |
---|---|
push | addFirst(e) |
pop | removeFirst() |
peek | peekFirst() |
關(guān)于Deque接口的更多細(xì)節(jié),讀者可以參考Oracle的官方文檔:https://docs.oracle.com/javas...
二、ConcurrentLinkedDeque簡(jiǎn)介ConcurrentLinkedDeque是JDK1.7時(shí),J.U.C包引入的一個(gè)集合工具類。在JDK1.7之前,除了Stack類外,并沒有其它適合并發(fā)環(huán)境的“棧”數(shù)據(jù)結(jié)構(gòu)。ConcurrentLinkedDeque作為雙端隊(duì)列,可以當(dāng)作“棧”來使用,并且高效地支持并發(fā)環(huán)境。
ConcurrentLinkedDeque和ConcurrentLinkedQueue一樣,采用了無鎖算法,底層基于自旋+CAS的方式實(shí)現(xiàn)。
三、ConcurrentLinkedDeque原理 隊(duì)列結(jié)構(gòu)我們先來看下ConcurrentLinkedDeque的內(nèi)部結(jié)構(gòu):
public class ConcurrentLinkedDequeextends AbstractCollection implements Deque , java.io.Serializable { /** * 頭指針 */ private transient volatile Node head; /** * 尾指針 */ private transient volatile Node tail; private static final Node
可以看到,ConcurrentLinkedDeque的內(nèi)部和ConcurrentLinkedQueue類似,不過是一個(gè)雙鏈表結(jié)構(gòu),每入隊(duì)一個(gè)元素就是插入一個(gè)Node類型的結(jié)點(diǎn)。字段head指向隊(duì)列頭,tail指向隊(duì)列尾,通過Unsafe來CAS操作字段值以及Node對(duì)象的字段值。
需要特別注意的是ConcurrentLinkedDeque包含兩個(gè)特殊字段:PREV_TERMINATOR、NEXT_TERMINATOR。
這兩個(gè)字段初始時(shí)都指向一個(gè)值為null的空結(jié)點(diǎn),這兩個(gè)字段在結(jié)點(diǎn)刪除時(shí)使用,后面會(huì)詳細(xì)介紹:
ConcurrentLinkedDeque包含兩種構(gòu)造器:
/** * 空構(gòu)造器. */ public ConcurrentLinkedDeque() { head = tail = new Node(null); }
/** * 從已有集合,構(gòu)造隊(duì)列 */ public ConcurrentLinkedDeque(Collection extends E> c) { Nodeh = null, t = null; for (E e : c) { checkNotNull(e); Node newNode = new Node (e); if (h == null) h = t = newNode; else { // 在隊(duì)尾插入元素 t.lazySetNext(newNode); newNode.lazySetPrev(t); t = newNode; } } initHeadTail(h, t); }
我們重點(diǎn)看下空構(gòu)造器,通過空構(gòu)造器建立的ConcurrentLinkedDeque對(duì)象,其head和tail指針并非指向null,而是指向一個(gè)item值為null的Node結(jié)點(diǎn)——哨兵結(jié)點(diǎn),如下圖:
入隊(duì)操作雙端隊(duì)列與普通隊(duì)列的入隊(duì)區(qū)別是:雙端隊(duì)列既可以在“隊(duì)尾”插入元素,也可以在“隊(duì)首”插入元素。ConcurrentLinkedDeque的入隊(duì)方法有很多:addFirst(e)、addLast(e)、offerFirst(e)、offerLast(e):
public void addFirst(E e) { linkFirst(e); } public void addLast(E e) { linkLast(e); } public boolean offerFirst(E e) { linkFirst(e); return true; } public boolean offerLast(E e) { linkLast(e); return true; }
可以看到,隊(duì)首“入隊(duì)”其實(shí)就是調(diào)用了linkFirst(e)方法,而隊(duì)尾“入隊(duì)”是調(diào)用了 linkLast(e)方法。我們先來看下隊(duì)首“入隊(duì)”——linkFirst(e):
/** * 在隊(duì)首插入一個(gè)元素. */ private void linkFirst(E e) { checkNotNull(e); final NodenewNode = new Node (e); // 創(chuàng)建待插入的結(jié)點(diǎn) restartFromHead: for (; ; ) for (Node h = head, p = h, q; ; ) { if ((q = p.prev) != null && (q = (p = q).prev) != null) // Check for head updates every other hop. // If p == q, we are sure to follow head instead. p = (h != (h = head)) ? h : q; else if (p.next == p) // PREV_TERMINATOR continue restartFromHead; else { // p is first node newNode.lazySetNext(p); // CAS piggyback if (p.casPrev(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this deque, // and for newNode to become "live". if (p != h) // hop two nodes at a time casHead(h, newNode); // Failure is OK. return; } // Lost CAS race to another thread; re-read prev } } }
為了便于理解,我們以示例來看:假設(shè)有兩個(gè)線程ThreadA和ThreadB同時(shí)進(jìn)行入隊(duì)操作。
①ThreadA先多帶帶入隊(duì)一個(gè)元素9
此時(shí),ThreadA會(huì)執(zhí)行CASE3分支:
else { // CASE3: p是隊(duì)首結(jié)點(diǎn) newNode.lazySetNext(p); // “新結(jié)點(diǎn)”的next指向隊(duì)首結(jié)點(diǎn) if (p.casPrev(null, newNode)) { // 隊(duì)首結(jié)點(diǎn)的prev指針指向“新結(jié)點(diǎn)” if (p != h) // hop two nodes at a time casHead(h, newNode); // Failure is OK. return; } // 執(zhí)行到此處說明CAS操作失敗,有其它線程也在隊(duì)首插入元素 }
隊(duì)列的結(jié)構(gòu)如下:
②ThreadA入隊(duì)一個(gè)元素2,同時(shí)ThreadB入隊(duì)一個(gè)元素10
此時(shí),依然執(zhí)行CASE3分支,我們假設(shè)ThreadA操作成功,ThreadB操作失?。?/p>
else { // CASE3: p是隊(duì)首結(jié)點(diǎn) newNode.lazySetNext(p); // “新結(jié)點(diǎn)”的next指向隊(duì)首結(jié)點(diǎn) if (p.casPrev(null, newNode)) { // 隊(duì)首結(jié)點(diǎn)的prev指針指向“新結(jié)點(diǎn)” if (p != h) // hop two nodes at a time casHead(h, newNode); // Failure is OK. return; } // 執(zhí)行到此處說明CAS操作失敗,有其它線程也在隊(duì)首插入元素 }
ThreadA的CAS操作成功后,會(huì)進(jìn)入以下判斷:
if (p != h) // hop two nodes at a time casHead(h, newNode); // Failure is OK.
上述判斷的作用就是重置head頭指針,可以看到,ConcurrentLinkedDeque其實(shí)是以每次跳2個(gè)結(jié)點(diǎn)的方式移動(dòng)指針,這主要考慮到并發(fā)環(huán)境以這種hop跳的方式可以提升效率。
此時(shí)隊(duì)列的機(jī)構(gòu)如下:
注意,此時(shí)ThreadB的p.casPrev(null, newNode)操作失敗了,所以會(huì)進(jìn)入下一次自旋,在下一次自旋中繼續(xù)進(jìn)入CASE3。如果ThreadA的casHead操作沒有完成,ThreadB就進(jìn)入了下一次自旋,則會(huì)進(jìn)入分支1,重置指針p指向隊(duì)首。最終隊(duì)列結(jié)構(gòu)如下:
在隊(duì)尾插入元素和隊(duì)首類似,不再贅述,讀者可以自己閱讀源碼。出隊(duì)操作
ConcurrentLinkedDeque的出隊(duì)一樣分為隊(duì)首、隊(duì)尾兩種情況:removeFirst()、pollFirst()、removeLast()、pollLast()。
public E removeFirst() { return screenNullResult(pollFirst()); } public E removeLast() { return screenNullResult(pollLast()); } public E pollFirst() { for (Nodep = first(); p != null; p = succ(p)) { E item = p.item; if (item != null && p.casItem(item, null)) { unlink(p); return item; } } return null; } public E pollLast() { for (Node p = last(); p != null; p = pred(p)) { E item = p.item; if (item != null && p.casItem(item, null)) { unlink(p); return item; } } return null; }
可以看到,兩個(gè)remove方法其實(shí)內(nèi)部都調(diào)用了對(duì)應(yīng)的poll方法,我們重點(diǎn)看下隊(duì)尾的“出隊(duì)”——pollLast方法:
public E pollLast() { for (Nodep = last(); p != null; p = pred(p)) { E item = p.item; if (item != null && p.casItem(item, null)) { unlink(p); return item; } } return null; }
last方法用于尋找隊(duì)尾結(jié)點(diǎn),即滿足p.next == null && p.prev != p的結(jié)點(diǎn):
Nodelast() { restartFromTail: for (; ; ) for (Node t = tail, p = t, q; ; ) { if ((q = p.next) != null && (q = (p = q).next) != null) // Check for tail updates every other hop. // If p == q, we are sure to follow tail instead. p = (t != (t = tail)) ? t : q; else if (p == t // It is possible that p is NEXT_TERMINATOR, // but if so, the CAS is guaranteed to fail. || casTail(t, p)) return p; else continue restartFromTail; } }
pred方法用于尋找當(dāng)前結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)(如果前驅(qū)是自身,則返回隊(duì)尾結(jié)點(diǎn)):
final Nodepred(Node p) { Node q = p.prev; return (p == q) ? last() : q; }
unlink方法斷開結(jié)點(diǎn)的鏈接:
/** * Unlinks non-null node x. */ void unlink(Nodex) { // assert x != null; // assert x.item == null; // assert x != PREV_TERMINATOR; // assert x != NEXT_TERMINATOR; final Node prev = x.prev; final Node next = x.next; if (prev == null) { unlinkFirst(x, next); } else if (next == null) { unlinkLast(x, prev); } else { Node activePred, activeSucc; boolean isFirst, isLast; int hops = 1; // Find active predecessor for (Node p = prev; ; ++hops) { if (p.item != null) { activePred = p; isFirst = false; break; } Node q = p.prev; if (q == null) { if (p.next == p) return; activePred = p; isFirst = true; break; } else if (p == q) return; else p = q; } // Find active successor for (Node p = next; ; ++hops) { if (p.item != null) { activeSucc = p; isLast = false; break; } Node q = p.next; if (q == null) { if (p.prev == p) return; activeSucc = p; isLast = true; break; } else if (p == q) return; else p = q; } // TODO: better HOP heuristics if (hops < HOPS // always squeeze out interior deleted nodes && (isFirst | isLast)) return; // Squeeze out deleted nodes between activePred and // activeSucc, including x. skipDeletedSuccessors(activePred); skipDeletedPredecessors(activeSucc); // Try to gc-unlink, if possible if ((isFirst | isLast) && // Recheck expected state of predecessor and successor (activePred.next == activeSucc) && (activeSucc.prev == activePred) && (isFirst ? activePred.prev == null : activePred.item != null) && (isLast ? activeSucc.next == null : activeSucc.item != null)) { updateHead(); // Ensure x is not reachable from head updateTail(); // Ensure x is not reachable from tail // Finally, actually gc-unlink x.lazySetPrev(isFirst ? prevTerminator() : x); x.lazySetNext(isLast ? nextTerminator() : x); } } }
ConcurrentLinkedDeque相比ConcurrentLinkedQueue,功能更豐富,但是由于底層結(jié)構(gòu)是雙鏈表,且完全采用CAS+自旋的無鎖算法保證線程安全性,所以需要考慮各種并發(fā)情況,源碼比ConcurrentLinkedQueue更加難懂,留待有精力作進(jìn)一步分析。四、總結(jié)
ConcurrentLinkedDeque使用了自旋+CAS的非阻塞算法來保證線程并發(fā)訪問時(shí)的數(shù)據(jù)一致性。由于隊(duì)列本身是一種雙鏈表結(jié)構(gòu),所以雖然算法看起來很簡(jiǎn)單,但其實(shí)需要考慮各種并發(fā)的情況,實(shí)現(xiàn)復(fù)雜度較高,并且ConcurrentLinkedDeque不具備實(shí)時(shí)的數(shù)據(jù)一致性,實(shí)際運(yùn)用中,如果需要一種線程安全的棧結(jié)構(gòu),可以使用ConcurrentLinkedDeque。
另外,關(guān)于ConcurrentLinkedDeque還有以下需要注意的幾點(diǎn):
ConcurrentLinkedDeque的迭代器是弱一致性的,這在并發(fā)容器中是比較普遍的現(xiàn)象,主要是指在一個(gè)線程在遍歷隊(duì)列結(jié)點(diǎn)而另一個(gè)線程嘗試對(duì)某個(gè)隊(duì)列結(jié)點(diǎn)進(jìn)行修改的話不會(huì)拋出ConcurrentModificationException,這也就造成在遍歷某個(gè)尚未被修改的結(jié)點(diǎn)時(shí),在next方法返回時(shí)可以看到該結(jié)點(diǎn)的修改,但在遍歷后再對(duì)該結(jié)點(diǎn)修改時(shí)就看不到這種變化。
size方法需要遍歷鏈表,所以在并發(fā)情況下,其結(jié)果不一定是準(zhǔn)確的,只能供參考。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/77014.html
摘要:所以,在并發(fā)量適中的情況下,一般具有較好的性能。字段指向隊(duì)列頭,指向隊(duì)列尾,通過來操作字段值以及對(duì)象的字段值。單線程的情況下,元素入隊(duì)比較好理解,直接線性地在隊(duì)首插入元素即可。 showImg(https://segmentfault.com/img/bVbguGd?w=1200&h=800); 本文首發(fā)于一世流云專欄:https://segmentfault.com/blog... ...
摘要:整個(gè)包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設(shè)計(jì)模式,設(shè)計(jì)了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對(duì)等進(jìn)行補(bǔ)充增強(qiáng)。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:我們來看下的類繼承圖可以看到,實(shí)現(xiàn)了接口,在多線程進(jìn)階二五之框架中,我們提到過實(shí)現(xiàn)了接口,以提供和排序相關(guān)的功能,維持元素的有序性,所以就是一種為并發(fā)環(huán)境設(shè)計(jì)的有序工具類。唯一的區(qū)別是針對(duì)的僅僅是鍵值,針對(duì)鍵值對(duì)進(jìn)行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發(fā)于一世流云專欄:https://seg...
摘要:僅僅當(dāng)有多個(gè)線程同時(shí)進(jìn)行寫操作時(shí),才會(huì)進(jìn)行同步??梢钥吹?,上述方法返回一個(gè)迭代器對(duì)象,的迭代是在舊數(shù)組上進(jìn)行的,當(dāng)創(chuàng)建迭代器的那一刻就確定了,所以迭代過程中不會(huì)拋出并發(fā)修改異常。另外,迭代器對(duì)象也不支持修改方法,全部會(huì)拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發(fā)于一世流云專欄:https://...
摘要:我們之前已經(jīng)介紹過了,底層基于跳表實(shí)現(xiàn),其操作平均時(shí)間復(fù)雜度均為。事實(shí)上,內(nèi)部引用了一個(gè)對(duì)象,以組合方式,委托對(duì)象實(shí)現(xiàn)了所有功能。線程安全內(nèi)存的使用較多迭代是對(duì)快照進(jìn)行的,不會(huì)拋出,且迭代過程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發(fā)于一世流云專欄:https://segmentfa...
閱讀 1650·2023-04-26 02:11
閱讀 2993·2023-04-25 16:18
閱讀 3722·2021-09-06 15:00
閱讀 2638·2019-08-30 15:55
閱讀 1943·2019-08-30 13:20
閱讀 2060·2019-08-26 18:36
閱讀 3134·2019-08-26 11:40
閱讀 2553·2019-08-26 10:11