摘要:序本文主要簡(jiǎn)單介紹下與。有界無(wú)界有界,適合已知最大存儲(chǔ)容量的場(chǎng)景可有界可以無(wú)界吞吐量在大多數(shù)并發(fā)的場(chǎng)景下吞吐量比,但是性能不穩(wěn)定。測(cè)試結(jié)果表明,的可伸縮性要高于。
序
本文主要簡(jiǎn)單介紹下ArrayBlockingQueue與LinkedBlockingQueue。
對(duì)比queue | 阻塞與否 | 是否有界 | 線(xiàn)程安全保障 | 適用場(chǎng)景 | 注意事項(xiàng) |
---|---|---|---|---|---|
ArrayBlockingQueue | 阻塞 | 有界 | 一把全局鎖 | 生產(chǎn)消費(fèi)模型,平衡兩邊處理速度 | 用于存儲(chǔ)隊(duì)列元素的存儲(chǔ)空間是預(yù)先分配的,使用過(guò)程中內(nèi)存開(kāi)銷(xiāo)較?。o(wú)須動(dòng)態(tài)申請(qǐng)存儲(chǔ)空間) |
LinkedBlockingQueue | 阻塞 | 可配置 | 存取采用2把鎖 | 生產(chǎn)消費(fèi)模型,平衡兩邊處理速度 | 無(wú)界的時(shí)候注意內(nèi)存溢出問(wèn)題,用于存儲(chǔ)隊(duì)列元素的存儲(chǔ)空間是在其使用過(guò)程中動(dòng)態(tài)分配的,因此它可能會(huì)增加JVM垃圾回收的負(fù)擔(dān)。 |
ConcurrentLinkedQueue | 非阻塞 | 無(wú)界 | CAS | 對(duì)全局的集合進(jìn)行操作的場(chǎng)景 | size() 是要遍歷一遍集合,慎用 |
ArrayBlockingQueue
用于存儲(chǔ)隊(duì)列元素的存儲(chǔ)空間是預(yù)先分配的,使用過(guò)程中內(nèi)存開(kāi)銷(xiāo)較小(無(wú)須動(dòng)態(tài)申請(qǐng)存儲(chǔ)空間)
LinkedBlockingQueue
用于存儲(chǔ)隊(duì)列元素的存儲(chǔ)空間是在其使用過(guò)程中動(dòng)態(tài)分配的,因此它可能會(huì)增加JVM垃圾回收的負(fù)擔(dān)。
ArrayBlockingQueue
有界,適合已知最大存儲(chǔ)容量的場(chǎng)景
LinkedBlockingQueue
可有界可以無(wú)界
LinkedBlockingQueue在大多數(shù)并發(fā)的場(chǎng)景下吞吐量比ArrayBlockingQueue,但是性能不穩(wěn)定。
Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.
測(cè)試結(jié)果表明,LinkedBlockingQueue的可伸縮性要高于ArrayBlockingQueue。初看起來(lái),這個(gè)結(jié)果有些奇怪:鏈表隊(duì)列在每次插入元素時(shí),都必須分配一個(gè)鏈表節(jié)點(diǎn)對(duì)象,這似乎比基于數(shù)組的隊(duì)列執(zhí)行了更多的工作。然而,雖然它擁有更好的內(nèi)存分配與GC等開(kāi)銷(xiāo),但與基于數(shù)組的隊(duì)列相比,鏈表隊(duì)列的put和take等方法支持并發(fā)性更高的訪(fǎng)問(wèn),因?yàn)橐恍﹥?yōu)化后的鏈接隊(duì)列算法能將隊(duì)列頭節(jié)點(diǎn)的更新操作與尾節(jié)點(diǎn)的更新操作分離開(kāi)來(lái)。由于內(nèi)存分配操作通常是線(xiàn)程本地的,因此如果算法能通過(guò)多執(zhí)行一些內(nèi)存分配操作來(lái)降低競(jìng)爭(zhēng)程度,那么這種算法通常具有更高的可伸縮性。
并發(fā)方面ArrayBlockingQueue
采用一把鎖,兩個(gè)condition
/** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; /** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } /** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
此外還支持公平鎖
/** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and the specified access policy. * * @param capacity the capacity of this queue * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if {@code false} the access order is unspecified. * @throws IllegalArgumentException if {@code capacity < 1} */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
LinkedBlockingQueue
頭尾各1把鎖
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); /** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue"s capacity, * returning {@code true} upon success and {@code false} if this queue * is full. * When using a capacity-restricted queue, this method is generally * preferable to method {@link BlockingQueue#add add}, which can fail to * insert an element only by throwing an exception. * * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node應(yīng)用實(shí)例 Executorsnode = new Node (e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; } public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
里頭用了LinkedBlockingQueue
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue (), threadFactory); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue ())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue (), threadFactory)); }
使用LinkedBlockingQueue實(shí)現(xiàn)logger
public class BungeeLogger extends Logger { private final ColouredWriter writer; private final Formatter formatter = new ConciseFormatter(); // private final LogDispatcher dispatcher = new LogDispatcher(this); private final BlockingQueuedocqueue = new LinkedBlockingQueue<>(); volatile boolean running = true; Thread recvThread = new Thread(){ @Override public void run() { while (!isInterrupted() && running) { LogRecord record; try { record = queue.take(); } catch (InterruptedException ex) { continue; } doLog(record); } for (LogRecord record : queue) { doLog(record); } } }; public BungeeLogger() throws IOException { super("BungeeCord", null); this.writer = new ColouredWriter(new ConsoleReader()); try { FileHandler handler = new FileHandler("proxy.log", 1 << 24, 8, true); handler.setFormatter(formatter); addHandler(handler); } catch (IOException ex) { System.err.println("Could not register logger!"); ex.printStackTrace(); } recvThread.start(); Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { running = false; } }); } @Override public void log(LogRecord record) { if (running) { queue.add(record); } } void doLog(LogRecord record) { super.log(record); writer.print(formatter.format(record)); } }
BungeeCord
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/70386.html
摘要:線(xiàn)程安全的線(xiàn)程安全的,在讀多寫(xiě)少的場(chǎng)合性能非常好,遠(yuǎn)遠(yuǎn)好于高效的并發(fā)隊(duì)列,使用鏈表實(shí)現(xiàn)。這樣帶來(lái)的好處是在高并發(fā)的情況下,你會(huì)需要一個(gè)全局鎖來(lái)保證整個(gè)平衡樹(shù)的線(xiàn)程安全。 該文已加入開(kāi)源項(xiàng)目:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識(shí)的文檔類(lèi)項(xiàng)目,Star 數(shù)接近 14 k)。地址:https://github.com/Snailclimb... 一 JDK ...
摘要:本人郵箱歡迎轉(zhuǎn)載轉(zhuǎn)載請(qǐng)注明網(wǎng)址代碼已經(jīng)全部托管有需要的同學(xué)自行下載引言做的同學(xué)們或多或少的接觸過(guò)集合框架在集合框架中大多的集合類(lèi)是線(xiàn)程不安全的比如我們常用的等等我們寫(xiě)一個(gè)例子看為什么說(shuō)是不安全的例子證明是線(xiàn)程不安全的我們開(kāi)啟個(gè)線(xiàn)程每個(gè)線(xiàn)程向 本人郵箱: 歡迎轉(zhuǎn)載,轉(zhuǎn)載請(qǐng)注明網(wǎng)址 http://blog.csdn.net/tianshi_kcogithub: https://github...
摘要:在章節(jié)中,我們說(shuō)過(guò),維護(hù)了一把全局鎖,無(wú)論是出隊(duì)還是入隊(duì),都共用這把鎖,這就導(dǎo)致任一時(shí)間點(diǎn)只有一個(gè)線(xiàn)程能夠執(zhí)行。入隊(duì)鎖對(duì)應(yīng)的是條件隊(duì)列,出隊(duì)鎖對(duì)應(yīng)的是條件隊(duì)列,所以每入隊(duì)一個(gè)元素,應(yīng)當(dāng)立即去喚醒可能阻塞的其它入隊(duì)線(xiàn)程。 showImg(https://segmentfault.com/img/bVbgCD9?w=1920&h=1080); 本文首發(fā)于一世流云專(zhuān)欄:https://seg...
摘要:引言在包中,很好的解決了在多線(xiàn)程中,如何高效安全傳輸數(shù)據(jù)的問(wèn)題。同時(shí),也用于自帶線(xiàn)程池的緩沖隊(duì)列中,了解也有助于理解線(xiàn)程池的工作模型。 引言 在java.util.Concurrent包中,BlockingQueue很好的解決了在多線(xiàn)程中,如何高效安全傳輸數(shù)據(jù)的問(wèn)題。通過(guò)這些高效并且線(xiàn)程安全的隊(duì)列類(lèi),為我們快速搭建高質(zhì)量的多線(xiàn)程程序帶來(lái)極大的便利。同時(shí),BlockingQueue也用于...
摘要:源碼分析創(chuàng)建可緩沖的線(xiàn)程池。源碼分析使用創(chuàng)建線(xiàn)程池源碼分析的構(gòu)造函數(shù)構(gòu)造函數(shù)參數(shù)核心線(xiàn)程數(shù)大小,當(dāng)線(xiàn)程數(shù),會(huì)創(chuàng)建線(xiàn)程執(zhí)行最大線(xiàn)程數(shù),當(dāng)線(xiàn)程數(shù)的時(shí)候,會(huì)把放入中保持存活時(shí)間,當(dāng)線(xiàn)程數(shù)大于的空閑線(xiàn)程能保持的最大時(shí)間。 之前創(chuàng)建線(xiàn)程的時(shí)候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThr...
閱讀 1847·2021-09-03 10:50
閱讀 1356·2019-08-30 15:55
閱讀 3401·2019-08-30 15:52
閱讀 1255·2019-08-30 15:44
閱讀 981·2019-08-30 15:44
閱讀 3344·2019-08-30 14:23
閱讀 3583·2019-08-28 17:51
閱讀 2313·2019-08-26 13:52