摘要:的多線程機(jī)制可彌補(bǔ)拋出未檢查的異常,將終止線程執(zhí)行,此時(shí)會(huì)錯(cuò)誤的認(rèn)為任務(wù)都取消了。如果想要不保留,則需要設(shè)置,此時(shí)最小的就是線程池最大的線程數(shù)。
提供Executor的工廠類
忽略了自定義的ThreadFactory、callable和unconfigurable相關(guān)的方法
newFixedxxx:在任意時(shí)刻,最多有nThreads個(gè)線程在處理task;如果所有線程都在運(yùn)行時(shí)來了新的任務(wù),它會(huì)被扔入隊(duì)列;如果有線程在執(zhí)行期間因某種原因終止了運(yùn)行,如果需要執(zhí)行后續(xù)任務(wù),新的線程將取代它
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
newCachedxxx:新任務(wù)到來如果線程池中有空閑的線程就復(fù)用,否則新建一個(gè)線程。如果一個(gè)線程超過60秒沒有使用,它就會(huì)被關(guān)閉移除線程池
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
newSingleThreadExecutor:僅使用一個(gè)線程來處理任務(wù),如果這線程掛了,會(huì)產(chǎn)生一個(gè)新的線程來代替它。每一個(gè)任務(wù)被保證按照順序執(zhí)行,而且一次只執(zhí)行一個(gè)
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }
使用newFixedxxx方法也能實(shí)現(xiàn)類似的作用,但是ThreadPoolExecutor會(huì)提供修改線程數(shù)的方法,F(xiàn)inalizableDelegatedExecutorService則沒有修改的途徑,它在DelegatedExecutorService的基礎(chǔ)
上僅提供了執(zhí)行finalize時(shí)候去關(guān)閉線程,而DelegatedExecutorService僅暴漏ExecutorService自身的方法
newScheduledThreadPool:提供一個(gè)線程池來延遲或者定期執(zhí)行任務(wù)
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); }
newSingleThreadScheduledExecutor:提供單個(gè)線程來延遲或者定期執(zhí)行任務(wù),如果執(zhí)行的線程掛了,會(huì)生成新的。
return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1));
同樣,它保證返回的Executor自身的線程數(shù)不可修改
從上述的實(shí)現(xiàn)可以看出,核心在于三個(gè)部分
ThreadPoolExecutor:提供線程數(shù)相關(guān)的控制
DelegatedExecutorService:僅暴露ExecutorService自身的方法,保證線程數(shù)不變來實(shí)現(xiàn)語義場(chǎng)景
ScheduledExecutorService:提供延遲或者定期執(zhí)行的功能
對(duì)應(yīng)的,相應(yīng)也有不同的隊(duì)列去實(shí)現(xiàn)不同的場(chǎng)景
LinkedBlockingQueue:無界阻塞隊(duì)列
SynchronousQueue:沒有消費(fèi)者消費(fèi)時(shí),新的任務(wù)就會(huì)被阻塞
DelayQueue:隊(duì)列中的任務(wù)過期之后才可以執(zhí)行,否則無法查詢到隊(duì)列中的元素
DelegatedExecutorService它僅僅是包裝了ExecutorService的方法,交由傳入的ExecutorService來執(zhí)行,所謂的UnConfigurable實(shí)際也就是它沒有暴漏配置各種參數(shù)調(diào)整的方法
static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public ListScheduledExecutorServiceshutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future> submit(Runnable task) { return e.submit(task); } public Future submit(Callable task) { return e.submit(task); } public Future submit(Runnable task, T result) { return e.submit(task, result); } public List > invokeAll(Collection extends Callable > tasks) throws InterruptedException { return e.invokeAll(tasks); } public List > invokeAll(Collection extends Callable > tasks, long timeout, TimeUnit unit) throws InterruptedException { return e.invokeAll(tasks, timeout, unit); } public T invokeAny(Collection extends Callable > tasks) throws InterruptedException, ExecutionException { return e.invokeAny(tasks); } public T invokeAny(Collection extends Callable > tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); } }
提供一系列的schedule方法,使得任務(wù)可以延遲或者周期性的執(zhí)行,對(duì)應(yīng)schedule方法會(huì)返回ScheduledFuture以供確認(rèn)是否執(zhí)行以及是否要取消。它的實(shí)現(xiàn)ScheduledThreadPoolExecutor也支持立即執(zhí)行由submit提交的任務(wù)
ThreadPoolExecutor僅支持相對(duì)延遲時(shí)間,比如距離現(xiàn)在5分鐘后執(zhí)行。類似Timer也可以管理延遲任務(wù)和周期任務(wù),但是存在一些缺陷:
所有的定時(shí)任務(wù)只有一個(gè)線程,如果某個(gè)任務(wù)執(zhí)行時(shí)間長(zhǎng),將影響其它TimerTask的精確性。ScheduledExecutorService的多線程機(jī)制可彌補(bǔ)
TimerTask拋出未檢查的異常,將終止線程執(zhí)行,此時(shí)會(huì)錯(cuò)誤的認(rèn)為任務(wù)都取消了。1:可以使用try-catch-finally對(duì)相應(yīng)執(zhí)行快處理;2:通過execute執(zhí)行的方法可以設(shè)置UncaughtExceptionHandler來接收未捕獲的異常,并作出處理;3:通過submit執(zhí)行的,將被封裝層ExecutionException重新拋出
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
corePoolSize、maximumPoolSize:ThreadPoolExecutor會(huì)根據(jù)這兩自動(dòng)調(diào)整線程池的大小,當(dāng)一個(gè)新任務(wù)通過execute提交的時(shí)候:
如果當(dāng)前運(yùn)行的線程數(shù)小于corePoolSize就新建線程;
如果當(dāng)前線程數(shù)在corePoolSize與maximumPoolSize之間,則只有在隊(duì)列滿的時(shí)候才會(huì)創(chuàng)建新的線程;
如果已經(jīng)達(dá)到最大線程數(shù),并且隊(duì)列都滿了,在這種飽和狀態(tài)下就會(huì)執(zhí)行拒絕策略
默認(rèn)情況下,只有新任務(wù)到達(dá)的時(shí)候才會(huì)啟動(dòng)線程,可通過prestartCoreThread方法實(shí)現(xiàn)事先啟動(dòng)
corePoolSize:默認(rèn)線程池所需要維護(hù)的最小的worker的數(shù)量,就算是worker過期了也會(huì)保留。如果想要不保留,則需要設(shè)置allowCoreThreadTimeOut,此時(shí)最小的就是0
maximumPoolSize:線程池最大的線程數(shù)。java限制最多為 2^29-1,大約5億個(gè)
keepAliveTime、unit:如果當(dāng)前線程池有超過corePoolSize的線程數(shù),只要有線程空閑時(shí)間超過keepAliveTime的設(shè)定,就會(huì)被終止;unit則是它的時(shí)間單位
workQueue:任何BlockingQueue都可以使用,基本上有三種
Direct handoffs,直接交付任務(wù)。比如 SynchronousQueue,如果沒有線程消費(fèi),提交任務(wù)會(huì)失敗,當(dāng)然可以新建一個(gè)線程來處理。它適合處理有依賴關(guān)系的任務(wù),一般它的maximumPoolSizes會(huì)被設(shè)置成最大的
Unbounded queues,無界隊(duì)列。比如LinkedBlockingQueue,這意味著如果有corePoolSize個(gè)線程在執(zhí)行,那么其他的任務(wù)都只能等待。它適合于處理任務(wù)都是互相獨(dú)立的,
Bounded queues,有界隊(duì)列。比如ArrayBlockingQueue,需要考慮隊(duì)列大小和最大線程數(shù)之間的關(guān)系,來達(dá)到更好的資源利用率和吞吐量
threadFactory:沒有指定的時(shí)候,使用Executors.defaultThreadFactory
RejectedExecutionHandler:通過execute添加的任務(wù),如果Executor已經(jīng)關(guān)閉或者已經(jīng)飽和了(線程數(shù)達(dá)到了maximumPoolSize,并且隊(duì)列滿了),就會(huì)執(zhí)行,java提供了4種策略:
AbortPolicy,拒絕的時(shí)候拋出運(yùn)行時(shí)異常RejectedExecutionException;
CallerRunsPolicy,如果executor沒有關(guān)閉,那么由調(diào)用execute的線程來執(zhí)行它;
DiscardPolicy,直接扔掉新的任務(wù);
DiscardOldestPolicy,如果executor沒有關(guān)閉,那么扔掉隊(duì)列頭部的任務(wù),再次嘗試;
ThreadPoolExecutor可自定義beforeExecutor、afterExecutor可以用來添加日志統(tǒng)計(jì)、計(jì)時(shí)、件事或統(tǒng)計(jì)信息收集功能,無論run是正常返回還是拋出異常,afterExecutor都會(huì)被執(zhí)行。如果beforeExecutor拋出RuntimeException,任務(wù)和afterExecutor都不會(huì)被執(zhí)行。terminated在所有任務(wù)都已經(jīng)完成,并且所有工作者線程關(guān)閉后會(huì)調(diào)用,此時(shí)也可以用來執(zhí)行發(fā)送通知、記錄日志等等。如何估算線程池的大小
計(jì)算密集型,通常在擁有$N_{cpu}$個(gè)處理器的系統(tǒng)上,線程池大小設(shè)置為$N_{cpu}+1$能夠?qū)崿F(xiàn)最優(yōu)的利用率;
$N_{cpu}$ cpu的個(gè)數(shù)
I/O密集型或者其它阻塞型的任務(wù),定義 $N_{cpu}$為CPU的個(gè)數(shù),$U_{cpu}$為CPU的利用率,$W/C$為等待時(shí)間與計(jì)算時(shí)間的比率,此時(shí)線程池的最優(yōu)大小為
$$N_{threads}=N_{cpu}*U_{cpu}*(1+W/C)$$
場(chǎng)景說明將一個(gè)網(wǎng)站的業(yè)務(wù)抽象成如下幾塊
接收客戶端請(qǐng)求與處理請(qǐng)求
頁面渲染返回的文本和圖片
獲取頁面的廣告
接收請(qǐng)求與處理請(qǐng)求 理論模型理論上,服務(wù)端通過實(shí)現(xiàn)約定的接口就可以實(shí)現(xiàn)接收請(qǐng)求和處理連續(xù)不斷的請(qǐng)求過來
ServerSocket socket = new ServerSocket(80); while(true){ Socket conn = socket.accept(); handleRequest(conn) }
缺點(diǎn):每次只能處理一個(gè)請(qǐng)求,新請(qǐng)求到來時(shí),必須等到正在處理的請(qǐng)求處理完成,才能接收新的請(qǐng)求
顯示的創(chuàng)建多線程為每個(gè)請(qǐng)求創(chuàng)建新的線程提供服務(wù)
ServerSocket socket = new ServerSocket(80); while(true){ final Socket conn = socket.accept(); Runnable task = new Runnable(){ public void run(){ handleRequest(conn); } } new Thread(task).start(); }
缺點(diǎn):
線程的創(chuàng)建和銷毀都有一定的開銷,延遲對(duì)請(qǐng)求的處理;
創(chuàng)建后的線程多于可用處理器的數(shù)量,造成線程閑置,這會(huì)給垃圾回收帶來壓力
存活的大量線程競(jìng)爭(zhēng)CPU資源會(huì)產(chǎn)生很多性能開銷
系統(tǒng)上對(duì)可創(chuàng)建的線程數(shù)存在限制
使用線程池使用java自帶的Executor框架。
private static final Executor exec = Executors.newFixedThreadPool(100); ... ServerSocket socket = new ServerSocket(80); while(true){ final Socket conn = socket.accept(); Runnable task = new Runnable(){ public void run(){ handleRequest(conn); } } exec.execute(task); } ...
線程池策略通過實(shí)現(xiàn)預(yù)估好的線程需求,限制并發(fā)任務(wù)的數(shù)量,重用現(xiàn)有的線程,解決每次創(chuàng)建線程的資源耗盡、競(jìng)爭(zhēng)過于激烈和頻繁創(chuàng)建的問題,也囊括了線程的優(yōu)勢(shì),解耦了任務(wù)提交和任務(wù)執(zhí)行。
頁面渲染返回的文本和圖片 串行渲染renderText(source); ListimageData = new ArrayList (); for(ImageInfo info:scaForImageInfo(source)){ imageData.add(info.downloadImage()); } for(ImageData data:imageData){ renderImage(data); }
缺點(diǎn):圖像的下載大部分時(shí)間在等待I/O操作執(zhí)行完成,這期間CPU幾乎不做任何工作,使得用戶看到最終頁面之前要等待過長(zhǎng)的時(shí)間
并行化渲染過程可以分成兩個(gè)部分,1是渲染文本,1是下載圖像
private static final ExecutorService exec = Executors.newFixedThreadPool(100); ... final Listinfos=scaForImageInfo(source); Callable > task=new Callable
>(){ public List
call(){ List r = new ArrayList (); for(ImageInfo info:infos){ r.add(info.downloadImage()); } return r; } }; Future > future = exec.submit(task); renderText(source); try{ List
imageData = future.get(); for(ImageData data:imageData){ renderImage(data); } }catch(InterruptedException e){ Thread.currentThread().interrupt(); future.cancel(true); }catche(ExecutionException e){ throw launderThrowable(e.getCause()); }
使用Callable來返回下載的圖片結(jié)果,使用future來獲得下載的圖片,這樣將減少用戶所需要的等待時(shí)間。
缺點(diǎn):圖片的下載很明顯時(shí)間要比文本要慢,這樣的并行化很可能速度可能只提升了1%
使用CompletionService。
private static final ExecutorService exec; ... final Listinfos=scaForImageInfo(source); CompletionService cService = new ExecutorCompletionService (exec); for(final ImageInfo info:infos){ cService.submit(new Callable (){ public ImageData call(){ return info.downloadImage(); } }); } renderText(source); try{ for(int i=0,n=info.size();t f = cService.take(); ImageData imageData=f.get(); renderImage(imageData) } }catch(InterruptedException e){ Thread.currentThread().interrupt(); }catche(ExecutionException e){ throw launderThrowable(e.getCause()); }
核心思路為為每一幅圖像下載都創(chuàng)建一個(gè)獨(dú)立的任務(wù),并在線程池中執(zhí)行他們,從而將串行的下載過程轉(zhuǎn)換為并行的過程
獲取頁面的廣告廣告展示如果在一定的時(shí)間以內(nèi)沒有獲取,可以不再展示,并取消超時(shí)的任務(wù)。
ExecutorService exe = Executors.newFixedThreadPool(3); ListmyTasks = new ArrayList<>(); for (int i=0;i<3;i++){ myTasks.add(new MyTask(3-i)); } try { List > futures = exe.invokeAll(myTasks, 1, TimeUnit.SECONDS); for (int i=0;i invokeAll方法對(duì)于沒有完成的任務(wù)會(huì)被取消,通過CancellationException可以捕獲,invokeAll返回的序列順序和傳入的task保持一致。結(jié)果如下:
task sleep 3 not execute ,because java.util.concurrent.CancellationException task sleep 2 not execute ,because java.util.concurrent.CancellationException task execut 1 s
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/72394.html
摘要:目標(biāo)線程由運(yùn)行狀態(tài)轉(zhuǎn)換為就緒狀態(tài),也就是讓出執(zhí)行權(quán)限,讓其他線程得以優(yōu)先執(zhí)行,但其他線程能否優(yōu)先執(zhí)行時(shí)未知的。函數(shù)的官方解釋是意思是使調(diào)用該函數(shù)的線程讓出執(zhí)行時(shí)間給其他已就緒狀態(tài)的線程。 線程允許在同一個(gè)進(jìn)程中同時(shí)存在多個(gè)程序控制流,即通過線程可以實(shí)現(xiàn)同時(shí)處理多個(gè)任務(wù)的功能。線程會(huì)共享進(jìn)程范圍內(nèi)的資源,例如內(nèi)存句柄和文件句柄,但每個(gè)線程都有各自的程序計(jì)數(shù)器、棧以及局部變量。 多線程的實(shí)...
摘要:中的線程池是運(yùn)用場(chǎng)景最多的并發(fā)框架。才是真正的線程池。存放任務(wù)的隊(duì)列存放需要被線程池執(zhí)行的線程隊(duì)列。所以線程池的所有任務(wù)完成后,它最終會(huì)收縮到的大小。飽和策略一般情況下,線程池采用的是,表示無法處理新任務(wù)時(shí)拋出異常。 Java線程池 1. 簡(jiǎn)介 系統(tǒng)啟動(dòng)一個(gè)新線程的成本是比較高的,因?yàn)樗婕芭c操作系統(tǒng)的交互,這個(gè)時(shí)候使用線程池可以提升性能,尤其是需要?jiǎng)?chuàng)建大量聲明周期很短暫的線程時(shí)。Ja...
摘要:面向?qū)ο竺嫦驅(qū)ο蟮娜N基本特征繼承封裝多態(tài)結(jié)構(gòu)化程序設(shè)計(jì)簡(jiǎn)介主要原則自頂向下逐步求精模塊化。在面向?qū)ο蠓椒ㄖ?,類之間共享屬性和操作的機(jī)制稱為繼承。 面向?qū)ο?面向?qū)ο蟮娜N基本特征:繼承、封裝、多態(tài) 結(jié)構(gòu)化程序設(shè)計(jì)簡(jiǎn)介 主要原則:自頂向下、逐步求精、模塊化。 結(jié)構(gòu)化分析SA方法對(duì)系統(tǒng)進(jìn)行需求分析;結(jié)構(gòu)化設(shè)計(jì)SD方法對(duì)系統(tǒng)進(jìn)行概要設(shè)計(jì)、詳細(xì)設(shè)計(jì);結(jié)構(gòu)化編程SP方法來實(shí)現(xiàn)系統(tǒng)。 結(jié)構(gòu)化程序...
摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢娝氖褂?,在開始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購(gòu),是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個(gè)名詞,今天我們首先來說說分布式。 探究...
閱讀 3226·2021-11-10 11:35
閱讀 1322·2019-08-30 13:20
閱讀 1146·2019-08-29 16:18
閱讀 2161·2019-08-26 13:54
閱讀 2185·2019-08-26 13:50
閱讀 983·2019-08-26 13:39
閱讀 2509·2019-08-26 12:08
閱讀 1974·2019-08-26 10:37