摘要:先分析下同步器,這個(gè)是用于鎖實(shí)現(xiàn)的類,就用到了它的方法就是調(diào)用去做事情的。方法簡(jiǎn)單來(lái)說(shuō)是構(gòu)造節(jié)點(diǎn),然后用的方式把這個(gè)節(jié)點(diǎn)加入同步器中節(jié)點(diǎn)鏈表的尾巴。通過(guò)調(diào)用同步器的方法,等待隊(duì)列中的頭節(jié)點(diǎn)線程安全地移動(dòng)到同步隊(duì)列。
AbstractQueuedSynchronizer
先分析下同步器AbstractQueuedSynchronizer,這個(gè)是用于鎖實(shí)現(xiàn)的類,ReentrantLock就用到了它
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
ReentrantLock的lock()方法就是調(diào)用acquire(int arg)去做事情的。
其中protected boolean tryAcquire(int arg) 是留給子類去實(shí)現(xiàn)的,所以這里是采用了模板設(shè)計(jì)模式。這個(gè)方法簡(jiǎn)單來(lái)說(shuō)就是去獲取鎖。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
addWaiter方法簡(jiǎn)單來(lái)說(shuō)是構(gòu)造節(jié)點(diǎn),然后用CAS的方式把這個(gè)節(jié)點(diǎn)加入同步器中節(jié)點(diǎn)鏈表的尾巴。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //如果前驅(qū)結(jié)點(diǎn)是頭節(jié)點(diǎn)的話,那么嘗試獲取鎖(也就是同步狀態(tài)) if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //如果失敗的話 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
acquireQueued()方法中,當(dāng)前線程在“死循環(huán)”中嘗試獲取同步狀態(tài),并且只有前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)才能夠嘗試獲取同步狀態(tài)。
/* * 查詢是否有線程比當(dāng)前線程更早地請(qǐng)求獲取鎖 */ public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; //如果h==t,返回false,表示沒(méi)有。h==t的時(shí)候要么沒(méi)有等待者,要么只有一個(gè)。當(dāng)只有一個(gè)的時(shí)候, //那么這一個(gè)節(jié)點(diǎn)肯定是首節(jié)點(diǎn),而首節(jié)點(diǎn)中的線程必定是獲取了鎖的。 // h!=t&&((s = h.next) == null || s.thread != Thread.currentThread()) //如果h!=t,則進(jìn)入后面的判斷 //當(dāng)頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)為null,但是這個(gè)時(shí)候tail為null(head和tail節(jié)點(diǎn)都是懶初始 //化),或者頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)不為null,但是頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)中的線程不是當(dāng)前線程,則返回true return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
如果等待隊(duì)列為空,或者只有首節(jié)點(diǎn),或者首節(jié)點(diǎn)的后繼節(jié)點(diǎn)中的線程是當(dāng)前線程,那么當(dāng)前線程就可以去獲取同步狀態(tài)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ /* *如果這個(gè)節(jié)點(diǎn)的waitStatus已經(jīng)被標(biāo)記為SIGNAL(-1)了的話,那么 *這個(gè)節(jié)點(diǎn)就需要park,所以方法返回true */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ /* * 該節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)如果被取消了(waitStatus為1時(shí)表示取消狀態(tài),目前只有這個(gè)狀態(tài)的值大于 * 0),那么跳過(guò)前驅(qū)節(jié)點(diǎn)(通過(guò)死循環(huán)的方式把前驅(qū)結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)設(shè)置為自己的前驅(qū)結(jié)點(diǎn))。然 * 后退出if條件語(yǔ)句,調(diào)到末尾,返回false,表示自己還可以搶救一下,可以進(jìn)行獲取的鎖的嘗 * 試,而不是park。 * */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don"t park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ /* * 如果以上情況都不成立的話,那么就把自己的waitStatus標(biāo)記為SIGNAL,返回true,表示自己要 * park,戰(zhàn)略性投降。 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }ReentrantLock
public class ReentrantLock implements Lock, java.io.Serializable { ..... abstract static class Sync extends AbstractQueuedSynchronizer /** public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } ..... }ConditionObject await()
/** * Implements interruptible condition wait. *signal()*
*/ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); long savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { //線程將會(huì)在調(diào)用park方法后阻塞,直到被重新喚醒,從condition隊(duì)列加入同步隊(duì)列,從await()方 //法中的while循環(huán)中退出(isOnSyncQueue(Node node)方法返回true,節(jié)點(diǎn)已經(jīng)在同步隊(duì)列中), //進(jìn)而調(diào)用同步器的acquireQueued()方法加入到獲取同步狀態(tài)的競(jìng)爭(zhēng)中。 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); //成功獲取同步狀態(tài)(或者說(shuō)鎖)之后,被喚醒的線程將從先前調(diào)用的await()方法返回,此時(shí)該線程已 //經(jīng)成功地獲取了鎖。 } /** * Invokes release with current state value; returns saved state. * Cancels node and throws exception on failure. * @param node the condition node for this wait * @return previous sync state */ final long fullyRelease(Node node) { boolean failed = true; try { long savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { //如果釋放同步狀態(tài)失敗,就throws exception,在finally那里還會(huì)Cancels node throw new IllegalMonitorStateException(); } } finally { //如果釋放同步狀態(tài)失敗,就Cancels node if (failed) node.waitStatus = Node.CANCELLED; } }- If current thread is interrupted, throw InterruptedException. *
- Save lock state returned by {@link #getState}. *
- Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. *
- Block until signalled or interrupted. *
- Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. *
- If interrupted while blocked in step 4, throw InterruptedException. *
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
調(diào)用Condition的signal()方法,將會(huì)喚醒在等待隊(duì)列中等待時(shí)間最長(zhǎng)的節(jié)點(diǎn)(首節(jié)點(diǎn)),在喚醒節(jié)點(diǎn)之前,會(huì)將節(jié)點(diǎn)移到同步隊(duì)列中。
調(diào)用該方法的前置條件是當(dāng)前線程必須獲取了鎖,可以看到signal()方法進(jìn)行了isHeldExclusively()檢查,也就是當(dāng)前線程必須是獲取了鎖的線程。接著獲取等待隊(duì)列的首節(jié)點(diǎn),將其移動(dòng)到同步隊(duì)列并使用LockSupport喚醒節(jié)點(diǎn)中的線程。
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { do { //當(dāng)沒(méi)有waiters時(shí)進(jìn)行的一些優(yōu)化,以便編譯器進(jìn)行方法內(nèi)聯(lián),transferForSignal方法才是重點(diǎn) if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } /** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) 把節(jié)點(diǎn)從condition隊(duì)列“傳輸”到同步隊(duì)列去?!眰鬏敗俺晒蛘咴趩拘亚肮?jié)點(diǎn)已經(jīng)取消,就返回true。 */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). * 通過(guò)調(diào)用同步器的enq(Node node)方法,等待隊(duì)列中的頭節(jié)點(diǎn)線程安全地移動(dòng)到同步隊(duì)列。 */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //當(dāng)節(jié)點(diǎn)移動(dòng)到同步隊(duì)列后,當(dāng)前線程再使用LockSupport喚醒該節(jié)點(diǎn)的線程。 LockSupport.unpark(node.thread); return true; }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/68800.html
摘要:的主要功能和關(guān)鍵字一致,均是用于多線程的同步。而僅支持通過(guò)查詢當(dāng)前線程是否持有鎖。由于和使用的是同一把可重入鎖,所以線程可以進(jìn)入方法,并再次獲得鎖,而不會(huì)被阻塞住。公平與非公平公平與非公平指的是線程獲取鎖的方式。 1.簡(jiǎn)介 可重入鎖ReentrantLock自 JDK 1.5 被引入,功能上與synchronized關(guān)鍵字類似。所謂的可重入是指,線程可對(duì)同一把鎖進(jìn)行重復(fù)加鎖,而不會(huì)被阻...
摘要:更新成功返回,否則返回這個(gè)操作是原子的,不會(huì)出現(xiàn)線程安全問(wèn)題,這里面涉及到這個(gè)類的操作,一級(jí)涉及到這個(gè)屬性的意義。 簡(jiǎn)單解釋一下J.U.C,是JDK中提供的并發(fā)工具包,java.util.concurrent。里面提供了很多并發(fā)編程中很常用的實(shí)用工具類,比如atomic原子操作、比如lock同步鎖、fork/join等。 從Lock作為切入點(diǎn) 我想以lock作為切入點(diǎn)來(lái)講解AQS,畢竟...
摘要:概述前面已經(jīng)講解了關(guān)于的非公平鎖模式,關(guān)于非公平鎖,內(nèi)部其實(shí)告訴我們誰(shuí)先爭(zhēng)搶到鎖誰(shuí)就先獲得資源,下面就來(lái)分析一下公平鎖內(nèi)部是如何實(shí)現(xiàn)公平的如果沒(méi)有看過(guò)非公平鎖的先去了解下非公平鎖,因?yàn)檫@篇文章前面不會(huì)講太多內(nèi)部結(jié)構(gòu),直接會(huì)對(duì)源碼進(jìn)行分析前文 概述 前面已經(jīng)講解了關(guān)于AQS的非公平鎖模式,關(guān)于NonfairSync非公平鎖,內(nèi)部其實(shí)告訴我們誰(shuí)先爭(zhēng)搶到鎖誰(shuí)就先獲得資源,下面就來(lái)分析一下公平...
摘要:二什么是重入鎖可重入鎖,顧名思義,支持重新進(jìn)入的鎖,其表示該鎖能支持一個(gè)線程對(duì)資源的重復(fù)加鎖。將由最近成功獲得鎖,并且還沒(méi)有釋放該鎖的線程所擁有。可以使用和方法來(lái)檢查此情況是否發(fā)生。 一、寫(xiě)在前面 前幾篇我們具體的聊了AQS原理以及底層源碼的實(shí)現(xiàn),具體參見(jiàn) 《J.U.C|一文搞懂AQS》《J.U.C|同步隊(duì)列(CLH)》《J.U.C|AQS獨(dú)占式源碼分析》《J.U.C|AQS共享式源...
摘要:之所以使用這種方式是因?yàn)樵诨謴?fù)一個(gè)被掛起的線程與該線程真正運(yùn)行之間存在著嚴(yán)重的延遲。這樣我們就可以把線程恢復(fù)運(yùn)行的這段時(shí)間給利用起來(lái)了,結(jié)果就是線程更早的獲取了鎖,線程獲取鎖的時(shí)刻也沒(méi)有推遲。 前言 系列文章目錄 上一篇 我們學(xué)習(xí)了lock接口,本篇我們就以ReentrantLock為例,學(xué)習(xí)一下Lock鎖的基本的實(shí)現(xiàn)。我們先來(lái)看看Lock接口中的方法與ReentrantLock對(duì)其實(shí)...
閱讀 910·2021-09-03 10:42
閱讀 1521·2019-08-30 15:56
閱讀 1457·2019-08-29 17:27
閱讀 881·2019-08-29 15:25
閱讀 3168·2019-08-26 18:27
閱讀 2490·2019-08-26 13:41
閱讀 1898·2019-08-26 10:39
閱讀 1589·2019-08-23 18:36