摘要:如果停止了版本更新,可使用方法來(lái)解除所有因而阻塞的線程,包括指定版本號(hào)的。如果自己維護(hù)版本號(hào),則應(yīng)該保證遞增。
前言
相比上一篇而言,本文不需要太多的準(zhǔn)備知識(shí),但技巧性更強(qiáng)一些。因?yàn)榉治觥⒃O(shè)計(jì)的過程比較復(fù)雜繁瑣,也限于篇幅,所以,主要展示如何解決這些需求,和講解代碼。另外,所講的內(nèi)容也是后一篇實(shí)戰(zhàn)中需要用到的一個(gè)工具類。
需求介紹我需要編寫一個(gè)同步工具,它需要提供這樣幾個(gè)方法:await、pass、cancel。某個(gè)線程調(diào)用await時(shí),會(huì)被阻塞;當(dāng)調(diào)用pass方法時(shí),之前因?yàn)閍wait而阻塞的線程將全部被解除阻塞,之后調(diào)用await的線程繼續(xù)被阻塞,直到下一次調(diào)用pass。
該工具同時(shí)還維護(hù)一個(gè)版本號(hào),await方法可以帶一個(gè)目標(biāo)版本號(hào),如果當(dāng)前的版本號(hào)比目標(biāo)版本號(hào)新或相同,則直接通過,否則,阻塞本線程,直到到達(dá)或超過目標(biāo)版本。調(diào)用pass的時(shí)候,更新版本號(hào)。
如果停止了版本更新,可使用cancel方法來(lái)解除所有因await而阻塞的線程,包括指定版本號(hào)的。此方法用于避免無(wú)謂地等待。若await發(fā)生在cancel之后,則仍將被阻塞。
因?yàn)镃ountDownLatch不允許重復(fù)使用,CyclicBarrier只支持固定個(gè)數(shù)的線程,并且都沒有維護(hù)一個(gè)版本號(hào),所以沒有已有的類能實(shí)現(xiàn)上面的需求,需要自己實(shí)現(xiàn)。
問題分析簡(jiǎn)單分析可知,應(yīng)該維護(hù)一個(gè)隊(duì)列,來(lái)保存當(dāng)前被阻塞的線程,用于在pass時(shí)對(duì)它們一一解除阻塞,pass時(shí)應(yīng)該使用一個(gè)新的隊(duì)列,否則不方便正確處理pass前和pass后調(diào)用await的線程。
至此,問題的關(guān)鍵就明了了:如何將隊(duì)列的替換和版本號(hào)的更新這兩個(gè)操作做成原子的。
解決方案以前在《JAVA并發(fā)編程實(shí)踐》曾看到過這樣一個(gè)小技巧,如果要原子地更新兩個(gè)變量,那么可以創(chuàng)建一個(gè)新的類將它們封裝起來(lái),將這兩個(gè)變量當(dāng)定義成類成員變量,更新時(shí),用CAS更新這個(gè)類的引用即可。
因?yàn)檩^為復(fù)雜,下面先給出完整的代碼,再講解其中的關(guān)鍵。
注意:上面所說pass,在代碼中的具體實(shí)現(xiàn)為nextCycle,有兩個(gè)版本,一個(gè)自動(dòng)維護(hù)版本號(hào),一個(gè)由調(diào)用者維護(hù)版本號(hào)。
/** * @author [email protected] * @time 2013-1-31 */ public class BoundlessCyclicBarrier { protected final AtomicReference代碼分析waitQueueRef; public BoundlessCyclicBarrier() { this(0); } public BoundlessCyclicBarrier(int startVersion) { waitQueueRef = new AtomicReference (new VersionQueue(startVersion)); } public final void awaitWithAssignedVersion(int myVersion) throws InterruptedException { awaitImpl(true, myVersion, 0); } /** * * @param myVersion * @param nanosTimeout * @return if timeout, or be canceled and doesn"t reach myVersion, returns false * @throws InterruptedException */ public final boolean awaitWithAssignedVersion(int myVersion, long nanosTimeout) throws InterruptedException { return awaitImpl(true, myVersion, nanosTimeout); } public final void await() throws InterruptedException { awaitImpl(false, 0, 0); } /** * * @param nanosTimeout * @return if and only if timeout, returns false * @throws InterruptedException */ public final boolean await(long nanosTimeout) throws InterruptedException { return awaitImpl(false, 0, nanosTimeout); } /** * pass and version++(some threads may not be unparked when awaitImpl is in process, but it"s OK in this Barrier) * @return old queue version */ public int nextCycle() { VersionQueue oldQueue = waitQueueRef.get(); VersionQueue newQueue = new VersionQueue(oldQueue.version + 1); for(;;){ if (waitQueueRef.compareAndSet(oldQueue, newQueue)) { for (Thread t : oldQueue.queue) LockSupport.unpark(t); break; } oldQueue = waitQueueRef.get(); newQueue.version = oldQueue.version + 1; } return oldQueue.version; } /** * pass and assign the next cycle version(caller should make sure that the newAssignVersion is right) * @param newAssignVersion */ public void nextCycle(int newAssignVersion) { VersionQueue oldQueue = waitQueueRef.getAndSet(new VersionQueue(newAssignVersion)); for (Thread t : oldQueue.queue) LockSupport.unpark(t); } /** * if version update has stopped, invoke this to awake all threads */ public void cancel(){ VersionQueue oldQueue = waitQueueRef.get(); if (waitQueueRef.compareAndSet(oldQueue, new VersionQueue(oldQueue.version, true))) { for (Thread t : oldQueue.queue) LockSupport.unpark(t); } public final int getVersion() { return waitQueueRef.get().version; } private static final class VersionQueue { final private ConcurrentLinkedQueue queue; int version; final boolean isCancelQueue; VersionQueue(int curVersion){ this(curVersion, false); } VersionQueue(int curVersion, boolean isCancelQueue) { this.version = curVersion; this.isCancelQueue = isCancelQueue; queue = new ConcurrentLinkedQueue(); } } /** * * @param assignVersion is myVersion available * @param myVersion wait for this version * @param nanosTimeout wait time(nanosTimeout <=0 means that nanosTimeout is invalid) * @return if timeout, or be canceled and doesn"t reach myVersion, returns false * @throws InterruptedException */ protected boolean awaitImpl(boolean assignVersion, int myVersion, long nanosTimeout) throws InterruptedException { boolean timeOutEnable = nanosTimeout > 0; long lastTime = System.nanoTime(); VersionQueue newQueue = waitQueueRef.get();//A if (assignVersion && newQueue.version - myVersion >= 0) return true; while (true) { VersionQueue submitQueue = newQueue;//B submitQueue.queue.add(Thread.currentThread());//C while (true) { newQueue = waitQueueRef.get();//D if (newQueue != submitQueue){//E: it"s a new cycle if(assignVersion == false) return true; else if(newQueue.version - myVersion >= 0) return true; else if (newQueue.isCancelQueue)//F: be canceled return false; else//just like invoking awaitImpl again break; } if (timeOutEnable) { if (nanosTimeout <= 0) return false; LockSupport.parkNanos(this, nanosTimeout); long now = System.nanoTime(); nanosTimeout -= now - lastTime; lastTime = now; } else LockSupport.park(this); if (Thread.interrupted()) throw new InterruptedException(); } } } }
先分析一下awaitImpl方法,A和D是該方法的關(guān)鍵點(diǎn),決定著它屬于哪一個(gè)批次,對(duì)應(yīng)哪一個(gè)版本。這里有個(gè)小細(xì)節(jié),在nexeCycle,cancel解除阻塞時(shí),該線程可能并不在隊(duì)列中,因?yàn)椴迦腙?duì)列發(fā)生在C處,這在A和D之后(雖然看起來(lái)C在D之前,但D取到的queue要在下一次循環(huán)時(shí)才被當(dāng)作submitQueue),所以,在E處再進(jìn)行了一次判斷,開始解除阻塞時(shí),舊隊(duì)列肯定被新隊(duì)列所替換,newQueue != submitQueue一定為真,就會(huì)不調(diào)用park進(jìn)行阻塞了,也就不需要解除阻塞,所以即使解除阻塞時(shí),該線程不在隊(duì)列中也是沒問題的。
再看E處,當(dāng)進(jìn)入一個(gè)新的cycle時(shí)(當(dāng)前隊(duì)列與提交的隊(duì)列不同),a)如果沒指定版本,或者到達(dá)或超過了指定版本,則返回true;b)如果當(dāng)前調(diào)用了cancel,則當(dāng)前隊(duì)列的isCancelQueue將為true,則不繼續(xù)傻等,返回false;c)或者還未到達(dá)指定版本,break,插入到當(dāng)前隊(duì)列中,繼續(xù)等待指定版本的到達(dá)。
如果沒有進(jìn)入E處的IF內(nèi),則當(dāng)前線程會(huì)被阻塞,直到超時(shí),然后返回false;或被中斷,然后拋出InterruptedException;或被解除阻塞,重新進(jìn)行E處的判定。
這里還有個(gè)小細(xì)節(jié),既然cancel時(shí),把當(dāng)前的隊(duì)列設(shè)置了isCancelQueue,那么之后指定版本的await會(huì)不會(huì)也直接返回了呢?其實(shí)不會(huì)的,因?yàn)樗粢獔?zhí)行F處的判斷,則先必需通過E處的判定,這意味著,當(dāng)前隊(duì)列已經(jīng)不是提交時(shí)的那個(gè)設(shè)置了isCancelQueue的隊(duì)列了。
代碼中對(duì)于cancel的處理,其實(shí)并不保證cancel后,之前的await都會(huì)被解除阻塞并返回,如果cancel后,緊接著又調(diào)用了nextCycle,那么可能某線程感知不到cancel的調(diào)用,喚醒后又繼續(xù)等待指定的版本。cancel的目的是在于不讓線程傻等,既然恢復(fù)版本更新了,那就繼續(xù)等待吧。
如果自己維護(hù)版本號(hào),則應(yīng)該保證遞增。另外,版本號(hào)的設(shè)計(jì),考慮到了int溢出的情況,版本的前后判斷,我不是使用newVersion>=oldVersion,而是newVersion-oldVersion>=0,這樣,版本號(hào)就相當(dāng)于循環(huán)使用了,只要兩個(gè)比較的版本號(hào)的差不超過int的最大值,那么都是正確的,int的最大值可是20多億,幾乎不可能出現(xiàn)跨度這么大的兩個(gè)版本號(hào)的比較,所以,認(rèn)為它是正確的。
小結(jié)本文講到了一個(gè)非阻塞同步算法設(shè)計(jì)時(shí)的小技巧,如果多個(gè)變量之間要維護(hù)某種特定關(guān)系,那么可以將它們封裝到一個(gè)類中,再用CAS更新這個(gè)類的引用,這樣就達(dá)到了:要么都被更新,要么都沒被更新,保持了多個(gè)變量之間的一致性。同時(shí)需要注意的是,每次更新都必需創(chuàng)建新的包裝對(duì)象,假如有其它更好的辦法,應(yīng)該避免使用該方法。
via ifeve.com
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/64034.html
摘要:黑色的線表示,可在任意狀態(tài)下發(fā)起主動(dòng)取消,進(jìn)入該狀態(tài)。所以當(dāng)線程阻塞時(shí),可能處于停止?fàn)顟B(tài)或者主動(dòng)取消狀態(tài)。非阻塞同步相對(duì)于鎖同步而言,由代碼塊,轉(zhuǎn)為了點(diǎn),是另一種思考方式。 前言 閱讀本文前,需要讀者對(duì)happens-before比較熟悉,了解非阻塞同步的一些基本概念。本文主要為happens-before法則的靈活運(yùn)用,和一些解決問題的小技巧,分析問題的方式。 背景介紹 原始需...
摘要:前言學(xué)習(xí)情況記錄時(shí)間子目標(biāo)多線程記錄在學(xué)習(xí)線程安全知識(shí)點(diǎn)中,關(guān)于的有關(guān)知識(shí)點(diǎn)。對(duì)于資源競(jìng)爭(zhēng)嚴(yán)重線程沖突嚴(yán)重的情況,自旋的概率會(huì)比較大,從而浪費(fèi)更多的資源,效率低于。 前言 學(xué)習(xí)情況記錄 時(shí)間:week 1 SMART子目標(biāo) :Java 多線程 記錄在學(xué)習(xí)線程安全知識(shí)點(diǎn)中,關(guān)于CAS的有關(guān)知識(shí)點(diǎn)。 線程安全是指:多個(gè)線程不管以何種方式訪問某個(gè)類,并且在主調(diào)代碼中不需要進(jìn)行同步,都能表...
摘要:注意這里指的不是當(dāng)次而是之后,所以如果我們使用隊(duì)列的方法返回,就知道隊(duì)列是否為空,但是不知道之后是否為空,并且,當(dāng)關(guān)注的操作發(fā)生時(shí),在插入或取出操作的返回值里告知此信息,來(lái)指導(dǎo)是否繼續(xù)注冊(cè)寫操作。 前言 本文寫給對(duì)ConcurrentLinkedQueue的實(shí)現(xiàn)和非阻塞同步算法的實(shí)現(xiàn)原理有一定了解,但缺少實(shí)踐經(jīng)驗(yàn)的朋友,文中包括了實(shí)戰(zhàn)中的嘗試、所走的彎路,經(jīng)驗(yàn)和教訓(xùn)。 背景介紹 ...
摘要:如問到是否使用某框架,實(shí)際是是問該框架的使用場(chǎng)景,有什么特點(diǎn),和同類可框架對(duì)比一系列的問題。這兩個(gè)方向的區(qū)分點(diǎn)在于工作方向的側(cè)重點(diǎn)不同。 [TOC] 這是一份來(lái)自嗶哩嗶哩的Java面試Java面試 32個(gè)核心必考點(diǎn)完全解析(完) 課程預(yù)習(xí) 1.1 課程內(nèi)容分為三個(gè)模塊 基礎(chǔ)模塊: 技術(shù)崗位與面試 計(jì)算機(jī)基礎(chǔ) JVM原理 多線程 設(shè)計(jì)模式 數(shù)據(jù)結(jié)構(gòu)與算法 應(yīng)用模塊: 常用工具集 ...
摘要:相比與其他操作系統(tǒng)包括其他類系統(tǒng)有很多的優(yōu)點(diǎn),其中有一項(xiàng)就是,其上下文切換和模式切換的時(shí)間消耗非常少。因?yàn)槎嗑€程競(jìng)爭(zhēng)鎖時(shí)會(huì)引起上下文切換。減少線程的使用。很多編程語(yǔ)言中都有協(xié)程。所以如何避免死鎖的產(chǎn)生,在我們使用并發(fā)編程時(shí)至關(guān)重要。 系列文章傳送門: Java多線程學(xué)習(xí)(一)Java多線程入門 Java多線程學(xué)習(xí)(二)synchronized關(guān)鍵字(1) java多線程學(xué)習(xí)(二)syn...
閱讀 1960·2023-04-26 01:56
閱讀 3124·2021-11-18 10:02
閱讀 3076·2021-09-09 11:35
閱讀 1314·2021-09-03 10:28
閱讀 3433·2019-08-29 18:36
閱讀 2864·2019-08-29 17:14
閱讀 846·2019-08-29 16:10
閱讀 1625·2019-08-26 13:45