摘要:今天在群上拋出來一個問題,如下我以自帶的數(shù)據(jù)結(jié)構(gòu)為例,用源碼的形式說明,如何阻塞線程通知線程的。一以可重入鎖和兩個對象來控制并發(fā)。四使用來控制并發(fā),同時也使用的對象來與線程交互。
今天在QQ群上拋出來一個問題,如下
我以Java自帶的數(shù)據(jù)結(jié)構(gòu)為例,用源碼的形式說明,如何阻塞線程、通知線程的。
一、Lock & Condition
ArrayBlockingQueue以可重入鎖和兩個Condition對象來控制并發(fā)。
/* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ private final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
構(gòu)造函數(shù)中初始化了notEmpty和notFull.
/** * Creates an ArrayBlockingQueue with the given (fixed) * capacity and the specified access policy. * @param capacity the capacity of this queue * @param fair if true then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if false the access order is unspecified. * @throws IllegalArgumentException if capacity is less than 1 */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
二、線程阻塞
當(dāng)ArrayBlockingQueue存儲的元素是0個的時候,take()方法會阻塞.
public Object take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } Object x = extract(); return x; } finally { lock.unlock(); } }
這里take方法首先獲得可重入鎖lock,然后判斷如果元素為空就執(zhí)行notEmpty.await(); 這個時候線程掛起。
三、通知線程
比如使用put放入一個新元素,
/** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
在enqueue方法中,
/** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
對剛才的notEmptyCondition進(jìn)行通知。
四、ReentrantLock vs AbstractQueuedSynchronizer
ArrayBlockingQueue使用ReentrantLock來控制并發(fā),同時也使用ArrayBlockingQueue的Condition對象來與線程交互。notEmpty和notFull都是由
ReentrantLock的成員變量sync生成的,
public Condition newCondition() { return sync.newCondition(); }
sync可以認(rèn)為是一個抽象類類型,Sync,它是在ReentrantLock內(nèi)部定義的靜態(tài)抽象類,抽象類實現(xiàn)了newCondition方法,
final ConditionObject newCondition() { return new ConditionObject(); }
返回的類型是實現(xiàn)了Condition接口的ConditionObject類,這是在AbstractQueuedSynchronizer內(nèi)部定義的類。在ArrayBlockingQueue中的notEmpty就是ConditionObject實例。
阻塞:
當(dāng)ArrayBlockingQueue為空時,notEmpty.await()將自己掛起,如ConditionObject的await方法,
/** * Implements interruptible condition wait. **
*/ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }- If current thread is interrupted, throw InterruptedException. *
- Save lock state returned by {@link #getState}. *
- Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. *
- Block until signalled or interrupted. *
- Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. *
- If interrupted while blocked in step 4, throw InterruptedException. *
addConditionWaiter是將當(dāng)前線程作為一個node加入到ConditionObject的隊列中,隊列是用鏈表實現(xiàn)的。
如果是初次加入隊列的情況,node.waitStatus == Node.CONDITION成立,方法isOnSyncQueue返回false,那么就將線程park。
while (!isOnSyncQueue(node)) { LockSupport.park(this); .... }
至此線程被掛起,LockSupport.park(this);這里this是指ConditionObject,是notEmpty.
通知:
當(dāng)新的元素put進(jìn)入ArrayBlockingQueue后,notEmpty.signal()通知在這上面等待的線程,如ConditionObject的signal方法,
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
doSignal方法,
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
doSignal一開始接收到的參數(shù)就是firstWaiter這個參數(shù),在內(nèi)部實現(xiàn)中用了do..while的形式,首先將first的的nextWaiter找出來保存到firstWaiter此時(first和firstWaiter不是一回事),在while的比較條件中可調(diào)用了transferForSignal方法,
整個while比較條件可以看著短路邏輯,如果transferForSignal結(jié)果為true,后面的first = firstWaiter就不執(zhí)行了,整個while循環(huán)就結(jié)束了。
參照注釋,看
transferForSignal方法,
/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
首先確保想要被signal的等待node還是處于Node.CONDITION狀態(tài),然后調(diào)整狀態(tài)為Node.SIGNAL,這兩個都是采用CAS方法,最后調(diào)用的是
LockSupport.unpark(node.thread);
五、LockSupport
至此,我們已經(jīng)知道了線程的掛起和通知都是使用LockSupport來完成的,并發(fā)數(shù)據(jù)結(jié)構(gòu)與線程直接的交互最終也是需要LockSupport。那么關(guān)于LockSupport,我們又可以了解多少呢?
Ref:
Java中的ReentrantLock和synchronized兩種鎖定機制的對比
Java的LockSupport.park()實現(xiàn)分析
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/68083.html
摘要:線程安全的線程安全的,在讀多寫少的場合性能非常好,遠(yuǎn)遠(yuǎn)好于高效的并發(fā)隊列,使用鏈表實現(xiàn)。這樣帶來的好處是在高并發(fā)的情況下,你會需要一個全局鎖來保證整個平衡樹的線程安全。 該文已加入開源項目:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識的文檔類項目,Star 數(shù)接近 14 k)。地址:https://github.com/Snailclimb... 一 JDK ...
摘要:本人郵箱歡迎轉(zhuǎn)載轉(zhuǎn)載請注明網(wǎng)址代碼已經(jīng)全部托管有需要的同學(xué)自行下載引言做的同學(xué)們或多或少的接觸過集合框架在集合框架中大多的集合類是線程不安全的比如我們常用的等等我們寫一個例子看為什么說是不安全的例子證明是線程不安全的我們開啟個線程每個線程向 本人郵箱: 歡迎轉(zhuǎn)載,轉(zhuǎn)載請注明網(wǎng)址 http://blog.csdn.net/tianshi_kcogithub: https://github...
摘要:如果隊列已滿,這個時候?qū)懖僮鞯木€程進(jìn)入到寫線程隊列排隊,等待讀線程將隊列元素移除騰出空間,然后喚醒寫線程隊列的第一個等待線程。數(shù)據(jù)必須從某個寫線程交給某個讀線程,而不是寫到某個隊列中等待被消費。 前言 本文直接參考 Doug Lea 寫的 Java doc 和注釋,這也是我們在學(xué)習(xí) java 并發(fā)包時最好的材料了。希望大家能有所思、有所悟,學(xué)習(xí) Doug Lea 的代碼風(fēng)格,并將其優(yōu)雅...
摘要:序本文主要簡單介紹下與。有界無界有界,適合已知最大存儲容量的場景可有界可以無界吞吐量在大多數(shù)并發(fā)的場景下吞吐量比,但是性能不穩(wěn)定。測試結(jié)果表明,的可伸縮性要高于。 序 本文主要簡單介紹下ArrayBlockingQueue與LinkedBlockingQueue。 對比 queue 阻塞與否 是否有界 線程安全保障 適用場景 注意事項 ArrayBlockingQueue 阻...
摘要:自己實現(xiàn)在自己實現(xiàn)之前先搞清楚阻塞隊列的幾個特點基本隊列特性先進(jìn)先出。消費隊列空時會阻塞直到寫入線程寫入了隊列數(shù)據(jù)后喚醒消費線程。最終的隊列大小為,可見線程也是安全的。 showImg(https://segmentfault.com/img/remote/1460000018811340); 前言 較長一段時間以來我都發(fā)現(xiàn)不少開發(fā)者對 jdk 中的 J.U.C(java.util.c...
閱讀 2255·2021-11-23 09:51
閱讀 1086·2021-11-22 15:35
閱讀 4880·2021-11-22 09:34
閱讀 1623·2021-10-08 10:13
閱讀 3029·2021-07-22 17:35
閱讀 2554·2019-08-30 15:56
閱讀 3091·2019-08-29 18:44
閱讀 3106·2019-08-29 15:32