成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

AbstractQueuedSynchronizer理解之四(Condition)

leiyi / 2975人閱讀

摘要:總結(jié)在一開是的例子中,假設(shè)有兩個(gè)線程,分別代表生產(chǎn)者和消費(fèi)者線程,生產(chǎn)消費(fèi)元素的隊(duì)列容量為。

什么是Condition

Condition必須要和獨(dú)占鎖一起使用,獨(dú)占鎖代替了原來的synchronized,Condition代替了原來的Object中的監(jiān)視器方法(wait, notify and notifyAll);一個(gè)Lock可以對(duì)應(yīng)多個(gè)Condition,這樣線程之間可以按照條件喚醒指定的線程,而不是簡(jiǎn)單的notifyAll多有的線程,使得我們多線程編程的時(shí)候可以靈活的控制線程。

獨(dú)占鎖和Condition最經(jīng)典的配合使用就是ArrayBlockingQueue.java,典型的生產(chǎn)者消費(fèi)者問題:

/*
 * Concurrency control uses the classic two-condition algorithm
 * found in any textbook.
 */

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

這是在許多教科書中能找到的經(jīng)典的雙Condition算法的并發(fā)控制,需要有一個(gè)獨(dú)占鎖ReentrantLock,然后再定義兩個(gè)Condition,notEmpty(隊(duì)列不是空的)表示可以從隊(duì)列中消費(fèi)元素的信號(hào)條件,notFull(隊(duì)列不是滿的)表示可以向隊(duì)列生產(chǎn)元素的信號(hào)條件。這兩個(gè)Condition都是調(diào)用了lock.newCondition()方法實(shí)例化的。

當(dāng)消費(fèi)者線程調(diào)用消費(fèi)方法take時(shí):

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //當(dāng)隊(duì)列的元素?cái)?shù)量為0時(shí),調(diào)用notEmpty.await,阻塞當(dāng)前的消費(fèi)線程
        while (count == 0)
            notEmpty.await();
        //dequeue中調(diào)用了notFull.signal(),通知生產(chǎn)者隊(duì)列還沒滿,可以生產(chǎn)
        return dequeue();
    } finally {
        lock.unlock();
    }
}

當(dāng)生產(chǎn)者線程調(diào)用生產(chǎn)方法put時(shí):

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //當(dāng)隊(duì)列滿時(shí),調(diào)用notFull.await(),阻塞當(dāng)前生產(chǎn)線程,停止生產(chǎn)
        while (count == items.length)
            notFull.await();
        //enqueue中調(diào)用了notEmpty.signal(),通知消費(fèi)者隊(duì)列里有元素,可以消費(fèi)
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
Condition的await

在AQS中有一個(gè)ConditionObject內(nèi)部類實(shí)現(xiàn)了Condition接口,其中有兩個(gè)成員變量:

    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;

Condition也有一個(gè)node隊(duì)列,firstWaiter、lastWaiter分別表示第一個(gè)和最后一個(gè)node。

先看await方法:

    public final void await() throws InterruptedException {
        //如果線程設(shè)置中斷標(biāo)志,拋出中斷異常
        if (Thread.interrupted())
            throw new InterruptedException();
        //往隊(duì)列添加node
        Node node = addConditionWaiter();
        //完全釋放鎖,head的后繼節(jié)點(diǎn)將被喚醒,然后被移出sync隊(duì)列
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        //判斷當(dāng)前節(jié)點(diǎn)是否在sync隊(duì)列中(當(dāng)condition調(diào)用signal是會(huì)將該節(jié)點(diǎn)放入Sync隊(duì)列),如果不在就park當(dāng)前線程,線程在這里開始等待被signal
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            //發(fā)送中斷時(shí)(喚醒了線程)break;checkInterruptWhileWaiting中調(diào)用了transferAfterCancelledWait(貼在下面),這個(gè)方法時(shí)檢測(cè)中斷是發(fā)生在signal之前還是之后
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //當(dāng)前線程被signal后,調(diào)用acquireQueued搶占鎖,如果interruptMode不為拋出異常,設(shè)置為REINTERRUPT
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            //從頭到尾移除取消的節(jié)點(diǎn)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            //繼續(xù)中斷還是拋出異常
            reportInterruptAfterWait(interruptMode);
    }
        
final boolean transferAfterCancelledWait(Node node) {
    //首先CAS設(shè)置node狀態(tài)為0,如果成功說明中斷發(fā)生在signal之前(因?yàn)閟ignal會(huì)將node狀態(tài)設(shè)置為0)
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        //將node入sync隊(duì)列
        enq(node);
        return true;
    }
    /*
     * If we lost out to a signal(), then we can"t proceed
     * until it finishes its enq().  Cancelling during an
     * incomplete transfer is both rare and transient, so just
     * spin.
     */
    //如果node不在sync隊(duì)列中,yield,讓出cpu
    while (!isOnSyncQueue(node))
        Thread.yield();
    //中斷發(fā)生在signal后
    return false;
}

分析一下addConditionWaiter:

    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        //如果最后一個(gè)node被取消,清除node
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        //新建一個(gè)node,持有當(dāng)前線程,狀態(tài)為CONDITION
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            //如果尾節(jié)點(diǎn)為null,說明condition隊(duì)列還是空的,將新建的node作為頭節(jié)點(diǎn)
            firstWaiter = node;
        else
            //如果condition隊(duì)列已經(jīng)存在,將新建的node作為尾節(jié)點(diǎn)的next
            t.nextWaiter = node;
        //將新建node設(shè)置為尾節(jié)點(diǎn)
        lastWaiter = node;
        //返回新建的node
        return node;
    }

在這里我們可以看到Condition的隊(duì)列是一個(gè)單鏈表。
看一下unlinkCancelledWaiters,Condition所有操作都是在獲取鎖之后執(zhí)行的,所以不用考慮線程安全問題:

    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        Node trail = null;
        while (t != null) {
            Node next = t.nextWaiter;
            if (t.waitStatus != Node.CONDITION) {
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            }
            else
                trail = t;
            t = next;
        }
    }

該方法從隊(duì)列頭開始往后遍歷所有node,移除已經(jīng)取消的node;

在新建了node后,調(diào)用了fullyRelease:

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        //保存當(dāng)前的state
        int savedState = getState();
        //release(savedState)嘗試釋放鎖,這也是為什么叫fullyRelease
        if (release(savedState)) {
            failed = false;
            //返回之前保存的state
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            //如果失敗,將當(dāng)前node設(shè)置為取消狀態(tài)
            node.waitStatus = Node.CANCELLED;
    }
}

看一下release:

public final boolean release(int arg) {
    //嘗試釋放鎖,這里調(diào)用的是ReentrantLock實(shí)現(xiàn)的tryRelease,傳入的arg是當(dāng)前的state,所以會(huì)釋放成功,即state為0
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            //喚醒后繼節(jié)點(diǎn)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

下面的方法是判斷當(dāng)前節(jié)點(diǎn)是否在Sync隊(duì)列中

final boolean isOnSyncQueue(Node node) {
    //如果當(dāng)前節(jié)點(diǎn)狀態(tài)為CONDITION或者節(jié)點(diǎn)前驅(qū)為null,說明該節(jié)點(diǎn)已經(jīng)在CONDITION隊(duì)列中,不在Syc隊(duì)列里
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    //如果節(jié)點(diǎn)后繼不是null,那該節(jié)點(diǎn)一定在Syc隊(duì)列中
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     * node.prev can be non-null, but not yet on queue because
     * the CAS to place it on queue can fail. So we have to
     * traverse from tail to make sure it actually made it.  It
     * will always be near the tail in calls to this method, and
     * unless the CAS failed (which is unlikely), it will be
     * there, so we hardly ever traverse much.
     */
    //此時(shí)節(jié)點(diǎn)入列的CAS動(dòng)作可能失敗,所以要從尾部往前查找該節(jié)點(diǎn)再次確認(rèn)
    return findNodeFromTail(node);
}

Condition的signal
    public final void signal() {
        //如果當(dāng)前線程不是當(dāng)前的獨(dú)占線程,拋出異常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            //signal Condition隊(duì)列的第一個(gè)節(jié)點(diǎn)
            doSignal(first);
    }
    
    private void doSignal(Node first) {
        //如果transferForSignal失?。串?dāng)前節(jié)點(diǎn)取消)且下一個(gè)節(jié)點(diǎn)存在,while繼續(xù)loop
        do {
            //設(shè)置第一個(gè)節(jié)點(diǎn)的next為firstWaiter,此時(shí)如果firstWaiter為null,說明隊(duì)列空了,將lastWaiter也設(shè)置為null
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            //設(shè)置第一個(gè)節(jié)點(diǎn)next為null,help GC
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    
    final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
     //如果為node設(shè)置狀態(tài)失敗,說明node被取消,返回false
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    //將當(dāng)前node入列sync隊(duì)列,返回node的前繼
    Node p = enq(node);
    int ws = p.waitStatus;
    //如果前繼的狀態(tài)為取消或者設(shè)置前繼狀態(tài)為SIGNAL失敗,當(dāng)前node線程unpark
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

signal后,Condition第一個(gè)節(jié)點(diǎn)將入列sync的隊(duì)列,等待搶占到鎖繼續(xù)執(zhí)行。

總結(jié)

在一開是的例子中,假設(shè)有兩個(gè)線程P,C分別代表生產(chǎn)者和消費(fèi)者線程,生產(chǎn)消費(fèi)元素E的隊(duì)列Q容量為1。

C無(wú)限loop調(diào)用take,當(dāng)C搶占到獨(dú)占鎖,發(fā)現(xiàn)Q時(shí)空的,調(diào)用notEmpty.await(),線程C釋放鎖并且入列notEmpty隊(duì)列park,等待別的線程調(diào)用notEmpty.signal();

P無(wú)限loop調(diào)用put,當(dāng)P搶占到獨(dú)占鎖生產(chǎn)了一個(gè)E,調(diào)用notEmpty.signal()通知C,然后釋放了鎖;

C收到signal信號(hào),入列SYC隊(duì)列,并且unpark,嘗試搶占獨(dú)占鎖,成功獲得獨(dú)占鎖后,消費(fèi)了一個(gè)E,然后調(diào)用notFull.signal();

P生產(chǎn)E時(shí)發(fā)現(xiàn)Q已滿(C還沒來得及消費(fèi)),調(diào)用notFull.await()線程P釋放鎖并且入列notFull隊(duì)列park,等待notFull.signal()通知自己unpark并入列AQS隊(duì)列去搶占獨(dú)占鎖進(jìn)行生產(chǎn);

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/76309.html

相關(guān)文章

  • AbstractQueuedSynchronizer理解之四Condition

    摘要:總結(jié)在一開是的例子中,假設(shè)有兩個(gè)線程,分別代表生產(chǎn)者和消費(fèi)者線程,生產(chǎn)消費(fèi)元素的隊(duì)列容量為。 什么是Condition Condition必須要和獨(dú)占鎖一起使用,獨(dú)占鎖代替了原來的synchronized,Condition代替了原來的Object中的監(jiān)視器方法(wait, notify and notifyAll);一個(gè)Lock可以對(duì)應(yīng)多個(gè)Condition,這樣線程之間可以按照條件...

    RiverLi 評(píng)論0 收藏0
  • AbstractQueuedSynchronizer 原理分析 - Condition 實(shí)現(xiàn)原理

    摘要:實(shí)現(xiàn)原理是通過基于單鏈表的條件隊(duì)列來管理等待線程的。中斷在轉(zhuǎn)移到同步隊(duì)列期間或之后發(fā)生,此時(shí)表明有線程正在調(diào)用轉(zhuǎn)移節(jié)點(diǎn)。在該種中斷模式下,再次設(shè)置線程的中斷狀態(tài)。 1. 簡(jiǎn)介 Condition是一個(gè)接口,AbstractQueuedSynchronizer 中的ConditionObject內(nèi)部類實(shí)現(xiàn)了這個(gè)接口。Condition聲明了一組等待/通知的方法,這些方法的功能與Objec...

    leone 評(píng)論0 收藏0
  • AbstractQueuedSynchronizer 原理分析 - Condition 實(shí)現(xiàn)原理

    摘要:實(shí)現(xiàn)原理是通過基于單鏈表的條件隊(duì)列來管理等待線程的。中斷在轉(zhuǎn)移到同步隊(duì)列期間或之后發(fā)生,此時(shí)表明有線程正在調(diào)用轉(zhuǎn)移節(jié)點(diǎn)。在該種中斷模式下,再次設(shè)置線程的中斷狀態(tài)。 1. 簡(jiǎn)介 Condition是一個(gè)接口,AbstractQueuedSynchronizer 中的ConditionObject內(nèi)部類實(shí)現(xiàn)了這個(gè)接口。Condition聲明了一組等待/通知的方法,這些方法的功能與Objec...

    李世贊 評(píng)論0 收藏0
  • AbstractQueuedSynchronizer 原理分析 - Condition 實(shí)現(xiàn)原理

    摘要:實(shí)現(xiàn)原理是通過基于單鏈表的條件隊(duì)列來管理等待線程的。中斷在轉(zhuǎn)移到同步隊(duì)列期間或之后發(fā)生,此時(shí)表明有線程正在調(diào)用轉(zhuǎn)移節(jié)點(diǎn)。在該種中斷模式下,再次設(shè)置線程的中斷狀態(tài)。 1. 簡(jiǎn)介 Condition是一個(gè)接口,AbstractQueuedSynchronizer 中的ConditionObject內(nèi)部類實(shí)現(xiàn)了這個(gè)接口。Condition聲明了一組等待/通知的方法,這些方法的功能與Objec...

    bigdevil_s 評(píng)論0 收藏0
  • AbstractQueuedSynchronizer的介紹和原理分析

    摘要:同步器擁有三個(gè)成員變量隊(duì)列的頭結(jié)點(diǎn)隊(duì)列的尾節(jié)點(diǎn)和狀態(tài)。對(duì)于同步器維護(hù)的狀態(tài),多個(gè)線程對(duì)其的獲取將會(huì)產(chǎn)生一個(gè)鏈?zhǔn)降慕Y(jié)構(gòu)。使用將當(dāng)前線程,關(guān)于后續(xù)會(huì)詳細(xì)介紹。 簡(jiǎn)介提供了一個(gè)基于FIFO隊(duì)列,可以用于構(gòu)建鎖或者其他相關(guān)同步裝置的基礎(chǔ)框架。該同步器(以下簡(jiǎn)稱同步器)利用了一個(gè)int來表示狀態(tài),期望它能夠成為實(shí)現(xiàn)大部分同步需求的基礎(chǔ)。使用的方法是繼承,子類通過繼承同步器并需要實(shí)現(xiàn)它的方法來管理...

    Yuanf 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

leiyi

|高級(jí)講師

TA的文章

閱讀更多
最新活動(dòng)
閱讀需要支付1元查看
<