摘要:有了這個(gè)基礎(chǔ),才能發(fā)揮作用,使得在節(jié)點(diǎn)取消和異常時(shí)能夠保證隊(duì)列在多線程下的完整性。
Doug Lea是JDK中concurrent工具包的作者,這位大神是誰可以自行g(shù)oogle。
本文淺析ReentrantLock(可重入鎖)的原理
Lock接口Lock接口定義了這幾個(gè)方法:
lock()
用來獲取鎖,如果鎖已經(jīng)被其他線程占有,則進(jìn)行等待,直到搶占到鎖;該方法在發(fā)送異常時(shí)不會(huì)自動(dòng)釋放鎖,所以在使用時(shí)需要在finall塊中釋放鎖;
tryLock()和tryLock(long time, TimeUnit unit)
嘗試獲得鎖,如果鎖已經(jīng)被其他線程占有,返回false,成功獲取鎖返回true;該方法不會(huì)等待,立即返回;而帶有參數(shù)的tryLock在等待時(shí)長(zhǎng)內(nèi)拿到鎖返回true,超時(shí)或者沒拿到鎖返回false;帶參數(shù)的方法還支持響應(yīng)中斷;
lockInterruptibly()
支持中斷的lock();
unlock()
釋放鎖;
newCondition()
新建Condition,Condition以后會(huì)分析;
ReentrantLock實(shí)現(xiàn)了Lock接口,ReentrantLock中有一個(gè)重要的成員變量,同步器sync繼承了AbstractQueuedSynchronizer簡(jiǎn)稱AQS,我們先介紹AQS;
AQS用一個(gè)隊(duì)列(結(jié)構(gòu)是一個(gè)FIFO隊(duì)列)來管理同步狀態(tài),當(dāng)線程獲取同步狀態(tài)失敗時(shí),會(huì)將當(dāng)前線程包裝成一個(gè)Node放入隊(duì)列,當(dāng)前線程進(jìn)入阻塞狀態(tài);當(dāng)同步狀態(tài)釋放時(shí),會(huì)從隊(duì)列去出線程獲取同步狀態(tài)。
AQS里定義了head、tail、state,他們都是volatile修飾的,head指向隊(duì)列的第一個(gè)元素,tail指向隊(duì)列的最后一個(gè)元素,state表示了同步狀態(tài),這個(gè)狀態(tài)非常重要,在ReentrantLock中,state為0的時(shí)候代表鎖被釋放,state為1時(shí)代表鎖已經(jīng)被占用;
看下面代碼:
private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } }
這一段靜態(tài)初始化代碼初始了state、head、tail等變量的在內(nèi)存中的偏移量;Unsafe類是sun.misc下的類,不屬于java標(biāo)準(zhǔn)。Unsafe讓java可以像C語(yǔ)言一樣操作內(nèi)存指針,其中就提供了CAS的一些原子操作和park、unpark對(duì)線程掛起與恢復(fù)的操作;關(guān)于CAS是concurrent工具包的基礎(chǔ),以后會(huì)多帶帶介紹,其主要作用就是在硬件級(jí)別提供了compareAndSwap的功能,從而實(shí)現(xiàn)了比較和交換的原子性操作。
AQS還有一個(gè)內(nèi)部類叫Node,它將線程封裝,利用prev和next可以將Node串連成雙向鏈表,這就是一開始說的FIFO的結(jié)構(gòu);
ReentrantLock提供了公平鎖和非公平鎖,我們這里從非公平鎖分析AQS的應(yīng)用;
Lock調(diào)用lock()方法時(shí)調(diào)用了AQS的lock()方法,我們來看這個(gè)非公平鎖NonfairSync的lock方法:
final void lock() { //首先調(diào)用CAS搶占同步狀態(tài)state,如果成功則將當(dāng)前線程設(shè)置為同步器的獨(dú)占線程, //這也是非公平的體現(xiàn),因?yàn)樾聛淼木€程沒有馬上加入隊(duì)列尾部,而是先嘗試搶占同步狀態(tài)。 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //搶占同步狀態(tài)失敗,調(diào)用AQS的acquire acquire(1); }
瞄一眼acquire方法:
public final void acquire(int arg) { //在這里還是先試著搶占一下同步狀態(tài) if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire調(diào)用的是NonfairSync的實(shí)現(xiàn),然后又調(diào)用了Sync的nonfairTryAcquire方法:
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //和之前一樣,利用CAS搶占同步狀態(tài),成功則設(shè)置當(dāng)前線程為獨(dú)占線程并且返回true if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果當(dāng)前線程已經(jīng)是獨(dú)占線程,即當(dāng)前線程已經(jīng)獲得了同步狀態(tài)則將同步狀態(tài)state加1, //這里是可重入鎖的體現(xiàn) else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //沒有搶占到同步狀態(tài)返回false return false; }
再看addWaiter方法:
private Node addWaiter(Node mode) { //新建一個(gè)Node,封裝了當(dāng)前線程和模式,這里傳入的是獨(dú)占模式Node.EXCLUSIVE Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //如果tail不為空就不需要初始化node隊(duì)列了 if (pred != null) { //將node作為隊(duì)列最后一個(gè)元素入列 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; //返回新建的node return node; } } //如果tail為空則表示node隊(duì)列還沒有初始化,此時(shí)初始化隊(duì)列 enq(node); return node; }
瞄一眼enq方法:
private Node enq(final Node node) { //無限loop直到CAS成功,其他地方也大量使用了無限loop for (;;) { Node t = tail; if (t == null) { // Must initialize //隊(duì)列尾部為空,必須初始化,head初始化為一個(gè)空node,不包含線程,tail = head if (compareAndSetHead(new Node())) tail = head; } else { //隊(duì)列已經(jīng)初始化,將當(dāng)前node加在列尾 node.prev = t; //將當(dāng)前node設(shè)置為tail,CAS操作,enqueue安全 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
拿到新建的node后傳給acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { //標(biāo)記是否中斷狀態(tài) boolean interrupted = false; for (;;) { //拿到當(dāng)前node的前驅(qū) final Node p = node.predecessor(); //如果前驅(qū)正好為head,即當(dāng)前線程在列首,馬上tryAcquire搶占同步狀態(tài) if (p == head && tryAcquire(arg)) { //搶占成功后,將當(dāng)前節(jié)點(diǎn)的thread、prev清空作為head setHead(node); p.next = null; // help GC 原來的head等待GC回收 failed = false; return interrupted; } //沒有搶占成功后,判斷是否要park if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
瞄一眼shouldParkAfterFailedAcquire方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //如果前驅(qū)node的狀態(tài)為SIGNAL,說明當(dāng)前node可以park /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { //如果前驅(qū)的狀態(tài)大于0說明前驅(qū)node的thread已經(jīng)被取消 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { //從前驅(qū)node開始,將取消的node移出隊(duì)列 //當(dāng)前節(jié)點(diǎn)之前的節(jié)點(diǎn)不會(huì)變化,所以這里可以更新prev,而且不必用CAS來更新。 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //前驅(qū)node狀態(tài)等于0或者為PROPAGATE(以后會(huì)介紹) //將前驅(qū)node狀態(tài)設(shè)置為SIGNAL,返回false,表示當(dāng)前node暫不需要park, //可以再嘗試一下?lián)屨纪綘顟B(tài) /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don"t park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
看一下parkAndCheckInterrupt方法:
private final boolean parkAndCheckInterrupt() { //阻塞當(dāng)前線程 LockSupport.park(this); //返回當(dāng)前線程是否設(shè)置中斷標(biāo)志,并清空中斷標(biāo)志 return Thread.interrupted(); }
這里解釋一下為什么要保存一下中斷標(biāo)志:中斷會(huì)喚醒被park的阻塞線程,但被park的阻塞線程不會(huì)響應(yīng)中斷,所以這里保存一下中斷狀態(tài)并返回,如果狀態(tài)為true說明發(fā)生過中斷,會(huì)補(bǔ)發(fā)一次中斷,即調(diào)用interrupt()方法
在acquireQueued中發(fā)生異常時(shí)執(zhí)行cancelAcquire:
private void cancelAcquire(Node node) { // Ignore if node doesn"t exist if (node == null) return; //清空node的線程 node.thread = null; // Skip cancelled predecessors //移除被取消的前繼node,這里只移動(dòng)了node的prev,沒有改變next Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. //獲取前繼node的后繼node Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. //設(shè)置當(dāng)前node等待狀態(tài)為取消,其他線程檢測(cè)到取消狀態(tài)會(huì)移除它們 node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { //如果當(dāng)前node為tail,將前驅(qū)node設(shè)置為tail(CAS) //設(shè)置前驅(qū)node(即現(xiàn)在的tail)的后繼為null(CAS) //此時(shí),如果中間有取消的node,將沒有引用指向它,將被GC回收 compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred"s next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { //如果當(dāng)前node既不是head也不是tail,設(shè)置前繼node的后繼為當(dāng)前node后繼 Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { //喚醒當(dāng)前node后繼 unparkSuccessor(node); } //當(dāng)前node的next設(shè)置為自己 //注意現(xiàn)在當(dāng)前node的后繼的prev還指向當(dāng)前node,所以當(dāng)前node還未被刪除,prev是在移除取消節(jié)點(diǎn)時(shí)更新的 //這里就是為什么在前面要從后往前找可換新的node原因了,next會(huì)導(dǎo)致死循環(huán) node.next = node; // help GC } }
畫圖描述解析一下cancelAcquire:
首先看如何跳過取消的前驅(qū)
這時(shí),前驅(qū)被取消的node并沒有被移出隊(duì)列,前驅(qū)的前驅(qū)的next還指向前驅(qū);
如果當(dāng)前node是tail的情況:
這時(shí),沒有任何引用指向當(dāng)前node;
如果當(dāng)前node既不是tail也不是head:
這時(shí),當(dāng)前node的前驅(qū)的next指向當(dāng)前node的后繼,當(dāng)前node的next指向自己,pre都沒有更新;
如果當(dāng)前node是head的后繼:
這時(shí),只是簡(jiǎn)單的將當(dāng)前node的next指向自己;
到這里,當(dāng)線程搶占同步狀態(tài)的時(shí)候,會(huì)進(jìn)入FIFO隊(duì)列等待同步狀態(tài)被釋放。在unlock()方法中調(diào)用了同步器的release方法;看一下release方法:
public final boolean release(int arg) { //判斷是否釋放同步狀態(tài)成功 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //如果head不為null,且head的等待狀態(tài)不為0, //喚醒后繼node的線程 unparkSuccessor(h); return true; } return false; }
再來看一下tryRelease方法(在Sync類中實(shí)現(xiàn)):
protected final boolean tryRelease(int releases) { int c = getState() - releases; //當(dāng)前thread不是獨(dú)占模式的那個(gè)線程,拋出異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { //如果同步狀態(tài)state為0,釋放成功,將獨(dú)占線程設(shè)置為null free = true; setExclusiveOwnerThread(null); } //更新同步狀態(tài)state setState(c); return free; }
繼續(xù)看unparkSuccessor(喚醒后繼node的tread)方法:
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) //head的等待狀態(tài)為負(fù)數(shù),設(shè)置head的等待狀態(tài)為0 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { //如果head的后繼node不存在或者后繼node等待狀態(tài)大于0(即取消) //從尾部往當(dāng)前node迭代找到等待狀態(tài)為負(fù)數(shù)的node,unpark //因?yàn)闀?huì)有取消的節(jié)點(diǎn) s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }總結(jié)
介紹完ReentrantLock后,我們大體了解了AQS的工作原理。AQS主要就是使用了同步狀態(tài)和隊(duì)列實(shí)現(xiàn)了鎖的功能。有了CAS這個(gè)基礎(chǔ),AQS才能發(fā)揮作用,使得在enqueue、dequeque、節(jié)點(diǎn)取消和異常時(shí)能夠保證隊(duì)列在多線程下的完整性。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/70915.html
摘要:有了這個(gè)基礎(chǔ),才能發(fā)揮作用,使得在節(jié)點(diǎn)取消和異常時(shí)能夠保證隊(duì)列在多線程下的完整性。 Doug Lea是JDK中concurrent工具包的作者,這位大神是誰可以自行g(shù)oogle。 本文淺析ReentrantLock(可重入鎖)的原理 Lock接口 showImg(https://segmentfault.com/img/bV2671?w=276&h=176); Lock接口定義了這幾個(gè)...
摘要:有了這個(gè)基礎(chǔ),才能發(fā)揮作用,使得在節(jié)點(diǎn)取消和異常時(shí)能夠保證隊(duì)列在多線程下的完整性。 Doug Lea是JDK中concurrent工具包的作者,這位大神是誰可以自行g(shù)oogle。 本文淺析ReentrantLock(可重入鎖)的原理 Lock接口 showImg(https://segmentfault.com/img/bV2671?w=276&h=176); Lock接口定義了這幾個(gè)...
摘要:當(dāng)前線程已經(jīng)獲取過這個(gè)鎖,則此時(shí)是重入,改變的計(jì)數(shù)即可,返回表示加鎖成功。的核心在于使用更新鎖的狀態(tài),并利用一個(gè)同步隊(duì)列將獲取鎖失敗的線程進(jìn)行排隊(duì),當(dāng)前驅(qū)節(jié)點(diǎn)解鎖后再喚醒后繼節(jié)點(diǎn),是一個(gè)幾乎純實(shí)現(xiàn)的加鎖與解鎖。 簡(jiǎn)介 Java 并發(fā)編程離不開鎖, Synchronized 是常用的一種實(shí)現(xiàn)加鎖的方式,使用比較簡(jiǎn)單快捷。在 Java 中還有另一種鎖,即 Lock 鎖。 Lock 是一個(gè)接...
摘要:開始獲取鎖終于輪到出場(chǎng)了,的調(diào)用過程和完全一樣,同樣拿不到鎖,然后加入到等待隊(duì)列隊(duì)尾然后,在阻塞前需要把前驅(qū)結(jié)點(diǎn)的狀態(tài)置為,以確保將來可以被喚醒至此,的執(zhí)行也暫告一段落了安心得在等待隊(duì)列中睡覺。 showImg(https://segmentfault.com/img/remote/1460000016012467); 本文首發(fā)于一世流云的專欄:https://segmentfault...
摘要:的主要功能和關(guān)鍵字一致,均是用于多線程的同步。而僅支持通過查詢當(dāng)前線程是否持有鎖。由于和使用的是同一把可重入鎖,所以線程可以進(jìn)入方法,并再次獲得鎖,而不會(huì)被阻塞住。公平與非公平公平與非公平指的是線程獲取鎖的方式。 1.簡(jiǎn)介 可重入鎖ReentrantLock自 JDK 1.5 被引入,功能上與synchronized關(guān)鍵字類似。所謂的可重入是指,線程可對(duì)同一把鎖進(jìn)行重復(fù)加鎖,而不會(huì)被阻...
閱讀 2336·2021-09-29 09:42
閱讀 573·2021-09-06 15:02
閱讀 2623·2021-09-02 15:40
閱讀 2127·2019-08-30 14:23
閱讀 1874·2019-08-30 13:48
閱讀 1300·2019-08-26 12:01
閱讀 973·2019-08-26 11:53
閱讀 2159·2019-08-23 18:31