摘要:如果此時(shí),鎖被釋放,需要通知等待線程再次嘗試獲取鎖,公平鎖會(huì)讓最先進(jìn)入隊(duì)列的線程獲得鎖。等待隊(duì)列節(jié)點(diǎn)的操作由于進(jìn)入阻塞狀態(tài)的操作會(huì)降低執(zhí)行效率,所以,會(huì)盡力避免試圖獲取獨(dú)占性變量的線程進(jìn)入阻塞狀態(tài)。
?今天我們來研究學(xué)習(xí)一下AbstractQueuedSynchronizer類的相關(guān)原理,java.util.concurrent包中很多類都依賴于這個(gè)類所提供隊(duì)列式同步器,比如說常用的ReentranLock,Semaphore和CountDownLatch等。
?為了方便理解,我們以一段使用ReentranLock的代碼為例,講解ReentranLock每個(gè)方法中有關(guān)AQS的使用。
?我們都知道ReentranLock的加鎖行為和Synchronized類似,都是可重入的鎖,但是二者的實(shí)現(xiàn)方式確實(shí)完全不同的,我們之后也會(huì)講解Synchronized的原理。除此之外,Synchronized的阻塞無法被中斷,而ReentrantLock則提供了可中斷的阻塞。下面的代碼是ReentranLock的函數(shù),我們就以此為順序,依次講解這些函數(shù)背后的實(shí)現(xiàn)原理。
ReentrantLock lock = new ReentrantLock(); lock.lock(); lock.unlock();公平鎖和非公平鎖
?ReentranLock分為公平鎖和非公平鎖,二者的區(qū)別就在獲取鎖機(jī)會(huì)是否和排隊(duì)順序相關(guān)。我們都知道,如果鎖被另一個(gè)線程持有,那么申請(qǐng)鎖的其他線程會(huì)被掛起等待,加入等待隊(duì)列。理論上,先調(diào)用lock函數(shù)被掛起等待的線程應(yīng)該排在等待隊(duì)列的前端,后調(diào)用的就排在后邊。如果此時(shí),鎖被釋放,需要通知等待線程再次嘗試獲取鎖,公平鎖會(huì)讓最先進(jìn)入隊(duì)列的線程獲得鎖。而非公平鎖則會(huì)喚醒所有線程,讓它們?cè)俅螄L試獲取鎖,所以可能會(huì)導(dǎo)致后來的線程先獲得了鎖,則就是非公平。
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
?我們會(huì)發(fā)現(xiàn)FairSync和NonfairSync都繼承了Sync類,而Sync的父類就是AbstractQueuedSynchronizer(后續(xù)簡稱AQS)。但是AQS的構(gòu)造函數(shù)是空的,并沒有任何操作。
?之后的源碼分析,如果沒有特別說明,就是指公平鎖。
?ReentranLock的lock函數(shù)如下所示,直接調(diào)用了sync的lock函數(shù)。也就是調(diào)用了FairSync的lock函數(shù)。
//ReentranLock public void lock() { sync.lock(); } //FairSync final void lock() { //調(diào)用了AQS的acquire函數(shù),這是關(guān)鍵函數(shù)之一 acquire(1); }
?我們接下來就正式開始AQS相關(guān)的源碼分析了,acquire函數(shù)的作用是獲取同一時(shí)間段內(nèi)只能被一個(gè)線程獲取的量,這個(gè)量就是抽象化的鎖概念。我們先分析代碼,你慢慢就會(huì)明白其中的含義。
public final void acquire(int arg) { // tryAcquire先嘗試獲取"鎖",獲取了就不進(jìn)入后續(xù)流程 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //addWaiter是給當(dāng)前線程創(chuàng)建一個(gè)節(jié)點(diǎn),并將其加入等待隊(duì)列 //acquireQueued是當(dāng)線程已經(jīng)加入等待隊(duì)列之后繼續(xù)嘗試獲取鎖. selfInterrupt(); }
?tryAcquire,addWaiter和acquireQueued都是十分重要的函數(shù),我們接下來依次學(xué)習(xí)一下這些函數(shù),理解它們的作用。
//AQS類中的變量. private volatile int state; //這是FairSync的實(shí)現(xiàn),AQS中未實(shí)現(xiàn),子類按照自己的需要實(shí)現(xiàn)該函數(shù) protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); //獲取AQS中的state變量,代表抽象概念的鎖. int c = getState(); if (c == 0) { //值為0,那么當(dāng)前獨(dú)占性變量還未被線程占有 //如果當(dāng)前阻塞隊(duì)列上沒有先來的線程在等待,UnfairSync這里的實(shí)現(xiàn)就不一致 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //成功cas,那么代表當(dāng)前線程獲得該變量的所有權(quán),也就是說成功獲得鎖 setExclusiveOwnerThread(current); // setExclusiveOwnerThread將本線程設(shè)置為獨(dú)占性變量所有者線程 return true; } } else if (current == getExclusiveOwnerThread()) { //如果該線程已經(jīng)獲取了獨(dú)占性變量的所有權(quán),那么根據(jù)重入性 //原理,將state值進(jìn)行加1,表示多次lock //由于已經(jīng)獲得鎖,該段代碼只會(huì)被一個(gè)線程同時(shí)執(zhí)行,所以不需要 //進(jìn)行任何并行處理 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //上述情況都不符合,說明獲取鎖失敗 return false; }
?由上述代碼我們可以發(fā)現(xiàn),tryAcquire就是嘗試獲取那個(gè)線程獨(dú)占的變量state。state的值表示其狀態(tài):如果是0,那么當(dāng)前還沒有線程獨(dú)占此變量;否在就是已經(jīng)有線程獨(dú)占了這個(gè)變量,也就是代表已經(jīng)有線程獲得了鎖。但是這個(gè)時(shí)候要再進(jìn)行一次判斷,看是否是當(dāng)前線程自己獲得的這個(gè)鎖,如果是,就增加state的值。
?這里有幾點(diǎn)需要說明一下,首先是compareAndSetState函數(shù),這是使用CAS操作來設(shè)置state的值,而且state值設(shè)置了volatile修飾符,通過這兩點(diǎn)來確保修改state的值不會(huì)出現(xiàn)多線程問題。然后是公平鎖和非公平鎖的區(qū)別問題,在UnfairSync的nonfairTryAcquire函數(shù)中不會(huì)在相同的位置上調(diào)用hasQueuedPredecessors來判斷當(dāng)前是否已經(jīng)有線程在排隊(duì)等待獲得鎖。
?如果tryAcquire返回true,那么就是獲取鎖成功;如果返回false,那么就是未獲得鎖,需要加入阻塞等待隊(duì)列。我們下面就來看一下addWaiter的相關(guān)操作。
等待鎖的阻塞隊(duì)列?將保存當(dāng)前線程信息的節(jié)點(diǎn)加入到等待隊(duì)列的相關(guān)函數(shù)中涉及到了無鎖隊(duì)列的相關(guān)算法,由于在AQS中只是將節(jié)點(diǎn)添加到隊(duì)尾,使用到的無鎖算法也相對(duì)簡單。真正的無鎖隊(duì)列的算法我們等到分析ConcurrentSkippedListMap時(shí)在進(jìn)行講解。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); //先使用快速入列法來嘗試一下,如果失敗,則進(jìn)行更加完備的入列算法. //只有在必要的情況下才會(huì)使用更加復(fù)雜耗時(shí)的算法,也就是樂觀的態(tài)度 Node pred = tail; //列尾指針 if (pred != null) { node.prev = pred; //步驟1:該節(jié)點(diǎn)的前趨指針指向tail if (compareAndSetTail(pred, node)){ //步驟二:cas將尾指針指向該節(jié)點(diǎn) pred.next = node;//步驟三:如果成果,讓舊列尾節(jié)點(diǎn)的next指針指向該節(jié)點(diǎn). return node; } } //cas失敗,或在pred == null時(shí)調(diào)用enq enq(node); return node; } private Node enq(final Node node) { for (;;) { //cas無鎖算法的標(biāo)準(zhǔn)for循環(huán),不停的嘗試 Node t = tail; if (t == null) { //初始化 if (compareAndSetHead(new Node())) //需要注意的是head是一個(gè)哨兵的作用,并不代表某個(gè)要獲取鎖的線程節(jié)點(diǎn) tail = head; } else { //和addWaiter中一致,不過有了外側(cè)的無限循環(huán),不停的嘗試,自旋鎖 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
?通過調(diào)用addWaiter函數(shù),AQS將當(dāng)前線程加入到了等待隊(duì)列,但是還沒有阻塞當(dāng)前線程的執(zhí)行,接下來我們就來分析一下acquireQueued函數(shù)。
等待隊(duì)列節(jié)點(diǎn)的操作?由于進(jìn)入阻塞狀態(tài)的操作會(huì)降低執(zhí)行效率,所以,AQS會(huì)盡力避免試圖獲取獨(dú)占性變量的線程進(jìn)入阻塞狀態(tài)。所以,當(dāng)線程加入等待隊(duì)列之后,acquireQueued會(huì)執(zhí)行一個(gè)for循環(huán),每次都判斷當(dāng)前節(jié)點(diǎn)是否應(yīng)該獲得這個(gè)變量(在隊(duì)首了)。如果不應(yīng)該獲取或在再次嘗試獲取失敗,那么就調(diào)用shouldParkAfterFailedAcquire判斷是否應(yīng)該進(jìn)入阻塞狀態(tài)。如果當(dāng)前節(jié)點(diǎn)之前的節(jié)點(diǎn)已經(jīng)進(jìn)入阻塞狀態(tài)了,那么就可以判定當(dāng)前節(jié)點(diǎn)不可能獲取到鎖,為了防止CPU不停的執(zhí)行for循環(huán),消耗CPU資源,調(diào)用parkAndCheckInterrupt函數(shù)來進(jìn)入阻塞狀態(tài)。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //一直執(zhí)行,直到獲取鎖,返回. final Node p = node.predecessor(); //node的前驅(qū)是head,就說明,node是將要獲取鎖的下一個(gè)節(jié)點(diǎn). if (p == head && tryAcquire(arg)) { //所以再次嘗試獲取獨(dú)占性變量 setHead(node); //如果成果,那么就將自己設(shè)置為head p.next = null; // help GC failed = false; return interrupted; //此時(shí),還沒有進(jìn)入阻塞狀態(tài),所以直接返回false,表示不需要中斷調(diào)用selfInterrupt函數(shù) } //判斷是否要進(jìn)入阻塞狀態(tài).如果`shouldParkAfterFailedAcquire` //返回true,表示需要進(jìn)入阻塞 //調(diào)用parkAndCheckInterrupt;否則表示還可以再次嘗試獲取鎖,繼續(xù)進(jìn)行for循環(huán) if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //調(diào)用parkAndCheckInterrupt進(jìn)行阻塞,然后返回是否為中斷狀態(tài) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //前一個(gè)節(jié)點(diǎn)在等待獨(dú)占性變量釋放的通知,所以,當(dāng)前節(jié)點(diǎn)可以阻塞 return true; if (ws > 0) { //前一個(gè)節(jié)點(diǎn)處于取消獲取獨(dú)占性變量的狀態(tài),所以,可以跳過去 //返回false do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //將上一個(gè)節(jié)點(diǎn)的狀態(tài)設(shè)置為signal,返回false, compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); //將AQS對(duì)象自己傳入 return Thread.interrupted(); }阻塞和中斷
?由上述分析,我們知道了AQS通過調(diào)用LockSupport的park方法來執(zhí)行阻塞當(dāng)前進(jìn)程的操作。其實(shí),這里的阻塞就是線程不再執(zhí)行的含義,通過調(diào)用這個(gè)函數(shù),線程進(jìn)入阻塞狀態(tài),上述的lock操作也就阻塞了,等待中斷或在獨(dú)占性變量被釋放。
public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker);//設(shè)置阻塞對(duì)象,用來記錄線程被誰阻塞的,用于線程監(jiān)控和分析工具來定位 UNSAFE.park(false, 0L);//讓當(dāng)前線程不再被線程調(diào)度,就是當(dāng)前線程不再執(zhí)行. setBlocker(t, null); }
?關(guān)于中斷的相關(guān)知識(shí),我們以后再說,就繼續(xù)沿著AQS的主線,看一下釋放獨(dú)占性變量的相關(guān)操作吧。
unlock操作?與lock操作類似,unlock操作調(diào)用了AQS的relase方法,參數(shù)和調(diào)用acquire時(shí)一樣,都是1。
public final boolean release(int arg) { if (tryRelease(arg)) { //釋放獨(dú)占性變量,起始就是將status的值減1,因?yàn)閍cquire時(shí)是加1 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h);//喚醒head的后繼節(jié)點(diǎn) return true; } return false; }
?由上述代碼可知,release就是先調(diào)用tryRelease來釋放獨(dú)占性變量。如果成功,那么就看一下是否有等待鎖的阻塞線程,如果有,就調(diào)用unparkSuccessor來喚醒他們。
protected final boolean tryRelease(int releases) { //由于只有一個(gè)線程可以獲得獨(dú)占先變量,所以,所有操作不需要考慮多線程 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { //如果等于0,那么說明鎖應(yīng)該被釋放了,否則表示當(dāng)前線程有多次lock操作. free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
?我們可以看到tryRelease中的邏輯也體現(xiàn)了可重入鎖的概念,只有等到state的值為0時(shí),才代表鎖真正被釋放了。所以獨(dú)占性變量state的值就代表鎖的有無。當(dāng)state=0時(shí),表示鎖未被占有,否在表示當(dāng)前鎖已經(jīng)被占有。
private void unparkSuccessor(Node node) { ..... //一般來說,需要喚醒的線程就是head的下一個(gè)節(jié)點(diǎn),但是如果它獲取鎖的操作被取消,或在節(jié)點(diǎn)為null時(shí) //就直接繼續(xù)往后遍歷,找到第一個(gè)未取消的后繼節(jié)點(diǎn). Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
?調(diào)用了unpark方法后,進(jìn)行lock操作被阻塞的線程就恢復(fù)到運(yùn)行狀態(tài),就會(huì)再次執(zhí)行acquireQueued中的無限for循環(huán)中的操作,再次嘗試獲取鎖。
后記?有關(guān)AQS和ReentrantLock的分析就差不多結(jié)束了。不得不說,我第一次看到AQS的實(shí)現(xiàn)時(shí)真是震驚,以前都認(rèn)為Synchronized和ReentrantLock的實(shí)現(xiàn)原理是一致的,都是依靠java虛擬機(jī)的功能實(shí)現(xiàn)的。沒有想到還有AQS這樣一個(gè)背后大Boss在提供幫助啊。學(xué)習(xí)了這個(gè)類的原理,我們對(duì)JUC的很多類的分析就簡單了很多。此外,AQS涉及的CAS操作和無鎖隊(duì)列的算法也為我們學(xué)習(xí)其他無鎖算法提供了基礎(chǔ)。知識(shí)的海洋是無限的??!
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/72983.html
摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢娝氖褂茫陂_始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個(gè)名詞,今天我們首先來說說分布式。 探究...
摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢娝氖褂?,在開始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個(gè)名詞,今天我們首先來說說分布式。 探究...
摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)常可見它的使用,在開始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個(gè)名詞,今天我們首先來說說分布式。 探究...
摘要:在中一般來說通過來創(chuàng)建所需要的線程池,如高并發(fā)原理初探后端掘金閱前熱身為了更加形象的說明同步異步阻塞非阻塞,我們以小明去買奶茶為例。 AbstractQueuedSynchronizer 超詳細(xì)原理解析 - 后端 - 掘金今天我們來研究學(xué)習(xí)一下AbstractQueuedSynchronizer類的相關(guān)原理,java.util.concurrent包中很多類都依賴于這個(gè)類所提供的隊(duì)列式...
摘要:在中一般來說通過來創(chuàng)建所需要的線程池,如高并發(fā)原理初探后端掘金閱前熱身為了更加形象的說明同步異步阻塞非阻塞,我們以小明去買奶茶為例。 AbstractQueuedSynchronizer 超詳細(xì)原理解析 - 后端 - 掘金今天我們來研究學(xué)習(xí)一下AbstractQueuedSynchronizer類的相關(guān)原理,java.util.concurrent包中很多類都依賴于這個(gè)類所提供的隊(duì)列式...
閱讀 2254·2021-11-23 09:51
閱讀 1085·2021-11-22 15:35
閱讀 4879·2021-11-22 09:34
閱讀 1622·2021-10-08 10:13
閱讀 3028·2021-07-22 17:35
閱讀 2552·2019-08-30 15:56
閱讀 3090·2019-08-29 18:44
閱讀 3105·2019-08-29 15:32