成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

非阻塞同步算法實(shí)戰(zhàn)(二):BoundlessCyclicBarrier

yintaolaowanzi / 2192人閱讀

摘要:如果停止了版本更新,可使用方法來(lái)解除所有因而阻塞的線程,包括指定版本號(hào)的。如果自己維護(hù)版本號(hào),則應(yīng)該保證遞增。

前言

相比上一篇而言,本文不需要太多的準(zhǔn)備知識(shí),但技巧性更強(qiáng)一些。因?yàn)榉治觥⒃O(shè)計(jì)的過程比較復(fù)雜繁瑣,也限于篇幅,所以,主要展示如何解決這些需求,和講解代碼。另外,所講的內(nèi)容也是后一篇實(shí)戰(zhàn)中需要用到的一個(gè)工具類。

需求介紹

我需要編寫一個(gè)同步工具,它需要提供這樣幾個(gè)方法:await、pass、cancel。某個(gè)線程調(diào)用await時(shí),會(huì)被阻塞;當(dāng)調(diào)用pass方法時(shí),之前因?yàn)閍wait而阻塞的線程將全部被解除阻塞,之后調(diào)用await的線程繼續(xù)被阻塞,直到下一次調(diào)用pass。

該工具同時(shí)還維護(hù)一個(gè)版本號(hào),await方法可以帶一個(gè)目標(biāo)版本號(hào),如果當(dāng)前的版本號(hào)比目標(biāo)版本號(hào)新或相同,則直接通過,否則,阻塞本線程,直到到達(dá)或超過目標(biāo)版本。調(diào)用pass的時(shí)候,更新版本號(hào)。

如果停止了版本更新,可使用cancel方法來(lái)解除所有因await而阻塞的線程,包括指定版本號(hào)的。此方法用于避免無(wú)謂地等待。若await發(fā)生在cancel之后,則仍將被阻塞。

因?yàn)镃ountDownLatch不允許重復(fù)使用,CyclicBarrier只支持固定個(gè)數(shù)的線程,并且都沒有維護(hù)一個(gè)版本號(hào),所以沒有已有的類能實(shí)現(xiàn)上面的需求,需要自己實(shí)現(xiàn)。

問題分析

簡(jiǎn)單分析可知,應(yīng)該維護(hù)一個(gè)隊(duì)列,來(lái)保存當(dāng)前被阻塞的線程,用于在pass時(shí)對(duì)它們一一解除阻塞,pass時(shí)應(yīng)該使用一個(gè)新的隊(duì)列,否則不方便正確處理pass前和pass后調(diào)用await的線程。

至此,問題的關(guān)鍵就明了了:如何將隊(duì)列的替換和版本號(hào)的更新這兩個(gè)操作做成原子的。

解決方案

以前在《JAVA并發(fā)編程實(shí)踐》曾看到過這樣一個(gè)小技巧,如果要原子地更新兩個(gè)變量,那么可以創(chuàng)建一個(gè)新的類將它們封裝起來(lái),將這兩個(gè)變量當(dāng)定義成類成員變量,更新時(shí),用CAS更新這個(gè)類的引用即可。

因?yàn)檩^為復(fù)雜,下面先給出完整的代碼,再講解其中的關(guān)鍵。

注意:上面所說pass,在代碼中的具體實(shí)現(xiàn)為nextCycle,有兩個(gè)版本,一個(gè)自動(dòng)維護(hù)版本號(hào),一個(gè)由調(diào)用者維護(hù)版本號(hào)。

/**
 * @author [email protected]
 * @time 2013-1-31
 */
public class BoundlessCyclicBarrier {
    protected final AtomicReference waitQueueRef;

    public BoundlessCyclicBarrier() {
        this(0);
    }

    public BoundlessCyclicBarrier(int startVersion) {
        waitQueueRef = new AtomicReference(new VersionQueue(startVersion));
    }

    public final void awaitWithAssignedVersion(int myVersion)
            throws InterruptedException {
        awaitImpl(true, myVersion, 0);
    }

    /**
     *
     * @param myVersion
     * @param nanosTimeout
     * @return if timeout, or be canceled and doesn"t reach myVersion, returns false
     * @throws InterruptedException
     */
    public final boolean awaitWithAssignedVersion(int myVersion, long nanosTimeout) throws InterruptedException {
        return awaitImpl(true, myVersion, nanosTimeout);
    }

    public final void await() throws InterruptedException {
        awaitImpl(false, 0, 0);
    }

    /**
     *
     * @param nanosTimeout
     * @return if and only if timeout, returns false
     * @throws InterruptedException
     */
    public final boolean await(long nanosTimeout)
            throws InterruptedException {
        return awaitImpl(false, 0, nanosTimeout);
    }

    /**
     * pass and version++(some threads may not be unparked when awaitImpl is in process, but it"s OK in this Barrier)
     * @return old queue version
     */
    public int nextCycle() {
        VersionQueue oldQueue = waitQueueRef.get();
        VersionQueue newQueue = new VersionQueue(oldQueue.version + 1);
        for(;;){
            if (waitQueueRef.compareAndSet(oldQueue, newQueue)) {
                for (Thread t : oldQueue.queue)
                    LockSupport.unpark(t);
                break;
            }
            oldQueue = waitQueueRef.get();
            newQueue.version = oldQueue.version + 1;
        }
        return oldQueue.version;
    }

    /**
     * pass and assign the next cycle version(caller should make sure that the newAssignVersion is right)
     * @param newAssignVersion
     */
    public void nextCycle(int newAssignVersion) {
        VersionQueue oldQueue = waitQueueRef.getAndSet(new VersionQueue(newAssignVersion));
        for (Thread t : oldQueue.queue)
            LockSupport.unpark(t);
    }

    /**
     * if version update has stopped, invoke this to awake all threads
     */
    public void cancel(){
        VersionQueue oldQueue = waitQueueRef.get();
        if (waitQueueRef.compareAndSet(oldQueue, new VersionQueue(oldQueue.version, true))) {
            for (Thread t : oldQueue.queue)
                LockSupport.unpark(t);
    }

    public final int getVersion() {
        return waitQueueRef.get().version;
    }

    private static final class VersionQueue {
        final private ConcurrentLinkedQueue queue;
        int version;
        final boolean isCancelQueue;

        VersionQueue(int curVersion){
            this(curVersion, false);
        }

        VersionQueue(int curVersion, boolean isCancelQueue) {
            this.version = curVersion;
            this.isCancelQueue = isCancelQueue;
            queue = new ConcurrentLinkedQueue();
        }
    }

    /**
     *
     * @param assignVersion is myVersion available
     * @param myVersion wait for this version
     * @param nanosTimeout wait time(nanosTimeout <=0 means that nanosTimeout is invalid)      * @return if timeout, or be canceled and doesn"t reach myVersion, returns false      * @throws InterruptedException      */     protected boolean awaitImpl(boolean assignVersion, int myVersion,             long nanosTimeout) throws InterruptedException {         boolean timeOutEnable = nanosTimeout > 0;
        long lastTime = System.nanoTime();
        VersionQueue newQueue = waitQueueRef.get();//A
        if (assignVersion && newQueue.version - myVersion >= 0)
            return true;
        while (true) {
            VersionQueue submitQueue = newQueue;//B
            submitQueue.queue.add(Thread.currentThread());//C
            while (true) {
                newQueue = waitQueueRef.get();//D
                if (newQueue != submitQueue){//E: it"s a new cycle
                    if(assignVersion == false)
                        return true;
                    else if(newQueue.version - myVersion >= 0)
                        return true;
                    else if (newQueue.isCancelQueue)//F: be canceled
                        return false;
                    else//just like invoking awaitImpl again
                        break;
                }
                if (timeOutEnable) {
                    if (nanosTimeout <= 0)
                        return false;
                    LockSupport.parkNanos(this, nanosTimeout);
                    long now = System.nanoTime();
                    nanosTimeout -= now - lastTime;
                    lastTime = now;
                } else
                    LockSupport.park(this);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        }
    }
}
代碼分析

先分析一下awaitImpl方法,A和D是該方法的關(guān)鍵點(diǎn),決定著它屬于哪一個(gè)批次,對(duì)應(yīng)哪一個(gè)版本。這里有個(gè)小細(xì)節(jié),在nexeCycle,cancel解除阻塞時(shí),該線程可能并不在隊(duì)列中,因?yàn)椴迦腙?duì)列發(fā)生在C處,這在A和D之后(雖然看起來(lái)C在D之前,但D取到的queue要在下一次循環(huán)時(shí)才被當(dāng)作submitQueue),所以,在E處再進(jìn)行了一次判斷,開始解除阻塞時(shí),舊隊(duì)列肯定被新隊(duì)列所替換,newQueue != submitQueue一定為真,就會(huì)不調(diào)用park進(jìn)行阻塞了,也就不需要解除阻塞,所以即使解除阻塞時(shí),該線程不在隊(duì)列中也是沒問題的。

再看E處,當(dāng)進(jìn)入一個(gè)新的cycle時(shí)(當(dāng)前隊(duì)列與提交的隊(duì)列不同),a)如果沒指定版本,或者到達(dá)或超過了指定版本,則返回true;b)如果當(dāng)前調(diào)用了cancel,則當(dāng)前隊(duì)列的isCancelQueue將為true,則不繼續(xù)傻等,返回false;c)或者還未到達(dá)指定版本,break,插入到當(dāng)前隊(duì)列中,繼續(xù)等待指定版本的到達(dá)。

如果沒有進(jìn)入E處的IF內(nèi),則當(dāng)前線程會(huì)被阻塞,直到超時(shí),然后返回false;或被中斷,然后拋出InterruptedException;或被解除阻塞,重新進(jìn)行E處的判定。

這里還有個(gè)小細(xì)節(jié),既然cancel時(shí),把當(dāng)前的隊(duì)列設(shè)置了isCancelQueue,那么之后指定版本的await會(huì)不會(huì)也直接返回了呢?其實(shí)不會(huì)的,因?yàn)樗粢獔?zhí)行F處的判斷,則先必需通過E處的判定,這意味著,當(dāng)前隊(duì)列已經(jīng)不是提交時(shí)的那個(gè)設(shè)置了isCancelQueue的隊(duì)列了。

代碼中對(duì)于cancel的處理,其實(shí)并不保證cancel后,之前的await都會(huì)被解除阻塞并返回,如果cancel后,緊接著又調(diào)用了nextCycle,那么可能某線程感知不到cancel的調(diào)用,喚醒后又繼續(xù)等待指定的版本。cancel的目的是在于不讓線程傻等,既然恢復(fù)版本更新了,那就繼續(xù)等待吧。

如果自己維護(hù)版本號(hào),則應(yīng)該保證遞增。另外,版本號(hào)的設(shè)計(jì),考慮到了int溢出的情況,版本的前后判斷,我不是使用newVersion>=oldVersion,而是newVersion-oldVersion>=0,這樣,版本號(hào)就相當(dāng)于循環(huán)使用了,只要兩個(gè)比較的版本號(hào)的差不超過int的最大值,那么都是正確的,int的最大值可是20多億,幾乎不可能出現(xiàn)跨度這么大的兩個(gè)版本號(hào)的比較,所以,認(rèn)為它是正確的。

小結(jié)

本文講到了一個(gè)非阻塞同步算法設(shè)計(jì)時(shí)的小技巧,如果多個(gè)變量之間要維護(hù)某種特定關(guān)系,那么可以將它們封裝到一個(gè)類中,再用CAS更新這個(gè)類的引用,這樣就達(dá)到了:要么都被更新,要么都沒被更新,保持了多個(gè)變量之間的一致性。同時(shí)需要注意的是,每次更新都必需創(chuàng)建新的包裝對(duì)象,假如有其它更好的辦法,應(yīng)該避免使用該方法。

via ifeve.com

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/64034.html

相關(guān)文章

  • 阻塞同步算法實(shí)戰(zhàn)(三)-LatestResultsProvider

    摘要:黑色的線表示,可在任意狀態(tài)下發(fā)起主動(dòng)取消,進(jìn)入該狀態(tài)。所以當(dāng)線程阻塞時(shí),可能處于停止?fàn)顟B(tài)或者主動(dòng)取消狀態(tài)。非阻塞同步相對(duì)于鎖同步而言,由代碼塊,轉(zhuǎn)為了點(diǎn),是另一種思考方式。 前言 閱讀本文前,需要讀者對(duì)happens-before比較熟悉,了解非阻塞同步的一些基本概念。本文主要為happens-before法則的靈活運(yùn)用,和一些解決問題的小技巧,分析問題的方式。 背景介紹 原始需...

    CrazyCodes 評(píng)論0 收藏0
  • Week 1 - Java 多線程 - CAS

    摘要:前言學(xué)習(xí)情況記錄時(shí)間子目標(biāo)多線程記錄在學(xué)習(xí)線程安全知識(shí)點(diǎn)中,關(guān)于的有關(guān)知識(shí)點(diǎn)。對(duì)于資源競(jìng)爭(zhēng)嚴(yán)重線程沖突嚴(yán)重的情況,自旋的概率會(huì)比較大,從而浪費(fèi)更多的資源,效率低于。 前言 學(xué)習(xí)情況記錄 時(shí)間:week 1 SMART子目標(biāo) :Java 多線程 記錄在學(xué)習(xí)線程安全知識(shí)點(diǎn)中,關(guān)于CAS的有關(guān)知識(shí)點(diǎn)。 線程安全是指:多個(gè)線程不管以何種方式訪問某個(gè)類,并且在主調(diào)代碼中不需要進(jìn)行同步,都能表...

    ZweiZhao 評(píng)論0 收藏0
  • 阻塞同步算法實(shí)戰(zhàn)(一):ConcurrentLinkedQueue

    摘要:注意這里指的不是當(dāng)次而是之后,所以如果我們使用隊(duì)列的方法返回,就知道隊(duì)列是否為空,但是不知道之后是否為空,并且,當(dāng)關(guān)注的操作發(fā)生時(shí),在插入或取出操作的返回值里告知此信息,來(lái)指導(dǎo)是否繼續(xù)注冊(cè)寫操作。 前言 本文寫給對(duì)ConcurrentLinkedQueue的實(shí)現(xiàn)和非阻塞同步算法的實(shí)現(xiàn)原理有一定了解,但缺少實(shí)踐經(jīng)驗(yàn)的朋友,文中包括了實(shí)戰(zhàn)中的嘗試、所走的彎路,經(jīng)驗(yàn)和教訓(xùn)。 背景介紹 ...

    EscapedDog 評(píng)論0 收藏0
  • Java面試 32個(gè)核心必考點(diǎn)完全解析

    摘要:如問到是否使用某框架,實(shí)際是是問該框架的使用場(chǎng)景,有什么特點(diǎn),和同類可框架對(duì)比一系列的問題。這兩個(gè)方向的區(qū)分點(diǎn)在于工作方向的側(cè)重點(diǎn)不同。 [TOC] 這是一份來(lái)自嗶哩嗶哩的Java面試Java面試 32個(gè)核心必考點(diǎn)完全解析(完) 課程預(yù)習(xí) 1.1 課程內(nèi)容分為三個(gè)模塊 基礎(chǔ)模塊: 技術(shù)崗位與面試 計(jì)算機(jī)基礎(chǔ) JVM原理 多線程 設(shè)計(jì)模式 數(shù)據(jù)結(jié)構(gòu)與算法 應(yīng)用模塊: 常用工具集 ...

    JiaXinYi 評(píng)論0 收藏0
  • Java多線程學(xué)習(xí)(七)并發(fā)編程中一些問題

    摘要:相比與其他操作系統(tǒng)包括其他類系統(tǒng)有很多的優(yōu)點(diǎn),其中有一項(xiàng)就是,其上下文切換和模式切換的時(shí)間消耗非常少。因?yàn)槎嗑€程競(jìng)爭(zhēng)鎖時(shí)會(huì)引起上下文切換。減少線程的使用。很多編程語(yǔ)言中都有協(xié)程。所以如何避免死鎖的產(chǎn)生,在我們使用并發(fā)編程時(shí)至關(guān)重要。 系列文章傳送門: Java多線程學(xué)習(xí)(一)Java多線程入門 Java多線程學(xué)習(xí)(二)synchronized關(guān)鍵字(1) java多線程學(xué)習(xí)(二)syn...

    dingding199389 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<