摘要:前言本篇文章是基于線程間的同步與通信和這篇文章寫的,在那篇文章中,我們分析了接口所定義的方法,本篇我們就來看看對于接口的這些接口方法的具體實現(xiàn)。因此,條件隊列在出隊時,線程并不持有鎖。
前言
本篇文章是基于線程間的同步與通信(4)——Lock 和 Condtion 這篇文章寫的,在那篇文章中,我們分析了Condition接口所定義的方法,本篇我們就來看看AQS對于Condition接口的這些接口方法的具體實現(xiàn)。
下文中筆者將假設(shè)讀者已經(jīng)閱讀過那篇文章,或者已經(jīng)了解了相關(guān)的背景知識。
系列文章目錄
概述我們在前面介紹Conditon的時候說過,Condition接口的await/signal機制是設(shè)計用來代替監(jiān)視器鎖的wait/notify機制 的,因此,與監(jiān)視器鎖的wait/notify機制對照著學習有助于我們更好的理解Conditon接口:
Object 方法 | Condition 方法 | 區(qū)別 |
---|---|---|
void wait() | void await() | |
void wait(long timeout) | long awaitNanos(long nanosTimeout) | 時間單位,返回值 |
void wait(long timeout, int nanos) | boolean await(long time, TimeUnit unit) | 時間單位,參數(shù)類型,返回值 |
void notify() | void signal() | |
void notifyAll() | void signalAll() | |
- | void awaitUninterruptibly() | Condition獨有 |
- | boolean awaitUntil(Date deadline) | Condition獨有 |
這里先做一下說明,本文說wait方法時,是泛指wait()、wait(long timeout)、wait(long timeout, int nanos) 三個方法,當需要指明某個特定的方法時,會帶上相應(yīng)的參數(shù)。同樣的,說notify方法時,也是泛指notify(),notifyAll()方法,await方法和signal方法以此類推。
首先,我們通過wait/notify機制來類比await/signal機制:
調(diào)用wait方法的線程首先必須是已經(jīng)進入了同步代碼塊,即已經(jīng)獲取了監(jiān)視器鎖;與之類似,調(diào)用await方法的線程首先必須獲得lock鎖
調(diào)用wait方法的線程會釋放已經(jīng)獲得的監(jiān)視器鎖,進入當前監(jiān)視器鎖的等待隊列(wait set)中;與之類似,調(diào)用await方法的線程會釋放已經(jīng)獲得的lock鎖,進入到當前Condtion對應(yīng)的條件隊列中。
調(diào)用監(jiān)視器鎖的notify方法會喚醒等待在該監(jiān)視器鎖上的線程,這些線程將開始參與鎖競爭,并在獲得鎖后,從wait方法處恢復執(zhí)行;與之類似,調(diào)用Condtion的signal方法會喚醒對應(yīng)的條件隊列中的線程,這些線程將開始參與鎖競爭,并在獲得鎖后,從await方法處開始恢復執(zhí)行。
實戰(zhàn)由于前面我們已經(jīng)學習過了監(jiān)視器鎖的wait/notify機制,await/signal的用法基本類似。在正式分析源碼之前,我們先來看一個使用condition的實例:
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; // 生產(chǎn)者方法,往數(shù)組里面寫數(shù)據(jù) public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); //數(shù)組已滿,沒有空間時,掛起等待,直到數(shù)組“非滿”(notFull) items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; // 因為放入了一個數(shù)據(jù),數(shù)組肯定不是空的了 // 此時喚醒等待這notEmpty條件上的線程 notEmpty.signal(); } finally { lock.unlock(); } } // 消費者方法,從數(shù)組里面拿數(shù)據(jù) public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); // 數(shù)組是空的,沒有數(shù)據(jù)可拿時,掛起等待,直到數(shù)組非空(notEmpty) Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; // 因為拿出了一個數(shù)據(jù),數(shù)組肯定不是滿的了 // 此時喚醒等待這notFull條件上的線程 notFull.signal(); return x; } finally { lock.unlock(); } } }
這是java官方文檔提供的例子,是一個典型的生產(chǎn)者-消費者模型。這里在同一個lock鎖上,創(chuàng)建了兩個條件隊列notFull, notEmpty。當數(shù)組已滿,沒有存儲空間時,put方法在notFull條件上等待,直到數(shù)組“not full”;當數(shù)組空了,沒有數(shù)據(jù)可讀時,take方法在notEmpty條件上等待,直到數(shù)組“not empty”,而notEmpty.signal()和notFull.signal()則用來喚醒等待在這個條件上的線程。
注意,上面所說的,在notFull notEmpty條件上等待事實上是指線程在條件隊列(condition queue)上等待,當該線程被相應(yīng)的signal方法喚醒后,將進入到我們前面三篇介紹的sync queue中去爭鎖,爭到鎖后才能能await方法處返回。這里接牽涉到兩種隊列了——condition queue和sync queue,它們都定義在AQS中。
為了防止大家被AQS中的隊列弄暈,這里我們先理理清:
同步隊列 vs 條件隊列 sync queue首先,在逐行分析AQS源碼(1)——獨占鎖的獲取這篇中我們說過,所有等待鎖的線程都會被包裝成Node扔到一個同步隊列中。該同步隊列如下:
sync queue是一個雙向鏈表,我們使用prev、next屬性來串聯(lián)節(jié)點。但是在這個同步隊列中,我們一直沒有用到nextWaiter屬性,即使是在共享鎖模式下,這一屬性也只作為一個標記,指向了一個空節(jié)點,因此,在sync queue中,我們不會用它來串聯(lián)節(jié)點。
condtion queue每創(chuàng)建一個Condtion對象就會對應(yīng)一個Condtion隊列,每一個調(diào)用了Condtion對象的await方法的線程都會被包裝成Node扔進一個條件隊列中,就像這樣:
可見,每一個Condition對象對應(yīng)一個Conditon隊列,每個Condtion隊列都是獨立的,互相不影響的。在上圖中,如果我們對當前線程調(diào)用了notFull.await(), 則當前線程就會被包裝成Node加到notFull隊列的末尾。
值得注意的是,condition queue是一個單向鏈表,在該鏈表中我們使用nextWaiter屬性來串聯(lián)鏈表。但是,就像在sync queue中不會使用nextWaiter屬性來串聯(lián)鏈表一樣,在condition queue中,也并不會用到prev, next屬性,它們的值都為null。也就是說,在條件隊列中,Node節(jié)點真正用到的屬性只有三個:
thread:代表當前正在等待某個條件的線程
waitStatus:條件的等待狀態(tài)
nextWaiter:指向條件隊列中的下一個節(jié)點
既然這里又提到了waitStatus,我們這里再回顧一下它的取值范圍:
volatile int waitStatus; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3;
在條件隊列中,我們只需要關(guān)注一個值即可——CONDITION。它表示線程處于正常的等待狀態(tài),而只要waitStatus不是CONDITION,我們就認為線程不再等待了,此時就要從條件隊列中出隊。
sync queue 和 conditon queue的聯(lián)系一般情況下,等待鎖的sync queue和條件隊列condition queue是相互獨立的,彼此之間并沒有任何關(guān)系。但是,當我們調(diào)用某個條件隊列的signal方法時,會將某個或所有等待在這個條件隊列中的線程喚醒,被喚醒的線程和普通線程一樣需要去爭鎖,如果沒有搶到,則同樣要被加到等待鎖的sync queue中去,此時節(jié)點就從condition queue中被轉(zhuǎn)移到sync queue中:
但是,這里尤其要注意的是,node是被一個一個轉(zhuǎn)移過去的,哪怕我們調(diào)用的是signalAll()方法也是一個一個轉(zhuǎn)移過去的,而不是將整個條件隊列接在sync queue的末尾。
同時要注意的是,我們在sync queue中只使用prev、next來串聯(lián)鏈表,而不使用nextWaiter;我們在condition queue中只使用nextWaiter來串聯(lián)鏈表,而不使用prev、next.事實上,它們就是兩個使用了同樣的Node數(shù)據(jù)結(jié)構(gòu)的完全獨立的兩種鏈表。因此,將節(jié)點從condition queue中轉(zhuǎn)移到sync queue中時,我們需要斷開原來的鏈接(nextWaiter),建立新的鏈接(prev, next),這某種程度上也是需要將節(jié)點一個一個地轉(zhuǎn)移過去的原因之一。
入隊時和出隊時的鎖狀態(tài)sync queue是等待鎖的隊列,當一個線程被包裝成Node加到該隊列中時,必然是沒有獲取到鎖;當處于該隊列中的節(jié)點獲取到了鎖,它將從該隊列中移除(事實上移除操作是將獲取到鎖的節(jié)點設(shè)為新的dummy head,并將thread屬性置為null)。
condition隊列是等待在特定條件下的隊列,因為調(diào)用await方法時,必然是已經(jīng)獲得了lock鎖,所以在進入condtion隊列前線程必然是已經(jīng)獲取了鎖;在被包裝成Node扔進條件隊列中后,線程將釋放鎖,然后掛起;當處于該隊列中的線程被signal方法喚醒后,由于隊列中的節(jié)點在之前掛起的時候已經(jīng)釋放了鎖,所以必須先去再次的競爭鎖,因此,該節(jié)點會被添加到sync queue中。因此,條件隊列在出隊時,線程并不持有鎖。
所以事實上,這兩個隊列的鎖狀態(tài)正好相反:
condition queue:入隊時已經(jīng)持有了鎖 -> 在隊列中釋放鎖 -> 離開隊列時沒有鎖 -> 轉(zhuǎn)移到sync queue
sync queue:入隊時沒有鎖 -> 在隊列中爭鎖 -> 離開隊列時獲得了鎖
通過上面的介紹,我們對條件隊列已經(jīng)有了感性的認識,接下來就讓我們進入到本篇的重頭戲——源碼分析:
CondtionObjectAQS對Condition這個接口的實現(xiàn)主要是通過ConditionObject,上面已經(jīng)說個,它的核心實現(xiàn)就是是一個條件隊列,每一個在某個condition上等待的線程都會被封裝成Node對象扔進這個條件隊列。
核心屬性它的核心屬性只有兩個:
/** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter;
這兩個屬性分別代表了條件隊列的隊頭和隊尾,每當我們新建一個conditionObject對象,都會對應(yīng)一個條件隊列。
構(gòu)造函數(shù)public ConditionObject() { }
構(gòu)造函數(shù)啥也沒干,可見,條件隊列是延時初始化的,在真正用到的時候才會初始化。
Condition接口方法實現(xiàn) await()第一部分分析public final void await() throws InterruptedException { // 如果當前線程在調(diào)動await()方法前已經(jīng)被中斷了,則直接拋出InterruptedException if (Thread.interrupted()) throw new InterruptedException(); // 將當前線程封裝成Node添加到條件隊列 Node node = addConditionWaiter(); // 釋放當前線程所占用的鎖,保存當前的鎖狀態(tài) int savedState = fullyRelease(node); int interruptMode = 0; // 如果當前隊列不在同步隊列中,說明剛剛被await, 還沒有人調(diào)用signal方法,則直接將當前線程掛起 while (!isOnSyncQueue(node)) { LockSupport.park(this); // 線程將在這里被掛起,停止運行 // 能執(zhí)行到這里說明要么是signal方法被調(diào)用了,要么是線程被中斷了 // 所以檢查下線程被喚醒的原因,如果是因為中斷被喚醒,則跳出while循環(huán) if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 第一部分就分析到這里,下面的部分我們到第二部分再看, 先把它注釋起來 /* if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); */ }
我們已經(jīng)在上面的代碼注釋中描述了大體的流程,接下來我們詳細來看看await方法中所調(diào)用方法的具體實現(xiàn)。
首先是將當前線程封裝成Node扔進條件隊列中的addConditionWaiter方法:
addConditionWaiter/** * Adds a new waiter to wait queue. * @return its new wait node */ private Node addConditionWaiter() { Node t = lastWaiter; // 如果尾節(jié)點被cancel了,則先遍歷整個鏈表,清除所有被cancel的節(jié)點 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 將當前線程包裝成Node扔進條件隊列 Node node = new Node(Thread.currentThread(), Node.CONDITION); /* Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } */ if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
首先我們要思考的是,存在兩個不同的線程同時入隊的情況嗎?不存在。為什么呢?因為前面說過了,能調(diào)用await方法的線程必然是已經(jīng)獲得了鎖,而獲得了鎖的線程只有一個,所以這里不存在并發(fā),因此不需要CAS操作。
在這個方法中,我們就是簡單的將當前線程封裝成Node加到條件隊列的末尾。這和將一個線程封裝成Node加入等待隊列略有不同:
節(jié)點加入sync queue時waitStatus的值為0,但節(jié)點加入condition queue時waitStatus的值為Node.CONDTION。
sync queue的頭節(jié)點為dummy節(jié)點,如果隊列為空,則會先創(chuàng)建一個dummy節(jié)點,再創(chuàng)建一個代表當前節(jié)點的Node添加在dummy節(jié)點的后面;而condtion queue 沒有dummy節(jié)點,初始化時,直接將firstWaiter和lastWaiter直接指向新建的節(jié)點就行了。
sync queue是一個雙向隊列,在節(jié)點入隊后,要同時修改當前節(jié)點的前驅(qū)和前驅(qū)節(jié)點的后繼;而在condtion queue中,我們只修改了前驅(qū)節(jié)點的nextWaiter,也就是說,condtion queue是作為單向隊列來使用的。
如果入隊時發(fā)現(xiàn)尾節(jié)點已經(jīng)取消等待了,那么我們就不應(yīng)該接在它的后面,此時需要調(diào)用unlinkCancelledWaiters來剔除那些已經(jīng)取消等待的線程:
private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
該方法將從頭節(jié)點開始遍歷整個隊列,剔除其中waitStatus不為Node.CONDTION的節(jié)點,這里使用了兩個指針firstWaiter和trail來分別記錄第一個和最后一個waitStatus不為Node.CONDTION的節(jié)點,這些都是基礎(chǔ)的鏈表操作,很容易理解,這里不再贅述了。
fullyRelease在節(jié)點被成功添加到隊列的末尾后,我們將調(diào)用fullyRelease來釋放當前線程所占用的鎖:
/** * Invokes release with current state value; returns saved state. * Cancels node and throws exception on failure. * @param node the condition node for this wait * @return previous sync state */ final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
首先,當我們調(diào)用這個方法時,說明當前線程已經(jīng)被封裝成Node扔進條件隊列了。在該方法中,我們通過release方法釋放鎖,還記得release方法嗎,我們在逐行分析AQS源碼(2)——獨占鎖的釋放中已經(jīng)詳細講過了,這里不再贅述了。
值得注意的是,這是一次性釋放了所有的鎖,即對于可重入鎖而言,無論重入了幾次,這里是一次性釋放完的,這也就是為什么該方法的名字叫fullyRelease。但這里尤其要注意的是release(savedState)方法是有可能拋出IllegalMonitorStateException的,這是因為當前線程可能并不是持有鎖的線程。但是咱前面不是說,只有持有鎖的線程才能調(diào)用await方法嗎?既然fullyRelease方法在await方法中,為啥當前線程還有可能并不是持有鎖的線程呢?
雖然話是這么說,但是在調(diào)用await方法時,我們其實并沒有檢測Thread.currentThread() == getExclusiveOwnerThread(),換句話說,也就是執(zhí)行到fullyRelease這一步,我們才會檢測這一點,而這一點檢測是由AQS子類實現(xiàn)tryRelease方法來保證的,例如,ReentrantLock對tryRelease方法的實現(xiàn)如下:
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
當發(fā)現(xiàn)當前線程不是持有鎖的線程時,我們就會進入finally塊,將當前Node的狀態(tài)設(shè)為Node.CANCELLED,這也就是為什么上面的addConditionWaiter在添加新節(jié)點前每次都會檢查尾節(jié)點是否已經(jīng)被取消了。
在當前線程的鎖被完全釋放了之后,我們就可以調(diào)用LockSupport.park(this)把當前線程掛起,等待被signal了。但是,在掛起當前線程之前我們先用isOnSyncQueue確保了它不在sync queue中,這是為什么呢?當前線程不是在一個和sync queue無關(guān)的條件隊列中嗎?怎么可能會出現(xiàn)在sync queue中的情況?
/** * Returns true if a node, always one that was initially placed on * a condition queue, is now waiting to reacquire on sync queue. * @param node the node * @return true if is reacquiring */ final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ return findNodeFromTail(node); }
/** * Returns true if node is on sync queue by searching backwards from tail. * Called only when needed by isOnSyncQueue. * @return true if present */ private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
為了解釋這一問題,我們先來看看signal方法
signalAll()在看signalAll之前,我們首先要區(qū)分調(diào)用signalAll方法的線程與signalAll方法要喚醒的線程(等待在對應(yīng)的條件隊列里的線程):
調(diào)用signalAll方法的線程本身是已經(jīng)持有了鎖,現(xiàn)在準備釋放鎖了;
在條件隊列里的線程是已經(jīng)在對應(yīng)的條件上掛起了,等待著被signal喚醒,然后去爭鎖。
首先,與調(diào)用notify時線程必須是已經(jīng)持有了監(jiān)視器鎖類似,在調(diào)用condition的signal方法時,線程也必須是已經(jīng)持有了lock鎖:
public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); }
該方法首先檢查當前調(diào)用signal方法的線程是不是持有鎖的線程,這是通過isHeldExclusively方法來實現(xiàn)的,該方法由繼承AQS的子類來實現(xiàn),例如,ReentrantLock對該方法的實現(xiàn)為:
protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); }
因為exclusiveOwnerThread保存了當前持有鎖的線程,這里只要檢測它是不是等于當前線程就行了。
接下來先通過firstWaiter是否為空判斷條件隊列是否為空,如果條件隊列不為空,則調(diào)用doSignalAll方法:
private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
首先我們通過lastWaiter = firstWaiter = null;將整個條件隊列清空,然后通過一個do-while循環(huán),將原先的條件隊列里面的節(jié)點一個一個拿出來(令nextWaiter = null),再通過transferForSignal方法一個一個添加到sync queue的末尾:
/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ final boolean transferForSignal(Node node) { // 如果該節(jié)點在調(diào)用signal方法前已經(jīng)被取消了,則直接跳過這個節(jié)點 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 如果該節(jié)點在條件隊列中正常等待,則利用enq方法將該節(jié)點添加至sync queue隊列的尾部 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
在transferForSignal方法中,我們先使用CAS操作將當前節(jié)點的waitStatus狀態(tài)由CONDTION設(shè)為0,如果修改不成功,則說明該節(jié)點已經(jīng)被CANCEL了,則我們直接返回,操作下一個節(jié)點;如果修改成功,則說明我們已經(jīng)將該節(jié)點從等待的條件隊列中成功“喚醒”了,但此時該節(jié)點對應(yīng)的線程并沒有真正被喚醒,它還要和其他普通線程一樣去爭鎖,因此它將被添加到sync queue的末尾等待獲取鎖。
我們這里通過enq方法將該節(jié)點添加進sync queue的末尾。關(guān)于該方法,我們在逐行分析AQS源碼(1)——獨占鎖的獲取中已經(jīng)詳細講過了,這里不再贅述。不過這里尤其注意的是,enq方法將node節(jié)點添加進隊列時,返回的是node的前驅(qū)節(jié)點。
在將節(jié)點成功添加進sync queue中后,我們得到了該節(jié)點在sync queue中的前驅(qū)節(jié)點。我們前面說過,在sync queque中的節(jié)點都要靠前驅(qū)節(jié)點去喚醒,所以,這里我們要做的就是將前驅(qū)節(jié)點的waitStatus設(shè)為Node.SIGNAL, 這一點和shouldParkAfterFailedAcquire所做的工作類似:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
所不同的是,shouldParkAfterFailedAcquire將會向前查找,跳過那些被cancel的節(jié)點,然后將找到的第一個沒有被cancel的節(jié)點的waitStatus設(shè)成SIGNAL,最后再掛起。而在transferForSignal中,當前Node所代表的線程本身就已經(jīng)被掛起了,所以這里做的更像是一個復合操作——只要前驅(qū)節(jié)點處于被取消的狀態(tài)或者無法將前驅(qū)節(jié)點的狀態(tài)修成Node.SIGNAL,那我們就將Node所代表的線程喚醒,但這個條件并不意味著當前l(fā)ock處于可獲取的狀態(tài),有可能線程被喚醒了,但是鎖還是被占有的狀態(tài),不過這樣做至少是無害的,因為我們在線程被喚醒后還要去爭鎖,如果搶不到鎖,則大不了再次被掛起。
值得注意的是,transferForSignal是有返回值的,但是我們在這個方法中并沒有用到,它將在signal()方法中被使用。
在繼續(xù)往下看signal()方法之前,這里我們再總結(jié)一下signalAll()方法:
將條件隊列清空(只是令lastWaiter = firstWaiter = null,隊列中的節(jié)點和連接關(guān)系仍然還存在)
將條件隊列中的頭節(jié)點取出,使之成為孤立節(jié)點(nextWaiter,prev,next屬性都為null)
如果該節(jié)點處于被Cancelled了的狀態(tài),則直接跳過該節(jié)點(由于是孤立節(jié)點,則會被GC回收)
如果該節(jié)點處于正常狀態(tài),則通過enq方法將它添加到sync queue的末尾
判斷是否需要將該節(jié)點喚醒(包括設(shè)置該節(jié)點的前驅(qū)節(jié)點的狀態(tài)為SIGNAL),如有必要,直接喚醒該節(jié)點
重復2-5,直到整個條件隊列中的節(jié)點都被處理完
signal()與signalAll()方法不同,signal()方法只會喚醒一個節(jié)點,對于AQS的實現(xiàn)來說,就是喚醒條件隊列中第一個沒有被Cancel的節(jié)點,弄懂了signalAll()方法,signal()方法就很容易理解了,因為它們大同小異:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
首先依然是檢查調(diào)用該方法的線程(即當前線程)是不是已經(jīng)持有了鎖,這一點和上面的signalAll()方法一樣,所不一樣的是,接下來調(diào)用的是doSignal方法:
private void doSignal(Node first) { do { // 將firstWaiter指向條件隊列隊頭的下一個節(jié)點 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 將條件隊列原來的隊頭從條件隊列中斷開,則此時該節(jié)點成為一個孤立的節(jié)點 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
這個方法也是一個do-while循環(huán),目的是遍歷整個條件隊列,找到第一個沒有被cancelled的節(jié)點,并將它添加到條件隊列的末尾。如果條件隊列里面已經(jīng)沒有節(jié)點了,則將條件隊列清空(firstWaiter=lasterWaiter=null)。
在這里,我們用的依然用的是transferForSignal方法,但是用到了它的返回值,只要節(jié)點被成功添加到sync queue中,transferForSignal就返回true, 此時while循環(huán)的條件就不滿足了,整個方法就結(jié)束了,即調(diào)用signal()方法,只會喚醒一個線程。
總結(jié): 調(diào)用signal()方法會從當前條件隊列中取出第一個沒有被cancel的節(jié)點添加到sync隊列的末尾。
await()第二部分分析前面我們已經(jīng)分析了signal方法,它會將節(jié)點添加進sync queue隊列中,并要么立即喚醒線程,要么等待前驅(qū)節(jié)點釋放鎖后將自己喚醒,無論怎樣,被喚醒的線程要從哪里恢復執(zhí)行呢?當然是被掛起的地方呀,我們在哪里被掛起的呢?還記得嗎?當然是調(diào)用了await方法的地方,以await()方法為例:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); // 我們在這里被掛起了,被喚醒后,將從這里繼續(xù)往下運行 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
這里值得注意的是,當我們被喚醒時,其實并不知道是因為什么原因被喚醒,有可能是因為其他線程調(diào)用了signal方法,也有可能是因為當前線程被中斷了。
但是,無論是被中斷喚醒還是被signal喚醒,被喚醒的線程最后都將離開condition queue,進入到sync queue中,這一點我們在下面分析源代碼的時候詳細說。
隨后,線程將在sync queue中利用進行acquireQueued方法進行“阻塞式”爭鎖,搶到鎖就返回,搶不到鎖就繼續(xù)被掛起。因此,當await()方法返回時,必然是保證了當前線程已經(jīng)持有了lock鎖。
另外有一點這里我們提前說明一下,這一點對于我們下面理解源碼很重要,那就是:
如果從線程被喚醒,到線程獲取到鎖這段過程中發(fā)生過中斷,該怎么處理?
我們前面分析中斷的時候說過,中斷對于當前線程只是個建議,由當前線程決定怎么對其做出處理。在acquireQueued方法中,我們對中斷是不響應(yīng)的,只是簡單的記錄搶鎖過程中的中斷狀態(tài),并在搶到鎖后將這個中斷狀態(tài)返回,交于上層調(diào)用的函數(shù)處理,而這里“上層調(diào)用的函數(shù)”就是我們的await()方法。
那么await()方法是怎么對待這個中斷的呢?這取決于:
中斷發(fā)生時,線程是否已經(jīng)被signal過?
如果中斷發(fā)生時,當前線程并沒有被signal過,則說明當前線程還處于條件隊列中,屬于正常在等待中的狀態(tài),此時中斷將導致當前線程的正常等待行為被打斷,進入到sync queue中搶鎖,因此,在我們從await方法返回后,需要拋出InterruptedException,表示當前線程因為中斷而被喚醒。
如果中斷發(fā)生時,當前線程已經(jīng)被signal過了,則說明這個中斷來的太晚了,既然當前線程已經(jīng)被signal過了,那么就說明在中斷發(fā)生前,它就已經(jīng)正常地被從condition queue中喚醒了,所以隨后即使發(fā)生了中斷(注意,這個中斷可以發(fā)生在搶鎖之前,也可以發(fā)生在搶鎖的過程中),我們都將忽略它,僅僅是在await()方法返回后,再自我中斷一下,補一下這個中斷。就好像這個中斷是在await()方法調(diào)用結(jié)束之后才發(fā)生的一樣。這里之所以要“補一下”這個中斷,是因為我們在用Thread.interrupted()方法檢測是否發(fā)生中斷的同時,會將中斷狀態(tài)清除,因此如果選擇了忽略中斷,則應(yīng)該在await()方法退出后將它設(shè)成原來的樣子。
關(guān)于“這個中斷來的太晚了”這一點如果大家不太容易理解的話,這里打個比方:這就好比我們?nèi)ワ埖瓿燥垼伎斐酝炅?,有一個菜到現(xiàn)在還沒有上,于是我們常常會把服務(wù)員叫來問:這個菜有沒有在做?要是還沒做我們就不要了。然后服務(wù)員會跑到廚房去問,之后跑回來說:對不起,這個菜已經(jīng)下鍋在炒了,請再耐心等待一下。這里,這個“這個菜我們不要了”(發(fā)起的中斷)就來的太晚了,因為菜已經(jīng)下鍋了(已經(jīng)被signal過了)。
理清了上面的概念,我們再來看看await()方法是怎么做的,它用中斷模式interruptMode這個變量記錄中斷事件,該變量有三個值:
0 : 代表整個過程中一直沒有中斷發(fā)生。
THROW_IE : 表示退出await()方法時需要拋出InterruptedException,這種模式對應(yīng)于中斷發(fā)生在signal之前
REINTERRUPT : 表示退出await()方法時只需要再自我中斷以下,這種模式對應(yīng)于中斷發(fā)生在signal之后,即中斷來的太晚了。
/** Mode meaning to reinterrupt on exit from wait */ private static final int REINTERRUPT = 1; /** Mode meaning to throw InterruptedException on exit from wait */ private static final int THROW_IE = -1;
接下來我們就從線程被喚醒的地方繼續(xù)往下走,一步步分析源碼:
情況1:中斷發(fā)生時,線程還沒有被signal過線程被喚醒后,我們將首先使用checkInterruptWhileWaiting方法檢測中斷的模式:
/** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. */ private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
這里假設(shè)已經(jīng)發(fā)生過中斷,則Thread.interrupted()方法必然返回true,接下來就是用transferAfterCancelledWait進一步判斷是否發(fā)生了signal:
final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } while (!isOnSyncQueue(node)) Thread.yield(); return false; }
上面已經(jīng)說過,判斷一個node是否被signal過,一個簡單有效的方法就是判斷它是否離開了condition queue, 進入到sync queue中。
換句話說,只要一個節(jié)點的waitStatus還是Node.CONDITION,那就說明它還沒有被signal過。
由于現(xiàn)在我們分析情況1,則當前節(jié)點的waitStatus必然是Node.CONDITION,則會成功執(zhí)行compareAndSetWaitStatus(node, Node.CONDITION, 0),將該節(jié)點的狀態(tài)設(shè)置成0,然后調(diào)用enq(node)方法將當前節(jié)點添加進sync queue中,然后返回true。
這里值得注意的是,我們此時并沒有斷開node的nextWaiter,所以最后一定不要忘記將這個鏈接斷開。
再回到transferAfterCancelledWait調(diào)用處,可知,由于transferAfterCancelledWait將返回true,現(xiàn)在checkInterruptWhileWaiting將返回THROW_IE,這表示我們在離開await方法時應(yīng)當要拋出THROW_IE異常。
再回到checkInterruptWhileWaiting的調(diào)用處:
public final void await() throws InterruptedException { /* if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; */ while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 我們現(xiàn)在在這里?。?! break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
interruptMode現(xiàn)在為THROW_IE,則我們將執(zhí)行break,跳出while循環(huán)。
接下來我們將執(zhí)行acquireQueued(node, savedState)進行爭鎖,注意,這里傳入的需要獲取鎖的重入數(shù)量是savedState,即之前釋放了多少,這里就需要再次獲取多少:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 如果線程獲取不到鎖,則將在這里被阻塞住 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
acquireQueued我們在前面的文章中已經(jīng)詳細分析過了,它是一個阻塞式的方法,獲取到鎖則退出,獲取不到鎖則會被掛起。該方法只有在最終獲取到了鎖后,才會退出,并且退出時會返回當前線程的中斷狀態(tài),如果我們在獲取鎖的過程中又被中斷了,則會返回true,否則會返回false。但是其實這里返回true還是false已經(jīng)不重要了,因為前面已經(jīng)發(fā)生過中斷了,我們就是因為中斷而被喚醒的不是嗎?所以無論如何,我們在退出await()方法時,必然會拋出InterruptedException。
我們這里假設(shè)它獲取到了鎖了,則它將回到上面的調(diào)用處,由于我們這時的interruptMode = THROW_IE,則會跳過if語句。
接下來我們將執(zhí)行:
if (node.nextWaiter != null) unlinkCancelledWaiters();
上面我們說過,當前節(jié)點的nextWaiter是有值的,它并沒有和原來的condition隊列斷開,這里我們已經(jīng)獲取到鎖了,根據(jù)逐行分析AQS源碼(1)——獨占鎖的獲取中的分析,我們通過setHead方法已經(jīng)將它的thread屬性置為null,從而將當前線程從sync queue"移除"了,接下來應(yīng)當將它從condition隊列里面移除。由于condition隊列是一個單向隊列,我們無法獲取到它的前驅(qū)節(jié)點,所以只能從頭開始遍歷整個條件隊列,然后找到這個節(jié)點,再移除它。
然而,事實上呢,我們并沒有這么做。因為既然已經(jīng)必須從頭開始遍歷鏈表了,我們就干脆一次性把鏈表中所有沒有在等待的節(jié)點都拿出去,所以這里調(diào)用了unlinkCancelledWaiters方法,該方法我們在前面await()第一部分的分析的時候已經(jīng)講過了,它就是簡單的遍歷鏈表,找到所有waitStatus不為CONDITION的節(jié)點,并把它們從隊列中移除。
節(jié)點被移除后,接下來就是最后一步了——匯報中斷狀態(tài):
if (interruptMode != 0) reportInterruptAfterWait(interruptMode);
這里我們的interruptMode=THROW_IE,說明發(fā)生了中斷,則將調(diào)用reportInterruptAfterWait:
/** * Throws InterruptedException, reinterrupts current thread, or * does nothing, depending on mode. */ private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
可以看出,在interruptMode=THROW_IE時,我們就是簡單的拋出了一個InterruptedException。
至此,情況1(中斷發(fā)生于signal之前)我們就分析完了,這里我們簡單總結(jié)一下:
線程因為中斷,從掛起的地方被喚醒
隨后,我們通過transferAfterCancelledWait確認了線程的waitStatus值為Node.CONDITION,說明并沒有signal發(fā)生過
然后我們修改線程的waitStatus為0,并通過enq(node)方法將其添加到sync queue中
接下來線程將在sync queue中以阻塞的方式獲取,如果獲取不到鎖,將會被再次掛起
線程在sync queue中獲取到鎖后,將調(diào)用unlinkCancelledWaiters方法將自己從條件隊列中移除,該方法還會順便移除其他取消等待的鎖
最后我們通過reportInterruptAfterWait拋出了InterruptedException
由此可以看出,一個調(diào)用了await方法掛起的線程在被中斷后不會立即拋出InterruptedException,而是會被添加到sync queue中去爭鎖,如果爭不到,還是會被掛起;
只有爭到了鎖之后,該線程才得以從sync queue和condition queue中移除,最后拋出InterruptedException。
所以說,一個調(diào)用了await方法的線程,即使被中斷了,它依舊還是會被阻塞住,直到它獲取到鎖之后才能返回,并在返回時拋出InterruptedException。中斷對它意義更多的是體現(xiàn)在將它從condition queue中移除,加入到sync queue中去爭鎖,從這個層面上看,中斷和signal的效果其實很像,所不同的是,在await()方法返回后,如果是因為中斷被喚醒,則await()方法需要拋出InterruptedException異常,表示是它是被非正常喚醒的(正常喚醒是指被signal喚醒)。
情況2:中斷發(fā)生時,線程已經(jīng)被signal過了這種情況對應(yīng)于“中斷來的太晚了”,即REINTERRUPT模式,我們在拿到鎖退出await()方法后,只需要再自我中斷一下,不需要拋出InterruptedException。
值得注意的是這種情況其實包含了兩個子情況:
被喚醒時,已經(jīng)發(fā)生了中斷,但此時線程已經(jīng)被signal過了
被喚醒時,并沒有發(fā)生中斷,但是在搶鎖的過程中發(fā)生了中斷
下面我們分別來分析:
對于這種情況,與前面中斷發(fā)生于signal之前的主要差別在于transferAfterCancelledWait方法:
final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { //線程A執(zhí)行到這里,CAS操作將會失敗 enq(node); return true; } // 由于中斷發(fā)生前,線程已經(jīng)被signal了,則這里只需要等待線程成功進入sync queue即可 while (!isOnSyncQueue(node)) Thread.yield(); return false; }
在這里,由于signal已經(jīng)發(fā)生過了,則由我們之前分析的signal方法可知,此時當前節(jié)點的waitStatus必定不為Node.CONDITION,他將跳過if語句。此時當前線程可能已經(jīng)在sync queue中,或者正在進入到sync queue的路上。
為什么這里會出現(xiàn)“正在進入到sync queue的路上”的情況呢? 這里我們解釋下:
假設(shè)當前線程為線程A, 它被喚醒之后檢測到發(fā)生了中斷,來到了transferAfterCancelledWait這里,而另一個線程B在這之前已經(jīng)調(diào)用了signal方法,該方法會調(diào)用transferForSignal將當前線程添加到sync queue的末尾:
final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 線程B執(zhí)行到這里,CAS操作將會成功 return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
因為線程A和線程B是并發(fā)執(zhí)行的,而這里我們分析的是“中斷發(fā)生在signal之后”,則此時,線程B的compareAndSetWaitStatus先于線程A執(zhí)行。這時可能出現(xiàn)線程B已經(jīng)成功修改了node的waitStatus狀態(tài),但是還沒來得及調(diào)用enq(node)方法,線程A就執(zhí)行到了transferAfterCancelledWait方法,此時它發(fā)現(xiàn)waitStatus已經(jīng)不是Condition,但是其實當前節(jié)點還沒有被添加到sync node隊列中,因此,它接下來將通過自旋,等待線程B執(zhí)行完transferForSignal方法。
線程A在自旋過程中會不斷地判斷節(jié)點有沒有被成功添加進sync queue,判斷的方法就是isOnSyncQueue:
/** * Returns true if a node, always one that was initially placed on * a condition queue, is now waiting to reacquire on sync queue. * @param node the node * @return true if is reacquiring */ final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node); }
該方法很好理解,只要waitStatus的值還為Node.CONDITION,則它一定還在condtion隊列中,自然不可能在sync里面;而每一個調(diào)用了enq方法入隊的線程:
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { //即使這一步失敗了next.prev一定是有值的 t.next = node; // 如果t.next有值,說明上面的compareAndSetTail方法一定成功了,則當前節(jié)點成為了新的尾節(jié)點 return t; // 返回了當前節(jié)點的前驅(qū)節(jié)點 } } } }
哪怕在設(shè)置compareAndSetTail這一步失敗了,它的prev必然也是有值的,因此這兩個條件只要有一個滿足,就說明節(jié)點必然不在sync queue隊列中。
另一方面,如果node.next有值,則說明它不僅在sync queue中,并且在它后面還有別的節(jié)點,則只要它有值,該節(jié)點必然在sync queue中。
如果以上都不滿足,說明這里出現(xiàn)了尾部分叉(關(guān)于尾部分叉,參見這里)的情況,我們就從尾節(jié)點向前尋找這個節(jié)點:
/** * Returns true if node is on sync queue by searching backwards from tail. * Called only when needed by isOnSyncQueue. * @return true if present */ private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
這里當然還是有可能出現(xiàn)從尾部反向遍歷找不到的情況,但是不用擔心,我們還在while循環(huán)中,無論如何,節(jié)點最后總會入隊成功的。最終,transferAfterCancelledWait將返回false。
再回到transferAfterCancelledWait調(diào)用處:
private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
則這里,由于transferAfterCancelledWait返回了false,則checkInterruptWhileWaiting方法將返回REINTERRUPT,這說明我們在退出該方法時只需要再次中斷。
再回到checkInterruptWhileWaiting方法的調(diào)用處:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //我們在這里?。?! break; } //當前interruptMode=REINTERRUPT,無論這里是否進入if體,該值不變 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
此時,interruptMode的值為REINTERRUPT,我們將直接跳出while循環(huán)。
接下來就和上面的情況1一樣了,我們依然還是去爭鎖,這一步依然是阻塞式的,獲取到鎖則退出,獲取不到鎖則會被掛起。
另外由于現(xiàn)在interruptMode的值已經(jīng)為REINTERRUPT,因此無論在爭鎖的過程中是否發(fā)生過中斷interruptMode的值都還是REINTERRUPT。
接著就是將節(jié)點從condition queue中剔除,與情況1不同的是,在signal方法成功將node加入到sync queue時,該節(jié)點的nextWaiter已經(jīng)是null了,所以這里這一步不需要執(zhí)行。
再接下來就是報告中斷狀態(tài)了:
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } static void selfInterrupt() { Thread.currentThread().interrupt(); }
注意,這里并沒有拋出中斷異常,而只是將當前線程再中斷一次。
至此,情況2.1(被喚醒時,已經(jīng)發(fā)生了中斷,但此時線程已經(jīng)被signal過了)我們就分析完了,這里我們簡單總結(jié)一下:
線程從掛起的地方被喚醒,此時既發(fā)生過中斷,又發(fā)生過signal
隨后,我們通過transferAfterCancelledWait確認了線程的waitStatus值已經(jīng)不為Node.CONDITION,說明signal發(fā)生于中斷之前
然后,我們通過自旋的方式,等待signal方法執(zhí)行完成,確保當前節(jié)點已經(jīng)被成功添加到sync queue中
接下來線程將在sync queue中以阻塞的方式獲取鎖,如果獲取不到,將會被再次掛起
最后我們通過reportInterruptAfterWait將當前線程再次中斷,但是不會拋出InterruptedException
這種情況就比上面的情況簡單一點了,既然被喚醒時沒有發(fā)生中斷,那基本可以確信線程是被signal喚醒的,但是不要忘記還存在“假喚醒”這種情況,因此我們依然還是要檢測被喚醒的原因。
那么怎么區(qū)分到底是假喚醒還是因為是被signal喚醒了呢?
如果線程是因為signal而被喚醒,則由前面分析的signal方法可知,線程最終都會離開condition queue 進入sync queue中,所以我們只需要判斷被喚醒時,線程是否已經(jīng)在sync queue中即可:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); // 我們在這里,線程將在這里被喚醒 // 由于現(xiàn)在沒有發(fā)生中斷,所以interruptMode目前為0 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
線程被喚醒時,暫時還沒有發(fā)生中斷,所以這里interruptMode = 0, 表示沒有中斷發(fā)生,所以我們將繼續(xù)while循環(huán),這時我們將通過isOnSyncQueue方法判斷當前線程是否已經(jīng)在sync queue中了。由于已經(jīng)發(fā)生過signal了,則此時node必然已經(jīng)在sync queue中了,所以isOnSyncQueue將返回true,我們將退出while循環(huán)。
不過這里插一句,如果isOnSyncQueue檢測到當前節(jié)點不在sync queue中,則說明既沒有發(fā)生中斷,也沒有發(fā)生過signal,則當前線程是被“假喚醒”的,那么我們將再次進入循環(huán)體,將線程掛起。
退出while循環(huán)后接下來還是利用acquireQueued爭鎖,因為前面沒有發(fā)生中斷,則interruptMode=0,這時,如果在爭鎖的過程中發(fā)生了中斷,則acquireQueued將返回true,則此時interruptMode將變?yōu)?b>REINTERRUPT。
接下是判斷node.nextWaiter != null,由于在調(diào)用signal方法時已經(jīng)將節(jié)點移出了隊列,所有這個條件也不成立。
最后就是匯報中斷狀態(tài)了,此時interruptMode的值為REINTERRUPT,說明線程在被signal后又發(fā)生了中斷,這個中斷發(fā)生在搶鎖的過程中,這個中斷來的太晚了,因此我們只是再次自我中斷一下。
至此,情況2.2(被喚醒時,并沒有發(fā)生中斷,但是在搶鎖的過程中發(fā)生了中斷)我們就分析完了,這種情況和2.1很像,區(qū)別就是一個是在喚醒后就被發(fā)現(xiàn)已經(jīng)發(fā)生了中斷,一個個喚醒后沒有發(fā)生中斷,但是在搶鎖的過成中發(fā)生了中斷,但無論如何,這兩種情況都會被歸結(jié)為“中斷來的太晚了”,中斷模式為REINTERRUPT,情況2.2的總結(jié)如下:
線程被signal方法喚醒,此時并沒有發(fā)生過中斷
因為沒有發(fā)生過中斷,我們將從checkInterruptWhileWaiting處返回,此時interruptMode=0
接下來我們回到while循環(huán)中,因為signal方法保證了將節(jié)點添加到sync queue中,此時while循環(huán)條件不成立,循環(huán)退出
接下來線程將在sync queue中以阻塞的方式獲取,如果獲取不到鎖,將會被再次掛起
線程獲取到鎖返回后,我們檢測到在獲取鎖的過程中發(fā)生過中斷,并且此時interruptMode=0,這時,我們將interruptMode修改為REINTERRUPT
最后我們通過reportInterruptAfterWait將當前線程再次中斷,但是不會拋出InterruptedException
這里我們再總結(jié)以下情況2(中斷發(fā)生時,線程已經(jīng)被signal過了),這種情況對應(yīng)于中斷發(fā)生signal之后,我們不管這個中斷是在搶鎖之前就已經(jīng)發(fā)生了還是搶鎖的過程中發(fā)生了,只要它是在signal之后發(fā)生的,我們就認為它來的太晚了,我們將忽略這個中斷。因此,從await()方法返回的時候,我們只會將當前線程重新中斷一下,而不會拋出中斷異常。
情況3: 一直沒有中斷發(fā)生這種情況就更簡單了,它的大體流程和上面的情況2.2差不多,只是在搶鎖的過程中也沒有發(fā)生異常,則interruptMode為0,沒有發(fā)生過中斷,因此不需要匯報中斷。則線程就從await()方法處正常返回。
await()總結(jié)至此,我們總算把await()方法完整的分析完了,這里我們對整個方法做出總結(jié):
進入await()時必須是已經(jīng)持有了鎖
離開await()時同樣必須是已經(jīng)持有了鎖
調(diào)用await()會使得當前線程被封裝成Node扔進條件隊列,然后釋放所持有的鎖
釋放鎖后,當前線程將在condition queue中被掛起,等待signal或者中斷
線程被喚醒后會將會離開condition queue進入sync queue中進行搶鎖
若在線程搶到鎖之前發(fā)生過中斷,則根據(jù)中斷發(fā)生在signal之前還是之后記錄中斷模式
線程在搶到鎖后進行善后工作(離開condition queue, 處理中斷異常)
線程已經(jīng)持有了鎖,從await()方法返回
在這一過程中我們尤其要關(guān)注中斷,如前面所說,中斷和signal所起到的作用都是將線程從condition queue中移除,加入到sync queue中去爭鎖,所不同的是,signal方法被認為是正常喚醒線程,中斷方法被認為是非正常喚醒線程,如果中斷發(fā)生在signal之前,則我們在最終返回時,應(yīng)當拋出InterruptedException;如果中斷發(fā)生在signal之后,我們就認為線程本身已經(jīng)被正常喚醒了,這個中斷來的太晚了,我們直接忽略它,并在await()返回時再自我中斷一下,這種做法相當于將中斷推遲至await()返回時再發(fā)生。
awaitUninterruptibly()在前面我們分析的await()方法中,中斷起到了和signal同樣的效果,但是中斷屬于將一個等待中的線程非正常喚醒,可能即使線程被喚醒后,也搶到了鎖,但是卻發(fā)現(xiàn)當前的等待條件并沒有滿足,則還是得把線程掛起。因此我們有時候并不希望await方法被中斷,awaitUninterruptibly()方法即實現(xiàn)了這個功能:
public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; // 發(fā)生了中斷后線程依舊留在了condition queue中,將會再次被掛起 } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
首先,從方法簽名上就可以看出,這個方法不會拋出中斷異常,我們拿它和await()方法對比一下:
public final void await() throws InterruptedException { if (Thread.interrupted()) // 不同之處 throw new InterruptedException(); // 不同之處 Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; // 不同之處 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 不同之處 break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 不同之處 interruptMode = REINTERRUPT; // 不同之處 if (node.nextWaiter != null) // 不同之處 unlinkCancelledWaiters(); // 不同之處 if (interruptMode != 0) // 不同之處 reportInterruptAfterWait(interruptMode); // 不同之處 }
由此可見,awaitUninterruptibly()全程忽略中斷,即使是當前線程因為中斷被喚醒,該方法也只是簡單的記錄中斷狀態(tài),然后再次被掛起(因為并沒有并沒有任何操作將它添加到sync queue中)
要使當前線程離開condition queue去爭鎖,則必須是發(fā)生了signal事件。
最后,當線程在獲取鎖的過程中發(fā)生了中斷,該方法也是不響應(yīng),只是在最終獲取到鎖返回時,再自我中斷一下??梢钥闯?,該方法和“中斷發(fā)生于signal之后的”REINTERRUPT模式的await()方法很像。
至此,該方法我們就分析完了,如果你之前await()方法已經(jīng)弄懂了,這個awaitUninterruptibly()方法就很容易理解了。它的核心思想是:
中斷雖然會喚醒線程,但是不會導致線程離開condition queue,如果線程只是因為中斷而被喚醒,則他將再次被掛起
只有signal方法會使得線程離開condition queue
調(diào)用該方法時或者調(diào)用過程中如果發(fā)生了中斷,僅僅會在該方法結(jié)束時再自我中斷以下,不會拋出InterruptedException
awaitNanos(long nanosTimeout)前面我們看的方法,無論是await()還是awaitUninterruptibly(),它們在搶鎖的過程中都是阻塞式的,即一直到搶到了鎖才能返回,否則線程還是會被掛起,這樣帶來一個問題就是線程如果長時間搶不到鎖,就會一直被阻塞,因此我們有時候更需要帶超時機制的搶鎖,這一點和帶超時機制的wait(long timeout)是很像的,我們直接來看源碼:
public final long awaitNanos(long nanosTimeout) throws InterruptedException { /*if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node);*/ final long deadline = System.nanoTime() + nanosTimeout; /*int interruptMode = 0; while (!isOnSyncQueue(node)) */{ if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); /*if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break;*/ nanosTimeout = deadline - System.nanoTime(); } /*if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode);*/ return deadline - System.nanoTime(); }
該方法幾乎和await()方法一樣,只是多了超時時間的處理,我們上面已經(jīng)把和await()方法相同的部分注釋起來了,只留下了不同的部分,這樣它們的區(qū)別就變得更明顯了。
該方法的主要設(shè)計思想是,如果設(shè)定的超時時間還沒到,我們就將線程掛起;超過等待的時間了,我們就將線程從condtion queue轉(zhuǎn)移到sync queue中。注意這里對于超時時間有一個小小的優(yōu)化——當設(shè)定的超時時間很短時(小于spinForTimeoutThreshold的值),我們就是簡單的自旋,而不是將線程掛起,以減少掛起線程和喚醒線程所帶來的時間消耗。
不過這里還有一處值得注意,就是awaitNanos(0)的意義,我們在線程間的同步與通信(2)——wait, notify, notifyAll曾經(jīng)提到過,wait(0)的含義是無限期等待,而我們在awaitNanos(long nanosTimeout)方法中是怎么處理awaitNanos(0)的呢?
if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; }
從這里可以看出,如果設(shè)置的等待時間本身就小于等于0,當前線程是會直接從condition queue中轉(zhuǎn)移到sync queue中的,并不會被掛起,也不需要等待signal,這一點確實是更復合邏輯。如果需要線程只有在signal發(fā)生的條件下才會被喚醒,則應(yīng)該用上面的awaitUninterruptibly()方法。
await(long time, TimeUnit unit)看完awaitNanos(long nanosTimeout)再看await(long time, TimeUnit unit)方法就更簡單了,它就是在awaitNanos(long nanosTimeout)的基礎(chǔ)上多了對于超時時間的時間單位的設(shè)置,但是在內(nèi)部實現(xiàn)上還是會把時間轉(zhuǎn)成納秒去執(zhí)行,這里我們直接拿它和上面的awaitNanos(long nano
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/77193.html
摘要:為了避免一篇文章的篇幅過長,于是一些比較大的主題就都分成幾篇來講了,這篇文章是筆者所有文章的目錄,將會持續(xù)更新,以給大家一個查看系列文章的入口。 前言 大家好,筆者是今年才開始寫博客的,寫作的初衷主要是想記錄和分享自己的學習經(jīng)歷。因為寫作的時候發(fā)現(xiàn),為了弄懂一個知識,不得不先去了解另外一些知識,這樣以來,為了說明一個問題,就要把一系列知識都了解一遍,寫出來的文章就特別長。 為了避免一篇...
摘要:本篇我們將以的公平鎖為例來詳細看看使用獲取獨占鎖的流程。本文中的源碼基于。由于本篇我們分析的是獨占鎖,同一時刻,鎖只能被一個線程所持有。由于在整個搶鎖過程中,我們都是不響應(yīng)中斷的。 前言 AQS(AbstractQueuedSynchronizer)是JAVA中眾多鎖以及并發(fā)工具的基礎(chǔ),其底層采用樂觀鎖,大量使用了CAS操作, 并且在沖突時,采用自旋方式重試,以實現(xiàn)輕量級和高效地獲取鎖...
摘要:相較于方法,提供了超時等待機制注意,在方法中,我們用到了的返回值,如果該方法因為超時而退出時,則將返回。的這個返回值有助于我們理解該方法究竟是因為獲取到了鎖而返回,還是因為超時時間到了而返回。 前言 系列文章目錄 CountDownLatch是一個很有用的工具,latch是門閂的意思,該工具是為了解決某些操作只能在一組操作全部執(zhí)行完成后才能執(zhí)行的情景。例如,小組早上開會,只有等所有人...
摘要:前言本來準備做源碼閱讀的幾千行看著太累了看了幾篇大神的文章后才基本搞懂附在這里閱讀本文前請先看懂的介紹和原理分析并發(fā)包源碼學習之框架四源碼分析接口實現(xiàn)接口一般看一個類實現(xiàn)的接口可以看出它的目的其實也是熟悉的目的主要是替代的方法的它是基于實現(xiàn) 前言 本來準備做AbstractQueuedSynchronizer源碼閱讀的,幾千行看著太累了,看了幾篇大神的文章后才基本搞懂,附在這里,閱讀本...
閱讀 556·2021-10-19 11:45
閱讀 1372·2021-09-30 09:48
閱讀 1481·2021-08-16 10:56
閱讀 744·2021-07-26 23:38
閱讀 3216·2019-08-30 13:15
閱讀 2602·2019-08-30 12:45
閱讀 1838·2019-08-29 12:14
閱讀 2087·2019-08-26 18:42