成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

BlockingQueue與Condition原理解析

TalkingData / 1277人閱讀

摘要:最后一直調(diào)用函數(shù)判斷節(jié)點是否被轉(zhuǎn)移到隊列上,也就是中等待獲取鎖的隊列。這樣的話,函數(shù)中調(diào)用函數(shù)就會返回,導(dǎo)致函數(shù)進(jìn)入最后一步重新獲取鎖的狀態(tài)。函數(shù)其實就做了一件事情,就是不斷嘗試調(diào)用函數(shù),將隊首的一個節(jié)點轉(zhuǎn)移到隊列中,直到轉(zhuǎn)移成功。

?我在前段時間寫了一篇關(guān)于AQS源碼解析的文章
AbstractQueuedSynchronizer超詳細(xì)原理解析,在文章里邊我說JUC包中的大部分多線程相關(guān)的類都和AQS相關(guān),今天我們就學(xué)習(xí)一下依賴于AQS來實現(xiàn)的阻塞隊列BlockingQueue的實現(xiàn)原理。本文中的源碼未加說明即來自于以ArrayBlockingQueue。

阻塞隊列

?相信大多數(shù)同學(xué)在學(xué)習(xí)線程池時會了解阻塞隊列的概念,熟記各種類型的阻塞隊列對線程池初始化的影響。當(dāng)從阻塞隊列獲取元素但是隊列為空時,當(dāng)前線程會阻塞直到另一個線程向阻塞隊列中添加一個元素;類似的,當(dāng)向一個阻塞隊列加入元素時,如果隊列已經(jīng)滿了,當(dāng)前線程也會阻塞直到另外一個線程從隊列中讀取一個元素。阻塞隊列一般都是先進(jìn)先出的,用來實現(xiàn)生產(chǎn)者和消費者模式。當(dāng)發(fā)生上述兩種情況時,阻塞隊列有四種不同的處理方式,這四種方式分別為拋出異常,返回特殊值(null或在是false),阻塞當(dāng)前線程直到執(zhí)行結(jié)束,最后一種是只阻塞固定時間,到時后還無法執(zhí)行成功就放棄操作。這些方法都總結(jié)在下邊這種表中了。

?我們就只分析puttake方法。

put和take函數(shù)

?我們都知道,使用同步隊列可以很輕松的實現(xiàn)生產(chǎn)者-消費者模式,其實,同步隊列就是按照生產(chǎn)者-消費者的模式來實現(xiàn)的,我們可以將put函數(shù)看作生產(chǎn)者的操作,take是消費者的操作。

?我們首先看一下ArrayListBlock的構(gòu)造函數(shù)。它初始化了puttake函數(shù)中使用到的關(guān)鍵成員變量,分別是ReentrantLockCondition。

public ArrayBlockingQueue(int capacity, boolean fair) {
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

?ReentrantLock是AQS的子類,其newCondition函數(shù)返回的Condition接口實例是定義在AQS類內(nèi)部的ConditionObject實現(xiàn)類。它可以直接調(diào)用AQS相關(guān)的函數(shù)。

?put函數(shù)會在隊列末尾添加元素,如果隊列已經(jīng)滿了,無法添加元素的話,就一直阻塞等待到可以加入為止。函數(shù)的源碼如下所示。

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); //先獲得鎖
    try {
        while (count == items.length) 
        //如果隊列滿了,就NotFull這個Condition對象上進(jìn)行等待
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    //這里可以注意的是ArrayBlockingList實際上使用Array實現(xiàn)了一個環(huán)形數(shù)組,
   //當(dāng)putIndex達(dá)到最大時,就返回到起點,繼續(xù)插入,
   //當(dāng)然,如果此時0位置的元素還沒有被取走,
   //下次put時,就會因為cout == item.length未被阻塞。
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    //因為插入了元素,通知等待notEmpty事件的線程。
    notEmpty.signal();
} 

?我們會發(fā)現(xiàn)put函數(shù)使用了wait/notify的機制。與一般生產(chǎn)者-消費者的實現(xiàn)方式不同,同步隊列使用ReentrantLockCondition相結(jié)合的先獲得鎖,再等待的機制;而不是SynchronizedObject.wait的機制。這里的區(qū)別我們下一節(jié)再詳細(xì)講解。
?看完了生產(chǎn)者相關(guān)的put函數(shù),我們再來看一下消費者調(diào)用的take函數(shù)。take函數(shù)在隊列為空時會被阻塞,一直到阻塞隊列加入了新的元素。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
        //如果隊列為空,那么在notEmpty對象上等待,
        //當(dāng)put函數(shù)調(diào)用時,會調(diào)用notEmpty的notify進(jìn)行通知。
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}
private E dequeue() {
    E x = (E) items[takeIndex];
    items[takeIndex] = null; //取出takeIndex位置的元素
    if (++takeIndex == items.length)
        //如果到了尾部,將指針重新調(diào)整到頭部
        takeIndex = 0;
    count--;
    ....
    //通知notFull對象上等待的線程
    notFull.signal();
    return x;
}
await操作

?我們發(fā)現(xiàn)ArrayBlockingList并沒有使用Object.wait,而是使用的Condition.await,這是為什么呢?其中又有哪些原因呢?
?Condition對象可以提供和Objectwaitnotify一樣的行為,但是后者必須先獲取synchronized這個內(nèi)置的monitor鎖,才能調(diào)用;而Condition則必須先獲取ReentrantLock。這兩種方式在阻塞等待時都會將相應(yīng)的鎖釋放掉,但是Condition的等待可以中斷,這是二者唯一的區(qū)別。

?我們先來看一下Conditionwait函數(shù),wait函數(shù)的流程大致如下圖所示。

?wait函數(shù)主要有三個步驟。一是調(diào)用addConditionWaiter 函數(shù),在condition wait queue隊列中添加一個節(jié)點,代表當(dāng)前線程在等待一個消息。然后調(diào)用fullyRelease函數(shù),將持有的鎖釋放掉,調(diào)用的是AQS的函數(shù),不清楚的同學(xué)可以查看本篇開頭的介紹的文章。最后一直調(diào)用isOnSyncQueue函數(shù)判斷節(jié)點是否被轉(zhuǎn)移到sync queue隊列上,也就是AQS中等待獲取鎖的隊列。如果沒有,則進(jìn)入阻塞狀態(tài),如果已經(jīng)在隊列上,則調(diào)用acquireQueued函數(shù)重新獲取鎖。

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //在condition wait隊列上添加新的節(jié)點
    Node node = addConditionWaiter();
    //釋放當(dāng)前持有的鎖
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //由于node在之前是添加到condition wait queue上的,現(xiàn)在判斷這個node
    //是否被添加到Sync的獲得鎖的等待隊列上,Sync就是AQS的子類
    //node在condition queue上說明還在等待事件的notify,
    //notify函數(shù)會將condition queue 上的node轉(zhuǎn)化到Sync的隊列上。
    while (!isOnSyncQueue(node)) {
        //node還沒有被添加到Sync Queue上,說明還在等待事件通知
        //所以調(diào)用park函數(shù)來停止線程執(zhí)行
        LockSupport.park(this);
        //判斷是否被中斷,線程從park函數(shù)返回有兩種情況,一種是
        //其他線程調(diào)用了unpark,另外一種是線程被中斷
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //代碼執(zhí)行到這里,已經(jīng)有其他線程調(diào)用notify函數(shù),或則被中斷,該線程可以繼續(xù)執(zhí)行,但是必須先
    //再次獲得調(diào)用await函數(shù)時的鎖.a(chǎn)cquireQueued函數(shù)在AQS文章中做了介紹.
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
   ....
}

final int fullyRelease(Node node) {
    //AQS的方法,當(dāng)前已經(jīng)在鎖中了,所以直接操作
    boolean failed = true;
    try {
        int savedState = getState();
        //獲取state當(dāng)前的值,然后保存,以待以后恢復(fù)
        // release函數(shù)是AQS的函數(shù),不清楚的同學(xué)請看開頭介紹的文章。 
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

private int checkInterruptWhileWaiting(Node node) {
    //中斷可能發(fā)生在兩個階段中,一是在等待signa時,另外一個是在獲得signal之后
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

final boolean transferAfterCancelledWait(Node node) {
    //這里要和下邊的transferForSignal對應(yīng)著看,這是線程中斷進(jìn)入的邏輯.那邊是signal的邏輯
    //兩邊可能有并發(fā)沖突,但是成功的一方必須調(diào)用enq來進(jìn)入acquire lock queue中.
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    //如果失敗了,說明transferForSignal那邊成功了,等待node 進(jìn)入acquire lock queue
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}
signal操作

?signal函數(shù)將condition wait queue隊列中隊首的線程節(jié)點轉(zhuǎn)移等待獲取鎖的sync queue隊列中。這樣的話,wait函數(shù)中調(diào)用isOnSyncQueue函數(shù)就會返回true,導(dǎo)致wait函數(shù)進(jìn)入最后一步重新獲取鎖的狀態(tài)。

?我們這里來詳細(xì)解析一下condition wait queuesync queue兩個隊列的設(shè)計原理。condition wait queue是等待消息的隊列,因為阻塞隊列為空而進(jìn)入阻塞狀態(tài)的take函數(shù)操作就是在等待阻塞隊列不為空的消息。而sync queue隊列則是等待獲取鎖的隊列,take函數(shù)獲得了消息,就可以運行了,但是它還必須等待獲取鎖之后才能真正進(jìn)行運行狀態(tài)。

?signal函數(shù)的示意圖如下所示。

?signal函數(shù)其實就做了一件事情,就是不斷嘗試調(diào)用transferForSignal 函數(shù),將condition wait queue隊首的一個節(jié)點轉(zhuǎn)移到sync queue隊列中,直到轉(zhuǎn)移成功。因為一次轉(zhuǎn)移成功,就代表這個消息被成功通知到了等待消息的節(jié)點。

public final void signal() {
    if (!isHeldExclusively())
    //如果當(dāng)前線程沒有獲得鎖,拋出異常
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        //將Condition wait queue中的第一個node轉(zhuǎn)移到acquire lock queue中.
        doSignal(first);
}

private void doSignal(Node first) {
    do {
   //由于生產(chǎn)者的signal在有消費者等待的情況下,必須要通知
        //一個消費者,所以這里有一個循環(huán),直到隊列為空
        //把first 這個node從condition queue中刪除掉
        //condition queue的頭指針指向node的后繼節(jié)點,如果node后續(xù)節(jié)點為null,那么也將尾指針也置為null
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
     //transferForSignal將node轉(zhuǎn)而添加到Sync的acquire lock 隊列
}

final boolean transferForSignal(Node node) {
    //如果設(shè)置失敗,說明該node已經(jīng)被取消了,所以返回false,讓doSignal繼續(xù)向下通知其他未被取消的node
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    //將node添加到acquire lock queue中.
    Node p = enq(node);
    int ws = p.waitStatus;
    //需要注意的是這里的node進(jìn)行了轉(zhuǎn)化
    //ws>0代表canceled的含義所以直接unpark線程
    //如果compareAndSetWaitStatus失敗,所以直接unpark,讓線程繼續(xù)執(zhí)行await中的
    //進(jìn)行isOnSyncQueue判斷的while循環(huán),然后進(jìn)入acquireQueue函數(shù).
    //這里失敗的原因可能是Lock其他線程釋放掉了鎖,同步設(shè)置p的waitStatus
    //如果compareAndSetWaitStatus成功了呢?那么該node就一直在acquire lock queue中
    //等待鎖被釋放掉再次搶奪鎖,然后再unpark
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
后記

?后邊一篇文章主要講解如何自己使用AQS來創(chuàng)建符合自己業(yè)務(wù)需求的鎖,請大家繼續(xù)關(guān)注我的文章啦.一起進(jìn)步偶。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/73031.html

相關(guān)文章

  • Java 線程通信 線程組 線程異常處理機制

    摘要:線程通信傳統(tǒng)的線程通信方法概述方法導(dǎo)致當(dāng)前線程等待,直到其他線程調(diào)用該同步監(jiān)視器的方法或方法來喚醒該線程。運行結(jié)果如下線程組和未處理的異常表示線程組,可以對一批線程進(jìn)行分類管理。對線程組的控制相當(dāng)于同時控制這批線程。 線程通信 傳統(tǒng)的線程通信 方法概述: wait方法:導(dǎo)致當(dāng)前線程等待,直到其他線程調(diào)用該同步監(jiān)視器的notify()方法或notifyAll()方法來喚醒該線程。 w...

    ivydom 評論0 收藏0
  • 解讀 Java 并發(fā)隊列 BlockingQueue

    摘要:如果隊列已滿,這個時候?qū)懖僮鞯木€程進(jìn)入到寫線程隊列排隊,等待讀線程將隊列元素移除騰出空間,然后喚醒寫線程隊列的第一個等待線程。數(shù)據(jù)必須從某個寫線程交給某個讀線程,而不是寫到某個隊列中等待被消費。 前言 本文直接參考 Doug Lea 寫的 Java doc 和注釋,這也是我們在學(xué)習(xí) java 并發(fā)包時最好的材料了。希望大家能有所思、有所悟,學(xué)習(xí) Doug Lea 的代碼風(fēng)格,并將其優(yōu)雅...

    maochunguang 評論0 收藏0
  • Java精講:生產(chǎn)者-消費者

    摘要:創(chuàng)建一個阻塞隊列生產(chǎn)者生產(chǎn),目前總共有消費者消費,目前總共有原文鏈接更多教程 原文鏈接 更多教程 本文概要 生產(chǎn)者和消費者問題是線程模型中老生常談的問題,也是面試中經(jīng)常遇到的問題。光在Java中的實現(xiàn)方式多達(dá)數(shù)十種,更不用說加上其他語言的實現(xiàn)方式了。那么我們該如何學(xué)習(xí)呢? 本文會通過精講wait()和notify()方法實現(xiàn)生產(chǎn)者-消費者模型,來學(xué)習(xí)生產(chǎn)者和消費者問題的原理。 目的...

    VPointer 評論0 收藏0
  • Java SDK 并發(fā)包全面總結(jié)

    摘要:一和并發(fā)包中的和主要解決的是線程的互斥和同步問題,這兩者的配合使用,相當(dāng)于的使用。寫鎖與讀鎖之間互斥,一個線程在寫時,不允許讀操作。的注意事項不支持重入,即不可反復(fù)獲取同一把鎖。沒有返回值,也就是說無法獲取執(zhí)行結(jié)果。 一、Lock 和 Condition Java 并發(fā)包中的 Lock 和 Condition 主要解決的是線程的互斥和同步問題,這兩者的配合使用,相當(dāng)于 synchron...

    luckyyulin 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<