摘要:同步器擁有三個(gè)成員變量隊(duì)列的頭結(jié)點(diǎn)隊(duì)列的尾節(jié)點(diǎn)和狀態(tài)。對(duì)于同步器維護(hù)的狀態(tài),多個(gè)線(xiàn)程對(duì)其的獲取將會(huì)產(chǎn)生一個(gè)鏈?zhǔn)降慕Y(jié)構(gòu)。使用將當(dāng)前線(xiàn)程,關(guān)于后續(xù)會(huì)詳細(xì)介紹。
簡(jiǎn)介
下面通過(guò)一個(gè)排它鎖的例子來(lái)深入理解一下同步器的工作原理,而只有掌握同步器的工作原理才能夠更加深入了解其他的并發(fā)組件。
排他鎖的實(shí)現(xiàn),一次只能一個(gè)線(xiàn)程獲取到鎖。
class Mutex implements Lock, java.io.Serializable { // 內(nèi)部類(lèi),自定義同步器 private static class Sync extends AbstractQueuedSynchronizer { // 是否處于占用狀態(tài) protected boolean isHeldExclusively() { return getState() == 1; } // 當(dāng)狀態(tài)為0的時(shí)候獲取鎖 public boolean tryAcquire(int acquires) { assert acquires == 1; // Otherwise unused if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 釋放鎖,將狀態(tài)設(shè)置為0 protected boolean tryRelease(int releases) { assert releases == 1; // Otherwise unused if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); return true; } // 返回一個(gè)Condition,每個(gè)condition都包含了一個(gè)condition隊(duì)列 Condition newCondition() { return new ConditionObject(); } } // 僅需要將操作代理到Sync上即可 private final Sync sync = new Sync(); public void lock() { sync.acquire(1); } public boolean tryLock() { return sync.tryAcquire(1); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public boolean isLocked() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }
可以看到Mutex將Lock接口均代理給了同步器的實(shí)現(xiàn)。
使用方將Mutex構(gòu)造出來(lái)之后,調(diào)用lock獲取鎖,調(diào)用unlock進(jìn)行解鎖。下面以Mutex為例子,詳細(xì)分析以下同步器的實(shí)現(xiàn)邏輯。
實(shí)現(xiàn)分析
如果獲取不到,將當(dāng)前線(xiàn)程構(gòu)造成節(jié)點(diǎn)Node并加入sync隊(duì)列;
進(jìn)入隊(duì)列的每個(gè)線(xiàn)程都是一個(gè)節(jié)點(diǎn)Node,從而形成了一個(gè)雙向隊(duì)列,類(lèi)似CLH隊(duì)列,這樣做的目的是線(xiàn)程間的通信會(huì)被限制在較小規(guī)模(也就是兩個(gè)節(jié)點(diǎn)左右)。
再次嘗試獲取,如果沒(méi)有獲取到那么將當(dāng)前線(xiàn)程從線(xiàn)程調(diào)度器上摘下,進(jìn)入等待狀態(tài)。
使用LockSupport將當(dāng)前線(xiàn)程unpark,關(guān)于LockSupport后續(xù)會(huì)詳細(xì)介紹。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 快速?lài)L試在尾部添加 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } }
上述邏輯主要包括:
使用當(dāng)前線(xiàn)程構(gòu)造Node;
對(duì)于一個(gè)節(jié)點(diǎn)需要做的是將當(dāng)節(jié)點(diǎn)前驅(qū)節(jié)點(diǎn)指向尾節(jié)點(diǎn)(current.prev = tail),尾節(jié)點(diǎn)指向它(tail = current),原有的尾節(jié)點(diǎn)的后繼節(jié)點(diǎn)指向它(t.next = current)而這些操作要求是原子的。上面的操作是利用尾節(jié)點(diǎn)的設(shè)置來(lái)保證的,也就是compareAndSetTail來(lái)完成的。
先行嘗試在隊(duì)尾添加;
如果尾節(jié)點(diǎn)已經(jīng)有了,然后做如下操作:
分配引用T指向尾節(jié)點(diǎn);
將節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)更新為尾節(jié)點(diǎn)(current.prev = tail);
如果尾節(jié)點(diǎn)是T,那么將當(dāng)尾節(jié)點(diǎn)設(shè)置為該節(jié)點(diǎn)(tail = current,原子更新);
T的后繼節(jié)點(diǎn)指向當(dāng)前節(jié)點(diǎn)(T.next = current)。
注意第3點(diǎn)是要求原子的。
這樣可以以最短路徑O(1)的效果來(lái)完成線(xiàn)程入隊(duì),是最大化減少開(kāi)銷(xiāo)的一種方式。
如果隊(duì)尾添加失敗或者是第一個(gè)入隊(duì)的節(jié)點(diǎn)。
如果是第1個(gè)節(jié)點(diǎn),也就是sync隊(duì)列沒(méi)有初始化,那么會(huì)進(jìn)入到enq這個(gè)方法,進(jìn)入的線(xiàn)程可能有多個(gè),或者說(shuō)在addWaiter中沒(méi)有成功入隊(duì)的線(xiàn)程都將進(jìn)入enq這個(gè)方法。
可以看到enq的邏輯是確保進(jìn)入的Node都會(huì)有機(jī)會(huì)順序的添加到sync隊(duì)列中,而加入的步驟如下:
如果尾節(jié)點(diǎn)為空,那么原子化的分配一個(gè)頭節(jié)點(diǎn),并將尾節(jié)點(diǎn)指向頭節(jié)點(diǎn),這一步是初始化;
然后是重復(fù)在addWaiter中做的工作,但是在一個(gè)while(true)的循環(huán)中,直到當(dāng)前節(jié)點(diǎn)入隊(duì)為止。
進(jìn)入sync隊(duì)列之后,接下來(lái)就是要進(jìn)行鎖的獲取,或者說(shuō)是訪(fǎng)問(wèn)控制了,只有一個(gè)線(xiàn)程能夠在同一時(shí)刻繼續(xù)的運(yùn)行,而其他的進(jìn)入等待狀態(tài)。而每個(gè)線(xiàn)程都是一個(gè)獨(dú)立的個(gè)體,它們自省的觀(guān)察,當(dāng)條件滿(mǎn)足的時(shí)候(自己的前驅(qū)是頭結(jié)點(diǎn)并且原子性的獲取了狀態(tài)),那么這個(gè)線(xiàn)程能夠繼續(xù)運(yùn)行。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
上述邏輯主要包括:
獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn);
需要獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),而頭結(jié)點(diǎn)所對(duì)應(yīng)的含義是當(dāng)前站有鎖且正在運(yùn)行。
當(dāng)前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)并且能夠獲取狀態(tài),代表該當(dāng)前節(jié)點(diǎn)占有鎖;
如果滿(mǎn)足上述條件,那么代表能夠占有鎖,根據(jù)節(jié)點(diǎn)對(duì)鎖占有的含義,設(shè)置頭結(jié)點(diǎn)為當(dāng)前節(jié)點(diǎn)。
否則進(jìn)入等待狀態(tài)。
如果沒(méi)有輪到當(dāng)前節(jié)點(diǎn)運(yùn)行,那么將當(dāng)前線(xiàn)程從線(xiàn)程調(diào)度器上摘下,也就是進(jìn)入等待狀態(tài)。
這里針對(duì)acquire做一下總結(jié):
狀態(tài)的維護(hù);
需要在鎖定時(shí),需要維護(hù)一個(gè)狀態(tài)(int類(lèi)型),而對(duì)狀態(tài)的操作是原子和非阻塞的,通過(guò)同步器提供的對(duì)狀態(tài)訪(fǎng)問(wèn)的方法對(duì)狀態(tài)進(jìn)行操縱,并且利用compareAndSet來(lái)確保原子性的修改。
狀態(tài)的獲?。?/p>
一旦成功的修改了狀態(tài),當(dāng)前線(xiàn)程或者說(shuō)節(jié)點(diǎn),就被設(shè)置為頭節(jié)點(diǎn)。
sync隊(duì)列的維護(hù)。
在獲取資源未果的過(guò)程中條件不符合的情況下(不該自己,前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn)或者沒(méi)有獲取到資源)進(jìn)入睡眠狀態(tài),停止線(xiàn)程調(diào)度器對(duì)當(dāng)前節(jié)點(diǎn)線(xiàn)程的調(diào)度。
這時(shí)引入的一個(gè)釋放的問(wèn)題,也就是說(shuō)使睡眠中的Node或者說(shuō)線(xiàn)程獲得通知的關(guān)鍵,就是前驅(qū)節(jié)點(diǎn)的通知,而這一個(gè)過(guò)程就是釋放,釋放會(huì)通知它的后繼節(jié)點(diǎn)從睡眠中返回準(zhǔn)備運(yùn)行。
下面的流程圖基本描述了一次acquire所需要經(jīng)歷的過(guò)程:
如上圖所示,其中的判定退出隊(duì)列的條件,判定條件是否滿(mǎn)足和休眠當(dāng)前線(xiàn)程就是完成了自旋spin的過(guò)程。
public final boolean release(int arg)在unlock方法的實(shí)現(xiàn)中,使用了同步器的release方法。相對(duì)于在之前的acquire方法中可以得出調(diào)用acquire,保證能夠獲取到鎖(成功獲取狀態(tài)),而release則表示將狀態(tài)設(shè)置回去,也就是將資源釋放,或者說(shuō)將鎖釋放。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
上述邏輯主要包括:
嘗試釋放狀態(tài);
tryRelease能夠保證原子化的將狀態(tài)設(shè)置回去,當(dāng)然需要使用compareAndSet來(lái)保證。如果釋放狀態(tài)成功過(guò)之后,將會(huì)進(jìn)入后繼節(jié)點(diǎn)的喚醒過(guò)程。
喚醒當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)所包含的線(xiàn)程。
通過(guò)LockSupport的unpark方法將休眠中的線(xiàn)程喚醒,讓其繼續(xù)acquire狀態(tài)。
private void unparkSuccessor(Node node) { // 將狀態(tài)設(shè)置為同步狀態(tài) int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 獲取當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn),如果滿(mǎn)足狀態(tài),那么進(jìn)行喚醒操作 // 如果沒(méi)有滿(mǎn)足狀態(tài),從尾部開(kāi)始找尋符合要求的節(jié)點(diǎn)并將其喚醒 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
上述邏輯主要包括,該方法取出了當(dāng)前節(jié)點(diǎn)的next引用,然后對(duì)其線(xiàn)程(Node)進(jìn)行了喚醒,這時(shí)就只有一個(gè)或合理個(gè)數(shù)的線(xiàn)程被喚醒,被喚醒的線(xiàn)程繼續(xù)進(jìn)行對(duì)資源的獲取與爭(zhēng)奪。
回顧整個(gè)資源的獲取和釋放過(guò)程:
在獲取時(shí),維護(hù)了一個(gè)sync隊(duì)列,每個(gè)節(jié)點(diǎn)都是一個(gè)線(xiàn)程在進(jìn)行自旋,而依據(jù)就是自己是否是首節(jié)點(diǎn)的后繼并且能夠獲取資源; 在釋放時(shí),僅僅需要將資源還回去,然后通知一下后繼節(jié)點(diǎn)并將其喚醒。
這里需要注意,隊(duì)列的維護(hù)(首節(jié)點(diǎn)的更換)是依靠消費(fèi)者(獲取時(shí))來(lái)完成的,也就是說(shuō)在滿(mǎn)足了自旋退出的條件時(shí)的一刻,這個(gè)節(jié)點(diǎn)就會(huì)被設(shè)置成為首節(jié)點(diǎn)。
protected boolean tryAcquire(int arg)tryAcquire是自定義同步器需要實(shí)現(xiàn)的方法,也就是自定義同步器非阻塞原子化的獲取狀態(tài),如果鎖該方法一般用于Lock的tryLock實(shí)現(xiàn)中,這個(gè)特性是synchronized無(wú)法提供的。
public final void acquireInterruptibly(int arg)該方法提供獲取狀態(tài)能力,當(dāng)然在無(wú)法獲取狀態(tài)的情況下會(huì)進(jìn)入sync隊(duì)列進(jìn)行排隊(duì),這類(lèi)似acquire,但是和acquire不同的地方在于它能夠在外界對(duì)當(dāng)前線(xiàn)程進(jìn)行中斷的時(shí)候提前結(jié)束獲取狀態(tài)的操作,換句話(huà)說(shuō),就是在類(lèi)似synchronized獲取鎖時(shí),外界能夠?qū)Ξ?dāng)前線(xiàn)程進(jìn)行中斷,并且獲取鎖的這個(gè)操作能夠響應(yīng)中斷并提前返回。一個(gè)線(xiàn)程處于synchronized塊中或者進(jìn)行同步I/O操作時(shí),對(duì)該線(xiàn)程進(jìn)行中斷操作,這時(shí)該線(xiàn)程的中斷標(biāo)識(shí)位被設(shè)置為true,但是線(xiàn)程依舊繼續(xù)運(yùn)行。
如果在獲取一個(gè)通過(guò)網(wǎng)絡(luò)交互實(shí)現(xiàn)的鎖時(shí),這個(gè)鎖資源突然進(jìn)行了銷(xiāo)毀,那么使用acquireInterruptibly的獲取方式就能夠讓該時(shí)刻嘗試獲取鎖的線(xiàn)程提前返回。而同步器的這個(gè)特性被實(shí)現(xiàn)Lock接口中的lockInterruptibly方法。根據(jù)Lock的語(yǔ)義,在被中斷時(shí),lockInterruptibly將會(huì)拋出InterruptedException來(lái)告知使用者。
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } // 檢測(cè)中斷標(biāo)志位 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
上述邏輯主要包括:
檢測(cè)當(dāng)前線(xiàn)程是否被中斷;
判斷當(dāng)前線(xiàn)程的中斷標(biāo)志位,如果已經(jīng)被中斷了,那么直接拋出異常并將中斷標(biāo)志位設(shè)置為false。
嘗試獲取狀態(tài);
調(diào)用tryAcquire獲取狀態(tài),如果順利會(huì)獲取成功并返回。
構(gòu)造節(jié)點(diǎn)并加入sync隊(duì)列;
獲取狀態(tài)失敗后,將當(dāng)前線(xiàn)程引用構(gòu)造為節(jié)點(diǎn)并加入到sync隊(duì)列中。退出隊(duì)列的方式在沒(méi)有中斷的場(chǎng)景下和acquireQueued類(lèi)似,當(dāng)頭結(jié)點(diǎn)是自己的前驅(qū)節(jié)點(diǎn)并且能夠獲取到狀態(tài)時(shí),即可以運(yùn)行,當(dāng)然要將本節(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn),表示正在運(yùn)行。
中斷檢測(cè)。
在每次被喚醒時(shí),進(jìn)行中斷檢測(cè),如果發(fā)現(xiàn)當(dāng)前線(xiàn)程被中斷,那么拋出InterruptedException并退出循環(huán)。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException該方法提供了具備有超時(shí)功能的獲取狀態(tài)的調(diào)用,如果在指定的nanosTimeout內(nèi)沒(méi)有獲取到狀態(tài),那么返回false,反之返回true??梢詫⒃摲椒醋鯽cquireInterruptibly的升級(jí)版,也就是在判斷是否被中斷的基礎(chǔ)上增加了超時(shí)控制。
針對(duì)超時(shí)控制這部分的實(shí)現(xiàn),主要需要計(jì)算出睡眠的delta,也就是間隔值。間隔可以表示為nanosTimeout = 原有nanosTimeout – now(當(dāng)前時(shí)間)+
lastTime(睡眠之前記錄的時(shí)間)。如果nanosTimeout大于0,那么還需要使當(dāng)前線(xiàn)程睡眠,反之則返回false。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { long lastTime = System.nanoTime(); final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } if (nanosTimeout <= 0) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); long now = System.nanoTime(); //計(jì)算時(shí)間,當(dāng)前時(shí)間減去睡眠之前的時(shí)間得到睡眠的時(shí)間,然后被 //原有超時(shí)時(shí)間減去,得到了還應(yīng)該睡眠的時(shí)間 nanosTimeout -= now - lastTime; lastTime = now; if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
上述邏輯主要包括:
加入sync隊(duì)列;
將當(dāng)前線(xiàn)程構(gòu)造成為節(jié)點(diǎn)Node加入到sync隊(duì)列中。
條件滿(mǎn)足直接返回;
退出條件判斷,如果前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)并且成功獲取到狀態(tài),那么設(shè)置自己為頭結(jié)點(diǎn)并退出,返回true,也就是在指定的nanosTimeout之前獲取了鎖。
獲取狀態(tài)失敗休眠一段時(shí)間;
通過(guò)LockSupport.unpark來(lái)指定當(dāng)前線(xiàn)程休眠一段時(shí)間。
計(jì)算再次休眠的時(shí)間;
喚醒后的線(xiàn)程,計(jì)算仍需要休眠的時(shí)間,該時(shí)間表示為nanosTimeout = 原有nanosTimeout – now(當(dāng)前時(shí)間)+ lastTime(睡眠之前記錄的時(shí)間)。其中now
– lastTime表示這次睡眠所持續(xù)的時(shí)間。
休眠時(shí)間的判定。
喚醒后的線(xiàn)程,計(jì)算仍需要休眠的時(shí)間,并無(wú)阻塞的嘗試再獲取狀態(tài),如果失敗后查看其nanosTimeout是否大于0,如果小于0,那么返回完全超時(shí),沒(méi)有獲取到鎖。 如果nanosTimeout小于等于1000L納秒,則進(jìn)入快速的自旋過(guò)程。那么快速自旋會(huì)造成處理器資源緊張嗎?結(jié)果是不會(huì),經(jīng)過(guò)測(cè)算,開(kāi)銷(xiāo)看起來(lái)很小,幾乎微乎其微。Doug Lea應(yīng)該測(cè)算了在線(xiàn)程調(diào)度器上的切換造成的額外開(kāi)銷(xiāo),因此在短時(shí)1000納秒內(nèi)就讓當(dāng)前線(xiàn)程進(jìn)入快速自旋狀態(tài),如果這時(shí)再休眠相反會(huì)讓nanosTimeout的獲取時(shí)間變得更加不精確。
上述過(guò)程可以如下圖所示:
上述這個(gè)圖中可以理解為在類(lèi)似獲取狀態(tài)需要排隊(duì)的基礎(chǔ)上增加了一個(gè)超時(shí)控制的邏輯。每次超時(shí)的時(shí)間就是當(dāng)前超時(shí)剩余的時(shí)間減去睡眠的時(shí)間,而在這個(gè)超時(shí)時(shí)間的基礎(chǔ)上進(jìn)行了判斷,如果大于0那么繼續(xù)睡眠(等待),可以看出這個(gè)超時(shí)版本的獲取狀態(tài)只是一個(gè)近似超時(shí)的獲取狀態(tài),因此任何含有超時(shí)的調(diào)用基本結(jié)果就是近似于給定超時(shí)。
public final void acquireShared(int arg)調(diào)用該方法能夠以共享模式獲取狀態(tài),共享模式和之前的獨(dú)占模式有所區(qū)別。以文件的查看為例,如果一個(gè)程序在對(duì)其進(jìn)行讀取操作,那么這一時(shí)刻,對(duì)這個(gè)文件的寫(xiě)操作就被阻塞,相反,這一時(shí)刻另一個(gè)程序?qū)ζ溥M(jìn)行同樣的讀操作是可以進(jìn)行的。如果一個(gè)程序在對(duì)其進(jìn)行寫(xiě)操作,那么所有的讀與寫(xiě)操作在這一時(shí)刻就被阻塞,直到這個(gè)程序完成寫(xiě)操作。
以讀寫(xiě)場(chǎng)景為例,描述共享和獨(dú)占的訪(fǎng)問(wèn)模式,如下圖所示:
上圖中,紅色代表被阻塞,綠色代表可以通過(guò)。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
上述邏輯主要包括:
嘗試獲取共享狀態(tài);
調(diào)用tryAcquireShared來(lái)獲取共享狀態(tài),該方法是非阻塞的,如果獲取成功則立刻返回,也就表示獲取共享鎖成功。
獲取失敗進(jìn)入sync隊(duì)列;
在獲取共享狀態(tài)失敗后,當(dāng)前時(shí)刻有可能是獨(dú)占鎖被其他線(xiàn)程所把持,那么將當(dāng)前線(xiàn)程構(gòu)造成為節(jié)點(diǎn)(共享模式)加入到sync隊(duì)列中。
循環(huán)內(nèi)判斷退出隊(duì)列條件;
如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)并且獲取共享狀態(tài)成功,這里和獨(dú)占鎖acquire的退出隊(duì)列條件類(lèi)似。
獲取共享狀態(tài)成功;
在退出隊(duì)列的條件上,和獨(dú)占鎖之間的主要區(qū)別在于獲取共享狀態(tài)成功之后的行為,而如果共享狀態(tài)獲取成功之后會(huì)判斷后繼節(jié)點(diǎn)是否是共享模式,如果是共享模式,那么就直接對(duì)其進(jìn)行喚醒操作,也就是同時(shí)激發(fā)多個(gè)線(xiàn)程并發(fā)的運(yùn)行。
獲取共享狀態(tài)失敗。
通過(guò)使用LockSupport將當(dāng)前線(xiàn)程從線(xiàn)程調(diào)度器上摘下,進(jìn)入休眠狀態(tài)。
對(duì)于上述邏輯中,節(jié)點(diǎn)之間的通知過(guò)程如下圖所示:
上圖中,綠色表示共享節(jié)點(diǎn),它們之間的通知和喚醒操作是在前驅(qū)節(jié)點(diǎn)獲取狀態(tài)時(shí)就進(jìn)行的,紅色表示獨(dú)占節(jié)點(diǎn),它的被喚醒必須取決于前驅(qū)節(jié)點(diǎn)的釋放,也就是release操作,可以看出來(lái)圖中的獨(dú)占節(jié)點(diǎn)如果要運(yùn)行,必須等待前面的共享節(jié)點(diǎn)均釋放了狀態(tài)才可以。而獨(dú)占節(jié)點(diǎn)如果獲取了狀態(tài),那么后續(xù)的獨(dú)占式獲取和共享式獲取均被阻塞。
public final boolean releaseShared(int arg)調(diào)用該方法釋放共享狀態(tài),每次獲取共享狀態(tài)acquireShared都會(huì)操作狀態(tài),同樣在共享鎖釋放的時(shí)候,也需要將狀態(tài)釋放。比如說(shuō),一個(gè)限定一定數(shù)量訪(fǎng)問(wèn)的同步工具,每次獲取都是共享的,但是如果超過(guò)了一定的數(shù)量,將會(huì)阻塞后續(xù)的獲取操作,只有當(dāng)之前獲取的消費(fèi)者將狀態(tài)釋放才可以使阻塞的獲取操作得以運(yùn)行。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
上述邏輯主要就是調(diào)用同步器的tryReleaseShared方法來(lái)釋放狀態(tài),并同時(shí)在doReleaseShared方法中喚醒其后繼節(jié)點(diǎn)。
一個(gè)例子
在上述對(duì)同步器AbstractQueuedSynchronizer進(jìn)行了實(shí)現(xiàn)層面的分析之后,我們通過(guò)一個(gè)例子來(lái)加深對(duì)同步器的理解:
設(shè)計(jì)一個(gè)同步工具,該工具在同一時(shí)刻,只能有兩個(gè)線(xiàn)程能夠并行訪(fǎng)問(wèn),超過(guò)限制的其他線(xiàn)程進(jìn)入阻塞狀態(tài)。
對(duì)于這個(gè)需求,可以利用同步器完成一個(gè)這樣的設(shè)定,定義一個(gè)初始狀態(tài),為2,一個(gè)線(xiàn)程進(jìn)行獲取那么減1,一個(gè)線(xiàn)程釋放那么加1,狀態(tài)正確的范圍在[0,1,2]三個(gè)之間,當(dāng)在0時(shí),代表再有新的線(xiàn)程對(duì)資源進(jìn)行獲取時(shí)只能進(jìn)入阻塞狀態(tài)(注意在任何時(shí)候進(jìn)行狀態(tài)變更的時(shí)候均需要以CAS作為原子性保障)。
public class TwinsLock implements Lock { private static final Sync sync = new Sync(); private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -7889272986162341211L; { setState(2); } protected boolean tryAcquire(int arg) { if (arg != 1) { return false; } int currentStats = getState(); if (currentStats <= 0) { return false; } if (compareAndSetState(currentStats, currentStats - 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int arg) { if (arg != 1) { return false; } for(;;) { int currentStats = getState(); if (compareAndSetState(currentStats, currentStats + 1)) { setExclusiveOwnerThread(null); return true; } } } protected boolean isHeldExclusively() { return getState() < 2; } } public void lock() { sync.acquire(1); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock() { return sync.tryAcquire(1); } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } public void unlock() { sync.release(1); } }
這里我們編寫(xiě)一個(gè)測(cè)試來(lái)驗(yàn)證TwinsLock是否能夠正常工作并達(dá)到預(yù)期。
public class TwinsLockTest { @Test public void test() { final Lock lock = new TwinsLock(); class Worker extends Thread { public void run() { while (true) { lock.lock(); try { Thread.sleep(1000L); System.out.println(Thread.currentThread()); Thread.sleep(1000L); } catch (Exception ex) { } finally { lock.unlock(); } } } } for (int i = 0; i < 10; i++) { Worker w = new Worker(); w.start(); } new Thread() { public void run() { while (true) { try { Thread.sleep(200L); System.out.println(); } catch (Exception ex) { } } } }.start(); try { Thread.sleep(20000L); } catch (InterruptedException e) { e.printStackTrace(); } } }
上述測(cè)試用例的邏輯主要包括:
打印線(xiàn)程
Worker在兩次睡眠之間打印自身線(xiàn)程,如果一個(gè)時(shí)刻只能有兩個(gè)線(xiàn)程同時(shí)訪(fǎng)問(wèn),那么打印出來(lái)的內(nèi)容將是成對(duì)出現(xiàn)。
分隔線(xiàn)程
不停的打印換行,能讓W(xué)orker的輸出看起來(lái)更加直觀(guān)。
該測(cè)試的結(jié)果是在一個(gè)時(shí)刻,僅有兩個(gè)線(xiàn)程能夠獲得到鎖,并完成打印,而表象就是打印的內(nèi)容成對(duì)出現(xiàn)。
by 魏鵬 via ifeve
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/64024.html
摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線(xiàn)程安全并且高效的,在并發(fā)編程中經(jīng)??梢?jiàn)它的使用,在開(kāi)始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話(huà),看看它是如何被引入的。電商秒殺和搶購(gòu),是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽(tīng)名字多牛逼,來(lái),我們一步一步來(lái)?yè)羝魄皟蓚€(gè)名詞,今天我們首先來(lái)說(shuō)說(shuō)分布式。 探究...
摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線(xiàn)程安全并且高效的,在并發(fā)編程中經(jīng)??梢?jiàn)它的使用,在開(kāi)始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話(huà),看看它是如何被引入的。電商秒殺和搶購(gòu),是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽(tīng)名字多牛逼,來(lái),我們一步一步來(lái)?yè)羝魄皟蓚€(gè)名詞,今天我們首先來(lái)說(shuō)說(shuō)分布式。 探究...
摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線(xiàn)程安全并且高效的,在并發(fā)編程中經(jīng)??梢?jiàn)它的使用,在開(kāi)始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話(huà),看看它是如何被引入的。電商秒殺和搶購(gòu),是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽(tīng)名字多牛逼,來(lái),我們一步一步來(lái)?yè)羝魄皟蓚€(gè)名詞,今天我們首先來(lái)說(shuō)說(shuō)分布式。 探究...
摘要:的主要功能和關(guān)鍵字一致,均是用于多線(xiàn)程的同步。而僅支持通過(guò)查詢(xún)當(dāng)前線(xiàn)程是否持有鎖。由于和使用的是同一把可重入鎖,所以線(xiàn)程可以進(jìn)入方法,并再次獲得鎖,而不會(huì)被阻塞住。公平與非公平公平與非公平指的是線(xiàn)程獲取鎖的方式。 1.簡(jiǎn)介 可重入鎖ReentrantLock自 JDK 1.5 被引入,功能上與synchronized關(guān)鍵字類(lèi)似。所謂的可重入是指,線(xiàn)程可對(duì)同一把鎖進(jìn)行重復(fù)加鎖,而不會(huì)被阻...
摘要:簡(jiǎn)介抽象隊(duì)列同步器,以下簡(jiǎn)稱(chēng)出現(xiàn)在中,由大師所創(chuàng)作。獲取成功則返回,獲取失敗,線(xiàn)程進(jìn)入同步隊(duì)列等待。響應(yīng)中斷版的超時(shí)響應(yīng)中斷版的共享式獲取同步狀態(tài),同一時(shí)刻可能會(huì)有多個(gè)線(xiàn)程獲得同步狀態(tài)。 1.簡(jiǎn)介 AbstractQueuedSynchronizer (抽象隊(duì)列同步器,以下簡(jiǎn)稱(chēng) AQS)出現(xiàn)在 JDK 1.5 中,由大師 Doug Lea 所創(chuàng)作。AQS 是很多同步器的基礎(chǔ)框架,比如 ...
閱讀 1464·2023-04-25 17:18
閱讀 1894·2021-10-27 14:18
閱讀 2134·2021-09-09 09:33
閱讀 1852·2019-08-30 15:55
閱讀 2025·2019-08-30 15:53
閱讀 3449·2019-08-29 16:17
閱讀 3436·2019-08-26 13:57
閱讀 1739·2019-08-26 13:46