摘要:當(dāng)生產(chǎn)者線程調(diào)用方法時(shí),如果沒有消費(fèi)者等待接收元素,則會(huì)立即返回。方法方法,用于將指定元素傳遞給消費(fèi)者線程調(diào)用方法。
本文首發(fā)于一世流云專欄:https://segmentfault.com/blog...一、LinkedTransferQueue簡(jiǎn)介
LinkedTransferQueue是在JDK1.7時(shí),J.U.C包新增的一種比較特殊的阻塞隊(duì)列,它除了具備阻塞隊(duì)列的常用功能外,還有一個(gè)比較特殊的transfer方法。
我們知道,在普通阻塞隊(duì)列中,當(dāng)隊(duì)列為空時(shí),消費(fèi)者線程(調(diào)用take或poll方法的線程)一般會(huì)阻塞等待生產(chǎn)者線程往隊(duì)列中存入元素。而LinkedTransferQueue的transfer方法則比較特殊:
當(dāng)有消費(fèi)者線程阻塞等待時(shí),調(diào)用transfer方法的生產(chǎn)者線程不會(huì)將元素存入隊(duì)列,而是直接將元素傳遞給消費(fèi)者;
如果調(diào)用transfer方法的生產(chǎn)者線程發(fā)現(xiàn)沒有正在等待的消費(fèi)者線程,則會(huì)將元素入隊(duì),然后會(huì)阻塞等待,直到有一個(gè)消費(fèi)者線程來(lái)獲取該元素。
TransferQueue接口可以看到,LinkedTransferQueue實(shí)現(xiàn)了一個(gè)名為TransferQueue的接口,TransferQueue也是JDK1.7時(shí)J.U.C包新增的接口,正是該接口提供了上述的transfer方法:
除了transfer方法外,TransferQueue還提供了兩個(gè)變種方法:tryTransfer(E e)、tryTransfer(E e, long timeout, TimeUnit unit)。
tryTransfer(E e)
當(dāng)生產(chǎn)者線程調(diào)用tryTransfer方法時(shí),如果沒有消費(fèi)者等待接收元素,則會(huì)立即返回false。該方法和transfer方法的區(qū)別就是tryTransfer方法無(wú)論消費(fèi)者是否接收,方法立即返回,而transfer方法必須等到消費(fèi)者消費(fèi)后才返回。
tryTransfer(E e, long timeout, TimeUnit unit)
tryTransfer(E e,long timeout,TimeUnit unit)方法則是加上了限時(shí)等待功能,如果沒有消費(fèi)者消費(fèi)該元素,則等待指定的時(shí)間再返回;如果超時(shí)還沒消費(fèi)元素,則返回false,如果在超時(shí)時(shí)間內(nèi)消費(fèi)了元素,則返回true。
TransferQueue接口定義:
LinkedTransferQueue的特點(diǎn)簡(jiǎn)要概括如下:
LinkedTransferQueue是一種無(wú)界阻塞隊(duì)列,底層基于單鏈表實(shí)現(xiàn);
LinkedTransferQueue中的結(jié)點(diǎn)有兩種類型:數(shù)據(jù)結(jié)點(diǎn)、請(qǐng)求結(jié)點(diǎn);
LinkedTransferQueue基于無(wú)鎖算法實(shí)現(xiàn)。
二、LinkedTransferQueue原理 內(nèi)部結(jié)構(gòu)LinkedTransferQueue提供了兩種構(gòu)造器,也沒有參數(shù)設(shè)置隊(duì)列初始容量,所以是一種無(wú)界隊(duì)列:
/** * 隊(duì)列結(jié)點(diǎn)定義. */ static final class Node { final boolean isData; // true: 數(shù)據(jù)結(jié)點(diǎn); false: 請(qǐng)求結(jié)點(diǎn) volatile Object item; // 結(jié)點(diǎn)值 volatile Node next; // 后驅(qū)結(jié)點(diǎn)指針 volatile Thread waiter; // 等待線程 // 設(shè)置當(dāng)前結(jié)點(diǎn)的后驅(qū)結(jié)點(diǎn)為val final boolean casNext(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // 設(shè)置當(dāng)前結(jié)點(diǎn)的值為val final boolean casItem(Object cmp, Object val) { // assert cmp == null || cmp.getClass() != Node.class; return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } Node(Object item, boolean isData) { UNSAFE.putObject(this, itemOffset, item); // relaxed write this.isData = isData; } // 設(shè)置當(dāng)前結(jié)點(diǎn)的后驅(qū)結(jié)點(diǎn)為自身 final void forgetNext() { UNSAFE.putObject(this, nextOffset, this); } /** * 設(shè)置當(dāng)前結(jié)點(diǎn)的值為自身. * 設(shè)置當(dāng)前結(jié)點(diǎn)的等待線程為null. */ final void forgetContents() { UNSAFE.putObject(this, itemOffset, this); UNSAFE.putObject(this, waiterOffset, null); } /** * 判斷當(dāng)前結(jié)點(diǎn)是否匹配成功. * Node.item == this || (Node.isData == true && Node.item == null) */ final boolean isMatched() { Object x = item; return (x == this) || ((x == null) == isData); } /** * 判斷是否為未匹配的請(qǐng)求結(jié)點(diǎn). * Node.isData == false && Node.item == null */ final boolean isUnmatchedRequest() { return !isData && item == null; } /** * 當(dāng)該結(jié)點(diǎn)(havaData)是未匹配結(jié)點(diǎn), 且與當(dāng)前的結(jié)點(diǎn)類型不同時(shí), 返回true. */ final boolean cannotPrecede(boolean haveData) { boolean d = isData; Object x; return d != haveData && (x = item) != this && (x != null) == d; } /** * 嘗試匹配數(shù)據(jù)結(jié)點(diǎn). */ final boolean tryMatchData() { // assert isData; 當(dāng)前結(jié)點(diǎn)必須為數(shù)據(jù)結(jié)點(diǎn) Object x = item; if (x != null && x != this && casItem(x, null)) { LockSupport.unpark(waiter); // 喚醒等待線程 return true; } return false; } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; private static final long waiterOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class> k = Node.class; itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next")); waiterOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiter")); } catch (Exception e) { throw new Error(e); } } }
關(guān)于Node結(jié)點(diǎn),有以下幾點(diǎn)需要特別注意:
Node結(jié)點(diǎn)有兩種類型:數(shù)據(jù)結(jié)點(diǎn)、請(qǐng)求結(jié)點(diǎn),通過(guò)字段isData區(qū)分,只有不同類型的結(jié)點(diǎn)才能相互匹配;
Node結(jié)點(diǎn)的值保存在item字段,匹配前后值會(huì)發(fā)生變化;
Node結(jié)點(diǎn)的狀態(tài)變化如下表:
結(jié)點(diǎn)/狀態(tài) | 數(shù)據(jù)結(jié)點(diǎn) | 請(qǐng)求結(jié)點(diǎn) |
---|---|---|
匹配前 | isData = true; item = 數(shù)據(jù)結(jié)點(diǎn)值 | isData = false; item = null |
匹配后 | isData = true; item = null | isData = false; item = this |
從上表也可以看出,對(duì)于一個(gè)數(shù)據(jù)結(jié)點(diǎn),當(dāng)item == null表示匹配成功;對(duì)于一個(gè)請(qǐng)求結(jié)點(diǎn),當(dāng)item == this表示匹配成功。歸納起來(lái),匹配成功的結(jié)點(diǎn)Node就是滿足(Node.item == this) || ((Node.item == null) == Node.isData)。
LinkedTransferQueue內(nèi)部的其余字段定義如下,主要就是通過(guò)Unsafe類操作字段值,內(nèi)部定義了很多常量字段,比如自旋,這些都是為了非阻塞算法的鎖優(yōu)化而定義的:
public class LinkedTransferQueueextends AbstractQueue implements TransferQueue , java.io.Serializable { /** * True如果是多核CPU */ private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; /** * 線程自旋次數(shù)(僅多核CPU時(shí)用到). */ private static final int FRONT_SPINS = 1 << 7; /** * 線程自旋次數(shù)(僅多核CPU時(shí)用到). */ private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; /** * The maximum number of estimated removal failures (sweepVotes) * to tolerate before sweeping through the queue unlinking * cancelled nodes that were not unlinked upon initial * removal. See above for explanation. The value must be at least * two to avoid useless sweeps when removing trailing nodes. */ static final int SWEEP_THRESHOLD = 32; /** * 隊(duì)首結(jié)點(diǎn)指針. */ transient volatile Node head; /** * 隊(duì)尾結(jié)點(diǎn)指針. */ private transient volatile Node tail; /** * The number of apparent failures to unsplice removed nodes */ private transient volatile int sweepVotes; // CAS設(shè)置隊(duì)尾tail指針為val private boolean casTail(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); } // CAS設(shè)置隊(duì)首head指針為val private boolean casHead(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); } private boolean casSweepVotes(int cmp, int val) { return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val); } /* * xfer方法的入?yún)? 不同類型的方法內(nèi)部調(diào)用xfer方法時(shí)入?yún)⒉煌? */ private static final int NOW = 0; // for untimed poll, tryTransfer private static final int ASYNC = 1; // for offer, put, add private static final int SYNC = 2; // for transfer, take private static final int TIMED = 3; // for timed poll, tryTransfer // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; private static final long sweepVotesOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class> k = LinkedTransferQueue.class; headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("tail")); sweepVotesOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("sweepVotes")); } catch (Exception e) { throw new Error(e); } } //... }
上述比較重要的就是4個(gè)常量值的定義:
/* * xfer方法的入?yún)? 不同類型的方法內(nèi)部調(diào)用xfer方法時(shí)入?yún)⒉煌? */ private static final int NOW = 0; // for untimed poll, tryTransfer private static final int ASYNC = 1; // for offer, put, add private static final int SYNC = 2; // for transfer, take private static final int TIMED = 3; // for timed poll, tryTransfer
這四個(gè)常量值,作為xfer方法的入?yún)ⅲ糜跇?biāo)識(shí)不同操作類型。其實(shí)從常量的命名也可以看出它們對(duì)應(yīng)的操作含義:
NOW表示即時(shí)操作(可能失?。?,即不會(huì)阻塞調(diào)用線程:
poll(獲取并移除隊(duì)首元素,如果隊(duì)列為空,直接返回null);tryTransfer(嘗試將元素傳遞給消費(fèi)者,如果沒有等待的消費(fèi)者,則立即返回false,也不會(huì)將元素入隊(duì))
ASYNC表示異步操作(必然成功):
offer(插入指定元素至隊(duì)尾,由于是無(wú)界隊(duì)列,所以會(huì)立即返回true);put(插入指定元素至隊(duì)尾,由于是無(wú)界隊(duì)列,所以會(huì)立即返回);add(插入指定元素至隊(duì)尾,由于是無(wú)界隊(duì)列,所以會(huì)立即返回true)
SYNC表示同步操作(阻塞調(diào)用線程):
transfer(阻塞直到出現(xiàn)一個(gè)消費(fèi)者線程);take(從隊(duì)首移除一個(gè)元素,如果隊(duì)列為空,則阻塞線程)
TIMED表示限時(shí)同步操作(限時(shí)阻塞調(diào)用線程):
poll(long timeout, TimeUnit unit);tryTransfer(E e, long timeout, TimeUnit unit)
關(guān)于xfer方法,它是LinkedTransferQueued的核心內(nèi)部方法,我們后面會(huì)詳細(xì)介紹。
transfer方法transfer方法,用于將指定元素e傳遞給消費(fèi)者線程(調(diào)用take/poll方法)。如果有消費(fèi)者線程正在阻塞等待,則調(diào)用transfer方法的線程會(huì)直接將元素傳遞給它;如果沒有消費(fèi)者線程等待獲取元素,則調(diào)用transfer方法的線程會(huì)將元素插入到隊(duì)尾,然后阻塞等待,直到出現(xiàn)一個(gè)消費(fèi)者線程獲取元素:
/** * 將指定元素e傳遞給消費(fèi)者線程(調(diào)用take/poll方法). */ public void transfer(E e) throws InterruptedException { if (xfer(e, true, SYNC, 0) != null) { // 進(jìn)入到此處, 說(shuō)明調(diào)用線程被中斷了 Thread.interrupted(); // 清除中斷狀態(tài), 然后拋出中斷異常 throw new InterruptedException(); } }
transfer方法的內(nèi)部實(shí)際是調(diào)用了xfer方法,入?yún)?b>SYNC=2:
/** * 入隊(duì)/出隊(duì)元素的真正實(shí)現(xiàn). * * @param e 入隊(duì)操作, e非null; 出隊(duì)操作, e為null * @param haveData true表示入隊(duì)元素, false表示出隊(duì)元素 * @param how NOW, ASYNC, SYNC, TIMED 四種常量定義 * @param nanos 限時(shí)模式下使用(納秒) * @return 匹配成功則返回匹配的元素, 否則返回e本身 */ private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) // 入隊(duì)操作, 元素e不能為null throw new NullPointerException(); Node s = null; retry: for (; ; ) { for (Node h = head, p = h; p != null; ) { // 嘗試匹配p指向的結(jié)點(diǎn) boolean isData = p.isData; // 結(jié)點(diǎn)類型 Object item = p.item; // 結(jié)點(diǎn)值 if (item != p && (item != null) == isData) { // 如果結(jié)點(diǎn)還未匹配過(guò) if (isData == haveData) // 同種類型結(jié)點(diǎn)不能匹配 break; if (p.casItem(item, e)) { // p指向從隊(duì)首開始向后的第一個(gè)匹配結(jié)點(diǎn) for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); // 喚醒匹配結(jié)點(diǎn)上的等待線程 return LinkedTransferQueue.cast(item); // 返回匹配結(jié)點(diǎn)的值 } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } if (how != NOW) { if (s == null) s = new Node(e, haveData); // 創(chuàng)建一個(gè)入隊(duì)結(jié)點(diǎn), 添加到隊(duì)尾 Node pred = tryAppend(s, haveData); // pred指向s的前驅(qū)結(jié)點(diǎn)或s(隊(duì)列中只有一個(gè)結(jié)點(diǎn))或null(tryAppend失?。? if (pred == null) continue retry; // 入隊(duì)失敗,則重試 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊(duì)線程 } return e; } }
我們通過(guò)示例看下xfer方法到底做了哪些事:
①隊(duì)列初始狀態(tài)
②ThreadA線程調(diào)用transfer入隊(duì)元素“9”
注意,此時(shí)入隊(duì)一個(gè)數(shù)據(jù)結(jié)點(diǎn),且隊(duì)列為空,所以會(huì)直接進(jìn)入xfer中的下述代碼:
if (how != NOW) { if (s == null) s = new Node(e, haveData); // 創(chuàng)建一個(gè)入隊(duì)結(jié)點(diǎn), 添加到隊(duì)尾 Node pred = tryAppend(s, haveData); // pred指向s的前驅(qū)結(jié)點(diǎn)或s(隊(duì)列中只有一個(gè)結(jié)點(diǎn))或null(tryAppend失?。? if (pred == null) continue retry; // 入隊(duì)失敗,則重試 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊(duì)線程 }
上述代碼會(huì)插入一個(gè)結(jié)點(diǎn)至隊(duì)尾,然后線程進(jìn)入阻塞,等待一個(gè)出隊(duì)線程(消費(fèi)者)的到來(lái)。
隊(duì)尾插入結(jié)點(diǎn)的方法是tryAppend,由于此時(shí)隊(duì)列為空,會(huì)進(jìn)入CASE1分支,設(shè)置隊(duì)首指針head指向新結(jié)點(diǎn),tryAppend方法的返回值有三種情況:
入隊(duì)失敗,返回null;
入隊(duì)成功且隊(duì)列只有一個(gè)結(jié)點(diǎn),返回該結(jié)點(diǎn)自身;
入隊(duì)成功且隊(duì)列不止一個(gè)結(jié)點(diǎn),返回該入隊(duì)結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)。
/** * 嘗試將結(jié)點(diǎn)s添加到隊(duì)尾. * * @param s 待添加的結(jié)點(diǎn) * @param haveData true: 數(shù)據(jù)結(jié)點(diǎn) * @return 返回null表示失敗; 否則返回s的前驅(qū)結(jié)點(diǎn)(沒有前驅(qū)則返回s自身) */ private Node tryAppend(Node s, boolean haveData) { for (Node t = tail, p = t; ; ) { Node n, u; if (p == null && (p = head) == null) { // CASE1: 隊(duì)列為空 if (casHead(null, s)) // 設(shè)置隊(duì)首指針head return s; } else if (p.cannotPrecede(haveData)) // CASE2: 結(jié)點(diǎn)s不能鏈接到結(jié)點(diǎn)p return null; else if ((n = p.next) != null) // CASE3: 遍歷至隊(duì)尾結(jié)點(diǎn) p = p != t && t != (u = tail) ? (t = u) : // stale tail (p != n) ? n : null; // restart if off list else if (!p.casNext(null, s)) // CASE4: 插入結(jié)點(diǎn)s p = p.next; // re-read on CAS failure else { // CASE5: 嘗試進(jìn)行松弛操作 if (p != t) { // update if slack now >= 2 while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t) ; } return p; } } }
等待出隊(duì)線程方法awaitMatch,該方法核心作用就是進(jìn)行結(jié)點(diǎn)匹配:
匹配成功,返回匹配值;
匹配失?。ㄖ袛嗷蛳迺r(shí)等待的超時(shí)情況),返回原匹配結(jié)點(diǎn)的值;
阻塞線程,等待與之匹配的結(jié)點(diǎn)的到來(lái)。
從awaitMatch方法其實(shí)可以看到一種經(jīng)典的“鎖優(yōu)化”思路,就是 自旋 -> yield -> 阻塞,線程不會(huì)立即進(jìn)入阻塞,因?yàn)榫€程上下文切換的開銷往往比較大,所以會(huì)先自旋一定次數(shù),中途可能伴隨隨機(jī)的yield操作,讓出cpu時(shí)間片,如果自旋次數(shù)用完后,還是沒有匹配線程出現(xiàn),再真正阻塞線程。
經(jīng)過(guò)上述步驟,ThreadA最終會(huì)進(jìn)入CASE4分支中等待,此時(shí)的隊(duì)列結(jié)構(gòu)如下:
注意,此時(shí)的隊(duì)列中tail隊(duì)尾指針并不指向結(jié)點(diǎn)“9”,這是一種“松弛”策略,后面會(huì)講到。
③ThreadB線程調(diào)用transfer入隊(duì)元素“2”
由于此時(shí)隊(duì)首head指針不為null,所以會(huì)進(jìn)入transfer方法中的以下循環(huán):
for (Node h = head, p = h; p != null; ) { boolean isData = p.isData; // 結(jié)點(diǎn)類型 Object item = p.item; // 結(jié)點(diǎn)值 if (item != p && (item != null) == isData) { // 如果結(jié)點(diǎn)還未匹配過(guò) if (isData == haveData) // 同種類型結(jié)點(diǎn)不能匹配 break; if (p.casItem(item, e)) { // match for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); return LinkedTransferQueue.cast(item); } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist }
上述方法會(huì)讀取隊(duì)首結(jié)點(diǎn),判斷該結(jié)點(diǎn)有沒被匹配過(guò)(item != p && (item != null) == isData):
如果已經(jīng)被其它線程匹配過(guò)了,則繼續(xù)判斷下一個(gè)結(jié)點(diǎn)(p.next);
如果還沒有被匹配,則判斷下當(dāng)前的入隊(duì)結(jié)點(diǎn)類型是否和隊(duì)首中的一致;如果一致(isData == haveData)就匹配失敗,跳出循環(huán),否則進(jìn)行匹配操作。
顯然,目前隊(duì)首結(jié)點(diǎn)是“數(shù)據(jù)結(jié)點(diǎn)”,ThreadB線程的入隊(duì)結(jié)點(diǎn)也是“數(shù)據(jù)結(jié)點(diǎn)”,結(jié)點(diǎn)類型一致,所以匹配失敗,直接跳過(guò)循環(huán),也進(jìn)入以下代碼塊:
if (how != NOW) { if (s == null) s = new Node(e, haveData); // 創(chuàng)建一個(gè)入隊(duì)結(jié)點(diǎn), 添加到隊(duì)尾 Node pred = tryAppend(s, haveData); // pred指向s的前驅(qū)結(jié)點(diǎn)或s(隊(duì)列中只有一個(gè)結(jié)點(diǎn))或null(tryAppend失?。? if (pred == null) continue retry; // 入隊(duì)失敗,則重試 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊(duì)線程 }
再次調(diào)用tryAppend方法, 會(huì)在CASE4分支中將元素“2”插入隊(duì)尾,然后在CASE5分支中重新設(shè)置隊(duì)尾指針tail:
/** * 嘗試將結(jié)點(diǎn)s添加到隊(duì)尾. * * @param s 待添加的結(jié)點(diǎn) * @param haveData true: 數(shù)據(jù)結(jié)點(diǎn) * @return 返回null表示失敗; 否則返回s的前驅(qū)結(jié)點(diǎn)(沒有前驅(qū)則返回s自身) */ private Node tryAppend(Node s, boolean haveData) { for (Node t = tail, p = t; ; ) { Node n, u; if (p == null && (p = head) == null) { // CASE1: 隊(duì)列為空 if (casHead(null, s)) // 設(shè)置隊(duì)首指針head return s; } else if (p.cannotPrecede(haveData)) // CASE2: 結(jié)點(diǎn)s不能鏈接到結(jié)點(diǎn)p return null; else if ((n = p.next) != null) // CASE3: 遍歷至隊(duì)尾結(jié)點(diǎn) p = p != t && t != (u = tail) ? (t = u) : // stale tail (p != n) ? n : null; // restart if off list else if (!p.casNext(null, s)) // CASE4: 插入結(jié)點(diǎn)s p = p.next; // re-read on CAS failure else { // CASE5: 嘗試進(jìn)行松弛操作 if (p != t) { // update if slack now >= 2 while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t) ; } return p; } } }
此時(shí)隊(duì)列結(jié)構(gòu)如下:
最終,ThreadB也會(huì)在awaitMatch方法中進(jìn)入阻塞,最終隊(duì)列結(jié)構(gòu)如下:
④ThreadC線程調(diào)用transfer入隊(duì)元素“93”
過(guò)程和前幾步幾乎相同,不再贅述,最終隊(duì)列結(jié)構(gòu)如下:
可以看到,隊(duì)尾指針tail的設(shè)置實(shí)際是滯后的,這是一種“松弛”策略,用以提升無(wú)鎖算法并發(fā)修改過(guò)程中的性能。
take方法再來(lái)看下消費(fèi)者線程調(diào)用的take方法,該方法會(huì)從隊(duì)首取出一個(gè)元素,如果隊(duì)列為空,則線程會(huì)阻塞:
/** * 從隊(duì)首出隊(duì)一個(gè)元素. */ public E take() throws InterruptedException { E e = xfer(null, false, SYNC, 0); // (e == null && isData=false)表示一個(gè)請(qǐng)求結(jié)點(diǎn) if (e != null) // 如果e!=null, 則表示匹配成功, 此時(shí)e為與之匹配的數(shù)據(jù)結(jié)點(diǎn)的值 return e; Thread.interrupted(); throw new InterruptedException(); }
內(nèi)部依然調(diào)用了xfer方法,不過(guò)此時(shí)入?yún)⒂兴煌?,由于是消費(fèi)線程調(diào)用,所以入?yún)?b>e == null && hasData == false,表示一個(gè)“請(qǐng)求結(jié)點(diǎn)”:
/** * 入隊(duì)/出隊(duì)元素的真正實(shí)現(xiàn). * * @param e 入隊(duì)操作, e非null; 出隊(duì)操作, e為null * @param haveData true表示入隊(duì)元素, false表示出隊(duì)元素 * @param how NOW, ASYNC, SYNC, TIMED 四種常量定義 * @param nanos 限時(shí)模式下使用(納秒) * @return 匹配成功則返回匹配的元素, 否則返回e本身 */ private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) // 入隊(duì)操作, 元素e不能為null throw new NullPointerException(); Node s = null; retry: for (; ; ) { for (Node h = head, p = h; p != null; ) { // 嘗試匹配p指向的結(jié)點(diǎn) boolean isData = p.isData; // 結(jié)點(diǎn)類型 Object item = p.item; // 結(jié)點(diǎn)值 if (item != p && (item != null) == isData) { // 如果結(jié)點(diǎn)還未匹配過(guò) if (isData == haveData) // 同種類型結(jié)點(diǎn)不能匹配 break; if (p.casItem(item, e)) { // p指向從隊(duì)首開始向后的第一個(gè)匹配結(jié)點(diǎn) for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); // 喚醒匹配結(jié)點(diǎn)上的等待線程 return LinkedTransferQueue.cast(item); // 返回匹配結(jié)點(diǎn)的值 } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } if (how != NOW) { if (s == null) s = new Node(e, haveData); // 創(chuàng)建一個(gè)入隊(duì)結(jié)點(diǎn), 添加到隊(duì)尾 Node pred = tryAppend(s, haveData); // pred指向s的前驅(qū)結(jié)點(diǎn)或s(隊(duì)列中只有一個(gè)結(jié)點(diǎn))或null(tryAppend失?。? if (pred == null) continue retry; // 入隊(duì)失敗,則重試 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊(duì)線程 } return e; } }
還是通過(guò)示例看:
①隊(duì)列初始狀態(tài)
②ThreadD調(diào)用take方法,消費(fèi)元素
此時(shí),在xfer方法中,會(huì)從隊(duì)首開始,向后找到第一個(gè)匹配結(jié)點(diǎn),并交換元素值,然后喚醒隊(duì)列中匹配結(jié)點(diǎn)上的等待線程:
/** * 入隊(duì)/出隊(duì)元素的真正實(shí)現(xiàn). * * @param e 入隊(duì)操作, e非null; 出隊(duì)操作, e為null * @param haveData true表示入隊(duì)元素, false表示出隊(duì)元素 * @param how NOW, ASYNC, SYNC, TIMED 四種常量定義 * @param nanos 限時(shí)模式下使用(納秒) * @return 匹配成功則返回匹配的元素, 否則返回e本身 */ private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) // 入隊(duì)操作, 元素e不能為null throw new NullPointerException(); Node s = null; retry: for (; ; ) { for (Node h = head, p = h; p != null; ) { // 嘗試匹配p指向的結(jié)點(diǎn) boolean isData = p.isData; // 結(jié)點(diǎn)類型 Object item = p.item; // 結(jié)點(diǎn)值 if (item != p && (item != null) == isData) { // 如果結(jié)點(diǎn)還未匹配過(guò) if (isData == haveData) // 同種類型結(jié)點(diǎn)不能匹配 break; if (p.casItem(item, e)) { // p指向從隊(duì)首開始向后的第一個(gè)匹配結(jié)點(diǎn) for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); // 喚醒匹配結(jié)點(diǎn)上的等待線程 return LinkedTransferQueue.cast(item); // 返回匹配結(jié)點(diǎn)的值 } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } if (how != NOW) { if (s == null) s = new Node(e, haveData); // 創(chuàng)建一個(gè)入隊(duì)結(jié)點(diǎn), 添加到隊(duì)尾 Node pred = tryAppend(s, haveData); // pred指向s的前驅(qū)結(jié)點(diǎn)或s(隊(duì)列中只有一個(gè)結(jié)點(diǎn))或null(tryAppend失敗) if (pred == null) continue retry; // 入隊(duì)失敗,則重試 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊(duì)線程 } return e; } }
最終隊(duì)列結(jié)構(gòu)如下,匹配結(jié)點(diǎn)的值被置換為null,ThreadA被喚醒,ThreadD拿到匹配結(jié)點(diǎn)上的元素值“9”并返回:
③ThreadA被喚醒后繼續(xù)執(zhí)行
ThreadA被喚醒后,從原阻塞處——繼續(xù)向下執(zhí)行,然后進(jìn)入下一次自旋,進(jìn)入CASE1分支:
/** * 自旋/yield/阻塞,直到結(jié)點(diǎn)s被匹配. * * @param s 等待被匹配的結(jié)點(diǎn)s * @param pred s的前驅(qū)結(jié)點(diǎn)或s自身(隊(duì)列中只有一個(gè)結(jié)點(diǎn)的情況) * @param e 結(jié)點(diǎn)s的值 * @return 匹配值, 或e本身(中斷或超時(shí)情況) */ private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; // 限時(shí)等待情況下使用 Thread w = Thread.currentThread(); int spins = -1; // 自旋次數(shù), 鎖優(yōu)化操作 ThreadLocalRandom randomYields = null; // bound if needed for (; ; ) { Object item = s.item; if (item != e) { // CASE1: 匹配成功 // assert item != s; s.forgetContents(); // avoid garbage return LinkedTransferQueue.cast(item); } if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { // CASE2: 取消(線程被中斷或超時(shí)) unsplice(pred, s); return e; } // CASE3: 設(shè)置輕量級(jí)鎖(自旋 -> yield) if (spins < 0) { // 初始化自旋次數(shù) if ((spins = spinsFor(pred, s.isData)) > 0) randomYields = ThreadLocalRandom.current(); } else if (spins > 0) { // 自選次數(shù)減1 --spins; if (randomYields.nextInt(CHAINED_SPINS) == 0) Thread.yield(); // 隨機(jī)yield線程 } else if (s.waiter == null) { // waiter保存待阻塞線程 s.waiter = w; } else if (timed) { // 限時(shí)等待情況, 計(jì)算剩余有效時(shí)間 nanos = deadline - System.nanoTime(); if (nanos > 0L) LockSupport.parkNanos(this, nanos); } else { // CASE4: 阻塞線程 LockSupport.park(this); } } }
在CASE1分支中,由于結(jié)點(diǎn)的item項(xiàng)已經(jīng)被替換成了null,所以調(diào)用s.forgetContents(),并返回null
/** * 設(shè)置當(dāng)前結(jié)點(diǎn)的值為自身. * 設(shè)置當(dāng)前結(jié)點(diǎn)的等待線程為null. */ final void forgetContents() { UNSAFE.putObject(this, itemOffset, this); UNSAFE.putObject(this, waiterOffset, null); }
最終隊(duì)列結(jié)構(gòu)如下:
④ThreadE調(diào)用take方法出隊(duì)元素
ThreadE調(diào)用take方法出隊(duì)元素,過(guò)程和步驟②相同,進(jìn)入xfer方法(e == null,hasData == false),由于head指針指向的元素已經(jīng)匹配過(guò)了,所以
向后繼續(xù)查找,找到第一個(gè)未匹配過(guò)的結(jié)點(diǎn)“2”,然后置換結(jié)點(diǎn)“2”中的元素值為null,喚醒線程ThreadB,返回匹配結(jié)點(diǎn)的元素值“2”:
for (Node h = head, p = h; p != null; ) { // 嘗試匹配p指向的結(jié)點(diǎn) boolean isData = p.isData; // 結(jié)點(diǎn)類型 Object item = p.item; // 結(jié)點(diǎn)值 if (item != p && (item != null) == isData) { // 如果結(jié)點(diǎn)還未匹配過(guò) if (isData == haveData) // 同種類型結(jié)點(diǎn)不能匹配 break; if (p.casItem(item, e)) { // p指向從隊(duì)首開始向后的第一個(gè)匹配結(jié)點(diǎn) for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); // 喚醒匹配結(jié)點(diǎn)上的等待線程 return LinkedTransferQueue.cast(item); // 返回匹配結(jié)點(diǎn)的值 } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist }
此時(shí)隊(duì)列狀態(tài)如下,可以看到,隊(duì)首指針head一次性向后跳了2個(gè)位置,原來(lái)已經(jīng)匹配過(guò)的元素的next指針指向自身,等待被GC回收,這其實(shí)就是LinkedTransferQueue的“松弛”策略:
⑤ThreadB被喚醒后繼續(xù)執(zhí)行
過(guò)程和步驟③完全相同,在awaitMatch方法中,將結(jié)點(diǎn)的item置為this,然后返回匹配結(jié)點(diǎn)值——null,最終隊(duì)列結(jié)構(gòu)如下:
⑥ThreadF調(diào)用take方法出隊(duì)元素
ThreadF調(diào)用take方法出隊(duì)元素,過(guò)程和步驟②相同,進(jìn)入xfer方法(e == null,hasData == false),由于head指針指向的元素此時(shí)沒有匹配,所以不用像步驟②那樣向后查找,而是直接置換匹配結(jié)點(diǎn)的元素值“93”,然后喚醒ThreadC,返回匹配值“93”。最終隊(duì)列結(jié)構(gòu)如下:
⑦ThreadC被喚醒后繼續(xù)執(zhí)行
過(guò)程和步驟③完全相同,在awaitMatch方法中,將結(jié)點(diǎn)的item置為this,然后返回匹配結(jié)點(diǎn)值——null,最終隊(duì)列結(jié)構(gòu)如下:
此時(shí)的隊(duì)列結(jié)構(gòu),讀者移一定感到非常奇怪,并不嚴(yán)格遵守隊(duì)列的定義,這其實(shí)就是“Dual Queue”算法的實(shí)現(xiàn),為了對(duì)自旋優(yōu)化,做了很多看似別扭的操作,不必奇怪。
假設(shè)此時(shí)再有一個(gè)線程ThreadH調(diào)用take方法出隊(duì)元素會(huì)怎么樣?其實(shí)這是隊(duì)列已經(jīng)空了,ThreadH會(huì)被阻塞,但是會(huì)創(chuàng)建一個(gè)“請(qǐng)求結(jié)點(diǎn)”入隊(duì):
/** * 嘗試將結(jié)點(diǎn)s添加到隊(duì)尾. * * @param s 待添加的結(jié)點(diǎn) * @param haveData true: 數(shù)據(jù)結(jié)點(diǎn) * @return 返回null表示失敗; 否則返回s的前驅(qū)結(jié)點(diǎn)(沒有前驅(qū)則返回s自身) */ private Node tryAppend(Node s, boolean haveData) { for (Node t = tail, p = t; ; ) { Node n, u; if (p == null && (p = head) == null) { // CASE1: 隊(duì)列為空 if (casHead(null, s)) // 設(shè)置隊(duì)首指針head return s; } else if (p.cannotPrecede(haveData)) // CASE2: 結(jié)點(diǎn)s不能鏈接到結(jié)點(diǎn)p return null; else if ((n = p.next) != null) // CASE3: 遍歷至隊(duì)尾結(jié)點(diǎn) p = p != t && t != (u = tail) ? (t = u) : // stale tail (p != n) ? n : null; // restart if off list else if (!p.casNext(null, s)) // CASE4: 插入結(jié)點(diǎn)s p = p.next; // re-read on CAS failure else { // CASE5: 嘗試進(jìn)行松弛操作 if (p != t) { // update if slack now >= 2 while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t) ; } return p; } } }
調(diào)用完tryAppend方法后,隊(duì)列結(jié)構(gòu)如下,橙色的為“請(qǐng)求結(jié)點(diǎn)”—— item==null && isData==false:
然后ThreadH也會(huì)進(jìn)入在awaitMatch方法后進(jìn)入阻塞,并等待一個(gè)入隊(duì)線程的到來(lái)。最終隊(duì)列結(jié)構(gòu)如下:
三、總結(jié)截止本篇為止,我們已經(jīng)學(xué)習(xí)完了juc-collection框架中的所有阻塞隊(duì)列,如下表所示:
隊(duì)列特性 | 有界隊(duì)列 | 近似無(wú)界隊(duì)列 | 無(wú)界隊(duì)列 | 特殊隊(duì)列 |
---|---|---|---|---|
有鎖算法 | ArrayBlockingQueue | LinkedBlockingQueue、LinkedBlockingDeque | / | PriorityBlockingQueue、DelayQueue |
無(wú)鎖算法 | / | / | LinkedTransferQueue | SynchronousQueue |
可以看到,LinkedTransferQueue其實(shí)兼具了SynchronousQueue的特性以及無(wú)鎖算法的性能,并且是一種無(wú)界隊(duì)列:
和SynchronousQueue相比,LinkedTransferQueue可以存儲(chǔ)實(shí)際的數(shù)據(jù);
和其它阻塞隊(duì)列相比,LinkedTransferQueue直接用無(wú)鎖算法實(shí)現(xiàn),性能有所提升。
另外,由于LinkedTransferQueue可以存放兩種不同類型的結(jié)點(diǎn),所以稱之為“Dual Queue”:
內(nèi)部Node結(jié)點(diǎn)定義了一個(gè) boolean 型字段——isData,表示該結(jié)點(diǎn)是“數(shù)據(jù)結(jié)點(diǎn)”還是“請(qǐng)求結(jié)點(diǎn)”。
為了節(jié)省 CAS 操作的開銷,LinkedTransferQueue使用了松弛(slack)操作:
在結(jié)點(diǎn)被匹配(被刪除)之后,不會(huì)立即更新隊(duì)列的head、tail,而是當(dāng) head、tail結(jié)點(diǎn)與最近一個(gè)未匹配的結(jié)點(diǎn)之間的距離超過(guò)“松弛閥值”后才會(huì)更新(默認(rèn)為 2)。這個(gè)“松弛閥值”一般為1到3,如果太大會(huì)增加沿鏈表查找未匹配結(jié)點(diǎn)的時(shí)間,太小會(huì)增加 CAS 的開銷。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/77196.html
摘要:整個(gè)包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設(shè)計(jì)模式,設(shè)計(jì)了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對(duì)等進(jìn)行補(bǔ)充增強(qiáng)。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:我們來(lái)看下的類繼承圖可以看到,實(shí)現(xiàn)了接口,在多線程進(jìn)階二五之框架中,我們提到過(guò)實(shí)現(xiàn)了接口,以提供和排序相關(guān)的功能,維持元素的有序性,所以就是一種為并發(fā)環(huán)境設(shè)計(jì)的有序工具類。唯一的區(qū)別是針對(duì)的僅僅是鍵值,針對(duì)鍵值對(duì)進(jìn)行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發(fā)于一世流云專欄:https://seg...
摘要:僅僅當(dāng)有多個(gè)線程同時(shí)進(jìn)行寫操作時(shí),才會(huì)進(jìn)行同步??梢钥吹?,上述方法返回一個(gè)迭代器對(duì)象,的迭代是在舊數(shù)組上進(jìn)行的,當(dāng)創(chuàng)建迭代器的那一刻就確定了,所以迭代過(guò)程中不會(huì)拋出并發(fā)修改異常。另外,迭代器對(duì)象也不支持修改方法,全部會(huì)拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發(fā)于一世流云專欄:https://...
摘要:我們之前已經(jīng)介紹過(guò)了,底層基于跳表實(shí)現(xiàn),其操作平均時(shí)間復(fù)雜度均為。事實(shí)上,內(nèi)部引用了一個(gè)對(duì)象,以組合方式,委托對(duì)象實(shí)現(xiàn)了所有功能。線程安全內(nèi)存的使用較多迭代是對(duì)快照進(jìn)行的,不會(huì)拋出,且迭代過(guò)程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發(fā)于一世流云專欄:https://segmentfa...
摘要:接口截止目前為止,我們介紹的阻塞隊(duì)列都是實(shí)現(xiàn)了接口。該類在構(gòu)造時(shí)一般需要指定容量,如果不指定,則最大容量為。另外,由于內(nèi)部通過(guò)來(lái)保證線程安全,所以的整體實(shí)現(xiàn)時(shí)比較簡(jiǎn)單的。另外,雙端隊(duì)列相比普通隊(duì)列,主要是多了隊(duì)尾出隊(duì)元素隊(duì)首入隊(duì)元素的功能。 showImg(https://segmentfault.com/img/bVbgZ7j?w=770&h=514); 本文首發(fā)于一世流云專欄:ht...
閱讀 1451·2023-04-25 16:31
閱讀 2054·2021-11-24 10:33
閱讀 2755·2021-09-23 11:33
閱讀 2545·2021-09-23 11:31
閱讀 2926·2021-09-08 09:45
閱讀 2350·2021-09-06 15:02
閱讀 2658·2019-08-30 14:21
閱讀 2323·2019-08-30 12:56