摘要:關(guān)于線程池中的疑問(wèn)對(duì)于線程池,相信大家都或多或少使用過(guò)。所以,阻塞隊(duì)列的作用是控制線程池中線程的生命周期。也就是說(shuō),在的線程池,只有消費(fèi)者使用了阻塞的方法,生產(chǎn)者并沒(méi)有。
關(guān)于線程池中BlockingQueue的疑問(wèn)
對(duì)于Java線程池,相信大家都或多或少使用過(guò)。關(guān)于其用法和原理介紹,網(wǎng)上已經(jīng)有很多非常精彩的文章,珠玉在前,我就不獻(xiàn)丑了。不了解的,可以參考這篇文章。今天我想講的,是關(guān)于我對(duì)Java線程次的兩個(gè)疑問(wèn),當(dāng)然高手可以略過(guò)了。
為什么使用阻塞隊(duì)列?1.為什么線程池要使用BlockingQueue,而不是ArrayList或別的什么列表?
2.既然使用了BlockingQueue,為什么還要設(shè)置拒絕策略,隊(duì)列滿的時(shí)候不是阻塞嗎?
要回答這個(gè)問(wèn)答,首先來(lái)看看不用線程池的時(shí)候怎么執(zhí)行異步任務(wù)
new Thread(() -> { // do something }).start();
也就是說(shuō),每次需要執(zhí)行異步任務(wù)的時(shí)候,新建一個(gè)線程去執(zhí)行,執(zhí)行完就回收了。這會(huì)導(dǎo)致什么問(wèn)題呢,首先,是對(duì)資源的浪費(fèi),線程的創(chuàng)建需要陷入內(nèi)核,需要分配??臻g,需要執(zhí)行調(diào)度,等等,只使用一次就回收太浪費(fèi)資源。其次,當(dāng)異步任務(wù)比較多的時(shí)候,這種方式要?jiǎng)?chuàng)建大量的線程,這對(duì)于內(nèi)存資源也是一個(gè)很大的開(kāi)銷。我們知道,在jvm啟動(dòng)的時(shí)候可以設(shè)置線程棧大小的參數(shù)-Xss,默認(rèn)的大小是1M,如果同時(shí)啟動(dòng)1000個(gè)線程,就要占用1G的內(nèi)存,可想而知,這對(duì)內(nèi)存是一個(gè)多大的開(kāi)銷。而且,線程數(shù)太多,對(duì)于內(nèi)核的調(diào)度壓力也是相當(dāng)大的,而且,因?yàn)轭l繁的上下文切換而使程序的局部性喪失,也是一種消耗。線程池的作用,就是線程的復(fù)用,那么,怎么復(fù)用呢,來(lái)看一段代碼:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
在ThreadPoolExecutor中,線程封裝在Worker中,Worker實(shí)現(xiàn)了Runnable,同時(shí)在run()方法中調(diào)用上面的runWorker()方法,只要runWorker()方法沒(méi)有執(zhí)行完,這個(gè)線程就不會(huì)被回收。而runWorker()方法要執(zhí)行下去,就要保證while (task != null || (task = getTask()) != null)的條件為真,第一次判斷時(shí)task為firstTask,即執(zhí)行的第一個(gè)任務(wù),那么要點(diǎn)就成了getTask()必須不能為空,來(lái)看看getTask()的實(shí)現(xiàn):
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
核心邏輯是:
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
這里的workQueue就是阻塞隊(duì)列,timed表示是否會(huì)超時(shí)釋放,keepAliveTime是非核心線程允許的空閑時(shí)間;如果不超時(shí),則調(diào)用BlockingQueue.take(),如果取不到值,就會(huì)一直阻塞直到程序提交了一個(gè)任務(wù)。所以,阻塞隊(duì)列的作用是控制線程池中線程的生命周期。
那么,如果不用阻塞隊(duì)列,有沒(méi)有別的方式可以實(shí)現(xiàn)線程池的功能?答案是,有,但是沒(méi)必要。比如我們可以使用wait/notify來(lái)控制線程的執(zhí)行和阻塞,但這里使用生產(chǎn)者/消費(fèi)者模式來(lái)實(shí)現(xiàn)是一種更優(yōu)雅的方式。
為什么需要拒絕策略既然使用了阻塞隊(duì)列,那添加任務(wù)的時(shí)候如果隊(duì)列滿了不就阻塞了嗎,拒絕策略是干嘛用的?答案是添加任務(wù)調(diào)用的并不是阻塞的put()方法,而是非阻塞的offer()方法,看一下ThreadPoolExecutor的execute()方法就知道了
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn"t, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
至于為什么這么實(shí)現(xiàn),應(yīng)該是不希望阻塞用戶進(jìn)程吧。
也就是說(shuō),在Java的線程池,只有消費(fèi)者使用了阻塞的方法,生產(chǎn)者并沒(méi)有。
SynchronousQueue不過(guò)也有例外,調(diào)用ExecutorService executorService = Executors.newCachedThreadPool();
時(shí),BlockingQueue的實(shí)現(xiàn)類是SynchronousQueue,顧名思義,這是一個(gè)同步隊(duì)列,其內(nèi)部沒(méi)有容量,使用SynchronousQueue,消費(fèi)者線程和生產(chǎn)者線程必須交替執(zhí)行,也就是說(shuō),生產(chǎn)者和消費(fèi)者都必須等待對(duì)方就緒。這樣的話,不就阻塞用戶進(jìn)程了嗎。確實(shí)會(huì),但是這個(gè)時(shí)間非常短,因?yàn)槭褂眠@種方式,每次通過(guò)execute()提交任務(wù)的時(shí)候,要么復(fù)用現(xiàn)有空閑的線程,要么新建一個(gè)線程,也就是說(shuō)線程數(shù)理論上沒(méi)有上界,所以可以當(dāng)作不會(huì)阻塞
https://stackoverflow.com/questions/7556465/why-threadpoolexecutor-has-blockingqueue-as-its-argument?utm_medium=organic&utm_source=google_rich_qa&utm_campaign=google_rich_qa
http://www.geek-programmer.com/java-blocking-queues-explained/
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/69078.html
摘要:當(dāng)活動(dòng)線程核心線程非核心線程達(dá)到這個(gè)數(shù)值后,后續(xù)任務(wù)將會(huì)根據(jù)來(lái)進(jìn)行拒絕策略處理。線程池工作原則當(dāng)線程池中線程數(shù)量小于則創(chuàng)建線程,并處理請(qǐng)求。當(dāng)線程池中的數(shù)量等于最大線程數(shù)時(shí)默默丟棄不能執(zhí)行的新加任務(wù),不報(bào)任何異常。 spring-cache使用記錄 spring-cache的使用記錄,坑點(diǎn)記錄以及采用的解決方案 深入分析 java 線程池的實(shí)現(xiàn)原理 在這篇文章中,作者有條不紊的將 ja...
摘要:任務(wù)性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開(kāi)處理。線程池在運(yùn)行過(guò)程中已完成的任務(wù)數(shù)量。如等于線程池的最大大小,則表示線程池曾經(jīng)滿了。線程池的線程數(shù)量。獲取活動(dòng)的線程數(shù)。通過(guò)擴(kuò)展線程池進(jìn)行監(jiān)控??蚣馨ň€程池,,,,,,等。 Java線程池 [toc] 什么是線程池 線程池就是有N個(gè)子線程共同在運(yùn)行的線程組合。 舉個(gè)容易理解的例子:有個(gè)線程組合(即線程池,咱可以比喻為一個(gè)公司),里面有3...
摘要:一個(gè)線程池包含很多準(zhǔn)備運(yùn)行的空閑線程,每當(dāng)執(zhí)行完畢后,線程不會(huì)死亡而是回到線程池準(zhǔn)備為下一個(gè)請(qǐng)求提供服務(wù)。另一個(gè)使用線程池的理由是減少并發(fā)線程數(shù)。創(chuàng)建大量線程會(huì)大大降低性能甚至拖垮虛擬機(jī)。 【Future的概念 interface Future ,表示異步計(jì)算的結(jié)果,F(xiàn)uture有個(gè)get方法而獲取結(jié)果只有在計(jì)算完成時(shí)獲取,否則會(huì)一直阻塞直到任務(wù)轉(zhuǎn)入完成狀態(tài),然后會(huì)返回結(jié)果或者拋出異常...
摘要:去美團(tuán)面試,問(wèn)到了什么是線程池,如何使用,為什么要用以下做個(gè)總結(jié)。二線程池線程池的作用線程池作用就是限制系統(tǒng)中執(zhí)行線程的數(shù)量。真正的線程池接口是。創(chuàng)建固定大小的線程池。此線程池支持定時(shí)以及周期性執(zhí)行任務(wù)的需求。 去美團(tuán)面試,問(wèn)到了什么是線程池,如何使用,為什么要用,以下做個(gè)總結(jié)。關(guān)于線程之前也寫過(guò)一篇文章《高級(jí)面試題總結(jié)—線程池還能這么玩?》 1、什么是線程池:? java.util...
摘要:去美團(tuán)面試,問(wèn)到了什么是線程池,如何使用,為什么要用以下做個(gè)總結(jié)。二線程池線程池的作用線程池作用就是限制系統(tǒng)中執(zhí)行線程的數(shù)量。真正的線程池接口是。創(chuàng)建固定大小的線程池。此線程池支持定時(shí)以及周期性執(zhí)行任務(wù)的需求。 去美團(tuán)面試,問(wèn)到了什么是線程池,如何使用,為什么要用,以下做個(gè)總結(jié)。關(guān)于線程之前也寫過(guò)一篇文章《高級(jí)面試題總結(jié)—線程池還能這么玩?》 1、什么是線程池:? java.util...
閱讀 2122·2023-04-26 00:41
閱讀 1155·2021-09-24 10:34
閱讀 3585·2021-09-23 11:21
閱讀 4117·2021-09-22 15:06
閱讀 1567·2019-08-30 15:55
閱讀 908·2019-08-30 15:54
閱讀 1837·2019-08-30 15:48
閱讀 561·2019-08-29 13:58