摘要:此時(shí),會(huì)新建一個(gè)新的工作者線程用于對(duì)這個(gè)入隊(duì)列失敗的任務(wù)進(jìn)行處理假設(shè)此時(shí)線程池的大小還未達(dá)到其最大線程池大小。但此時(shí)需要限定線程池的最大大小為一個(gè)合理的有限值,而不是,否則可能導(dǎo)致線程池中的工作者線程的數(shù)量一直增加到系統(tǒng)資源所無(wú)法承受為止。
序
本文主要講一下SynchronousQueue。
定義SynchronousQueue,實(shí)際上它不是一個(gè)真正的隊(duì)列,因?yàn)樗粫?huì)為隊(duì)列中元素維護(hù)存儲(chǔ)空間。與其他隊(duì)列不同的是,它維護(hù)一組線程,這些線程在等待著把元素加入或移出隊(duì)列。
如果以洗盤子的比喻為例,那么這就相當(dāng)于沒有盤架,而是將洗好的盤子直接放入下一個(gè)空閑的烘干機(jī)中。這種實(shí)現(xiàn)隊(duì)列的方式看似很奇怪,但由于可以直接交付工作,從而降低了將數(shù)據(jù)從生產(chǎn)者移動(dòng)到消費(fèi)者的延遲。(在傳統(tǒng)的隊(duì)列中,在一個(gè)工作單元可以交付之前,必須通過(guò)串行方式首先完成入列[Enqueue]或者出列[Dequeue]等操作。)
直接交付方式還會(huì)將更多關(guān)于任務(wù)狀態(tài)的信息反饋給生產(chǎn)者。當(dāng)交付被接受時(shí),它就知道消費(fèi)者已經(jīng)得到了任務(wù),而不是簡(jiǎn)單地把任務(wù)放入一個(gè)隊(duì)列——這種區(qū)別就好比將文件直接交給同事,還是將文件放到她的郵箱中并希望她能盡快拿到文件。
因?yàn)镾ynchronousQueue沒有存儲(chǔ)功能,因此put和take會(huì)一直阻塞,直到有另一個(gè)線程已經(jīng)準(zhǔn)備好參與到交付過(guò)程中。僅當(dāng)有足夠多的消費(fèi)者,并且總是有一個(gè)消費(fèi)者準(zhǔn)備好獲取交付的工作時(shí),才適合使用同步隊(duì)列。
實(shí)例public class SynchronousQueueExample { static class SynchronousQueueProducer implements Runnable { protected BlockingQueueblockingQueue; final Random random = new Random(); public SynchronousQueueProducer(BlockingQueue queue) { this.blockingQueue = queue; } @Override public void run() { while (true) { try { String data = UUID.randomUUID().toString(); System.out.println("Put: " + data); blockingQueue.put(data); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class SynchronousQueueConsumer implements Runnable { protected BlockingQueue blockingQueue; public SynchronousQueueConsumer(BlockingQueue queue) { this.blockingQueue = queue; } @Override public void run() { while (true) { try { String data = blockingQueue.take(); System.out.println(Thread.currentThread().getName() + " take(): " + data); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { final BlockingQueue synchronousQueue = new SynchronousQueue (); SynchronousQueueProducer queueProducer = new SynchronousQueueProducer( synchronousQueue); new Thread(queueProducer).start(); SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer( synchronousQueue); new Thread(queueConsumer1).start(); SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer( synchronousQueue); new Thread(queueConsumer2).start(); } }
應(yīng)用場(chǎng)景插入數(shù)據(jù)的線程和獲取數(shù)據(jù)的線程,交替執(zhí)行
Executors.newCachedThreadPool()
/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available, and uses the provided * ThreadFactory to create new threads when needed. * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null */ public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory); }
由于ThreadPoolExecutor內(nèi)部實(shí)現(xiàn)任務(wù)提交的時(shí)候調(diào)用的是工作隊(duì)列(BlockingQueue接口的實(shí)現(xiàn)類)的非阻塞式入隊(duì)列方法(offer方法),因此,在使用SynchronousQueue作為工作隊(duì)列的前提下,客戶端代碼向線程池提交任務(wù)時(shí),而線程池中又沒有空閑的線程能夠從SynchronousQueue隊(duì)列實(shí)例中取一個(gè)任務(wù),那么相應(yīng)的offer方法調(diào)用就會(huì)失?。慈蝿?wù)沒有被存入工作隊(duì)列)。此時(shí),ThreadPoolExecutor會(huì)新建一個(gè)新的工作者線程用于對(duì)這個(gè)入隊(duì)列失敗的任務(wù)進(jìn)行處理(假設(shè)此時(shí)線程池的大小還未達(dá)到其最大線程池大小)。
所以,使用SynchronousQueue作為工作隊(duì)列,工作隊(duì)列本身并不限制待執(zhí)行的任務(wù)的數(shù)量。但此時(shí)需要限定線程池的最大大小為一個(gè)合理的有限值,而不是Integer.MAX_VALUE,否則可能導(dǎo)致線程池中的工作者線程的數(shù)量一直增加到系統(tǒng)資源所無(wú)法承受為止。
doc如果應(yīng)用程序確實(shí)需要比較大的工作隊(duì)列容量,而又想避免無(wú)界工作隊(duì)列可能導(dǎo)致的問(wèn)題,不妨考慮SynchronousQueue。SynchronousQueue實(shí)現(xiàn)上并不使用緩存空間。
使用SynchronousQueue的目的就是保證“對(duì)于提交的任務(wù),如果有空閑線程,則使用空閑線程來(lái)處理;否則新建一個(gè)線程來(lái)處理任務(wù)”。
A Guide to Java SynchronousQueue
SynchronousQueue Example in Java - Produer Consumer Solution
Java SynchronousQueue
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/70453.html
摘要:如果節(jié)點(diǎn)不為說(shuō)明已經(jīng)有其他線程進(jìn)行操作將節(jié)點(diǎn)替換為節(jié)點(diǎn)等待有消費(fèi)者消費(fèi)線程。如果頭節(jié)點(diǎn)下一個(gè)節(jié)點(diǎn)是當(dāng)前節(jié)點(diǎn)以防止其他線程已經(jīng)修改了節(jié)點(diǎn)則運(yùn)算,否則直接返回。 一、介紹 SynchronousQueue是一個(gè)雙棧雙隊(duì)列算法,無(wú)空間的隊(duì)列或棧,任何一個(gè)對(duì)SynchronousQueue寫需要等到一個(gè)對(duì)SynchronousQueue的讀操作,反之亦然。一個(gè)讀操作需要等待一個(gè)寫操作,相當(dāng)于是...
摘要:實(shí)際上是公平模式和的超集。而使用操作實(shí)現(xiàn)一個(gè)非阻塞的方法,這是避免序列化處理任務(wù)的關(guān)鍵。在這樣的設(shè)計(jì)中,消費(fèi)者的消費(fèi)能力將決定生產(chǎn)者產(chǎn)生消息的速度。實(shí)例輸出中的模式手記之似懂非懂的和長(zhǎng)度為的 序 本文主要簡(jiǎn)介一下TransferQueue。 TransferQueue TransferQueue(java7引入)繼承了BlockingQueue(BlockingQueue又繼承了Que...
摘要:這些工具包括名稱主要作用顯示指定系統(tǒng)內(nèi)所有的虛擬機(jī)進(jìn)程。虛擬機(jī)堆轉(zhuǎn)存快照分析工具命令用于與搭配使用,用來(lái)分析生成的文件。命令格式命令樣例線程堆棧跟蹤工具用于生成虛擬機(jī)當(dāng)前時(shí)刻的線程快照。 概述 給系統(tǒng)定位問(wèn)題的時(shí)候,知識(shí)、經(jīng)驗(yàn)是關(guān)鍵基礎(chǔ),數(shù)據(jù)是依據(jù),工具是運(yùn)用知識(shí)處理數(shù)據(jù)的手段。 java開發(fā)人員可以在jdk安裝的bin目錄下找到除了java,javac以外的其他命令。這些命令主要是一...
摘要:引言在包中,很好的解決了在多線程中,如何高效安全傳輸數(shù)據(jù)的問(wèn)題。同時(shí),也用于自帶線程池的緩沖隊(duì)列中,了解也有助于理解線程池的工作模型。 引言 在java.util.Concurrent包中,BlockingQueue很好的解決了在多線程中,如何高效安全傳輸數(shù)據(jù)的問(wèn)題。通過(guò)這些高效并且線程安全的隊(duì)列類,為我們快速搭建高質(zhì)量的多線程程序帶來(lái)極大的便利。同時(shí),BlockingQueue也用于...
摘要:三總結(jié)主要用于線程之間的數(shù)據(jù)交換,由于采用無(wú)鎖算法,其性能一般比單純的其它阻塞隊(duì)列要高。它的最大特點(diǎn)時(shí)不存儲(chǔ)實(shí)際元素,而是在內(nèi)部通過(guò)?;蜿?duì)列結(jié)構(gòu)保存阻塞線程。 showImg(https://segmentfault.com/img/bVbgOsh?w=900&h=900); 本文首發(fā)于一世流云專欄:https://segmentfault.com/blog... 一、Synchro...
閱讀 4189·2023-04-26 02:40
閱讀 2667·2023-04-26 02:31
閱讀 2762·2021-11-15 18:08
閱讀 580·2021-11-12 10:36
閱讀 1442·2021-09-30 09:57
閱讀 5215·2021-09-22 15:31
閱讀 2641·2019-08-30 14:17
閱讀 1290·2019-08-30 12:58