摘要:總結(jié)在一開是的例子中,假設(shè)有兩個線程,分別代表生產(chǎn)者和消費者線程,生產(chǎn)消費元素的隊列容量為。
什么是Condition
Condition必須要和獨占鎖一起使用,獨占鎖代替了原來的synchronized,Condition代替了原來的Object中的監(jiān)視器方法(wait, notify and notifyAll);一個Lock可以對應多個Condition,這樣線程之間可以按照條件喚醒指定的線程,而不是簡單的notifyAll多有的線程,使得我們多線程編程的時候可以靈活的控制線程。
獨占鎖和Condition最經(jīng)典的配合使用就是ArrayBlockingQueue.java,典型的生產(chǎn)者消費者問題:
/* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
這是在許多教科書中能找到的經(jīng)典的雙Condition算法的并發(fā)控制,需要有一個獨占鎖ReentrantLock,然后再定義兩個Condition,notEmpty(隊列不是空的)表示可以從隊列中消費元素的信號條件,notFull(隊列不是滿的)表示可以向隊列生產(chǎn)元素的信號條件。這兩個Condition都是調(diào)用了lock.newCondition()方法實例化的。
當消費者線程調(diào)用消費方法take時:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //當隊列的元素數(shù)量為0時,調(diào)用notEmpty.await,阻塞當前的消費線程 while (count == 0) notEmpty.await(); //dequeue中調(diào)用了notFull.signal(),通知生產(chǎn)者隊列還沒滿,可以生產(chǎn) return dequeue(); } finally { lock.unlock(); } }
當生產(chǎn)者線程調(diào)用生產(chǎn)方法put時:
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //當隊列滿時,調(diào)用notFull.await(),阻塞當前生產(chǎn)線程,停止生產(chǎn) while (count == items.length) notFull.await(); //enqueue中調(diào)用了notEmpty.signal(),通知消費者隊列里有元素,可以消費 enqueue(e); } finally { lock.unlock(); } }Condition的await
在AQS中有一個ConditionObject內(nèi)部類實現(xiàn)了Condition接口,其中有兩個成員變量:
/** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter;
Condition也有一個node隊列,firstWaiter、lastWaiter分別表示第一個和最后一個node。
先看await方法:
public final void await() throws InterruptedException { //如果線程設(shè)置中斷標志,拋出中斷異常 if (Thread.interrupted()) throw new InterruptedException(); //往隊列添加node Node node = addConditionWaiter(); //完全釋放鎖,head的后繼節(jié)點將被喚醒,然后被移出sync隊列 int savedState = fullyRelease(node); int interruptMode = 0; //判斷當前節(jié)點是否在sync隊列中(當condition調(diào)用signal是會將該節(jié)點放入Sync隊列),如果不在就park當前線程,線程在這里開始等待被signal while (!isOnSyncQueue(node)) { LockSupport.park(this); //發(fā)送中斷時(喚醒了線程)break;checkInterruptWhileWaiting中調(diào)用了transferAfterCancelledWait(貼在下面),這個方法時檢測中斷是發(fā)生在signal之前還是之后 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //當前線程被signal后,調(diào)用acquireQueued搶占鎖,如果interruptMode不為拋出異常,設(shè)置為REINTERRUPT if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled //從頭到尾移除取消的節(jié)點 unlinkCancelledWaiters(); if (interruptMode != 0) //繼續(xù)中斷還是拋出異常 reportInterruptAfterWait(interruptMode); } final boolean transferAfterCancelledWait(Node node) { //首先CAS設(shè)置node狀態(tài)為0,如果成功說明中斷發(fā)生在signal之前(因為signal會將node狀態(tài)設(shè)置為0) if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { //將node入sync隊列 enq(node); return true; } /* * If we lost out to a signal(), then we can"t proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. */ //如果node不在sync隊列中,yield,讓出cpu while (!isOnSyncQueue(node)) Thread.yield(); //中斷發(fā)生在signal后 return false; }
分析一下addConditionWaiter:
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. //如果最后一個node被取消,清除node if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //新建一個node,持有當前線程,狀態(tài)為CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) //如果尾節(jié)點為null,說明condition隊列還是空的,將新建的node作為頭節(jié)點 firstWaiter = node; else //如果condition隊列已經(jīng)存在,將新建的node作為尾節(jié)點的next t.nextWaiter = node; //將新建node設(shè)置為尾節(jié)點 lastWaiter = node; //返回新建的node return node; }
在這里我們可以看到Condition的隊列是一個單鏈表。
看一下unlinkCancelledWaiters,Condition所有操作都是在獲取鎖之后執(zhí)行的,所以不用考慮線程安全問題:
private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
該方法從隊列頭開始往后遍歷所有node,移除已經(jīng)取消的node;
在新建了node后,調(diào)用了fullyRelease:
final int fullyRelease(Node node) { boolean failed = true; try { //保存當前的state int savedState = getState(); //release(savedState)嘗試釋放鎖,這也是為什么叫fullyRelease if (release(savedState)) { failed = false; //返回之前保存的state return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) //如果失敗,將當前node設(shè)置為取消狀態(tài) node.waitStatus = Node.CANCELLED; } }
看一下release:
public final boolean release(int arg) { //嘗試釋放鎖,這里調(diào)用的是ReentrantLock實現(xiàn)的tryRelease,傳入的arg是當前的state,所以會釋放成功,即state為0 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //喚醒后繼節(jié)點 unparkSuccessor(h); return true; } return false; }
下面的方法是判斷當前節(jié)點是否在Sync隊列中
final boolean isOnSyncQueue(Node node) { //如果當前節(jié)點狀態(tài)為CONDITION或者節(jié)點前驅(qū)為null,說明該節(jié)點已經(jīng)在CONDITION隊列中,不在Syc隊列里 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; //如果節(jié)點后繼不是null,那該節(jié)點一定在Syc隊列中 if (node.next != null) // If has successor, it must be on queue return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ //此時節(jié)點入列的CAS動作可能失敗,所以要從尾部往前查找該節(jié)點再次確認 return findNodeFromTail(node); }Condition的signal
public final void signal() { //如果當前線程不是當前的獨占線程,拋出異常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) //signal Condition隊列的第一個節(jié)點 doSignal(first); } private void doSignal(Node first) { //如果transferForSignal失?。串斍肮?jié)點取消)且下一個節(jié)點存在,while繼續(xù)loop do { //設(shè)置第一個節(jié)點的next為firstWaiter,此時如果firstWaiter為null,說明隊列空了,將lastWaiter也設(shè)置為null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; //設(shè)置第一個節(jié)點next為null,help GC first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ //如果為node設(shè)置狀態(tài)失敗,說明node被取消,返回false if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ //將當前node入列sync隊列,返回node的前繼 Node p = enq(node); int ws = p.waitStatus; //如果前繼的狀態(tài)為取消或者設(shè)置前繼狀態(tài)為SIGNAL失敗,當前node線程unpark if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
signal后,Condition第一個節(jié)點將入列sync的隊列,等待搶占到鎖繼續(xù)執(zhí)行。
總結(jié)在一開是的例子中,假設(shè)有兩個線程P,C分別代表生產(chǎn)者和消費者線程,生產(chǎn)消費元素E的隊列Q容量為1。
C無限loop調(diào)用take,當C搶占到獨占鎖,發(fā)現(xiàn)Q時空的,調(diào)用notEmpty.await(),線程C釋放鎖并且入列notEmpty隊列park,等待別的線程調(diào)用notEmpty.signal();
P無限loop調(diào)用put,當P搶占到獨占鎖生產(chǎn)了一個E,調(diào)用notEmpty.signal()通知C,然后釋放了鎖;
C收到signal信號,入列SYC隊列,并且unpark,嘗試搶占獨占鎖,成功獲得獨占鎖后,消費了一個E,然后調(diào)用notFull.signal();
P生產(chǎn)E時發(fā)現(xiàn)Q已滿(C還沒來得及消費),調(diào)用notFull.await()線程P釋放鎖并且入列notFull隊列park,等待notFull.signal()通知自己unpark并入列AQS隊列去搶占獨占鎖進行生產(chǎn);
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/70922.html
摘要:總結(jié)在一開是的例子中,假設(shè)有兩個線程,分別代表生產(chǎn)者和消費者線程,生產(chǎn)消費元素的隊列容量為。 什么是Condition Condition必須要和獨占鎖一起使用,獨占鎖代替了原來的synchronized,Condition代替了原來的Object中的監(jiān)視器方法(wait, notify and notifyAll);一個Lock可以對應多個Condition,這樣線程之間可以按照條件...
摘要:實現(xiàn)原理是通過基于單鏈表的條件隊列來管理等待線程的。中斷在轉(zhuǎn)移到同步隊列期間或之后發(fā)生,此時表明有線程正在調(diào)用轉(zhuǎn)移節(jié)點。在該種中斷模式下,再次設(shè)置線程的中斷狀態(tài)。 1. 簡介 Condition是一個接口,AbstractQueuedSynchronizer 中的ConditionObject內(nèi)部類實現(xiàn)了這個接口。Condition聲明了一組等待/通知的方法,這些方法的功能與Objec...
摘要:實現(xiàn)原理是通過基于單鏈表的條件隊列來管理等待線程的。中斷在轉(zhuǎn)移到同步隊列期間或之后發(fā)生,此時表明有線程正在調(diào)用轉(zhuǎn)移節(jié)點。在該種中斷模式下,再次設(shè)置線程的中斷狀態(tài)。 1. 簡介 Condition是一個接口,AbstractQueuedSynchronizer 中的ConditionObject內(nèi)部類實現(xiàn)了這個接口。Condition聲明了一組等待/通知的方法,這些方法的功能與Objec...
摘要:實現(xiàn)原理是通過基于單鏈表的條件隊列來管理等待線程的。中斷在轉(zhuǎn)移到同步隊列期間或之后發(fā)生,此時表明有線程正在調(diào)用轉(zhuǎn)移節(jié)點。在該種中斷模式下,再次設(shè)置線程的中斷狀態(tài)。 1. 簡介 Condition是一個接口,AbstractQueuedSynchronizer 中的ConditionObject內(nèi)部類實現(xiàn)了這個接口。Condition聲明了一組等待/通知的方法,這些方法的功能與Objec...
摘要:同步器擁有三個成員變量隊列的頭結(jié)點隊列的尾節(jié)點和狀態(tài)。對于同步器維護的狀態(tài),多個線程對其的獲取將會產(chǎn)生一個鏈式的結(jié)構(gòu)。使用將當前線程,關(guān)于后續(xù)會詳細介紹。 簡介提供了一個基于FIFO隊列,可以用于構(gòu)建鎖或者其他相關(guān)同步裝置的基礎(chǔ)框架。該同步器(以下簡稱同步器)利用了一個int來表示狀態(tài),期望它能夠成為實現(xiàn)大部分同步需求的基礎(chǔ)。使用的方法是繼承,子類通過繼承同步器并需要實現(xiàn)它的方法來管理...
閱讀 3478·2021-11-25 09:43
閱讀 2627·2021-09-22 15:54
閱讀 604·2019-08-30 15:55
閱讀 984·2019-08-30 15:55
閱讀 2008·2019-08-30 15:55
閱讀 1752·2019-08-30 15:53
閱讀 3477·2019-08-30 15:52
閱讀 2048·2019-08-30 12:55