摘要:中的線程池運用場景非常廣泛,幾乎所有的一步或者并發(fā)執(zhí)行程序都可以使用。代碼中如果執(zhí)行了方法,線程池會提前創(chuàng)建并啟動所有核心線程。線程池最大數(shù)量線程池允許創(chuàng)建的線程最大數(shù)量。被稱為是可重用固定線程數(shù)的線程池。
Java中的線程池運用場景非常廣泛,幾乎所有的一步或者并發(fā)執(zhí)行程序都可以使用。那么線程池有什么好處呢,以及他的實現(xiàn)原理是怎么樣的呢?
在開發(fā)過程中,合理的使用線程池能夠帶來以下的一些優(yōu)勢,這些優(yōu)勢同時也是一些其他池化的優(yōu)勢,例如數(shù)據(jù)庫連接池,http連接池等。
降低資源消耗,通過重復(fù)利用已經(jīng)創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
提高響應(yīng)速度,當(dāng)任務(wù)到達時,任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
提高線程的可管理性 線程的比較稀缺的資源,如果無限的創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性。
線程池實現(xiàn)原理我們創(chuàng)建線程池完成之后,當(dāng)把一個任務(wù)提交給線程池處理的時候,線程池的處理流程如下:
1)線程池判斷核心線程池的任務(wù)是否都在執(zhí)行任務(wù),如果不是,則創(chuàng)建一個新的線程來執(zhí)行任務(wù),如何核心線程池的線程都在執(zhí)行任務(wù),則進入下一個流程
2)線程池判斷工作隊列是否已滿,如果工作隊列沒有滿,那么新提交的任務(wù)將會存儲在工作隊列中,如果工作隊列滿了,那會進入到下一個流程
3)線程池判斷線程池中的線程是否都處于工作狀態(tài),如果沒有,則創(chuàng)建一個新的工作線程來執(zhí)行任務(wù)。如果已經(jīng)滿了,那么使用飽和策略來處理這個任務(wù)
創(chuàng)建線程池可以使用java中已經(jīng)內(nèi)置的一些默認(rèn)配置的線程池,也可以使用ThreadPoolExecutor來自己配置參數(shù)來創(chuàng)建線程池,阿里代碼規(guī)約中推薦后者來創(chuàng)建線程池,這樣創(chuàng)建的線程池更加符合業(yè)務(wù)的實際需求。
下面是我創(chuàng)建線程池的一個例子:
ExecutorService executor = new ThreadPoolExecutor(2, 5, 30 , TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.AbortPolicy()); public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
在上面的這段代碼里new ThreadPoolExecutor中一共有6個參數(shù),我們來解析一下這些參數(shù)的意義,這樣下次創(chuàng)建線程池的時候能夠更加靈活的使用。
corePoolSize (核心線程的數(shù)量):任務(wù)提交到線程池時,線程池會創(chuàng)建一個線程來執(zhí)行任務(wù),即使其他空閑的核心線程能夠執(zhí)行新任務(wù)也會創(chuàng)建線程,等到需要執(zhí)行的任務(wù)數(shù)大于線程池核心數(shù)大小時就不再創(chuàng)建。代碼中如果執(zhí)行了prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動所有核心線程。
maximumPoolSize (線程池最大數(shù)量):線程池允許創(chuàng)建的線程最大數(shù)量。如果隊列滿了,并且創(chuàng)建的線程數(shù)小于最大線程數(shù),那么線程池會再創(chuàng)建新的線程執(zhí)行任務(wù),如果使用無界隊列,那么這個參數(shù)設(shè)置了意義不大。
keepAliveTime (線程保持活動時間):線程池的工作線程空閑后,保持存活的時間,如果任務(wù)很多,切任務(wù)執(zhí)行時間較長,這個值可以設(shè)置的大一點。
TimeUnit (上面的那個時間的單位)
BlockingQueue
ArrayBlockingQueue:基于數(shù)組的有界阻塞隊列 FIFO
LinkedBlockingQueue:基于鏈表的有界阻塞隊列 FIFO 吞吐量高于ArrayBlockingQueue, FixedThreadPool基于這個實現(xiàn)
SynchronousQueue: 不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入處于阻塞狀態(tài),吞吐量高于LinkedBlockingQueue,靜態(tài)方法:Executors.newCachedThreadPool使用了這個隊列
PriorityBlockingQueue 具有優(yōu)先級的無界阻塞隊列
handler (飽和策略):當(dāng)隊列和線程池都滿了,說明線程池已經(jīng)處于飽和狀態(tài),那么必須采取一種策略處理新提交的任務(wù),目前提供四種策略
AbortPolicy:直接拋出異常
CallerRunsPolicy: 只用調(diào)用者所在線程來運行任務(wù)
DiscardOldestPolicy :丟棄隊列里最近的一個任務(wù),執(zhí)行當(dāng)前任務(wù)
DiscardPolicy : 不處理丟棄掉
也可以實現(xiàn)RejectedExecutionHandler接口自定義處理方式。
線程池創(chuàng)建完成之后,就可以處理提交的任務(wù),任務(wù)如何提交到線程池呢,線程池提供了兩個方法:submit()和execute()方法。
public static void main(String[] args) { //提交沒有返回結(jié)果的任務(wù) EXECUTOR_SERVICE.execute(new Runnable() { @Override public void run() { System.out.println("hello world"); } }); Future future = EXECUTOR_SERVICE.submit(new Runnable() { @Override public void run() { System.out.println("hello world1"); } }); //提交有返回結(jié)果的任務(wù),返回一個future的對象,調(diào)用get方法獲取返回值,阻塞直到返回 Future future1 = EXECUTOR_SERVICE.submit(new Callable
Java對ExecutorService關(guān)閉方式有兩種,一是調(diào)用shutdown()方法,二是調(diào)用shutdownNow()方法。
這兩個方法雖然都可以關(guān)閉線程池,但是有一些區(qū)別
shutdown()
1、調(diào)用之后不允許繼續(xù)往線程池內(nèi)繼續(xù)添加線程,如果繼續(xù)添加會出現(xiàn)RejectedExecutionException異常;
2、線程池的狀態(tài)變?yōu)镾HUTDOWN狀態(tài);
3、所有在調(diào)用shutdown()方法之前提交到ExecutorSrvice的任務(wù)都會執(zhí)行;
4、一旦所有線程結(jié)束執(zhí)行當(dāng)前任務(wù),ExecutorService才會真正關(guān)閉。
shutdownNow()
1、該方法返回尚未執(zhí)行的 task 的 List;
2、線程池的狀態(tài)變?yōu)镾TOP狀態(tài);
3、阻止所有正在等待啟動的任務(wù), 并且停止當(dāng)前正在執(zhí)行的任務(wù);
最好的關(guān)閉線程池的方法:
package multithread.threadpool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * @author pangjianfei * @Date 2019/6/21 * @desc 測試線程池 */ public class TestThreadPoolExecutor { static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10); public static void main(String[] args) { EXECUTOR_SERVICE.execute( ()-> System.out.println("hello world")); Future future = EXECUTOR_SERVICE.submit(() -> System.out.println("hello world1")); Future future1 = EXECUTOR_SERVICE.submit(() -> {System.out.println("hello world1");return "succ";}); Future future2 = EXECUTOR_SERVICE.submit(() -> {Thread.sleep(5000);System.out.println("hello world2");return "線程執(zhí)行成功了";}); try { System.out.println(future.get()); System.out.println(future1.get()); System.out.println(future2.get()); } catch (Exception e) { e.printStackTrace(); } Runnable run = () -> { long sum = 0; boolean flag = true; while (flag && !Thread.currentThread().isInterrupted()) { sum += 1; if (sum == Long.MAX_VALUE) { flag = false; } } }; EXECUTOR_SERVICE.execute(run); //先調(diào)用shutdown()使線程池狀態(tài)改變?yōu)镾HUTDOWN EXECUTOR_SERVICE.shutdown(); try { //2s后檢測線程池內(nèi)的線程是否執(zhí)行完畢 if (!EXECUTOR_SERVICE.awaitTermination(2, TimeUnit.SECONDS)) { //內(nèi)部實際通過interrupt來終止線程,所以當(dāng)調(diào)用shutdownNow()時,isInterrupted()會返回true。 EXECUTOR_SERVICE.shutdownNow(); } } catch (InterruptedException e) { EXECUTOR_SERVICE.shutdownNow(); } } }
一般說來,大家認(rèn)為線程池的大小經(jīng)驗值應(yīng)該這樣設(shè)置:(其中N為CPU的個數(shù))
如果是CPU密集型應(yīng)用,則線程池大小設(shè)置為N+1
如果是IO密集型應(yīng)用,則線程池大小設(shè)置為2N+1
IO優(yōu)化中,更加合理的方式是:最佳線程數(shù)目 = (線程等待時間與線程CPU時間之比 + 1)* CPU數(shù)目
linux查看CPU的數(shù)量:
# 查看物理CPU個數(shù) cat /proc/cpuinfo| grep "physical id"| sort| uniq| wc -l # 查看每個物理CPU中core的個數(shù)(即核數(shù)) cat /proc/cpuinfo| grep "cpu cores"| uniq # 查看邏輯CPU的個數(shù) cat /proc/cpuinfo| grep "processor"| wc -l
基于下面的方法可以對線程池中的一些數(shù)據(jù)進行監(jiān)控:
//當(dāng)前排隊線程數(shù) ((ThreadPoolExecutor)EXECUTOR_SERVICE).getQueue().size(); //當(dāng)前活動的線程數(shù) ((ThreadPoolExecutor)EXECUTOR_SERVICE).getActiveCount(); //執(zhí)行完成的線程數(shù) ((ThreadPoolExecutor)EXECUTOR_SERVICE).getCompletedTaskCount(); //總線程數(shù) ((ThreadPoolExecutor)EXECUTOR_SERVICE).getTaskCount();Executor框架
Executor框架的調(diào)度模型是一種二級調(diào)度模型。
在HotSpot VM 的線程模型中,Java線程會被一對一的映射為本地操作系統(tǒng)的線程。Java線程的啟動與銷毀都與本地線程同步。操作系統(tǒng)會調(diào)度所有線程并將它們分配給可用的CPU。
在上層,我們通常會創(chuàng)建任務(wù),然后將任務(wù)提交到調(diào)度器,調(diào)度器(Executor框架)將這些任務(wù)映射為對應(yīng)數(shù)量的線程;底層,操作系統(tǒng)會將這些線程映射到硬件處理器上,這里不是由應(yīng)用程序控制。模型圖如下:
Executor框架主要包括三部分:
任務(wù): 定義的被執(zhí)行的任務(wù)需要實現(xiàn)兩個接口之一:Runable、Callable;
任務(wù)的執(zhí)行: 包括任務(wù)執(zhí)行機制的核心接口Executor,以及繼承自Executor的ExecutorService接口。Executor框架有兩個關(guān)鍵類實現(xiàn)了ExecutorService接口:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor、ForkJoinPool;
任務(wù)的異步計算結(jié)果: 包括Future接口和實現(xiàn)Future接口的FutureTask類、ForkJoinTask類。
Executor主要包含的類和接口如下:
Executor是一個接口,他是Executor框架的基礎(chǔ),他將任務(wù)的提交和執(zhí)行分離。
ThreadPoolExecutor是線程池的核心類,用來執(zhí)行被提交的任務(wù)
ScheduleThreadPoolExecutor是一個實現(xiàn)類,可以在給定的延遲后執(zhí)行命令,或者定期執(zhí)行命令,比Timer更加靈活。
Future接口和實現(xiàn)Future接口的FutureTask類,代表異步計算的結(jié)果
Executor框架最核心的類就是ThreadPoolExecutor,他是線程池的實現(xiàn)類,我們在上面看到過創(chuàng)建線程池的例子
ExecutorService executor = new ThreadPoolExecutor(2, 5, 30 , TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.AbortPolicy()); 里面包含了幾個核心的參數(shù),通過Executor框架的工具類Executors,可以直接創(chuàng)建3中類型的線程池。
FixedThreadPool被稱為是可重用固定線程數(shù)的線程池。為什么會這么說,我們看一下他的實現(xiàn)源碼:
public static ExecutorService newFixedThreadPool(int nThreads) { /** * 核心線程數(shù)和最大線程數(shù)相同,線程等待時間為0表示多余的空閑線程會被立刻終止,LinkedBlockingQueue默認(rèn)容量是Integer.MAX_VALUE,基本類似于無界隊列 */ return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
SingleThreadExecutor是單個Worker線程的Executor.他的實現(xiàn)源碼如下:
/** * 可以看到核心線程數(shù)和最大線程數(shù)都是1,相當(dāng)于是創(chuàng)建一個單線程順序的執(zhí)行隊列中的任務(wù) */ public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }
CachedThreadPool是一個會根據(jù)需要創(chuàng)建新線程的線程池,源碼如下:
/** * 核心線程數(shù)為0,最大線程數(shù)為Integer.MAX_VALUE,線程存活時間為1分鐘,而且是使用SynchronousQueue沒有容量的隊列,這種情況下任務(wù)提交快的是時候,會創(chuàng)建大量的線程,耗盡CPU的資源 */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
ScheduledThreadPoolExecutor用來在給定的延遲之后運行任務(wù),或者定期執(zhí)行任務(wù),他比Timer更加靈活,Timer是創(chuàng)建一個后臺線程,而這個線程池可以在構(gòu)造函數(shù)中指定多個對應(yīng)的后臺線程。源碼如下:
/** * 這里使用的是無界的隊列,如果核心線程池已滿,那么任務(wù)會一直提交到隊列 */ public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
ScheduledThreadPoolExecutor的執(zhí)行主要包含兩部分
/**參數(shù)的作用:1:執(zhí)行的線程 2、初始化延時 3、兩次開始執(zhí)行最小時間間隔 4、計時單位*/ /**scheduleAtFixedRate()是按照指定頻率執(zhí)行一個任務(wù),就是前一個任務(wù)沒有執(zhí)行完成,但是下次執(zhí)行時間到,仍然會啟動線程執(zhí)行新的任務(wù)*/ scheduledExecutorService.scheduleAtFixedRate(()-> System.out.println("hello"), 10, 10, TimeUnit.SECONDS); /**參數(shù)作用同上*/ /**scheduleWithFixedDelay()是按照指定頻率間隔執(zhí)行,本次執(zhí)行結(jié)果之后,在延時10s執(zhí)行下一次的任務(wù)*/ scheduledExecutorService.scheduleWithFixedDelay(()-> System.out.println("hello"), 10, 10, TimeUnit.SECONDS); 上面的兩種方法都是向ScheduledThreadPoolExecutor的DelayQueue中添加了一個實現(xiàn)了RunnableScheduleFuture接口的ScheduledTask. public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); //這里對commond進行了封裝 ScheduledFutureTasksft = new ScheduledFutureTask (command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
DelayQueue是一個支持優(yōu)先級的隊列,這里默認(rèn)會按照執(zhí)行時間進行排序,time小的排在前面。線程在獲取到期需要執(zhí)行的ScheduledTutureTask之后,執(zhí)行這個任務(wù),執(zhí)行完成之后會修改time,將其變?yōu)橄麓我獔?zhí)行的時間,修改之后放回到DelayQueue中。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/77839.html
摘要:當(dāng)活動線程核心線程非核心線程達到這個數(shù)值后,后續(xù)任務(wù)將會根據(jù)來進行拒絕策略處理。線程池工作原則當(dāng)線程池中線程數(shù)量小于則創(chuàng)建線程,并處理請求。當(dāng)線程池中的數(shù)量等于最大線程數(shù)時默默丟棄不能執(zhí)行的新加任務(wù),不報任何異常。 spring-cache使用記錄 spring-cache的使用記錄,坑點記錄以及采用的解決方案 深入分析 java 線程池的實現(xiàn)原理 在這篇文章中,作者有條不紊的將 ja...
摘要:任務(wù)性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。線程池在運行過程中已完成的任務(wù)數(shù)量。如等于線程池的最大大小,則表示線程池曾經(jīng)滿了。線程池的線程數(shù)量。獲取活動的線程數(shù)。通過擴展線程池進行監(jiān)控??蚣馨ň€程池,,,,,,等。 Java線程池 [toc] 什么是線程池 線程池就是有N個子線程共同在運行的線程組合。 舉個容易理解的例子:有個線程組合(即線程池,咱可以比喻為一個公司),里面有3...
摘要:線程池的工作原理一個線程池管理了一組工作線程,同時它還包括了一個用于放置等待執(zhí)行任務(wù)的任務(wù)隊列阻塞隊列。使用線程池可以對線程進行統(tǒng)一的分配和監(jiān)控。線程池的注意事項雖然線程池是構(gòu)建多線程應(yīng)用程序的強大機制,但使用它并不是沒有風(fēng)險的。 線程池的工作原理一個線程池管理了一組工作線程, 同時它還包括了一個用于放置等待執(zhí)行 任務(wù)的任務(wù)隊列(阻塞隊列) 。 一個線程池管理了一組工作線程, 同時它還...
摘要:高并發(fā)系列第篇文章。簡單的說,在使用了線程池之后,創(chuàng)建線程變成了從線程池中獲取一個空閑的線程,然后使用,關(guān)閉線程變成了將線程歸還到線程池。如果調(diào)用了線程池的方法,線程池會提前把核心線程都創(chuàng)造好,并啟動線程池允許創(chuàng)建的最大線程數(shù)。 java高并發(fā)系列第18篇文章。 本文主要內(nèi)容 什么是線程池 線程池實現(xiàn)原理 線程池中常見的各種隊列 自定義線程創(chuàng)建的工廠 常見的飽和策略 自定義飽和策略 ...
閱讀 1734·2021-11-12 10:36
閱讀 1641·2021-11-12 10:36
閱讀 3469·2021-11-02 14:46
閱讀 3857·2019-08-30 15:56
閱讀 3646·2019-08-30 15:55
閱讀 1495·2019-08-30 15:44
閱讀 1077·2019-08-30 14:00
閱讀 2758·2019-08-29 18:41