摘要:引言在包中,很好的解決了在多線程中,如何高效安全傳輸數(shù)據(jù)的問(wèn)題。同時(shí),也用于自帶線程池的緩沖隊(duì)列中,了解也有助于理解線程池的工作模型。
引言
在java.util.Concurrent包中,BlockingQueue很好的解決了在多線程中,如何高效安全“傳輸”數(shù)據(jù)的問(wèn)題。通過(guò)這些高效并且線程安全的隊(duì)列類,為我們快速搭建高質(zhì)量的多線程程序帶來(lái)極大的便利。同時(shí),BlockingQueue也用于java自帶線程池的緩沖隊(duì)列中,了解BlockingQueue也有助于理解線程池的工作模型。
一 BlockingQueue接口該接口屬于隊(duì)列,所以繼承了Queue接口,該接口最重要的五個(gè)方法分別是offer方法,poll方法,put方法,take方法和drainTo方法。
offer方法和poll方法分別有一個(gè)靜態(tài)重載方法,分別是offer(E e, long timeout, TimeUnit unit)和poll(long timeout, TimeUnit unit)方法。其意義是在限定時(shí)間內(nèi)存入或取出對(duì)象,如果不能存入取出則返回false。
put方法會(huì)在當(dāng)隊(duì)列存儲(chǔ)對(duì)象達(dá)到限定值時(shí)阻塞線程,而在隊(duì)列不為空時(shí)喚醒被take方法所阻塞的線程。take方法是相反的。
drainTo方法可批量獲取隊(duì)列中的元素。
二 常見(jiàn)的BlockingQueue實(shí)現(xiàn) 一 LinkedBlockingQueueLinkedBlockingQueue是比較常見(jiàn)的BlockingQueue的實(shí)現(xiàn),他是基于鏈表的阻塞隊(duì)列。在創(chuàng)建該對(duì)象時(shí)如果不指定可存儲(chǔ)對(duì)象個(gè)數(shù)大小時(shí),默認(rèn)為Integer.MAX_VALUE。當(dāng)生產(chǎn)者往隊(duì)列中放入一個(gè)數(shù)據(jù)時(shí),隊(duì)列會(huì)從生產(chǎn)者手中獲取數(shù)據(jù),并緩存在隊(duì)列內(nèi)部,而生產(chǎn)者立即返回;只有當(dāng)隊(duì)列緩沖區(qū)達(dá)到最大值緩存容量時(shí),才會(huì)阻塞生產(chǎn)者隊(duì)列,直到消費(fèi)者從隊(duì)列中消費(fèi)掉一份數(shù)據(jù),生產(chǎn)者線程會(huì)被喚醒,反之對(duì)于消費(fèi)者這端的處理也基于同樣的原理。
LinkedBlockingQueue內(nèi)部使用了獨(dú)立的兩把鎖來(lái)控制數(shù)據(jù)同步,這也意味著在高并發(fā)的情況下生產(chǎn)者和消費(fèi)者可以并行地操作隊(duì)列中的數(shù)據(jù),以此來(lái)提高整個(gè)隊(duì)列的并發(fā)性能。
put方法和offer方法:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Nodenode = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node node = new Node(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; } }
這兩個(gè)方法的區(qū)別是put方法在容量達(dá)到上限時(shí)會(huì)阻塞,而offer方法則會(huì)直接返回false。
二 ArrayBlockingQueueArrayBlockingQueue是基于數(shù)組的阻塞隊(duì)列,除了有一個(gè)定長(zhǎng)數(shù)組外,ArrayBlockingQueue內(nèi)部還保存著兩個(gè)整形變量,分別標(biāo)識(shí)著隊(duì)列的頭部和尾部在數(shù)組中的位置。ArrayBlockingQueue在生產(chǎn)者放入數(shù)據(jù)和消費(fèi)者獲取數(shù)據(jù),都是共用同一個(gè)鎖對(duì)象,由此也意味著兩者無(wú)法真正并行運(yùn)行,這點(diǎn)尤其不同于LinkedBlockingQueue。
ArrayBlockingQueue和LinkedBlockingQueue間還有一個(gè)明顯的不同之處在于,前者在插入或刪除元素時(shí)不會(huì)產(chǎn)生或銷毀任何額外的對(duì)象實(shí)例,而后者則會(huì)生成一個(gè)額外的Node對(duì)象。
是一種沒(méi)有緩沖的阻塞隊(duì)列,在生產(chǎn)者put的同時(shí)必須要有一個(gè)消費(fèi)者進(jìn)行take,否則就會(huì)阻塞。聲明一個(gè)SynchronousQueue有兩種不同的方式。公平模式和非公平模式的區(qū)別:如果采用公平模式:SynchronousQueue會(huì)采用公平鎖,并配合一個(gè)FIFO隊(duì)列來(lái)阻塞多余的生產(chǎn)者和消費(fèi)者,從而體系整體的公平策略;但如果是非公平模式(SynchronousQueue默認(rèn)):SynchronousQueue采用非公平鎖,同時(shí)配合一個(gè)LIFO隊(duì)列來(lái)管理多余的生產(chǎn)者和消費(fèi)者,而后一種模式,如果生產(chǎn)者和消費(fèi)者的處理速度有差距,則很容易出現(xiàn)饑渴的情況,即可能有某些生產(chǎn)者或者是消費(fèi)者的數(shù)據(jù)永遠(yuǎn)都得不到處理。
四 PriorityBlockingQueue和DelayQueuePriorityBlockingQueue是基于優(yōu)先級(jí)的阻塞隊(duì)列,該隊(duì)列不會(huì)阻塞生產(chǎn)者,只會(huì)阻塞消費(fèi)者。
DelayQueue隊(duì)列存儲(chǔ)的對(duì)象只有指定的延遲時(shí)間到了才能被取出,該隊(duì)列也不會(huì)阻塞生產(chǎn)者。
三 BlockingQueue的使用在處理多線程生產(chǎn)者消費(fèi)者問(wèn)題時(shí)的演示代碼:
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * Created by gavin on 15-8-30. */ public class BlockingQueueTest { public static void main(String[] args) { BlockingQueue四 總結(jié)queue = new ArrayBlockingQueue (1000); Thread p1 = new Thread(new Producer(queue),"producer1"); Thread p2 = new Thread(new Producer(queue),"producer2"); Thread c1 = new Thread(new Consumer(queue),"consumer1"); Thread c2 = new Thread(new Consumer(queue),"consumer2"); p1.start(); p2.start(); c1.start(); c2.start(); } } class Producer implements Runnable{ private BlockingQueue queue; public Producer(BlockingQueue queue) { this.queue = queue; } public void run() { int i = 0; while (!Thread.currentThread().isInterrupted()) { try { queue.put(Thread.currentThread().getName()+" product "+i); } catch (InterruptedException e) { System.err.println(Thread.currentThread().getName() + " error"); } i++; try { Thread.sleep(1000); } catch (InterruptedException e) { } } } } class Consumer implements Runnable{ private BlockingQueue queue; public Consumer(BlockingQueue queue) { this.queue = queue; } public void run() { int i = 0; while (!Thread.currentThread().isInterrupted()) { try { String str = queue.take(); System.out.println(str); } catch (InterruptedException e) { } try { Thread.sleep(300); } catch (InterruptedException e) { } } } }
BlockingQueue在并發(fā)編程中扮演著重要的角色,既可以自己用來(lái)解決生產(chǎn)者消費(fèi)者問(wèn)題,也用于java自帶線程池的緩沖隊(duì)列。
參考:
BlockingQueue
更多文章:http://blog.gavinzh.com
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/64465.html
摘要:如果隊(duì)列已滿,這個(gè)時(shí)候?qū)懖僮鞯木€程進(jìn)入到寫(xiě)線程隊(duì)列排隊(duì),等待讀線程將隊(duì)列元素移除騰出空間,然后喚醒寫(xiě)線程隊(duì)列的第一個(gè)等待線程。數(shù)據(jù)必須從某個(gè)寫(xiě)線程交給某個(gè)讀線程,而不是寫(xiě)到某個(gè)隊(duì)列中等待被消費(fèi)。 前言 本文直接參考 Doug Lea 寫(xiě)的 Java doc 和注釋,這也是我們?cè)趯W(xué)習(xí) java 并發(fā)包時(shí)最好的材料了。希望大家能有所思、有所悟,學(xué)習(xí) Doug Lea 的代碼風(fēng)格,并將其優(yōu)雅...
摘要:當(dāng)活動(dòng)線程核心線程非核心線程達(dá)到這個(gè)數(shù)值后,后續(xù)任務(wù)將會(huì)根據(jù)來(lái)進(jìn)行拒絕策略處理。線程池工作原則當(dāng)線程池中線程數(shù)量小于則創(chuàng)建線程,并處理請(qǐng)求。當(dāng)線程池中的數(shù)量等于最大線程數(shù)時(shí)默默丟棄不能執(zhí)行的新加任務(wù),不報(bào)任何異常。 spring-cache使用記錄 spring-cache的使用記錄,坑點(diǎn)記錄以及采用的解決方案 深入分析 java 線程池的實(shí)現(xiàn)原理 在這篇文章中,作者有條不紊的將 ja...
摘要:與最大線程池比較。如果加入成功,需要二次檢查線程池的狀態(tài)如果線程池沒(méi)有處于,則從移除任務(wù),啟動(dòng)拒絕策略。如果線程池處于狀態(tài),則檢查工作線程是否為。線程池將如何工作這個(gè)問(wèn)題應(yīng)該就不難回答了。 原文地址:https://www.xilidou.com/2018/02/09/thread-corepoolsize/ 最近在看《Java并發(fā)編程的藝術(shù)》回顧線程池的原理和參數(shù)的時(shí)候發(fā)現(xiàn)一個(gè)問(wèn)題,...
摘要:和方法會(huì)一直阻塞調(diào)用線程,直到線程被中斷或隊(duì)列狀態(tài)可用和方法會(huì)限時(shí)阻塞調(diào)用線程,直到超時(shí)或線程被中斷或隊(duì)列狀態(tài)可用。 showImg(https://segmentfault.com/img/bVbgyPy?w=1191&h=670); 本文首發(fā)于一世流云專欄:https://segmentfault.com/blog... 一、引言 從本節(jié)開(kāi)始,我們將介紹juc-collectio...
摘要:我們將使用單個(gè)線程管理任務(wù)放入隊(duì)列的操作以及從隊(duì)列中取出的操作。同時(shí)這個(gè)線程會(huì)持續(xù)的管理隊(duì)列。另一個(gè)線程將用來(lái)創(chuàng)建,它將一直運(yùn)行知道服務(wù)器終止。此線程永遠(yuǎn)不會(huì)過(guò)期,有助于實(shí)現(xiàn)持續(xù)監(jiān)控。這些請(qǐng)求將會(huì)自動(dòng)的被獲取,并在線程中繼續(xù)處理。 在Java中,BlockingQueue接口位于java.util.concurrent包下。阻塞隊(duì)列主要用來(lái)線程安全的實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模型。他們可以使用...
閱讀 1984·2021-11-25 09:43
閱讀 665·2021-10-11 10:58
閱讀 1742·2019-08-30 15:55
閱讀 1737·2019-08-30 13:13
閱讀 746·2019-08-29 17:01
閱讀 1851·2019-08-29 15:30
閱讀 806·2019-08-29 13:49
閱讀 2182·2019-08-29 12:13