成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

非阻塞同步算法實(shí)戰(zhàn)(一):ConcurrentLinkedQueue

EscapedDog / 3281人閱讀

摘要:注意這里指的不是當(dāng)次而是之后,所以如果我們使用隊(duì)列的方法返回,就知道隊(duì)列是否為空,但是不知道之后是否為空,并且,當(dāng)關(guān)注的操作發(fā)生時(shí),在插入或取出操作的返回值里告知此信息,來指導(dǎo)是否繼續(xù)注冊寫操作。

前言

本文寫給對ConcurrentLinkedQueue的實(shí)現(xiàn)和非阻塞同步算法的實(shí)現(xiàn)原理有一定了解,但缺少實(shí)踐經(jīng)驗(yàn)的朋友,文中包括了實(shí)戰(zhàn)中的嘗試、所走的彎路,經(jīng)驗(yàn)和教訓(xùn)。

背景介紹

上個(gè)月,我被安排獨(dú)自負(fù)責(zé)一個(gè)聊天系統(tǒng)的服務(wù)端,因?yàn)橐恍┰?,我沒使用現(xiàn)成的開源框架,網(wǎng)絡(luò)那塊直接使用AIO,收數(shù)據(jù)時(shí),因?yàn)橹粫腸hannel里過來,所以不需要考慮同步問題;但是發(fā)送數(shù)據(jù)時(shí),因?yàn)橛辛奶煜⒌霓D(zhuǎn)發(fā),所以必需處理這個(gè)同步問題。AIO中,是處理完一個(gè)注冊的操作后,再執(zhí)行我們定義的方法,此時(shí),如果還有數(shù)據(jù)需要寫,則繼續(xù)注冊寫操作,如果沒有則不注冊;提交數(shù)據(jù)時(shí),如果當(dāng)前沒有注冊寫操作,則注冊一個(gè),否則僅提交(此時(shí)再注冊則會報(bào)異常)。這樣,需要同步的點(diǎn)就是:如何知道當(dāng)前還有沒有數(shù)據(jù)需要發(fā)送(因?yàn)槠渌€程也可以提交數(shù)據(jù)到此處),和如何知道此次提交時(shí),有沒有注冊寫操作??傊?,要完成:有數(shù)據(jù)要發(fā)送時(shí),必需有寫操作被注冊,并且只能注冊一次;沒有數(shù)據(jù)時(shí),不能有寫操作被注冊。

問題分析

經(jīng)過分析,上面的問題,可以抽象成:我需要知道當(dāng)往隊(duì)列里插入一條數(shù)據(jù)之前,該隊(duì)列是否為空,如果為空則應(yīng)該注冊新的寫操作。當(dāng)從隊(duì)列里取出一條數(shù)據(jù)后,該隊(duì)列是否為非空,如果非空則應(yīng)該繼續(xù)注冊寫操作。(本文之后以“關(guān)注的操作”來表示這種場景下的插入或取出操作)

目前的問題是,我使用的隊(duì)列是ConcurrentLinkedQueue,但是它的取出數(shù)據(jù)的方法,沒有返回值告訴我們從隊(duì)列里取出數(shù)據(jù)之后隊(duì)列是否為空,如果是使用size或peek方法來執(zhí)行判斷,那就必需加鎖了,否則在拿到隊(duì)列大小時(shí),可能隊(duì)列大小已經(jīng)變化了。所以我首先想到的是,如何對該隊(duì)列進(jìn)行改造,讓它提供該信息。

注意:這里指的不是當(dāng)次而是之后,所以如果我們使用隊(duì)列的peek()方法返回null,就知道隊(duì)列是否為空,但是不知道之后是否為空 ,并且,當(dāng)關(guān)注的操作發(fā)生時(shí),在插入或取出操作的返回值里告知此信息,來指導(dǎo)是否繼續(xù)注冊寫操作。

用鎖實(shí)現(xiàn)

如果使用鎖的話很容易處理,用鎖同步插入和取出方法,下面是鎖實(shí)現(xiàn)的參考:

    public E poll() {
        synchronized (this) {
            E re = q.poll();
            // 獲取元素后,隊(duì)列是空,表示是我關(guān)注的操作
            if (q.peek() == null) {

            }
            return re;
        }
    }

    public void offer(E e) {
        synchronized (this) {
            // 插入元素前,隊(duì)列是空,表示是我關(guān)注的操作
            if (q.peek() == null) {

            }
            q.offer(e);
        }
    }

但因?yàn)槭欠?wù)端,我想用非阻塞同步算法來實(shí)現(xiàn)。

嘗試方案一

我第一次想到的改造辦法是,將head占位節(jié)點(diǎn)改成固定的,頭節(jié)點(diǎn)移動時(shí),只將head節(jié)點(diǎn)的next指向新的節(jié)點(diǎn),在插入數(shù)據(jù)時(shí),如果是在head節(jié)點(diǎn)上成功執(zhí)行的該操作,那么該插入就是關(guān)注的的操作;在取出時(shí),如果將head節(jié)點(diǎn)的next置為了null,那么該取出就是關(guān)注的操作(?因?yàn)橹暗恼嘉还?jié)點(diǎn)是變化的,所以沒法比較,必需用同步,現(xiàn)在是固定的了,所以可以直接與head節(jié)點(diǎn)進(jìn)行比較?)。如此一來,問題好像被解決了。改造完之后,出于嚴(yán)謹(jǐn),我仔細(xì)讀了一遍代碼,發(fā)現(xiàn)引入了新的問題,我的取出操作是這樣寫的

/**
 * @author [email protected]
 */
public E poll(){
    for(;;){
        Node n = head.nextRef.get();//head指向固定的head節(jié)點(diǎn),為final
        if(n == null)
            return null;
        Node m = n.nextRef.get();
        if(head.next.compareAndSet(n,m){
            if(m==null)
                ;//此時(shí)為關(guān)注的操作(為了簡化代碼顯示,不將該信息當(dāng)作返回值返回了,僅注釋)
            return n.itemRef.get();
        }
    }
}

這里有一個(gè)致命的問題:如果m為null,在CAS期間,插入了新節(jié)點(diǎn),n的next由null變成了非null,緊接著又把head的next更新為了null,那么鏈就斷了,該方法還存在一些其它的問題,如當(dāng)隊(duì)列為空的時(shí)候,尾節(jié)點(diǎn)指向了錯(cuò)誤的位置,本應(yīng)該是head的。我認(rèn)為最根本的原因在于,head不能設(shè)為固定的,否則會引發(fā)一堆問題。第一次嘗試宣告失敗。

嘗試方案二

這次我嘗試將head跟tail兩個(gè)引用包裝成一個(gè)對象,然后對這個(gè)對象進(jìn)行CAS更新(這僅為一處理論上的嘗試,因?yàn)閺男阅苌厦鎭碇v,已經(jīng)大打折扣了,創(chuàng)建了很多額外的對象),如果head跟tail指向了同一個(gè)節(jié)點(diǎn),則認(rèn)為隊(duì)列是空的,根據(jù)此信息來判斷一個(gè)操作是不是關(guān)注的操作。但該嘗試僅停留在了理論分析階段,期間發(fā)現(xiàn)了一些小問題,沒法解決,后來我發(fā)現(xiàn),我把ConcurrentLinkedQueue原本可以分成兩步執(zhí)行的插入和取出操作(更新節(jié)點(diǎn)的next或item引用,然后再更新head或tail引用),變成了必需一步完成,ConcurrentLinkedQueue尚不能一步完成,我何德何能,可將它們一步完成?所以直接放棄了。

解決方案一

經(jīng)過兩次的失敗嘗試,我?guī)缀踅^望了,我懷疑這是不是不能判斷出是否為關(guān)注的操作。

因?yàn)槭窃谧鲰?xiàng)目,周末已經(jīng)過去了,不能再搞這些“研究”了,所以我退而求其次,想了個(gè)不太漂亮的辦法,在隊(duì)列外部維護(hù)一個(gè)變量,記錄隊(duì)列有多大,在插入或取出后,更新該變量,使用的是AtomicInteger,如果更新時(shí),將變量從1變成0,或從0變成了1,就認(rèn)為該插入或取出為關(guān)注的操作。

    private AtomicInteger size = new AtomicInteger(0);
    public E poll() {
        E re = q.poll();
        if (re == null)
            return null;
        for(int old;;){
            old = size.get();
            if(size.compareAndSet(old,old-1)){
                // 獲取元素后,隊(duì)列是空,表示是我關(guān)注的操作
                if(old == 1){

                }
                break;
            }
        }
        return re;
    }

    public void offer(E e) {
        q.offer(e);
        for(int old;;){
            old = size.get();
            if(size.compareAndSet(old,old+1)){
                // 插入元素前,隊(duì)列是空,表示是我關(guān)注的操作
                if(old == 0){

                }
                break;
            }
        }
    }

此時(shí),也許細(xì)心的朋友會問,因?yàn)闆]有使用鎖,這個(gè)變量并不能真實(shí)反映隊(duì)列的大小,也就不能確定它是不是關(guān)注的操作。沒錯(cuò),是不能真實(shí)反映,但是,我獲取關(guān)注的操作的目的是用來指導(dǎo)我:該不該注冊新的寫操作,該變量的值變化就能提供正確的指導(dǎo),所以,同樣是正確的,只不過途徑不同而已。理論上的分析和后來的項(xiàng)目正確運(yùn)行都印證了該方法的正確性。

解決方案二

因?yàn)樯厦娴姆椒~外加了一次lock-free級別的CAS操作,我心里總不太舒服,空余時(shí)間總在琢磨,真的就沒有辦法,在不增加額外lock-free級別CAS開支的情況下,知曉一個(gè)操作是不是關(guān)注的操作?

后來經(jīng)分析,如果要知曉是不是關(guān)注的操作,跟兩個(gè)數(shù)據(jù)有關(guān),真實(shí)的頭節(jié)點(diǎn)跟尾節(jié)點(diǎn)(不同于head跟tail,因?yàn)樗鼈兪菧蟮模皩⑺鼈儼b成一個(gè)對象就是犯了該錯(cuò)誤),ConcurrentLinkedQueue的實(shí)現(xiàn)中,這兩個(gè)節(jié)點(diǎn)是沒有競爭的,一個(gè)是更新item,一個(gè)是更新next,必需得讓他們競爭同一個(gè)東西,才能解決該問題,于是我想到了一個(gè)辦法,取出完成后,如果該節(jié)點(diǎn)的next為null,就將其用CAS置為一個(gè)特殊的值,若成功則認(rèn)為是關(guān)注的操作;插入成功后,如果next被替換掉的值不是null而是這個(gè)特殊值,那么該插入也為關(guān)注的操作。這僅增加了一次wait-free級別的CAS操作(取出后的那次CAS),perfect!

因?yàn)镃oncurrentLinkedQueue的很多變量、內(nèi)部類都是私有的,沒法通過繼承來改造,沒辦法,只得自行實(shí)現(xiàn)。對于隊(duì)列里使用的Node,實(shí)現(xiàn)的方式有很多種,可以使用AtomicReference、AtomicReferenceFieldUpdater來實(shí)現(xiàn),如果你愿意的話,甚至是像ConcurrentLinkedQueue一樣,用sun.misc.Unsafe來實(shí)現(xiàn)(注意:一般來說,sun包下的類是不推薦使用的),各有優(yōu)缺點(diǎn)吧,所以我就不提供該隊(duì)列的具體實(shí)現(xiàn)了,下面給出在ConcurrentLinkedQueue(版本:1.7.0_10)基礎(chǔ)上進(jìn)行的改造,供參考。注意,如果需要用到size等方法,因?yàn)樘厥庵档囊?,影響了之前的判斷邏輯,?yīng)重新編寫。

   /**
    * @author [email protected]
    */
    private static final Node MARK_NODE = new Node(null);

    public boolean offer(E e) {
        checkNotNull(e);
        final Node newNode = new Node(e);

        for (Node t = tail, p = t;;) {
            Node q = p.next;
            if (q == null || q == MARK_NODE) {//修改1:加入條件:或q == MARK_NODE
                // p is last node
                if (p.casNext(q, newNode)) {//修改2:將null改為q
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (q == MARK_NODE)//修改3:
                        ;//此時(shí)為關(guān)注的操作(為了簡化代碼顯示,僅注釋)
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode); // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list. If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable. Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

    public E poll() {
        restartFromHead:
        for (;;) {
            for (Node h = head, p = h, q;;) {
                E item = p.item;

                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    if (p.casNext(null,MARK_NODE))//修改1:
                        ;//此時(shí)為關(guān)注的操作
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }
小結(jié)

設(shè)計(jì)非阻塞算法的關(guān)鍵在于,找出競爭點(diǎn),如果獲取的某個(gè)信息跟兩個(gè)操作有關(guān),那么應(yīng)該讓這兩個(gè)操作競爭同一個(gè)東西,這樣才能反應(yīng)出它們的關(guān)系。


by trytocache via ifeve

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/64036.html

相關(guān)文章

  • 阻塞同步算法實(shí)戰(zhàn)(二):BoundlessCyclicBarrier

    摘要:如果停止了版本更新,可使用方法來解除所有因而阻塞的線程,包括指定版本號的。如果自己維護(hù)版本號,則應(yīng)該保證遞增。 前言 相比上一篇而言,本文不需要太多的準(zhǔn)備知識,但技巧性更強(qiáng)一些。因?yàn)榉治?、設(shè)計(jì)的過程比較復(fù)雜繁瑣,也限于篇幅,所以,主要展示如何解決這些需求,和講解代碼。另外,所講的內(nèi)容也是后一篇實(shí)戰(zhàn)中需要用到的一個(gè)工具類。 需求介紹 我需要編寫一個(gè)同步工具,它需要提供這樣幾個(gè)方法:...

    yintaolaowanzi 評論0 收藏0
  • 通俗易懂,JDK 并發(fā)容器總結(jié)

    摘要:線程安全的線程安全的,在讀多寫少的場合性能非常好,遠(yuǎn)遠(yuǎn)好于高效的并發(fā)隊(duì)列,使用鏈表實(shí)現(xiàn)。這樣帶來的好處是在高并發(fā)的情況下,你會需要一個(gè)全局鎖來保證整個(gè)平衡樹的線程安全。 該文已加入開源項(xiàng)目:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識的文檔類項(xiàng)目,Star 數(shù)接近 14 k)。地址:https://github.com/Snailclimb... 一 JDK ...

    curlyCheng 評論0 收藏0
  • 阻塞隊(duì)列ConcurrentLinkedQueue與CAS算法應(yīng)用分析

    摘要:是無阻塞隊(duì)列的一種實(shí)現(xiàn),依賴與算法實(shí)現(xiàn)。這樣比從往后找更有效率出隊(duì)規(guī)則定義補(bǔ)充一項(xiàng),也表示節(jié)點(diǎn)已經(jīng)被刪除參考方法。 ConcurrentLinkedQueue是無阻塞隊(duì)列的一種實(shí)現(xiàn), 依賴與CAS算法實(shí)現(xiàn)。 入隊(duì)offer if(q==null)當(dāng)前是尾節(jié)點(diǎn) -> CAS賦值tail.next = newNode, 成功就跳出循環(huán) elseif(p == q)尾節(jié)點(diǎn)被移除 -> ...

    Ali_ 評論0 收藏0
  • Java并發(fā)

    摘要:對象改變條件對象當(dāng)前線程要等待線程終止之后才能從返回。如果線程在上的操作中被中斷,通道會被關(guān)閉,線程的中斷狀態(tài)會被設(shè)置,并得到一個(gè)。清除線程的中斷狀態(tài)。非公平性鎖雖然可能造成饑餓,但極少的線程切換,保證其更大的吞吐量。 聲明:Java并發(fā)的內(nèi)容是自己閱讀《Java并發(fā)編程實(shí)戰(zhàn)》和《Java并發(fā)編程的藝術(shù)》整理來的。 showImg(https://segmentfault.com/im...

    SKYZACK 評論0 收藏0
  • java 隊(duì)列

    摘要:是基于鏈接節(jié)點(diǎn)的線程安全的隊(duì)列。通過這些高效并且線程安全的隊(duì)列類,為我們快速搭建高質(zhì)量的多線程程序帶來極大的便利。隊(duì)列內(nèi)部僅允許容納一個(gè)元素。該隊(duì)列的頭部是延遲期滿后保存時(shí)間最長的元素。 隊(duì)列簡述 Queue: 基本上,一個(gè)隊(duì)列就是一個(gè)先入先出(FIFO)的數(shù)據(jù)結(jié)構(gòu)Queue接口與List、Set同一級別,都是繼承了Collection接口。LinkedList實(shí)現(xiàn)了Deque接 口。...

    goji 評論0 收藏0

發(fā)表評論

0條評論

EscapedDog

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<