成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Java多線程進(jìn)階(三五)—— J.U.C之collections框架:SynchronousQue

missonce / 1458人閱讀

摘要:三總結(jié)主要用于線程之間的數(shù)據(jù)交換,由于采用無(wú)鎖算法,其性能一般比單純的其它阻塞隊(duì)列要高。它的最大特點(diǎn)時(shí)不存儲(chǔ)實(shí)際元素,而是在內(nèi)部通過(guò)?;蜿?duì)列結(jié)構(gòu)保存阻塞線程。

本文首發(fā)于一世流云專欄:https://segmentfault.com/blog...
一、SynchronousQueue簡(jiǎn)介

SynchronousQueue是JDK1.5時(shí),隨著J.U.C包一起引入的一種阻塞隊(duì)列,它實(shí)現(xiàn)了BlockingQueue接口,底層基于隊(duì)列實(shí)現(xiàn):

沒(méi)有看錯(cuò),SynchronousQueue的底層實(shí)現(xiàn)包含兩種數(shù)據(jù)結(jié)構(gòu)——隊(duì)列。這是一種非常特殊的阻塞隊(duì)列,它的特點(diǎn)簡(jiǎn)要概括如下:

入隊(duì)線程和出隊(duì)線程必須一一匹配,否則任意先到達(dá)的線程會(huì)阻塞。比如ThreadA進(jìn)行入隊(duì)操作,在有其它線程執(zhí)行出隊(duì)操作之前,ThreadA會(huì)一直等待,反之亦然;

SynchronousQueue內(nèi)部不保存任何元素,也就是說(shuō)它的容量為0,數(shù)據(jù)直接在配對(duì)的生產(chǎn)者和消費(fèi)者線程之間傳遞,不會(huì)將數(shù)據(jù)緩沖到隊(duì)列中。

SynchronousQueue支持公平/非公平策略。其中非公平模式,基于內(nèi)部數(shù)據(jù)結(jié)構(gòu)——“?!眮?lái)實(shí)現(xiàn),公平模式,基于內(nèi)部數(shù)據(jù)結(jié)構(gòu)——“隊(duì)列”來(lái)實(shí)現(xiàn);

SynchronousQueue基于一種名為“Dual stack and Dual queue”的無(wú)鎖算法實(shí)現(xiàn)。

注意:上述的特點(diǎn)1,和我們之前介紹的Exchanger其實(shí)非常相似,可以類比Exchanger的功能來(lái)理解。
二、SynchronousQueue原理 構(gòu)造

之前提到,SynchronousQueue根據(jù)公平/非公平訪問(wèn)策略的不同,內(nèi)部使用了兩種不同的數(shù)據(jù)結(jié)構(gòu):棧和隊(duì)列。我們先來(lái)看下對(duì)象的構(gòu)造,SynchronousQueue只有2種構(gòu)造器:

/**
 * 默認(rèn)構(gòu)造器.
 * 默認(rèn)使用非公平策略.
 */
public SynchronousQueue() {
    this(false);
}
/**
 * 指定策略的構(gòu)造器.
 */
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue() : new TransferStack();
}

可以看到,對(duì)于公平策略,內(nèi)部構(gòu)造了一個(gè)TransferQueue對(duì)象,而非公平策略則是構(gòu)造了TransferStack對(duì)象。這兩個(gè)類都繼承了內(nèi)部類Transferer,SynchronousQueue中的所有方法,其實(shí)都是委托調(diào)用了TransferQueue/TransferStack的方法:

public class SynchronousQueue extends AbstractQueue
        implements BlockingQueue, java.io.Serializable {
?
    /**
     * tranferer對(duì)象, 構(gòu)造時(shí)根據(jù)策略類型確定.
     */
    private transient volatile Transferer transferer;
?
    /**
     * Shared internal API for dual stacks and queues.
     */
    abstract static class Transferer {
        /**
         * Performs a put or take.
         *
         * @param e 非null表示 生產(chǎn)者 -> 消費(fèi)者;
         *          null表示, 消費(fèi)者 -> 生產(chǎn)者.
         * @return 非null表示傳遞的數(shù)據(jù); null表示傳遞失?。ǔ瑫r(shí)或中斷).
         */
        abstract E transfer(E e, boolean timed, long nanos);
    }
?
    /**
     * Dual stack(雙棧結(jié)構(gòu)).
     * 非公平策略時(shí)使用.
     */
    static final class TransferStack extends Transferer {
        // ...
    }
?
    /**
     * Dual Queue(雙端隊(duì)列).
     * 公平策略時(shí)使用.
     */
    static final class TransferQueue extends Transferer {
        // ...
    }
?
    // ...
}
棧結(jié)構(gòu)

非公平策略由TransferStack類實(shí)現(xiàn),既然TransferStack是棧,那就有結(jié)點(diǎn)。TransferStack內(nèi)部定義了名為SNode的結(jié)點(diǎn):

static final class SNode {
    volatile SNode next;
    volatile SNode match;       // 與當(dāng)前結(jié)點(diǎn)配對(duì)的結(jié)點(diǎn)
    volatile Thread waiter;     // 當(dāng)前結(jié)點(diǎn)對(duì)應(yīng)的線程
    Object item;                // 實(shí)際數(shù)據(jù)或null
    int mode;                   // 結(jié)點(diǎn)類型
?
    SNode(Object item) {
        this.item = item;
    }
??
    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long matchOffset;
    private static final long nextOffset;
?
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class k = SNode.class;
            matchOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("match"));
            nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

    // ...

}

上述SNode結(jié)點(diǎn)的定義中有個(gè)mode字段,表示結(jié)點(diǎn)的類型。TransferStack一共定義了三種結(jié)點(diǎn)類型,任何線程對(duì)TransferStack的操作都會(huì)創(chuàng)建下述三種類型的某種結(jié)點(diǎn):

REQUEST:表示未配對(duì)的消費(fèi)者(當(dāng)線程進(jìn)行出隊(duì)操作時(shí),會(huì)創(chuàng)建一個(gè)mode值為REQUEST的SNode結(jié)點(diǎn) )

DATA:表示未配對(duì)的生產(chǎn)者(當(dāng)線程進(jìn)行入隊(duì)操作時(shí),會(huì)創(chuàng)建一個(gè)mode值為DATA的SNode結(jié)點(diǎn) )

FULFILLING:表示配對(duì)成功的消費(fèi)者/生產(chǎn)者

static final class TransferStack extends Transferer {
?
    /**
     * 未配對(duì)的消費(fèi)者
     */
    static final int REQUEST = 0;
    /**
     * 未配對(duì)的生產(chǎn)者
     */
    static final int DATA = 1;
    /**
     * 配對(duì)成功的消費(fèi)者/生產(chǎn)者
     */
    static final int FULFILLING = 2;
?
     volatile SNode head;
?
    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
?
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class k = TransferStack.class;
            headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
?
    // ...
}
核心操作——put/take

SynchronousQueue的入隊(duì)操作調(diào)用了put方法:

/**
 * 入隊(duì)指定元素e.
 * 如果沒(méi)有另一個(gè)線程進(jìn)行出隊(duì)操作, 則阻塞該入隊(duì)線程.
 */
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

SynchronousQueue的出隊(duì)操作調(diào)用了take方法:

/**
 * 出隊(duì)一個(gè)元素.
 * 如果沒(méi)有另一個(gè)線程進(jìn)行出隊(duì)操作, 則阻塞該入隊(duì)線程.
 */
public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

可以看到,SynchronousQueue一樣不支持null元素,實(shí)際的入隊(duì)/出隊(duì)操作都是委托給了transfer方法,該方法返回null表示出/入隊(duì)失?。ㄍǔJ蔷€程被中斷或超時(shí)):

/**
 * 入隊(duì)/出隊(duì)一個(gè)元素.
 */
E transfer(E e, boolean timed, long nanos) {
    SNode s = null; // s表示新創(chuàng)建的結(jié)點(diǎn)
    // 入?yún)==null, 說(shuō)明當(dāng)前是出隊(duì)線程(消費(fèi)者), 否則是入隊(duì)線程(生產(chǎn)者)
    // 入隊(duì)線程創(chuàng)建一個(gè)DATA結(jié)點(diǎn), 出隊(duì)線程創(chuàng)建一個(gè)REQUEST結(jié)點(diǎn)
    int mode = (e == null) ? REQUEST : DATA;

    for (; ; ) {    // 自旋
        SNode h = head;
        if (h == null || h.mode == mode) {          // CASE1: 棧為空 或 棧頂結(jié)點(diǎn)類型與當(dāng)前mode相同
            if (timed && nanos <= 0) {              // case1.1: 限時(shí)等待的情況
                if (h != null && h.isCancelled())
                    casHead(h, h.next);
                else
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) {  // case1.2 將當(dāng)前結(jié)點(diǎn)壓入棧
                SNode m = awaitFulfill(s, timed, nanos);        // 阻塞當(dāng)前調(diào)用線程
                if (m == s) {                                   // 阻塞過(guò)程中被中斷
                    clean(s);
                    return null;
                }

                // 此時(shí)m為配對(duì)結(jié)點(diǎn)
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);

                // 入隊(duì)線程null, 出隊(duì)線程返回配對(duì)結(jié)點(diǎn)的值
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
            // 執(zhí)行到此處說(shuō)明入棧失敗(多個(gè)線程同時(shí)入棧導(dǎo)致CAS操作head失敗),則進(jìn)入下一次自旋繼續(xù)執(zhí)行

        } else if (!isFulfilling(h.mode)) {          // CASE2: 棧頂結(jié)點(diǎn)還未配對(duì)成功
            if (h.isCancelled())                     // case2.1: 元素取消情況(因中斷或超時(shí))的處理
                casHead(h, h.next);
            else if (casHead(h, s = snode(s, e,
                h, FULFILLING | mode))) {      // case2.2: 將當(dāng)前結(jié)點(diǎn)壓入棧中
                for (; ; ) {
                    SNode m = s.next;       // s.next指向原棧頂結(jié)點(diǎn)(也就是與當(dāng)前結(jié)點(diǎn)匹配的結(jié)點(diǎn))
                    if (m == null) {        // m==null說(shuō)明被其它線程搶先匹配了, 則跳出循環(huán), 重新下一次自旋
                        casHead(s, null);
                        s = null;
                        break;
                    }

                    SNode mn = m.next;
                    if (m.tryMatch(s)) {    // 進(jìn)行結(jié)點(diǎn)匹配
                        casHead(s, mn);     // 匹配成功, 將匹配的兩個(gè)結(jié)點(diǎn)全部彈出棧
                        return (E) ((mode == REQUEST) ? m.item : s.item);   // 返回匹配值
                    } else                  // 匹配失敗
                        s.casNext(m, mn);   // 移除原待匹配結(jié)點(diǎn)
                }
            }
        } else {                            // CASE3: 其它線程正在匹配
            SNode m = h.next;
            if (m == null)                  // 棧頂?shù)膎ext==null, 則直接彈出, 重新進(jìn)入下一次自旋
                casHead(h, null);
            else {                          // 嘗試和其它線程競(jìng)爭(zhēng)匹配
                SNode mn = m.next;
                if (m.tryMatch(h))
                    casHead(h, mn);         // 匹配成功
                else
                    h.casNext(m, mn);       // 匹配失?。ū黄渌€程搶先匹配成功了)
            }
        }
    }
}

整個(gè)transfer方法考慮了限時(shí)等待的情況,且入隊(duì)/出隊(duì)其實(shí)都是調(diào)用了同一個(gè)方法,其主干邏輯就是在一個(gè)自旋中完成以下三種情況之一的操作,直到成功,或者被中斷或超時(shí)取消:

棧為空,或棧頂結(jié)點(diǎn)類型與當(dāng)前入隊(duì)結(jié)點(diǎn)相同。這種情況,調(diào)用線程會(huì)阻塞;

棧頂結(jié)點(diǎn)還未配對(duì)成功,且與當(dāng)前入隊(duì)結(jié)點(diǎn)可以配對(duì)。這種情況,直接進(jìn)行配對(duì)操作;

棧頂結(jié)點(diǎn)正在配對(duì)中。這種情況,直接進(jìn)行下一個(gè)結(jié)點(diǎn)的配對(duì)。

出/入隊(duì)示例講解

為了便于理解,我們來(lái)看下面這個(gè)調(diào)用示例(假設(shè)不考慮限時(shí)等待的情況),假設(shè)一共有三個(gè)線程ThreadA、ThreadB、ThreadC:

①初始棧結(jié)構(gòu)

初始棧為空,head為棧頂指針,始終指向棧頂結(jié)點(diǎn):

②ThreadA(生產(chǎn)者)執(zhí)行入隊(duì)操作

由于此時(shí)棧為空,所以ThreadA會(huì)進(jìn)入CASE1,創(chuàng)建一個(gè)類型為DATA的結(jié)點(diǎn):

if (h == null || h.mode == mode) {          // CASE1: 棧為空 或 棧頂結(jié)點(diǎn)類型與當(dāng)前mode相同
    if (timed && nanos <= 0) {              // case1.1: 限時(shí)等待的情況
        if (h != null && h.isCancelled())
            casHead(h, h.next);
        else
            return null;
    } else if (casHead(h, s = snode(s, e, h, mode))) {  // case1.2 將當(dāng)前結(jié)點(diǎn)壓入棧
        SNode m = awaitFulfill(s, timed, nanos);        // 阻塞當(dāng)前調(diào)用線程
        if (m == s) {                                   // 阻塞過(guò)程中被中斷
            clean(s);
            return null;
        }

        // 此時(shí)m為配對(duì)結(jié)點(diǎn)
        if ((h = head) != null && h.next == s)
            casHead(h, s.next);

        // 入隊(duì)線程null, 出隊(duì)線程返回配對(duì)結(jié)點(diǎn)的值
        return (E) ((mode == REQUEST) ? m.item : s.item);
    }
    // 執(zhí)行到此處說(shuō)明入棧失敗(多個(gè)線程同時(shí)入棧導(dǎo)致CAS操作head失敗),則進(jìn)入下一次自旋繼續(xù)執(zhí)行
}

CASE1分支中,將結(jié)點(diǎn)壓入棧后,會(huì)調(diào)用awaitFulfill方法,該方法會(huì)阻塞調(diào)用線程:

/**
 * 阻塞當(dāng)前調(diào)用線程, 并將線程信息記錄在s.waiter字段上.
 *
 * @param s 等待的結(jié)點(diǎn)
 * @return 返回配對(duì)的結(jié)點(diǎn) 或 當(dāng)前結(jié)點(diǎn)(說(shuō)明線程被中斷了)
 */
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();

    // 性能優(yōu)化操作(計(jì)算自旋次數(shù))
    int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (; ; ) {
        if (w.isInterrupted())
            s.tryCancel();
        /**
         * s.match保存當(dāng)前結(jié)點(diǎn)的匹配結(jié)點(diǎn).
         * s.match==null說(shuō)明還沒(méi)有匹配結(jié)點(diǎn)
         * s.match==s說(shuō)明當(dāng)前結(jié)點(diǎn)s對(duì)應(yīng)的線程被中斷了
         */
        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        if (spins > 0)
            spins = shouldSpin(s) ? (spins - 1) : 0;
        else if (s.waiter == null)  // 還沒(méi)有匹配結(jié)點(diǎn), 則保存當(dāng)前線程
            s.waiter = w;           // s.waiter保存當(dāng)前阻塞線程
        else if (!timed)
            LockSupport.park(this); // 阻塞當(dāng)前線程
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}      

此時(shí)棧結(jié)構(gòu)如下,結(jié)點(diǎn)的waiter字段保存著創(chuàng)建該結(jié)點(diǎn)的線程ThreadA,ThreadA等待著被配對(duì)消費(fèi)者線程喚醒:

③ThreadB(生產(chǎn)者)執(zhí)行入隊(duì)操作

此時(shí)棧頂結(jié)點(diǎn)的類型和ThreadB創(chuàng)建的結(jié)點(diǎn)相同(都是DATA類型的結(jié)點(diǎn)),所以依然走CASE1分支,直接將結(jié)點(diǎn)壓入棧:

④ThreadC(消費(fèi)者)執(zhí)行出隊(duì)操作

此時(shí)棧頂結(jié)點(diǎn)的類型和ThreadC創(chuàng)建的結(jié)點(diǎn)匹配(棧頂DATA類型,ThreadC創(chuàng)建的是REQUEST類型),所以走CASE2分支,該分支會(huì)將匹配的兩個(gè)結(jié)點(diǎn)彈出棧:

else if (!isFulfilling(h.mode)) {          // CASE2: 棧頂結(jié)點(diǎn)還未配對(duì)成功
    if (h.isCancelled())                     // case2.1: 元素取消情況(因中斷或超時(shí))的處理
        casHead(h, h.next);
    else if (casHead(h, s = snode(s, e,
        h, FULFILLING | mode))) {      // case2.2: 將當(dāng)前結(jié)點(diǎn)壓入棧中
        for (; ; ) {
            SNode m = s.next;       // s.next指向原棧頂結(jié)點(diǎn)(也就是與當(dāng)前結(jié)點(diǎn)匹配的結(jié)點(diǎn))
            if (m == null) {        // m==null說(shuō)明被其它線程搶先匹配了, 則跳出循環(huán), 重新下一次自旋
                casHead(s, null);
                s = null;
                break;
            }

            SNode mn = m.next;
            if (m.tryMatch(s)) {    // 進(jìn)行結(jié)點(diǎn)匹配
                casHead(s, mn);     // 匹配成功, 將匹配的兩個(gè)結(jié)點(diǎn)全部彈出棧
                return (E) ((mode == REQUEST) ? m.item : s.item);   // 返回匹配值
            } else                  // 匹配失敗
                s.casNext(m, mn);   // 移除原待匹配結(jié)點(diǎn)
        }
    }
} 

上述isFulfilling方法就是判斷結(jié)點(diǎn)是否匹配:

/**
 * 判斷m是否已經(jīng)配對(duì)成功.
 */
static boolean isFulfilling(int m) {
    return (m & FULFILLING) != 0;
}

ThreadC創(chuàng)建結(jié)點(diǎn)并壓入棧后,棧的結(jié)構(gòu)如下:

此時(shí),ThreadC會(huì)調(diào)用tryMatch方法進(jìn)行匹配,該方法的主要作用有兩點(diǎn):

將待結(jié)點(diǎn)的match字段置為與當(dāng)前配對(duì)的結(jié)點(diǎn)(如上圖中,結(jié)點(diǎn)m是待配對(duì)結(jié)點(diǎn),最終m.math == s

喚醒待配對(duì)結(jié)點(diǎn)中的線程(如上圖中,喚醒結(jié)點(diǎn)m中ThreadB線程)

/**
 * 嘗試將當(dāng)前結(jié)點(diǎn)和s結(jié)點(diǎn)配對(duì).
 */
boolean tryMatch(SNode s) {
    if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
        Thread w = waiter;
        if (w != null) {    // 喚醒當(dāng)前結(jié)點(diǎn)對(duì)應(yīng)的線程
            waiter = null;
            LockSupport.unpark(w);
        }
        return true;
    }
    return match == s;      // 配對(duì)成功返回true
}

匹配完成后,會(huì)將匹配的兩個(gè)結(jié)點(diǎn)彈出棧,并返回匹配值:

if (m.tryMatch(s)) {    // 進(jìn)行結(jié)點(diǎn)匹配
    casHead(s, mn);     // 匹配成功, 將匹配的兩個(gè)結(jié)點(diǎn)全部彈出棧
    return (E) ((mode == REQUEST) ? m.item : s.item);   // 返回匹配值
}

最終,ThreadC拿到了等待配對(duì)結(jié)點(diǎn)中的數(shù)據(jù)并返回,此時(shí)棧的結(jié)構(gòu)如下:

注意: CASE2分支中ThreadC創(chuàng)建的結(jié)點(diǎn)的mode值并不是REQUEST,其mode值為FULFILLING | mode,FULFILLING | mode的主要作用就是給棧頂結(jié)點(diǎn)置一個(gè)標(biāo)識(shí)(二進(jìn)制為11或10),表示當(dāng)前有線程正在對(duì)棧頂匹配,這時(shí)如果有其它線程進(jìn)入自旋(并發(fā)情況),則CASE2一定失敗,因?yàn)?b>isFulfilling的結(jié)果必然為true,所以會(huì)進(jìn)入CASE3分支——跳過(guò)棧頂結(jié)點(diǎn)進(jìn)行匹配。
casHead(h, s = snode(s, e, h, FULFILLING | mode))

⑤ThreadB(生產(chǎn)者)喚醒后繼續(xù)執(zhí)行

ThreadB被喚醒后,會(huì)從原阻塞處繼續(xù)執(zhí)行,并進(jìn)入下一次自旋,在下一次自旋中,由于結(jié)點(diǎn)的match字段已經(jīng)有了匹配結(jié)點(diǎn),所以直接返回配對(duì)結(jié)點(diǎn):

/**
 * 阻塞當(dāng)前調(diào)用線程, 并將線程信息記錄在s.waiter字段上.
 *
 * @param s 等待的結(jié)點(diǎn)
 * @return 返回配對(duì)的結(jié)點(diǎn) 或 當(dāng)前結(jié)點(diǎn)(說(shuō)明線程被中斷了)
 */
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();

    // 性能優(yōu)化操作(計(jì)算自旋次數(shù))
    int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (; ; ) {
        if (w.isInterrupted())
            s.tryCancel();
        /**
         * s.match保存當(dāng)前結(jié)點(diǎn)的匹配結(jié)點(diǎn).
         * s.match==null說(shuō)明還沒(méi)有匹配結(jié)點(diǎn)
         * s.match==s說(shuō)明當(dāng)前結(jié)點(diǎn)s對(duì)應(yīng)的線程被中斷了
         */
        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        if (spins > 0)
            spins = shouldSpin(s) ? (spins - 1) : 0;
        else if (s.waiter == null)  // 還沒(méi)有匹配結(jié)點(diǎn), 則保存當(dāng)前線程
            s.waiter = w;           // s.waiter保存當(dāng)前阻塞線程
        else if (!timed)
            LockSupport.park(this); // 阻塞當(dāng)前線程
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

最終,在下面分支中返回:

else if (casHead(h, s = snode(s, e, h, mode))) {  // case1.2 將當(dāng)前結(jié)點(diǎn)壓入棧
    SNode m = awaitFulfill(s, timed, nanos);        // 阻塞當(dāng)前調(diào)用線程
    if (m == s) {                                   // 阻塞過(guò)程中被中斷
        clean(s);
        return null;
    }

    // 此時(shí)m為配對(duì)結(jié)點(diǎn)
    if ((h = head) != null && h.next == s)
        casHead(h, s.next);

    // 入隊(duì)線程null, 出隊(duì)線程返回配對(duì)結(jié)點(diǎn)的值
    return (E) ((mode == REQUEST) ? m.item : s.item);
}
注意:對(duì)于入隊(duì)線程(生產(chǎn)者),返回的是它入隊(duì)時(shí)攜帶的原有元素值。
隊(duì)列結(jié)構(gòu)

SynchronousQueue的公平策略由TransferQueue類實(shí)現(xiàn),TransferQueue內(nèi)部定義了名為QNode的結(jié)點(diǎn),一個(gè)head隊(duì)首指針,一個(gè)tail隊(duì)尾指針:

/**
 * Dual Queue(雙端隊(duì)列).
 * 公平策略時(shí)使用.
 */
static final class TransferQueue extends Transferer {

    /**
     * Head of queue
     */
    transient volatile QNode head;
    /**
     * Tail of queue
     */
    transient volatile QNode tail;
    /**
     * Reference to a cancelled node that might not yet have been
     * unlinked from queue because it was the last inserted node
     * when it was cancelled.
     */
    transient volatile QNode cleanMe;

    /**
     * 隊(duì)列結(jié)點(diǎn)定義.
     */
    static final class QNode {
        volatile QNode next;          // next node in queue
        volatile Object item;         // CAS"ed to or from null
        volatile Thread waiter;       // to control park/unpark
        final boolean isData;
        // ...
    }
    
    // ...
}
關(guān)于TransferQueue的transfer方法就不再贅述了,其思路和TransferStack大致相同,總之就是入隊(duì)/出隊(duì)必須一一匹配,否則任意一方就會(huì)加入隊(duì)列并等待匹配線程喚醒。讀者可以自行閱讀TransferQueued的源碼。
三、總結(jié)

TransferQueue主要用于線程之間的數(shù)據(jù)交換,由于采用無(wú)鎖算法,其性能一般比單純的其它阻塞隊(duì)列要高。它的最大特點(diǎn)時(shí)不存儲(chǔ)實(shí)際元素,而是在內(nèi)部通過(guò)?;蜿?duì)列結(jié)構(gòu)保存阻塞線程。后面我們講JUC線程池框架的時(shí)候,還會(huì)再次看到它的身影。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/77078.html

相關(guān)文章

  • Java線程進(jìn)階(一)—— J.U.C并發(fā)包概述

    摘要:整個(gè)包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見(jiàn)的多線程設(shè)計(jì)模式,設(shè)計(jì)了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對(duì)等進(jìn)行補(bǔ)充增強(qiáng)。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...

    anonymoussf 評(píng)論0 收藏0
  • Java線程進(jìn)階(二六)—— J.U.Ccollections框架:ConcurrentSkip

    摘要:我們來(lái)看下的類繼承圖可以看到,實(shí)現(xiàn)了接口,在多線程進(jìn)階二五之框架中,我們提到過(guò)實(shí)現(xiàn)了接口,以提供和排序相關(guān)的功能,維持元素的有序性,所以就是一種為并發(fā)環(huán)境設(shè)計(jì)的有序工具類。唯一的區(qū)別是針對(duì)的僅僅是鍵值,針對(duì)鍵值對(duì)進(jìn)行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發(fā)于一世流云專欄:https://seg...

    levius 評(píng)論0 收藏0
  • Java線程進(jìn)階(二七)—— J.U.Ccollections框架:CopyOnWriteArr

    摘要:僅僅當(dāng)有多個(gè)線程同時(shí)進(jìn)行寫(xiě)操作時(shí),才會(huì)進(jìn)行同步??梢钥吹?,上述方法返回一個(gè)迭代器對(duì)象,的迭代是在舊數(shù)組上進(jìn)行的,當(dāng)創(chuàng)建迭代器的那一刻就確定了,所以迭代過(guò)程中不會(huì)拋出并發(fā)修改異常。另外,迭代器對(duì)象也不支持修改方法,全部會(huì)拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發(fā)于一世流云專欄:https://...

    garfileo 評(píng)論0 收藏0
  • Java線程進(jìn)階(二八)—— J.U.Ccollections框架:CopyOnWriteArr

    摘要:我們之前已經(jīng)介紹過(guò)了,底層基于跳表實(shí)現(xiàn),其操作平均時(shí)間復(fù)雜度均為。事實(shí)上,內(nèi)部引用了一個(gè)對(duì)象,以組合方式,委托對(duì)象實(shí)現(xiàn)了所有功能。線程安全內(nèi)存的使用較多迭代是對(duì)快照進(jìn)行的,不會(huì)拋出,且迭代過(guò)程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發(fā)于一世流云專欄:https://segmentfa...

    NeverSayNever 評(píng)論0 收藏0
  • Java線程進(jìn)階(三七)—— J.U.Ccollections框架:LinkedBlocking

    摘要:接口截止目前為止,我們介紹的阻塞隊(duì)列都是實(shí)現(xiàn)了接口。該類在構(gòu)造時(shí)一般需要指定容量,如果不指定,則最大容量為。另外,由于內(nèi)部通過(guò)來(lái)保證線程安全,所以的整體實(shí)現(xiàn)時(shí)比較簡(jiǎn)單的。另外,雙端隊(duì)列相比普通隊(duì)列,主要是多了隊(duì)尾出隊(duì)元素隊(duì)首入隊(duì)元素的功能。 showImg(https://segmentfault.com/img/bVbgZ7j?w=770&h=514); 本文首發(fā)于一世流云專欄:ht...

    light 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<