成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

ArrayBlockingQueue與LinkedBlockingQueue

jackwang / 887人閱讀

摘要:序本文主要簡(jiǎn)單介紹下與。有界無(wú)界有界,適合已知最大存儲(chǔ)容量的場(chǎng)景可有界可以無(wú)界吞吐量在大多數(shù)并發(fā)的場(chǎng)景下吞吐量比,但是性能不穩(wěn)定。測(cè)試結(jié)果表明,的可伸縮性要高于。

本文主要簡(jiǎn)單介紹下ArrayBlockingQueue與LinkedBlockingQueue。

對(duì)比
queue 阻塞與否 是否有界 線(xiàn)程安全保障 適用場(chǎng)景 注意事項(xiàng)
ArrayBlockingQueue 阻塞 有界 一把全局鎖 生產(chǎn)消費(fèi)模型,平衡兩邊處理速度 用于存儲(chǔ)隊(duì)列元素的存儲(chǔ)空間是預(yù)先分配的,使用過(guò)程中內(nèi)存開(kāi)銷(xiāo)較?。o(wú)須動(dòng)態(tài)申請(qǐng)存儲(chǔ)空間)
LinkedBlockingQueue 阻塞 可配置 存取采用2把鎖 生產(chǎn)消費(fèi)模型,平衡兩邊處理速度 無(wú)界的時(shí)候注意內(nèi)存溢出問(wèn)題,用于存儲(chǔ)隊(duì)列元素的存儲(chǔ)空間是在其使用過(guò)程中動(dòng)態(tài)分配的,因此它可能會(huì)增加JVM垃圾回收的負(fù)擔(dān)。
ConcurrentLinkedQueue 非阻塞 無(wú)界 CAS 對(duì)全局的集合進(jìn)行操作的場(chǎng)景 size() 是要遍歷一遍集合,慎用
內(nèi)存方面

ArrayBlockingQueue
用于存儲(chǔ)隊(duì)列元素的存儲(chǔ)空間是預(yù)先分配的,使用過(guò)程中內(nèi)存開(kāi)銷(xiāo)較小(無(wú)須動(dòng)態(tài)申請(qǐng)存儲(chǔ)空間)

LinkedBlockingQueue
用于存儲(chǔ)隊(duì)列元素的存儲(chǔ)空間是在其使用過(guò)程中動(dòng)態(tài)分配的,因此它可能會(huì)增加JVM垃圾回收的負(fù)擔(dān)。

有界無(wú)界

ArrayBlockingQueue
有界,適合已知最大存儲(chǔ)容量的場(chǎng)景

LinkedBlockingQueue
可有界可以無(wú)界

吞吐量

LinkedBlockingQueue在大多數(shù)并發(fā)的場(chǎng)景下吞吐量比ArrayBlockingQueue,但是性能不穩(wěn)定。

Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.

測(cè)試結(jié)果表明,LinkedBlockingQueue的可伸縮性要高于ArrayBlockingQueue。初看起來(lái),這個(gè)結(jié)果有些奇怪:鏈表隊(duì)列在每次插入元素時(shí),都必須分配一個(gè)鏈表節(jié)點(diǎn)對(duì)象,這似乎比基于數(shù)組的隊(duì)列執(zhí)行了更多的工作。然而,雖然它擁有更好的內(nèi)存分配與GC等開(kāi)銷(xiāo),但與基于數(shù)組的隊(duì)列相比,鏈表隊(duì)列的put和take等方法支持并發(fā)性更高的訪(fǎng)問(wèn),因?yàn)橐恍﹥?yōu)化后的鏈接隊(duì)列算法能將隊(duì)列頭節(jié)點(diǎn)的更新操作與尾節(jié)點(diǎn)的更新操作分離開(kāi)來(lái)。由于內(nèi)存分配操作通常是線(xiàn)程本地的,因此如果算法能通過(guò)多執(zhí)行一些內(nèi)存分配操作來(lái)降低競(jìng)爭(zhēng)程度,那么這種算法通常具有更高的可伸縮性。

并發(fā)方面

ArrayBlockingQueue
采用一把鎖,兩個(gè)condition

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
    
    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

    /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

此外還支持公平鎖

/**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity and the specified access policy.
     *
     * @param capacity the capacity of this queue
     * @param fair if {@code true} then queue accesses for threads blocked
     *        on insertion or removal, are processed in FIFO order;
     *        if {@code false} the access order is unspecified.
     * @throws IllegalArgumentException if {@code capacity < 1}
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

LinkedBlockingQueue
頭尾各1把鎖

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
    
    /**
     * Inserts the specified element at the tail of this queue if it is
     * possible to do so immediately without exceeding the queue"s capacity,
     * returning {@code true} upon success and {@code false} if this queue
     * is full.
     * When using a capacity-restricted queue, this method is generally
     * preferable to method {@link BlockingQueue#add add}, which can fail to
     * insert an element only by throwing an exception.
     *
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
    
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
應(yīng)用實(shí)例 Executors

里頭用了LinkedBlockingQueue

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
    }
    
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue(),
                                      threadFactory);
    }
    
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue()));
    }
    
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue(),
                                    threadFactory));
    }

使用LinkedBlockingQueue實(shí)現(xiàn)logger

public class BungeeLogger extends Logger {

    private final ColouredWriter writer;
    private final Formatter formatter = new ConciseFormatter();
//    private final LogDispatcher dispatcher = new LogDispatcher(this);

    private final BlockingQueue queue = new LinkedBlockingQueue<>();

    volatile boolean running = true;

    Thread recvThread = new Thread(){
        @Override
        public void run() {
            while (!isInterrupted() && running) {
                LogRecord record;
                try {
                    record = queue.take();
                } catch (InterruptedException ex) {
                    continue;
                }

                doLog(record);
            }
            for (LogRecord record : queue) {
                doLog(record);
            }
        }
    };

    public BungeeLogger() throws IOException {
        super("BungeeCord", null);
        this.writer = new ColouredWriter(new ConsoleReader());

        try {
            FileHandler handler = new FileHandler("proxy.log", 1 << 24, 8, true);
            handler.setFormatter(formatter);
            addHandler(handler);
        } catch (IOException ex) {
            System.err.println("Could not register logger!");
            ex.printStackTrace();
        }
        recvThread.start();

        Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run() {
                running = false;
            }
        });
    }

    @Override
    public void log(LogRecord record) {
        if (running) {
            queue.add(record);
        }
    }

    void doLog(LogRecord record) {
        super.log(record);
        writer.print(formatter.format(record));
    }
}
doc

BungeeCord

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/70386.html

相關(guān)文章

  • 通俗易懂,JDK 并發(fā)容器總結(jié)

    摘要:線(xiàn)程安全的線(xiàn)程安全的,在讀多寫(xiě)少的場(chǎng)合性能非常好,遠(yuǎn)遠(yuǎn)好于高效的并發(fā)隊(duì)列,使用鏈表實(shí)現(xiàn)。這樣帶來(lái)的好處是在高并發(fā)的情況下,你會(huì)需要一個(gè)全局鎖來(lái)保證整個(gè)平衡樹(shù)的線(xiàn)程安全。 該文已加入開(kāi)源項(xiàng)目:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識(shí)的文檔類(lèi)項(xiàng)目,Star 數(shù)接近 14 k)。地址:https://github.com/Snailclimb... 一 JDK ...

    curlyCheng 評(píng)論0 收藏0
  • (十五)java多線(xiàn)程之并發(fā)集合ArrayBlockingQueue

    摘要:本人郵箱歡迎轉(zhuǎn)載轉(zhuǎn)載請(qǐng)注明網(wǎng)址代碼已經(jīng)全部托管有需要的同學(xué)自行下載引言做的同學(xué)們或多或少的接觸過(guò)集合框架在集合框架中大多的集合類(lèi)是線(xiàn)程不安全的比如我們常用的等等我們寫(xiě)一個(gè)例子看為什么說(shuō)是不安全的例子證明是線(xiàn)程不安全的我們開(kāi)啟個(gè)線(xiàn)程每個(gè)線(xiàn)程向 本人郵箱: 歡迎轉(zhuǎn)載,轉(zhuǎn)載請(qǐng)注明網(wǎng)址 http://blog.csdn.net/tianshi_kcogithub: https://github...

    stefan 評(píng)論0 收藏0
  • Java多線(xiàn)程進(jìn)階(三三)—— J.U.C之collections框架:LinkedBlocking

    摘要:在章節(jié)中,我們說(shuō)過(guò),維護(hù)了一把全局鎖,無(wú)論是出隊(duì)還是入隊(duì),都共用這把鎖,這就導(dǎo)致任一時(shí)間點(diǎn)只有一個(gè)線(xiàn)程能夠執(zhí)行。入隊(duì)鎖對(duì)應(yīng)的是條件隊(duì)列,出隊(duì)鎖對(duì)應(yīng)的是條件隊(duì)列,所以每入隊(duì)一個(gè)元素,應(yīng)當(dāng)立即去喚醒可能阻塞的其它入隊(duì)線(xiàn)程。 showImg(https://segmentfault.com/img/bVbgCD9?w=1920&h=1080); 本文首發(fā)于一世流云專(zhuān)欄:https://seg...

    W_BinaryTree 評(píng)論0 收藏0
  • BlockingQueue學(xué)習(xí)

    摘要:引言在包中,很好的解決了在多線(xiàn)程中,如何高效安全傳輸數(shù)據(jù)的問(wèn)題。同時(shí),也用于自帶線(xiàn)程池的緩沖隊(duì)列中,了解也有助于理解線(xiàn)程池的工作模型。 引言 在java.util.Concurrent包中,BlockingQueue很好的解決了在多線(xiàn)程中,如何高效安全傳輸數(shù)據(jù)的問(wèn)題。通過(guò)這些高效并且線(xiàn)程安全的隊(duì)列類(lèi),為我們快速搭建高質(zhì)量的多線(xiàn)程程序帶來(lái)極大的便利。同時(shí),BlockingQueue也用于...

    xuhong 評(píng)論0 收藏0
  • 使用 Executors,ThreadPoolExecutor,創(chuàng)建線(xiàn)程池,源碼分析理解

    摘要:源碼分析創(chuàng)建可緩沖的線(xiàn)程池。源碼分析使用創(chuàng)建線(xiàn)程池源碼分析的構(gòu)造函數(shù)構(gòu)造函數(shù)參數(shù)核心線(xiàn)程數(shù)大小,當(dāng)線(xiàn)程數(shù),會(huì)創(chuàng)建線(xiàn)程執(zhí)行最大線(xiàn)程數(shù),當(dāng)線(xiàn)程數(shù)的時(shí)候,會(huì)把放入中保持存活時(shí)間,當(dāng)線(xiàn)程數(shù)大于的空閑線(xiàn)程能保持的最大時(shí)間。 之前創(chuàng)建線(xiàn)程的時(shí)候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThr...

    Chiclaim 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<