成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

解讀 Java 并發(fā)隊列 BlockingQueue

maochunguang / 396人閱讀

摘要:如果隊列已滿,這個時候?qū)懖僮鞯木€程進入到寫線程隊列排隊,等待讀線程將隊列元素移除騰出空間,然后喚醒寫線程隊列的第一個等待線程。數(shù)據(jù)必須從某個寫線程交給某個讀線程,而不是寫到某個隊列中等待被消費。

前言

本文直接參考 Doug Lea 寫的 Java doc 和注釋,這也是我們在學習 java 并發(fā)包時最好的材料了。希望大家能有所思、有所悟,學習 Doug Lea 的代碼風格,并將其優(yōu)雅、嚴謹?shù)淖黠L應(yīng)用到我們寫的每一行代碼中。

BlockingQueue

首先,最基本的來說, BlockingQueue 是一個先進先出的隊列(Queue),為什么說是阻塞(Blocking)的呢?是因為 BlockingQueue 支持當獲取隊列元素但是隊列為空時,會阻塞等待隊列中有元素再返回;也支持添加元素時,如果隊列已滿,那么等到隊列可以放入新元素時再放入。

BlockingQueue 是一個接口,繼承自 Queue,所以其實現(xiàn)類也可以作為 Queue 的實現(xiàn)來使用,而 Queue 又繼承自 Collection 接口。

BlockingQueue 對插入操作、移除操作、獲取元素操作提供了四種不同的方法用于不同的場景中使用:1、拋出異常;2、返回特殊值(null 或 true/false,取決于具體的操作);3、阻塞等待此操作,直到這個操作成功;4、阻塞等待此操作,直到成功或者超時指定時間。

對于 BlockingQueue,我們的關(guān)注點應(yīng)該在 put(e) 和 take() 這兩個方法,因為這兩個方法是帶阻塞的。
BlockingQueue 不接受 null 值的插入,相應(yīng)的方法在碰到 null 的插入時會拋出 NullPointerException 異常。null 值在這里通常用于作為特殊值返回(表格中的第三列),代表 poll 失敗。所以,如果允許插入 null 值的話,那獲取的時候,就不能很好地用 null 來判斷到底是代表失敗,還是獲取的值就是 null 值。

一個 BlockingQueue 可能是有界的,如果在插入的時候,發(fā)現(xiàn)隊列滿了,那么 put 操作將會阻塞。通常,在這里我們說的無界隊列也不是說真正的無界,而是它的容量是 Integer.MAX_VALUE(21億多)。

BlockingQueue 是設(shè)計用來實現(xiàn)生產(chǎn)者-消費者隊列的,當然,你也可以將它當做普通的 Collection 來用,前面說了,它實現(xiàn)了 java.util.Collection 接口。例如,我們可以用 remove(x) 來刪除任意一個元素,但是,這類操作通常并不高效,所以盡量只在少數(shù)的場合使用,比如一條消息已經(jīng)入隊,但是需要做取消操作的時候。

BlockingQueue 的實現(xiàn)都是線程安全的,但是批量的集合操作如 addAll, containsAll, retainAll 和 removeAll 不一定是原子操作。如 addAll(c) 有可能在添加了一些元素后中途拋出異常,此時 BlockingQueue 中已經(jīng)添加了部分元素,這個是允許的,取決于具體的實現(xiàn)。

BlockingQueue 不支持 close 或 shutdown 等關(guān)閉操作,因為開發(fā)者可能希望不會有新的元素添加進去,此特性取決于具體的實現(xiàn),不做強制約束。

最后,BlockingQueue 在生產(chǎn)者-消費者的場景中,是支持多消費者和多生產(chǎn)者的,說的其實就是線程安全問題。

相信上面說的每一句都很清楚了,BlockingQueue 是一個比較簡單的線程安全容器,下面我會分析其具體的在 JDK 中的實現(xiàn),這里又到了 Doug Lea 表演時間了。

BlockingQueue 實現(xiàn)之 ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 接口的有界隊列實現(xiàn)類,底層采用數(shù)組來實現(xiàn)。

其并發(fā)控制采用可重入鎖來控制,不管是插入操作還是讀取操作,都需要獲取到鎖才能進行操作。
ArrayBlockingQueue 共有以下幾個屬性:

// 用于存放元素的數(shù)組
final Object[] items;
// 下一次讀取操作的位置
int takeIndex;
// 下一次寫入操作的位置
int putIndex;
// 隊列中的元素數(shù)量
int count;

// 以下幾個就是控制并發(fā)用的同步器
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

ArrayBlockingQueue 實現(xiàn)并發(fā)同步的原理就是,讀操作和寫操作都需要獲取到 AQS 獨占鎖才能進行操作。如果隊列為空,這個時候讀操作的線程進入到讀線程隊列排隊,等待寫線程寫入新的元素,然后喚醒讀線程隊列的第一個等待線程。如果隊列已滿,這個時候?qū)懖僮鞯木€程進入到寫線程隊列排隊,等待讀線程將隊列元素移除騰出空間,然后喚醒寫線程隊列的第一個等待線程。

對于 ArrayBlockingQueue,我們可以在構(gòu)造的時候指定以下三個參數(shù):

1.隊列容量,其限制了隊列中最多允許的元素個數(shù);
2.指定獨占鎖是公平鎖還是非公平鎖。非公平鎖的吞吐量比較高,公平鎖可以保證每次都是等待最久的線程獲取到鎖;
3.可以指定用一個集合來初始化,將此集合中的元素在構(gòu)造方法期間就先添加到隊列中。

BlockingQueue 實現(xiàn)之 LinkedBlockingQueue

底層基于單向鏈表實現(xiàn)的阻塞隊列,可以當做無界隊列也可以當做有界隊列來使用??礃?gòu)造方法:

// 傳說中的無界隊列
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
// 傳說中的有界隊列
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node(null);
}

我們看看這個類有哪些屬性:

// 隊列容量
private final int capacity;

// 隊列中的元素數(shù)量
private final AtomicInteger count = new AtomicInteger(0);

// 隊頭
private transient Node head;

// 隊尾
private transient Node last;

// take, poll, peek 等讀操作的方法需要獲取到這個鎖
private final ReentrantLock takeLock = new ReentrantLock();

// 如果讀操作的時候隊列是空的,那么等待 notEmpty 條件
private final Condition notEmpty = takeLock.newCondition();

// put, offer 等寫操作的方法需要獲取到這個鎖
private final ReentrantLock putLock = new ReentrantLock();

// 如果寫操作的時候隊列是滿的,那么等待 notFull 條件
private final Condition notFull = putLock.newCondition();

這里用了兩個鎖,兩個 Condition,簡單介紹如下:

takeLock 和 notEmpty 怎么搭配:如果要獲?。╰ake)一個元素,需要獲取 takeLock 鎖,但是獲取了鎖還不夠,如果隊列此時為空,還需要隊列不為空(notEmpty)這個條件(Condition)。

putLock 需要和 notFull 搭配:如果要插入(put)一個元素,需要獲取 putLock 鎖,但是獲取了鎖還不夠,如果隊列此時已滿,還需要隊列不是滿的(notFull)這個條件(Condition)。

首先,這里用一個示意圖來看看 LinkedBlockingQueue 的并發(fā)讀寫控制,然后再開始分析源碼:

看懂這個示意圖,源碼也就簡單了,讀操作是排好隊的,寫操作也是排好隊的,唯一的并發(fā)問題在于一個寫操作和一個讀操作同時進行,只要控制好這個就可以了。

先上構(gòu)造方法:

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node(null);
}

注意,這里會初始化一個空的頭結(jié)點,那么第一個元素入隊的時候,隊列中就會有兩個元素。讀取元素時,也總是獲取頭節(jié)點后面的一個節(jié)點。count 的計數(shù)值不包括這個頭節(jié)點。

我們來看下 put 方法是怎么將元素插入到隊尾的:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // 如果你糾結(jié)這里為什么是 -1,可以看看 offer 方法。這就是個標識成功、失敗的標志而已。
    int c = -1;
    Node node = new Node(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 必須要獲取到 putLock 才可以進行插入操作
    putLock.lockInterruptibly();
    try {
        // 如果隊列滿,等待 notFull 的條件滿足。
        while (count.get() == capacity) {
            notFull.await();
        }
        // 入隊
        enqueue(node);
        // count 原子加 1,c 還是加 1 前的值
        c = count.getAndIncrement();
        // 如果這個元素入隊后,還有至少一個槽可以使用,調(diào)用 notFull.signal() 喚醒等待線程。
        // 哪些線程會等待在 notFull 這個 Condition 上呢?
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        // 入隊后,釋放掉 putLock
        putLock.unlock();
    }
    // 如果 c == 0,那么代表隊列在這個元素入隊前是空的(不包括head空節(jié)點),
    // 那么所有的讀線程都在等待 notEmpty 這個條件,等待喚醒,這里做一次喚醒操作
    if (c == 0)
        signalNotEmpty();
}

// 入隊的代碼非常簡單,就是將 last 屬性指向這個新元素,并且讓原隊尾的 next 指向這個元素
// 這里入隊沒有并發(fā)問題,因為只有獲取到 putLock 獨占鎖以后,才可以進行此操作
private void enqueue(Node node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

// 元素入隊后,如果需要,調(diào)用這個方法喚醒讀線程來讀
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

我們再看看 take 方法:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 首先,需要獲取到 takeLock 才能進行出隊操作
    takeLock.lockInterruptibly();
    try {
        // 如果隊列為空,等待 notEmpty 這個條件滿足再繼續(xù)執(zhí)行
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 出隊
        x = dequeue();
        // count 進行原子減 1
        c = count.getAndDecrement();
        // 如果這次出隊后,隊列中至少還有一個元素,那么調(diào)用 notEmpty.signal() 喚醒其他的讀線程
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 出隊后釋放掉 takeLock
        takeLock.unlock();
    }
    // 如果 c == capacity,那么說明在這個 take 方法發(fā)生的時候,隊列是滿的
    // 既然出隊了一個,那么意味著隊列不滿了,喚醒寫線程去寫
    if (c == capacity)
        signalNotFull();
    return x;
}
// 取隊頭,出隊
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    // 之前說了,頭結(jié)點是空的
    Node h = head;
    Node first = h.next;
    h.next = h; // help GC
    // 設(shè)置這個為新的頭結(jié)點
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
// 元素出隊后,如果需要,調(diào)用這個方法喚醒寫線程來寫
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}
BlockingQueue 實現(xiàn)之 SynchronousQueue

它是一個特殊的隊列,它的名字其實就蘊含了它的特征 - - 同步的隊列。為什么說是同步的呢?這里說的并不是多線程的并發(fā)問題,而是因為當一個線程往隊列中寫入一個元素時,寫入操作不會立即返回,需要等待另一個線程來將這個元素拿走;同理,當一個讀線程做讀操作的時候,同樣需要一個相匹配的寫線程的寫操作。這里的 Synchronous 指的就是讀線程和寫線程需要同步,一個讀線程匹配一個寫線程。

我們比較少使用到 SynchronousQueue 這個類,不過它在線程池的實現(xiàn)類 ScheduledThreadPoolExecutor 中得到了應(yīng)用,感興趣的讀者可以在看完這個后去看看相應(yīng)的使用。

雖然上面我說了隊列,但是 SynchronousQueue 的隊列其實是虛的,其不提供任何空間(一個都沒有)來存儲元素。數(shù)據(jù)必須從某個寫線程交給某個讀線程,而不是寫到某個隊列中等待被消費。

你不能在 SynchronousQueue 中使用 peek 方法(在這里這個方法直接返回 null),peek 方法的語義是只讀取不移除,顯然,這個方法的語義是不符合 SynchronousQueue 的特征的。SynchronousQueue 也不能被迭代,因為根本就沒有元素可以拿來迭代的。雖然 SynchronousQueue 間接地實現(xiàn)了 Collection 接口,但是如果你將其當做 Collection 來用的話,那么集合是空的。當然,這個類也是不允許傳遞 null 值的(并發(fā)包中的容器類好像都不支持插入 null 值,因為 null 值往往用作其他用途,比如用于方法的返回值代表操作失敗)。

接下來,我們來看看具體的源碼實現(xiàn)吧,它的源碼不是很簡單的那種,我們需要先搞清楚它的設(shè)計思想。

源碼加注釋大概有 1200 行,我們先看大框架:

// 構(gòu)造時,我們可以指定公平模式還是非公平模式,區(qū)別之后再說
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue() : new TransferStack();
}
abstract static class Transferer {
    // 從方法名上大概就知道,這個方法用于轉(zhuǎn)移元素,從生產(chǎn)者手上轉(zhuǎn)到消費者手上
    // 也可以被動地,消費者調(diào)用這個方法來從生產(chǎn)者手上取元素
    // 第一個參數(shù) e 如果不是 null,代表場景為:將元素從生產(chǎn)者轉(zhuǎn)移給消費者
    // 如果是 null,代表消費者等待生產(chǎn)者提供元素,然后返回值就是相應(yīng)的生產(chǎn)者提供的元素
    // 第二個參數(shù)代表是否設(shè)置超時,如果設(shè)置超時,超時時間是第三個參數(shù)的值
    // 返回值如果是 null,代表超時,或者中斷。具體是哪個,可以通過檢測中斷狀態(tài)得到。
    abstract Object transfer(Object e, boolean timed, long nanos);
}

Transferer 有兩個內(nèi)部實現(xiàn)類,是因為構(gòu)造 SynchronousQueue 的時候,我們可以指定公平策略。公平模式意味著,所有的讀寫線程都遵守先來后到,F(xiàn)IFO 嘛,對應(yīng) TransferQueue。而非公平模式則對應(yīng) TransferStack。


我們先采用公平模式分析源碼,然后再說說公平模式和非公平模式的區(qū)別。

接下來,我們看看 put 方法和 take 方法:

// 寫入值
public void put(E o) throws InterruptedException {
    if (o == null) throw new NullPointerException();
    if (transferer.transfer(o, false, 0) == null) { // 1
        Thread.interrupted();
        throw new InterruptedException();
    }
}
// 讀取值并移除
public E take() throws InterruptedException {
    Object e = transferer.transfer(null, false, 0); // 2
    if (e != null)
        return (E)e;
    Thread.interrupted();
    throw new InterruptedException();
}

我們看到,寫操作 put(E o) 和讀操作 take() 都是調(diào)用 Transferer.transfer(…) 方法,區(qū)別在于第一個參數(shù)是否為 null 值。

我們來看看 transfer 的設(shè)計思路,其基本算法如下:

當調(diào)用這個方法時,如果隊列是空的,或者隊列中的節(jié)點和當前的線程操作類型一致(如當前操作是 put 操作,而隊列中的元素也都是寫線程)。這種情況下,將當前線程加入到等待隊列即可。
如果隊列中有等待節(jié)點,而且與當前操作可以匹配(如隊列中都是讀操作線程,當前線程是寫操作線程,反之亦然)。這種情況下,匹配等待隊列的隊頭,出隊,返回相應(yīng)數(shù)據(jù)。
其實這里有個隱含的條件被滿足了,隊列如果不為空,肯定都是同種類型的節(jié)點,要么都是讀操作,要么都是寫操作。這個就要看到底是讀線程積壓了,還是寫線程積壓了。

我們可以假設(shè)出一個男女配對的場景:一個男的過來,如果一個人都沒有,那么他需要等待;如果發(fā)現(xiàn)有一堆男的在等待,那么他需要排到隊列后面;如果發(fā)現(xiàn)是一堆女的在排隊,那么他直接牽走隊頭的那個女的。

既然這里說到了等待隊列,我們先看看其實現(xiàn),也就是 QNode:

static final class QNode {
    volatile QNode next;          // 可以看出來,等待隊列是單向鏈表
    volatile Object item;         // CAS"ed to or from null
    volatile Thread waiter;       // 將線程對象保存在這里,用于掛起和喚醒
    final boolean isData;         // 用于判斷是寫線程節(jié)點(isData == true),還是讀線程節(jié)點

    QNode(Object item, boolean isData) {
        this.item = item;
        this.isData = isData;
    }
  ......

相信說了這么多以后,我們再來看 transfer 方法的代碼就輕松多了。

/**
 * Puts or takes an item.
 */
Object transfer(Object e, boolean timed, long nanos) {

    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);

    for (;;) {
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin

        // 隊列空,或隊列中節(jié)點類型和當前節(jié)點一致,
        // 即我們說的第一種情況,將節(jié)點入隊即可。讀者要想著這塊 if 里面方法其實就是入隊
        if (h == t || t.isData == isData) { // empty or same-mode
            QNode tn = t.next;
            // t != tail 說明剛剛有節(jié)點入隊,continue 即可
            if (t != tail)                  // inconsistent read
                continue;
            // 有其他節(jié)點入隊,但是 tail 還是指向原來的,此時設(shè)置 tail 即可
            if (tn != null) {               // lagging tail
                // 這個方法就是:如果 tail 此時為 t 的話,設(shè)置為 tn
                advanceTail(t, tn);
                continue;
            }
            // 
            if (timed && nanos <= 0)        // can"t wait
                return null;
            if (s == null)
                s = new QNode(e, isData);
            // 將當前節(jié)點,插入到 tail 的后面
            if (!t.casNext(null, s))        // failed to link in
                continue;

            // 將當前節(jié)點設(shè)置為新的 tail
            advanceTail(t, s);              // swing tail and wait
            // 看到這里,請讀者先往下滑到這個方法,看完了以后再回來這里,思路也就不會斷了
            Object x = awaitFulfill(s, e, timed, nanos);
            // 到這里,說明之前入隊的線程被喚醒了,準備往下執(zhí)行
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }

            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? x : e;

        // 這里的 else 分支就是上面說的第二種情況,有相應(yīng)的讀或?qū)懴嗥ヅ涞那闆r
        } else {                            // complementary-mode
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }

            advanceHead(h, m);              // successfully fulfilled
            LockSupport.unpark(m.waiter);
            return (x != null) ? x : e;
        }
    }
}

void advanceTail(QNode t, QNode nt) {
    if (tail == t)
        UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
// 自旋或阻塞,直到滿足條件,這個方法返回
Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {

    long lastTime = timed ? System.nanoTime() : 0;
    Thread w = Thread.currentThread();
    // 判斷需要自旋的次數(shù),
    int spins = ((head.next == s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 如果被中斷了,那么取消這個節(jié)點
        if (w.isInterrupted())
            // 就是將當前節(jié)點 s 中的 item 屬性設(shè)置為 this
            s.tryCancel(e);
        Object x = s.item;
        // 這里是這個方法的唯一的出口
        if (x != e)
            return x;
        // 如果需要,檢測是否超時
        if (timed) {
            long now = System.nanoTime();
            nanos -= now - lastTime;
            lastTime = now;
            if (nanos <= 0) {
                s.tryCancel(e);
                continue;
            }
        }
        if (spins > 0)
            --spins;
        // 如果自旋達到了最大的次數(shù),那么檢測
        else if (s.waiter == null)
            s.waiter = w;
        // 如果自旋到了最大的次數(shù),那么線程掛起,等待喚醒
        else if (!timed)
            LockSupport.park(this);
        // spinForTimeoutThreshold 這個之前講 AQS 的時候其實也說過,剩余時間小于這個閾值的時候,就
        // 不要進行掛起了,自旋的性能會比較好
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

Doug Lea 的巧妙之處在于,將各個代碼湊在了一起,使得代碼非常簡潔,當然也同時增加了我們的閱讀負擔,看代碼的時候,還是得仔細想想各種可能的情況。

下面,再說說前面說的公平模式和非公平模式的區(qū)別。

相信大家心里面已經(jīng)有了公平模式的工作流程的概念了,我就簡單說說 TransferStack 的算法,就不分析源碼了。

1.當調(diào)用這個方法時,如果隊列是空的,或者隊列中的節(jié)點和當前的線程操作類型一致(如當前操作是 put 操作,而棧中的元素也都是寫線程)。這種情況下,將當前線程加入到等待棧中,等待配對。然后返回相應(yīng)的元素,或者如果被取消了的話,返回 null。
2.如果棧中有等待節(jié)點,而且與當前操作可以匹配(如棧里面都是讀操作線程,當前線程是寫操作線程,反之亦然)。將當前節(jié)點壓入棧頂,和棧中的節(jié)點進行匹配,然后將這兩個節(jié)點出棧。配對和出棧的動作其實也不是必須的,因為下面的一條會執(zhí)行同樣的事情。
3.如果棧頂是進行匹配而入棧的節(jié)點,幫助其進行匹配并出棧,然后再繼續(xù)操作。

應(yīng)該說,TransferStack 的源碼要比 TransferQueue 的復(fù)雜一些,如果讀者感興趣,請自行進行源碼閱讀。

BlockingQueue 實現(xiàn)之 PriorityBlockingQueue

帶排序的 BlockingQueue 實現(xiàn),其并發(fā)控制采用的是 ReentrantLock,隊列為無界隊列(ArrayBlockingQueue 是有界隊列,LinkedBlockingQueue 也可以通過在構(gòu)造函數(shù)中傳入 capacity 指定隊列最大的容量,但是 PriorityBlockingQueue 只能指定初始的隊列大小,后面插入元素的時候,如果空間不夠的話會自動擴容)。

簡單地說,它就是 PriorityQueue 的線程安全版本。不可以插入 null 值,同時,插入隊列的對象必須是可比較大小的(comparable),否則報 ClassCastException 異常。它的插入操作 put 方法不會 block,因為它是無界隊列(take 方法在隊列為空的時候會阻塞)。

它的源碼相對比較簡單,本節(jié)將介紹其核心源碼部分。

我們來看看它有哪些屬性:

// 構(gòu)造方法中,如果不指定大小的話,默認大小為 11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 數(shù)組的最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

// 這個就是存放數(shù)據(jù)的數(shù)組
private transient Object[] queue;

// 隊列當前大小
private transient int size;

// 大小比較器,如果按照自然序排序,那么此屬性可設(shè)置為 null
private transient Comparator comparator;

// 并發(fā)控制所用的鎖,所有的 public 且涉及到線程安全的方法,都必須先獲取到這個鎖
private final ReentrantLock lock;

// 這個很好理解,其實例由上面的 lock 屬性創(chuàng)建
private final Condition notEmpty;

// 這個也是用于鎖,用于數(shù)組擴容的時候,需要先獲取到這個鎖,才能進行擴容操作
// 其使用 CAS 操作
private transient volatile int allocationSpinLock;

// 用于序列化和反序列化的時候用,對于 PriorityBlockingQueue 我們應(yīng)該比較少使用到序列化
private PriorityQueue q;

此類實現(xiàn)了 Collection 和 Iterator 接口中的所有接口方法,對其對象進行迭代并遍歷時,不能保證有序性。如果你想要實現(xiàn)有序遍歷,建議采用 Arrays.sort(queue.toArray()) 進行處理。PriorityBlockingQueue 提供了 drainTo 方法用于將部分或全部元素有序地填充(準確說是轉(zhuǎn)移,會刪除原隊列中的元素)到另一個集合中。還有一個需要說明的是,如果兩個對象的優(yōu)先級相同(compare 方法返回 0),此隊列并不保證它們之間的順序。

PriorityBlockingQueue 使用了基于數(shù)組的二叉堆來存放元素,所有的 public 方法采用同一個 lock 進行并發(fā)控制。

二叉堆:一顆完全二叉樹,它非常適合用數(shù)組進行存儲,對于數(shù)組中的元素 a[i],其左子節(jié)點為 a[2i+1],其右子節(jié)點為 a[2i + 2],其父節(jié)點為 a[(i-1)/2],其堆序性質(zhì)為,每個節(jié)點的值都小于其左右子節(jié)點的值。二叉堆中最小的值就是根節(jié)點,但是刪除根節(jié)點是比較麻煩的,因為需要調(diào)整樹。

簡單用個圖解釋一下二叉堆,我就不說太多專業(yè)的嚴謹?shù)男g(shù)語了,這種數(shù)據(jù)結(jié)構(gòu)的優(yōu)點是一目了然的,最小的元素一定是根元素,它是一棵滿的樹,除了最后一層,最后一層的節(jié)點從左到右緊密排列。

下面開始 PriorityBlockingQueue 的源碼分析,首先我們來看看構(gòu)造方法:

// 默認構(gòu)造方法,采用默認值(11)來進行初始化
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
// 指定數(shù)組的初始大小
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}
// 指定比較器
public PriorityBlockingQueue(int initialCapacity,
                             Comparator comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}
// 在構(gòu)造方法中就先填充指定的集合中的元素
public PriorityBlockingQueue(Collection c) {
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    // 
    boolean heapify = true; // true if not known to be in heap order
    boolean screen = true;  // true if must screen for nulls
    if (c instanceof SortedSet) {
        SortedSet ss = (SortedSet) c;
        this.comparator = (Comparator) ss.comparator();
        heapify = false;
    }
    else if (c instanceof PriorityBlockingQueue) {
        PriorityBlockingQueue pq =
            (PriorityBlockingQueue) c;
        this.comparator = (Comparator) pq.comparator();
        screen = false;
        if (pq.getClass() == PriorityBlockingQueue.class) // exact match
            heapify = false;
    }
    Object[] a = c.toArray();
    int n = a.length;
    // If c.toArray incorrectly doesn"t return Object[], copy it.
    if (a.getClass() != Object[].class)
        a = Arrays.copyOf(a, n, Object[].class);
    if (screen && (n == 1 || this.comparator != null)) {
        for (int i = 0; i < n; ++i)
            if (a[i] == null)
                throw new NullPointerException();
    }
    this.queue = a;
    this.size = n;
    if (heapify)
        heapify();
}

接下來,我們來看看其內(nèi)部的自動擴容實現(xiàn):

private void tryGrow(Object[] array, int oldCap) {
    // 這邊做了釋放鎖的操作
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    // 用 CAS 操作將 allocationSpinLock 由 0 變?yōu)?1,也算是獲取鎖
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            // 如果節(jié)點個數(shù)小于 64,那么增加的 oldCap + 2 的容量
            // 如果節(jié)點數(shù)大于等于 64,那么增加 oldCap 的一半
            // 所以節(jié)點數(shù)較小時,增長得快一些
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) :
                                   (oldCap >> 1));
            // 這里有可能溢出
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            // 如果 queue != array,那么說明有其他線程給 queue 分配了其他的空間
            if (newCap > oldCap && queue == array)
                // 分配一個新的大數(shù)組
                newArray = new Object[newCap];
        } finally {
            // 重置,也就是釋放鎖
            allocationSpinLock = 0;
        }
    }
    // 如果有其他的線程也在做擴容的操作
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    // 重新獲取鎖
    lock.lock();
    // 將原來數(shù)組中的元素復(fù)制到新分配的大數(shù)組中
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

擴容方法對并發(fā)的控制也非常的巧妙,釋放了原來的獨占鎖 lock,這樣的話,擴容操作和讀操作可以同時進行,提高吞吐量。

下面,我們來分析下寫操作 put 方法和讀操作 take 方法。

public void put(E e) {
    // 直接調(diào)用 offer 方法,因為前面我們也說了,在這里,put 方法不會阻塞
    offer(e); 
}
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    // 首先獲取到獨占鎖
    lock.lock();
    int n, cap;
    Object[] array;
    // 如果當前隊列中的元素個數(shù) >= 數(shù)組的大小,那么需要擴容了
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator cmp = comparator;
        // 節(jié)點添加到二叉堆中
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        // 更新 size
        size = n + 1;
        // 喚醒等待的讀線程
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

對于二叉堆而言,插入一個節(jié)點是簡單的,插入的節(jié)點如果比父節(jié)點小,交換它們,然后繼續(xù)和父節(jié)點比較。

// 這個方法就是將數(shù)據(jù) x 插入到數(shù)組 array 的位置 k 處,然后再調(diào)整樹
private static  void siftUpComparable(int k, T x, Object[] array) {
    Comparable key = (Comparable) x;
    while (k > 0) {
        // 二叉堆中 a[k] 節(jié)點的父節(jié)點位置
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

我們用圖來示意一下,我們接下來要將 11 插入到隊列中,看看 siftUp 是怎么操作的。

我們再看看 take 方法:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 獨占鎖
    lock.lockInterruptibly();
    E result;
    try {
        // dequeue 出隊
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

private E dequeue() {
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        // 隊頭,用于返回
        E result = (E) array[0];
        // 隊尾元素先取出
        E x = (E) array[n];
        // 隊尾置空
        array[n] = null;
        Comparator cmp = comparator;
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

dequeue 方法返回隊頭,并調(diào)整二叉堆的樹,調(diào)用這個方法必須先獲取獨占鎖。

廢話不多說,出隊是非常簡單的,因為隊頭就是最小的元素,對應(yīng)的是數(shù)組的第一個元素。難點是隊頭出隊后,需要調(diào)整樹。

private static  void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable key = (Comparable)x;
        // 這里得到的 half 肯定是非葉節(jié)點
        // a[n] 是最后一個元素,其父節(jié)點是 a[(n-1)/2]。所以 n >>> 1 代表的節(jié)點肯定不是葉子節(jié)點
        // 下面,我們結(jié)合圖來一行行分析,這樣比較直觀簡單
        // 此時 k 為 0, x 為 17,n 為 9
        int half = n >>> 1; // 得到 half = 4
        while (k < half) {
            // 先取左子節(jié)點
            int child = (k << 1) + 1; // 得到 child = 1
            Object c = array[child];  // c = 12
            int right = child + 1;  // right = 2
            // 如果右子節(jié)點存在,而且比左子節(jié)點小
            // 此時 array[right] = 20,所以條件不滿足
            if (right < n &&
                ((Comparable) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            // key = 17, c = 12,所以條件不滿足
            if (key.compareTo((T) c) <= 0)
                break;
            // 把 12 填充到根節(jié)點
            array[k] = c;
            // k 賦值后為 1
            k = child;
            // 一輪過后,我們發(fā)現(xiàn),12 左邊的子樹和剛剛的差不多,都是缺少根節(jié)點,接下來處理就簡單了
        }
        array[k] = key;
    }
}


記住二叉堆是一棵完全二叉樹,那么根節(jié)點 10 拿掉后,最后面的元素 17 必須找到合適的地方放置。首先,17 和 10 不能直接交換,那么先將根節(jié)點 10 的左右子節(jié)點中較小的節(jié)點往上滑,即 12 往上滑,然后原來 12 留下了一個空節(jié)點,然后再把這個空節(jié)點的較小的子節(jié)點往上滑,即 13 往上滑,最后,留出了位子,17 補上即可。

我稍微調(diào)整下這個樹,以便讀者能更明白:

總結(jié)

我知道本文過長,相信一字不漏看完的讀者肯定是少數(shù)。

ArrayBlockingQueue 底層是數(shù)組,有界隊列,如果我們要使用生產(chǎn)者-消費者模式,這是非常好的選擇。

LinkedBlockingQueue 底層是鏈表,可以當做無界和有界隊列來使用,所以大家不要以為它就是無界隊列。

SynchronousQueue 本身不帶有空間來存儲任何元素,使用上可以選擇公平模式和非公平模式。

PriorityBlockingQueue 是無界隊列,基于數(shù)組,數(shù)據(jù)結(jié)構(gòu)為二叉堆,數(shù)組第一個也是樹的根節(jié)點總是最小值。

(全文完)

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/72338.html

相關(guān)文章

  • 解讀線程池

    摘要:為了讓大家理解線程池的整個設(shè)計方案,我會按照的設(shè)計思路來多說一些相關(guān)的東西。也是因為線程池的需要,所以才有了這個接口。 線程池是非常重要的工具,如果你要成為一個好的工程師,還是得比較好地掌握這個知識。即使你為了謀生,也要知道,這基本上是面試必問的題目,而且面試官很容易從被面試者的回答中捕捉到被面試者的技術(shù)水平。 本文略長,建議在 pc 上閱讀,邊看文章邊翻源碼(Java7 和 Java...

    imccl 評論0 收藏0
  • 通俗易懂,JDK 并發(fā)容器總結(jié)

    摘要:線程安全的線程安全的,在讀多寫少的場合性能非常好,遠遠好于高效的并發(fā)隊列,使用鏈表實現(xiàn)。這樣帶來的好處是在高并發(fā)的情況下,你會需要一個全局鎖來保證整個平衡樹的線程安全。 該文已加入開源項目:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識的文檔類項目,Star 數(shù)接近 14 k)。地址:https://github.com/Snailclimb... 一 JDK ...

    curlyCheng 評論0 收藏0
  • 譯:Java中生產(chǎn)者與消費者問題的演變

    摘要:生產(chǎn)者消費者問題是一個典型的多進程同步問題。生產(chǎn)者線程開始產(chǎn)生新的元素并將它們存儲在緩沖區(qū)。否則,生產(chǎn)者線程將會在緩沖區(qū)創(chuàng)建一個新元素然后通知消費者。我們建立一個線程池,它將收到兩個任務(wù),生產(chǎn)者和消費者的任務(wù)。 原文鏈接:https://dzone.com/articles/th... 作者:Ioan Tinca 譯者:liumapp 想要了解更多關(guān)于Java生產(chǎn)者消費者問題的演變嗎?...

    王偉廷 評論0 收藏0
  • java 隊列

    摘要:是基于鏈接節(jié)點的線程安全的隊列。通過這些高效并且線程安全的隊列類,為我們快速搭建高質(zhì)量的多線程程序帶來極大的便利。隊列內(nèi)部僅允許容納一個元素。該隊列的頭部是延遲期滿后保存時間最長的元素。 隊列簡述 Queue: 基本上,一個隊列就是一個先入先出(FIFO)的數(shù)據(jù)結(jié)構(gòu)Queue接口與List、Set同一級別,都是繼承了Collection接口。LinkedList實現(xiàn)了Deque接 口。...

    goji 評論0 收藏0
  • BlockingQueue學習

    摘要:引言在包中,很好的解決了在多線程中,如何高效安全傳輸數(shù)據(jù)的問題。同時,也用于自帶線程池的緩沖隊列中,了解也有助于理解線程池的工作模型。 引言 在java.util.Concurrent包中,BlockingQueue很好的解決了在多線程中,如何高效安全傳輸數(shù)據(jù)的問題。通過這些高效并且線程安全的隊列類,為我們快速搭建高質(zhì)量的多線程程序帶來極大的便利。同時,BlockingQueue也用于...

    xuhong 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<