摘要:的引入先來看下,為什么有了,還要引入使得多個(gè)讀線程同時(shí)持有讀鎖只要寫鎖未被占用,而寫鎖是獨(dú)占的。部分常量的比特位表示如下另外,相比,對(duì)多核進(jìn)行了優(yōu)化,可以看到,當(dāng)核數(shù)超過時(shí),會(huì)有一些自旋操作示例分析假設(shè)現(xiàn)在有三個(gè)線程。
本文首發(fā)于一世流云的專欄:https://segmentfault.com/blog...一、StampedLock類簡介
StampedLock類,在JDK1.8時(shí)引入,是對(duì)讀寫鎖ReentrantReadWriteLock的增強(qiáng),該類提供了一些功能,優(yōu)化了讀鎖、寫鎖的訪問,同時(shí)使讀寫鎖之間可以互相轉(zhuǎn)換,更細(xì)粒度控制并發(fā)。
首先明確下,該類的設(shè)計(jì)初衷是作為一個(gè)內(nèi)部工具類,用于輔助開發(fā)其它線程安全組件,用得好,該類可以提升系統(tǒng)性能,用不好,容易產(chǎn)生死鎖和其它莫名其妙的問題。
1.1 StampedLock的引入先來看下,為什么有了ReentrantReadWriteLock,還要引入StampedLock?
ReentrantReadWriteLock使得多個(gè)讀線程同時(shí)持有讀鎖(只要寫鎖未被占用),而寫鎖是獨(dú)占的。
但是,讀寫鎖如果使用不當(dāng),很容易產(chǎn)生“饑餓”問題:
比如在讀線程非常多,寫線程很少的情況下,很容易導(dǎo)致寫線程“饑餓”,雖然使用“公平”策略可以一定程度上緩解這個(gè)問題,但是“公平”策略是以犧牲系統(tǒng)吞吐量為代價(jià)的。(在ReentrantLock類的介紹章節(jié)中,介紹過這種情況)
1.2 StampedLock的特點(diǎn)StampedLock的主要特點(diǎn)概括一下,有以下幾點(diǎn):
所有獲取鎖的方法,都返回一個(gè)郵戳(Stamp),Stamp為0表示獲取失敗,其余都表示成功;
所有釋放鎖的方法,都需要一個(gè)郵戳(Stamp),這個(gè)Stamp必須是和成功獲取鎖時(shí)得到的Stamp一致;
StampedLock是不可重入的;(如果一個(gè)線程已經(jīng)持有了寫鎖,再去獲取寫鎖的話就會(huì)造成死鎖)
StampedLock有三種訪問模式:
①Reading(讀模式):功能和ReentrantReadWriteLock的讀鎖類似
②Writing(寫模式):功能和ReentrantReadWriteLock的寫鎖類似
③Optimistic reading(樂觀讀模式):這是一種優(yōu)化的讀模式。
StampedLock支持讀鎖和寫鎖的相互轉(zhuǎn)換
我們知道RRW中,當(dāng)線程獲取到寫鎖后,可以降級(jí)為讀鎖,但是讀鎖是不能直接升級(jí)為寫鎖的。
StampedLock提供了讀鎖和寫鎖相互轉(zhuǎn)換的功能,使得該類支持更多的應(yīng)用場景。
無論寫鎖還是讀鎖,都不支持Conditon等待
我們知道,在ReentrantReadWriteLock中,當(dāng)讀鎖被使用時(shí),如果有線程嘗試獲取寫鎖,該寫線程會(huì)阻塞。二、StampedLock使用示例
但是,在Optimistic reading中,即使讀線程獲取到了讀鎖,寫線程嘗試獲取寫鎖也不會(huì)阻塞,這相當(dāng)于對(duì)讀模式的優(yōu)化,但是可能會(huì)導(dǎo)致數(shù)據(jù)不一致的問題。所以,當(dāng)使用Optimistic reading獲取到讀鎖時(shí),必須對(duì)獲取結(jié)果進(jìn)行校驗(yàn)。
先來看一個(gè)Oracle官方的例子:
class Point { private double x, y; private final StampedLock sl = new StampedLock(); void move(double deltaX, double deltaY) { long stamp = sl.writeLock(); //涉及對(duì)共享資源的修改,使用寫鎖-獨(dú)占操作 try { x += deltaX; y += deltaY; } finally { sl.unlockWrite(stamp); } } /** * 使用樂觀讀鎖訪問共享資源 * 注意:樂觀讀鎖在保證數(shù)據(jù)一致性上需要拷貝一份要操作的變量到方法棧,并且在操作數(shù)據(jù)時(shí)候可能其他寫線程已經(jīng)修改了數(shù)據(jù), * 而我們操作的是方法棧里面的數(shù)據(jù),也就是一個(gè)快照,所以最多返回的不是最新的數(shù)據(jù),但是一致性還是得到保障的。 * * @return */ double distanceFromOrigin() { long stamp = sl.tryOptimisticRead(); // 使用樂觀讀鎖 double currentX = x, currentY = y; // 拷貝共享資源到本地方法棧中 if (!sl.validate(stamp)) { // 如果有寫鎖被占用,可能造成數(shù)據(jù)不一致,所以要切換到普通讀鎖模式 stamp = sl.readLock(); try { currentX = x; currentY = y; } finally { sl.unlockRead(stamp); } } return Math.sqrt(currentX * currentX + currentY * currentY); } void moveIfAtOrigin(double newX, double newY) { // upgrade // Could instead start with optimistic, not read mode long stamp = sl.readLock(); try { while (x == 0.0 && y == 0.0) { long ws = sl.tryConvertToWriteLock(stamp); //讀鎖轉(zhuǎn)換為寫鎖 if (ws != 0L) { stamp = ws; x = newX; y = newY; break; } else { sl.unlockRead(stamp); stamp = sl.writeLock(); } } } finally { sl.unlock(stamp); } } }
可以看到,上述示例最特殊的其實(shí)是distanceFromOrigin方法,這個(gè)方法中使用了“Optimistic reading”樂觀讀鎖,使得讀寫可以并發(fā)執(zhí)行,但是“Optimistic reading”的使用必須遵循以下模式:
long stamp = lock.tryOptimisticRead(); // 非阻塞獲取版本信息 copyVaraibale2ThreadMemory(); // 拷貝變量到線程本地堆棧 if(!lock.validate(stamp)){ // 校驗(yàn) long stamp = lock.readLock(); // 獲取讀鎖 try { copyVaraibale2ThreadMemory(); // 拷貝變量到線程本地堆棧 } finally { lock.unlock(stamp); // 釋放悲觀鎖 } } useThreadMemoryVarables(); // 使用線程本地堆棧里面的數(shù)據(jù)進(jìn)行操作三、StampedLock原理 3.1 StampedLock的內(nèi)部常量
StampedLock雖然不像其它鎖一樣定義了內(nèi)部類來實(shí)現(xiàn)AQS框架,但是StampedLock的基本實(shí)現(xiàn)思路還是利用CLH隊(duì)列進(jìn)行線程的管理,通過同步狀態(tài)值來表示鎖的狀態(tài)和類型。
StampedLock內(nèi)部定義了很多常量,定義這些常量的根本目的還是和ReentrantReadWriteLock一樣,對(duì)同步狀態(tài)值按位切分,以通過位運(yùn)算對(duì)State進(jìn)行操作:
對(duì)于StampedLock來說,寫鎖被占用的標(biāo)志是第8位為1,讀鎖使用0-7位,正常情況下讀鎖數(shù)目為1-126,超過126時(shí),使用一個(gè)名為readerOverflow的int整型保存超出數(shù)。
部分常量的比特位表示如下:
另外,StampedLock相比ReentrantReadWriteLock,對(duì)多核CPU進(jìn)行了優(yōu)化,可以看到,當(dāng)CPU核數(shù)超過1時(shí),會(huì)有一些自旋操作:
假設(shè)現(xiàn)在有三個(gè)線程:ThreadA、ThreadB、ThreadC、ThreadD。操作如下:1. StampedLock對(duì)象的創(chuàng)建
//ThreadA調(diào)用writeLock, 獲取寫鎖
//ThreadB調(diào)用readLock, 獲取讀鎖
//ThreadC調(diào)用readLock, 獲取讀鎖
//ThreadD調(diào)用writeLock, 獲取寫鎖
//ThreadE調(diào)用readLock, 獲取讀鎖
StampedLock的構(gòu)造器很簡單,構(gòu)造時(shí)設(shè)置下同步狀態(tài)值:
另外,StamedLock提供了三類視圖:
這些視圖其實(shí)是對(duì)StamedLock方法的封裝,便于習(xí)慣了ReentrantReadWriteLock的用戶使用:
例如,ReadLockView其實(shí)相當(dāng)于ReentrantReadWriteLock.readLock()返回的讀鎖;
來看下writeLock方法:
StampedLock中大量運(yùn)用了位運(yùn)算,這里(s = state) & ABITS == 0L 表示讀鎖和寫鎖都未被使用,這里寫鎖可以立即獲取成功,然后CAS操作更新同步狀態(tài)值State。
操作完成后,等待隊(duì)列的結(jié)構(gòu)如下:
注意:StampedLock中,等待隊(duì)列的結(jié)點(diǎn)要比AQS中簡單些,僅僅三種狀態(tài)。
0:初始狀態(tài)
-1:等待中
1:取消
另外,結(jié)點(diǎn)的定義中有個(gè)cowait字段,該字段指向一個(gè)棧,用于保存讀線程,這個(gè)后續(xù)會(huì)講到。
來看下readLock方法:
由于ThreadA此時(shí)持有寫鎖,所以ThreadB獲取讀鎖失敗,將調(diào)用acquireRead方法,加入等待隊(duì)列:
acquireRead方法非常復(fù)雜,用到了大量自旋操作:
/** * 嘗試自旋的獲取讀鎖, 獲取不到則加入等待隊(duì)列, 并阻塞線程 * * @param interruptible true 表示檢測中斷, 如果線程被中斷過, 則最終返回INTERRUPTED * @param deadline 如果非0, 則表示限時(shí)獲取 * @return 非0表示獲取成功, INTERRUPTED表示中途被中斷過 */ private long acquireRead(boolean interruptible, long deadline) { WNode node = null, p; // node指向入隊(duì)結(jié)點(diǎn), p指向入隊(duì)前的隊(duì)尾結(jié)點(diǎn) /** * 自旋入隊(duì)操作 * 如果寫鎖未被占用, 則立即嘗試獲取讀鎖, 獲取成功則返回. * 如果寫鎖被占用, 則將當(dāng)前讀線程包裝成結(jié)點(diǎn), 并插入等待隊(duì)列(如果隊(duì)尾是寫結(jié)點(diǎn),直接鏈接到隊(duì)尾;否則,鏈接到隊(duì)尾讀結(jié)點(diǎn)的棧中) */ for (int spins = -1; ; ) { WNode h; if ((h = whead) == (p = wtail)) { // 如果隊(duì)列為空或只有頭結(jié)點(diǎn), 則會(huì)立即嘗試獲取讀鎖 for (long m, s, ns; ; ) { if ((m = (s = state) & ABITS) < RFULL ? // 判斷寫鎖是否被占用 U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : //寫鎖未占用,且讀鎖數(shù)量未超限, 則更新同步狀態(tài) (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) //寫鎖未占用,但讀鎖數(shù)量超限, 超出部分放到readerOverflow字段中 return ns; // 獲取成功后, 直接返回 else if (m >= WBIT) { // 寫鎖被占用,以隨機(jī)方式探測是否要退出自旋 if (spins > 0) { if (LockSupport.nextSecondarySeed() >= 0) --spins; } else { if (spins == 0) { WNode nh = whead, np = wtail; if ((nh == h && np == p) || (h = nh) != (p = np)) break; } spins = SPINS; } } } } if (p == null) { // p == null表示隊(duì)列為空, 則初始化隊(duì)列(構(gòu)造頭結(jié)點(diǎn)) WNode hd = new WNode(WMODE, null); if (U.compareAndSwapObject(this, WHEAD, null, hd)) wtail = hd; } else if (node == null) { // 將當(dāng)前線程包裝成讀結(jié)點(diǎn) node = new WNode(RMODE, p); } else if (h == p || p.mode != RMODE) { // 如果隊(duì)列只有一個(gè)頭結(jié)點(diǎn), 或隊(duì)尾結(jié)點(diǎn)不是讀結(jié)點(diǎn), 則直接將結(jié)點(diǎn)鏈接到隊(duì)尾, 鏈接完成后退出自旋 if (node.prev != p) node.prev = p; else if (U.compareAndSwapObject(this, WTAIL, p, node)) { p.next = node; break; } } // 隊(duì)列不為空, 且隊(duì)尾是讀結(jié)點(diǎn), 則將添加當(dāng)前結(jié)點(diǎn)鏈接到隊(duì)尾結(jié)點(diǎn)的cowait鏈中(實(shí)際上構(gòu)成一個(gè)棧, p是棧頂指針 ) else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node)) { // CAS操作隊(duì)尾結(jié)點(diǎn)p的cowait字段,實(shí)際上就是頭插法插入結(jié)點(diǎn) node.cowait = null; } else { for (; ; ) { WNode pp, c; Thread w; // 嘗試喚醒頭結(jié)點(diǎn)的cowait中的第一個(gè)元素, 假如是讀鎖會(huì)通過循環(huán)釋放cowait鏈 if ((h = whead) != null && (c = h.cowait) != null && U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) // help release U.unpark(w); if (h == (pp = p.prev) || h == p || pp == null) { long m, s, ns; do { if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) return ns; } while (m < WBIT); } if (whead == h && p.prev == pp) { long time; if (pp == null || h == p || p.status > 0) { node = null; // throw away break; } if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, p, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if ((h != pp || (state & ABITS) == WBIT) && whead == h && p.prev == pp) { // 寫鎖被占用, 且當(dāng)前結(jié)點(diǎn)不是隊(duì)首結(jié)點(diǎn), 則阻塞當(dāng)前線程 U.park(false, time); } node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, p, true); } } } } for (int spins = -1; ; ) { WNode h, np, pp; int ps; if ((h = whead) == p) { // 如果當(dāng)前線程是隊(duì)首結(jié)點(diǎn), 則嘗試獲取讀鎖 if (spins < 0) spins = HEAD_SPINS; else if (spins < MAX_HEAD_SPINS) spins <<= 1; for (int k = spins; ; ) { // spin at head long m, s, ns; if ((m = (s = state) & ABITS) < RFULL ? // 判斷寫鎖是否被占用 U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : //寫鎖未占用,且讀鎖數(shù)量未超限, 則更新同步狀態(tài) (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { //寫鎖未占用,但讀鎖數(shù)量超限, 超出部分放到readerOverflow字段中 // 獲取讀鎖成功, 釋放cowait鏈中的所有讀結(jié)點(diǎn) WNode c; Thread w; // 釋放頭結(jié)點(diǎn), 當(dāng)前隊(duì)首結(jié)點(diǎn)成為新的頭結(jié)點(diǎn) whead = node; node.prev = null; // 從棧頂開始(node.cowait指向的結(jié)點(diǎn)), 依次喚醒所有讀結(jié)點(diǎn), 最終node.cowait==null, node成為新的頭結(jié)點(diǎn) while ((c = node.cowait) != null) { if (U.compareAndSwapObject(node, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } return ns; } else if (m >= WBIT && LockSupport.nextSecondarySeed() >= 0 && --k <= 0) break; } } else if (h != null) { // 如果頭結(jié)點(diǎn)存在cowait鏈, 則喚醒鏈中所有讀線程 WNode c; Thread w; while ((c = h.cowait) != null) { if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } } if (whead == h) { if ((np = node.prev) != p) { if (np != null) (p = np).next = node; // stale } else if ((ps = p.status) == 0) // 將前驅(qū)結(jié)點(diǎn)的等待狀態(tài)置為WAITING, 表示之后將喚醒當(dāng)前結(jié)點(diǎn) U.compareAndSwapInt(p, WSTATUS, 0, WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; } } else { // 阻塞當(dāng)前讀線程 long time; if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) //限時(shí)等待超時(shí), 取消等待 return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p) { // 如果前驅(qū)的等待狀態(tài)為WAITING, 且寫鎖被占用, 則阻塞當(dāng)前調(diào)用線程 U.park(false, time); } node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); } } } }
我們來分析下這個(gè)方法。
該方法會(huì)首先自旋的嘗試獲取讀鎖,獲取成功后,就直接返回;否則,會(huì)將當(dāng)前線程包裝成一個(gè)讀結(jié)點(diǎn),插入到等待隊(duì)列。
由于,目前等待隊(duì)列還是空,所以ThreadB會(huì)初始化隊(duì)列,然后將自身包裝成一個(gè)讀結(jié)點(diǎn),插入隊(duì)尾,然后在下面這個(gè)地方跳出自旋:
此時(shí),等待隊(duì)列的結(jié)構(gòu)如下:
跳出自旋后,ThreadB會(huì)繼續(xù)向下執(zhí)行,進(jìn)入下一個(gè)自旋,在下一個(gè)自旋中,依然會(huì)再次嘗試獲取讀鎖,如果這次再獲取不到,就會(huì)將前驅(qū)的等待狀態(tài)置為WAITING, 表示我(當(dāng)前線程)要去睡了(阻塞),到時(shí)記得叫醒我:
最終, ThreadB進(jìn)入阻塞狀態(tài):
最終,等待隊(duì)列的結(jié)構(gòu)如下:
4. ThreadC調(diào)用readLock獲取讀鎖這個(gè)過程和ThreadB獲取讀鎖一樣,區(qū)別在于ThreadC被包裝成結(jié)點(diǎn)加入等待隊(duì)列后,是鏈接到ThreadB結(jié)點(diǎn)的棧指針中的。調(diào)用完下面這段代碼后,ThreadC會(huì)鏈接到以Thread B為棧頂指針的棧中:
注意:讀結(jié)點(diǎn)的cowait字段其實(shí)構(gòu)成了一個(gè)棧,入棧的過程其實(shí)是個(gè)“頭插法”插入單鏈表的過程。比如,再來個(gè)ThreadX讀結(jié)點(diǎn),則cowait鏈表結(jié)構(gòu)為:ThreadB - > ThreadX -> ThreadC。最終喚醒讀結(jié)點(diǎn)時(shí),將從棧頂開始。
然后會(huì)在下一次自旋中,阻塞當(dāng)前讀線程:
最終,等待隊(duì)列的結(jié)構(gòu)如下:
可以看到,此時(shí)ThreadC結(jié)點(diǎn)并沒有把它的前驅(qū)的等待狀態(tài)置為-1,因?yàn)門hreadC是鏈接到棧中的,當(dāng)寫鎖釋放的時(shí)候,會(huì)從棧底元素開始,喚醒棧中所有讀結(jié)點(diǎn)。
5. ThreadD調(diào)用writeLock獲取寫鎖ThreadD調(diào)用writeLock方法獲取寫鎖失敗后(ThreadA依然占用著寫鎖),會(huì)調(diào)用acquireWrite方法,該方法整體邏輯和acquireRead差不多,首先自旋的嘗試獲取寫鎖,獲取成功后,就直接返回;否則,會(huì)將當(dāng)前線程包裝成一個(gè)寫結(jié)點(diǎn),插入到等待隊(duì)列。
acquireWrite源碼:
/** * 嘗試自旋的獲取寫鎖, 獲取不到則阻塞線程 * * @param interruptible true 表示檢測中斷, 如果線程被中斷過, 則最終返回INTERRUPTED * @param deadline 如果非0, 則表示限時(shí)獲取 * @return 非0表示獲取成功, INTERRUPTED表示中途被中斷過 */ private long acquireWrite(boolean interruptible, long deadline) { WNode node = null, p; /** * 自旋入隊(duì)操作 * 如果沒有任何鎖被占用, 則立即嘗試獲取寫鎖, 獲取成功則返回. * 如果存在鎖被使用, 則將當(dāng)前線程包裝成獨(dú)占結(jié)點(diǎn), 并插入等待隊(duì)列尾部 */ for (int spins = -1; ; ) { long m, s, ns; if ((m = (s = state) & ABITS) == 0L) { // 沒有任何鎖被占用 if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) // 嘗試立即獲取寫鎖 return ns; // 獲取成功直接返回 } else if (spins < 0) spins = (m == WBIT && wtail == whead) ? SPINS : 0; else if (spins > 0) { if (LockSupport.nextSecondarySeed() >= 0) --spins; } else if ((p = wtail) == null) { // 隊(duì)列為空, 則初始化隊(duì)列, 構(gòu)造隊(duì)列的頭結(jié)點(diǎn) WNode hd = new WNode(WMODE, null); if (U.compareAndSwapObject(this, WHEAD, null, hd)) wtail = hd; } else if (node == null) // 將當(dāng)前線程包裝成寫結(jié)點(diǎn) node = new WNode(WMODE, p); else if (node.prev != p) node.prev = p; else if (U.compareAndSwapObject(this, WTAIL, p, node)) { // 鏈接結(jié)點(diǎn)至隊(duì)尾 p.next = node; break; } } for (int spins = -1; ; ) { WNode h, np, pp; int ps; if ((h = whead) == p) { // 如果當(dāng)前結(jié)點(diǎn)是隊(duì)首結(jié)點(diǎn), 則立即嘗試獲取寫鎖 if (spins < 0) spins = HEAD_SPINS; else if (spins < MAX_HEAD_SPINS) spins <<= 1; for (int k = spins; ; ) { // spin at head long s, ns; if (((s = state) & ABITS) == 0L) { // 寫鎖未被占用 if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) { // CAS修改State: 占用寫鎖 // 將隊(duì)首結(jié)點(diǎn)從隊(duì)列移除 whead = node; node.prev = null; return ns; } } else if (LockSupport.nextSecondarySeed() >= 0 && --k <= 0) break; } } else if (h != null) { // 喚醒頭結(jié)點(diǎn)的棧中的所有讀線程 WNode c; Thread w; while ((c = h.cowait) != null) { if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } } if (whead == h) { if ((np = node.prev) != p) { if (np != null) (p = np).next = node; // stale } else if ((ps = p.status) == 0) // 將當(dāng)前結(jié)點(diǎn)的前驅(qū)置為WAITING, 表示當(dāng)前結(jié)點(diǎn)會(huì)進(jìn)入阻塞, 前驅(qū)將來需要喚醒我 U.compareAndSwapInt(p, WSTATUS, 0, WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; } } else { // 阻塞當(dāng)前調(diào)用線程 long time; // 0 argument to park means no timeout if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p) U.park(false, time); // emulate LockSupport.park node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); } } } }
acquireWrite中的下面這個(gè)自旋操作,用于將線程包裝成寫結(jié)點(diǎn),插入隊(duì)尾:
插入完成后,隊(duì)列結(jié)構(gòu)如下:
然后,進(jìn)入下一個(gè)自旋,并在下一個(gè)自旋中阻塞ThreadD,最終隊(duì)列結(jié)構(gòu)如下:
同樣,由于寫鎖被ThreadA占用著,所以最終會(huì)調(diào)用acquireRead方法,在該方法的第一個(gè)自旋中,會(huì)將ThreadE加入等待隊(duì)列:
注意,由于隊(duì)尾結(jié)點(diǎn)是寫結(jié)點(diǎn),所以當(dāng)前讀結(jié)點(diǎn)會(huì)直接鏈接到隊(duì)尾;如果隊(duì)尾是讀結(jié)點(diǎn),則會(huì)鏈接到隊(duì)尾讀結(jié)點(diǎn)的cowait鏈中。
然后進(jìn)入第二個(gè)自旋,阻塞ThreadE,最終隊(duì)列結(jié)構(gòu)如下:
通過CAS操作,修改State成功后,會(huì)調(diào)用release方法喚醒等待隊(duì)列的隊(duì)首結(jié)點(diǎn):
release方法非常簡單,先將頭結(jié)點(diǎn)的等待狀態(tài)置為0,表示即將喚醒后繼結(jié)點(diǎn),然后立即喚醒隊(duì)首結(jié)點(diǎn):
此時(shí),等待隊(duì)列的結(jié)構(gòu)如下:
ThreadB被喚醒后,會(huì)從原阻塞處繼續(xù)向下執(zhí)行,然后開始下一次自旋:
第二次自旋時(shí),ThreadB發(fā)現(xiàn)寫鎖未被占用,則成功獲取到讀鎖,然后從棧頂(ThreadB的cowait指針指向的結(jié)點(diǎn))開始喚醒棧中所有線程,
最后返回:
最終,等待隊(duì)列的結(jié)構(gòu)如下:
ThreadC被喚醒后,繼續(xù)執(zhí)行,并進(jìn)入下一次自旋,下一次自旋時(shí),會(huì)成功獲取到讀鎖。
注意,此時(shí)ThreadB和ThreadC已經(jīng)拿到了讀鎖,ThreadD(寫線程)和ThreadE(讀線程)依然阻塞中,原來ThreadC對(duì)應(yīng)的結(jié)點(diǎn)是個(gè)孤立結(jié)點(diǎn),會(huì)被GC回收。
最終,等待隊(duì)列的結(jié)構(gòu)如下:
ThreadB和ThreadC調(diào)用unlockRead方法釋放讀鎖,CAS操作State將讀鎖數(shù)量減1:
注意,當(dāng)讀鎖的數(shù)量變?yōu)?時(shí)才會(huì)調(diào)用release方法,喚醒隊(duì)首結(jié)點(diǎn):
隊(duì)首結(jié)點(diǎn)(ThreadD寫結(jié)點(diǎn)被喚醒),最終等待隊(duì)列的結(jié)構(gòu)如下:
ThreadD會(huì)從原阻塞處繼續(xù)向下執(zhí)行,并在下一次自旋中獲取到寫鎖,然后返回:
最終,等待隊(duì)列的結(jié)構(gòu)如下:
ThreadD釋放寫鎖的過程和步驟7完全相同,會(huì)調(diào)用unlockWrite喚醒隊(duì)首結(jié)點(diǎn)(ThreadE)。
ThreadE被喚醒后會(huì)從原阻塞處繼續(xù)向下執(zhí)行,但由于ThreadE是個(gè)讀結(jié)點(diǎn),所以同時(shí)會(huì)喚醒cowait棧中的所有讀結(jié)點(diǎn),過程和步驟8完全一樣。最終,等待隊(duì)列的結(jié)構(gòu)如下:
至此,全部執(zhí)行完成。
四、StampedLock類/方法聲明參考Oracle官方文檔:https://docs.oracle.com/javas...
類聲明:
方法聲明:
StampedLock的等待隊(duì)列與RRW的CLH隊(duì)列相比,有以下特點(diǎn):
當(dāng)入隊(duì)一個(gè)線程時(shí),如果隊(duì)尾是讀結(jié)點(diǎn),不會(huì)直接鏈接到隊(duì)尾,而是鏈接到該讀結(jié)點(diǎn)的cowait鏈中,cowait鏈本質(zhì)是一個(gè)棧;
當(dāng)入隊(duì)一個(gè)線程時(shí),如果隊(duì)尾是寫結(jié)點(diǎn),則直接鏈接到隊(duì)尾;
喚醒線程的規(guī)則和AQS類似,都是首先喚醒隊(duì)首結(jié)點(diǎn)。區(qū)別是StampedLock中,當(dāng)喚醒的結(jié)點(diǎn)是讀結(jié)點(diǎn)時(shí),會(huì)喚醒該讀結(jié)點(diǎn)的cowait鏈中的所有讀結(jié)點(diǎn)(順序和入棧順序相反,也就是后進(jìn)先出)。
另外,StampedLock使用時(shí)要特別小心,避免鎖重入的操作,在使用樂觀讀鎖時(shí)也需要遵循相應(yīng)的調(diào)用模板,防止出現(xiàn)數(shù)據(jù)不一致的問題。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/76548.html
摘要:整個(gè)包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設(shè)計(jì)模式,設(shè)計(jì)了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對(duì)等進(jìn)行補(bǔ)充增強(qiáng)。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:公平策略在多個(gè)線程爭用鎖的情況下,公平策略傾向于將訪問權(quán)授予等待時(shí)間最長的線程。使用方式的典型調(diào)用方式如下二類原理的源碼非常簡單,它通過內(nèi)部類實(shí)現(xiàn)了框架,接口的實(shí)現(xiàn)僅僅是對(duì)的的簡單封裝,參見原理多線程進(jìn)階七鎖框架獨(dú)占功能剖析 showImg(https://segmentfault.com/img/remote/1460000016012582); 本文首發(fā)于一世流云的專欄:https...
摘要:關(guān)于接口的介紹,可以參見多線程進(jìn)階二鎖框架接口。最終線程釋放了鎖,并進(jìn)入阻塞狀態(tài)。當(dāng)線程被通知喚醒時(shí),則是將條件隊(duì)列中的結(jié)點(diǎn)轉(zhuǎn)換成等待隊(duì)列中的結(jié)點(diǎn),之后的處理就和獨(dú)占功能完全一樣。 showImg(https://segmentfault.com/img/remote/1460000016012490); 本文首發(fā)于一世流云的專欄:https://segmentfault.com/bl...
摘要:二接口簡介可以看做是類的方法的替代品,與配合使用。當(dāng)線程執(zhí)行對(duì)象的方法時(shí),當(dāng)前線程會(huì)立即釋放鎖,并進(jìn)入對(duì)象的等待區(qū),等待其它線程喚醒或中斷。 showImg(https://segmentfault.com/img/remote/1460000016012601); 本文首發(fā)于一世流云的專欄:https://segmentfault.com/blog... 本系列文章中所說的juc-...
摘要:我們知道,的作用其實(shí)是對(duì)類的和的增強(qiáng),是為了讓線程在指定對(duì)象上等待,是一種線程之間進(jìn)行協(xié)調(diào)的工具。當(dāng)線程調(diào)用對(duì)象的方法時(shí),必須拿到和這個(gè)對(duì)象關(guān)聯(lián)的鎖。 showImg(https://segmentfault.com/img/remote/1460000016012566); 本文首發(fā)于一世流云的專欄:https://segmentfault.com/blog... 一、Reentr...
閱讀 2813·2021-11-22 14:44
閱讀 560·2021-11-22 12:00
閱讀 3694·2019-08-30 15:54
閱讀 1589·2019-08-29 17:15
閱讀 1910·2019-08-29 13:50
閱讀 1126·2019-08-29 13:17
閱讀 3524·2019-08-29 13:05
閱讀 1192·2019-08-29 11:31