摘要:線程池一種線程使用模式。線程池不僅能夠保證內核的充分利用,還能防止過分調度。相關起提供了線程池相關頂級接口,及子接口和工具類。線程池的最大線程數(shù),要大于。可擴容創(chuàng)建一個可根據(jù)需要線程數(shù),創(chuàng)建新的線程的線程池。
在使用線程時,需要new一個,用完了又要銷毀,這樣頻繁的創(chuàng)建和銷毀很耗資源,所以就提供了線程池。道理和連接池差不多,連接池是為了避免頻繁的創(chuàng)建和釋放連接,所以在連
接池中就有一定數(shù)量的連接,要用時從連接池拿出,用完歸還給連接池,線程池也一樣。
線程池:一種線程使用模式。線程過多會帶來調度開銷,進而影響緩存局部性和整體性能。而線程池維護著多
個線程,等待著監(jiān)督管理者分配可并發(fā)執(zhí)行的任務。這避免了在處理短時間任務時創(chuàng)建與銷毀線程的代價。線程池不僅能夠保證內核的充分利用,還能防止過分調度。
腦圖:https://www.processon.com/view/link/61849ba4f346fb2ecc4546e5
線程池用法很簡單,?分為三步。首先用工具類Executors創(chuàng)建線程池,然后給線程池分配任務,最后關閉線程池就行了。
1 public class ThreadPoolTest { 2 public static void main(String[] args) throws Exception { 3 4 // 1.創(chuàng)建一個 10 個線程數(shù)的線程池 5 ExecutorService service = Executors.newFixedThreadPool(10); 6 7 // 2.執(zhí)行一個Runnable 8 service.execute(new Number1()); 9 10 // 2.提交一個Callable11 Futurefuture = service.submit(new Number2());12 Integer integer = future.get();13 System.out.println("result = " + integer);14 15 // 關閉線程池16 service.shutdown();17 }18 }19 20 class Number1 implements Runnable {21 22 @Override23 public void run() {24 System.out.println("----打印Runnable----");25 }26 }27 28 // 10以內數(shù)求和29 class Number2 implements Callable {30 private int sum = 0;31 32 @Override33 public Integer call() {34 for (int i = 0; i <= 10; i++) {35 if (i % 2 == 0) {36 sum += i;37 }38 }39 return sum;40 }41 }
注意:線程用完,要關閉線程池,否則程序依然在運行中。
JDK 5.0 起提供了線程池相關API:頂級接口Executor,及子接口 ExecutorService 和工具類Executors。
JUC包描述:圖片來源API文檔
Executors:工具類,線程池的工廠類,用于創(chuàng)建并返回不同類型的線程池。
1 // 一池N線程:創(chuàng)建一個固定(可重用)線程數(shù)的線程池。 2 ExecutorService Executors.newFixedThreadPool(int nThreads) 3 4 // 一池一線程:創(chuàng)建一個只有一個線程的線程池。 5 ExecutorService Executors.newSingleThreadExecutor() 6 7 // 可擴容:創(chuàng)建一個可根據(jù)需要線程數(shù),創(chuàng)建新的線程的線程池。 8 ExecutorService Executors.newCachedThreadPool() 9 10 // 可用于調度:創(chuàng)建一個線程池,它可安排在給定延遲后運行命令或者定期的執(zhí)行。11 ScheduledExecutorService Executors.newScheduledThreadPool(int corePoolSize)
ExecutorService:
1 // 執(zhí)行任務/命令,沒有返回值,一般用來執(zhí)行Runnable 2 void execute(Runnable command) 3 4 // 可用于提交一個Runnable,但沒有返回值 5 Future> submit(Runnable task) 6 7 // 執(zhí)行任務,有返回值,一般用來執(zhí)行Callable 8Future submit(Callable task) 9 10 // 關閉連接池11 void shutdown()
代碼示例:創(chuàng)建固定 5 個線程的線程池為 10 個任務服務。
1 public class ThreadPoolTest { 2 public static void main(String[] args) { 3 // 1.創(chuàng)建一個 10 個線程數(shù)的線程池 4 ExecutorService service = Executors.newFixedThreadPool(5); 5 6 try { 7 for (int i = 0; i < 10; i++) { 8 int finalI = i; 9 service.execute(() -> {10 11 System.out.println(Thread.currentThread().getName() + " 為客戶 " + finalI + " 辦理業(yè)務~");12 13 // try {14 // Thread.sleep(1000_000);15 // } catch (InterruptedException e) {16 // e.printStackTrace();17 // }18 19 });20 }21 } finally {22 service.shutdown();23 }24 }25 }26 27 // 可能的一種結果28 pool-1-thread-1 為客戶 0 辦理業(yè)務~29 pool-1-thread-2 為客戶 1 辦理業(yè)務~30 pool-1-thread-2 為客戶 6 辦理業(yè)務~31 pool-1-thread-2 為客戶 7 辦理業(yè)務~32 pool-1-thread-2 為客戶 8 辦理業(yè)務~33 pool-1-thread-1 為客戶 5 辦理業(yè)務~34 pool-1-thread-2 為客戶 9 辦理業(yè)務~35 pool-1-thread-3 為客戶 2 辦理業(yè)務~36 pool-1-thread-4 為客戶 3 辦理業(yè)務~37 pool-1-thread-5 為客戶 4 辦理業(yè)務~
可以看到,銀行 5 個窗口為 10 個客戶相繼服務。若前面服務時間長(打開注釋),線程池便沒有新的線程來執(zhí)行任務了。程序會陷入等待中。
代碼示例:創(chuàng)建單個線程的線程池為 10 個線程服務。代碼同上,只修改:
1 ExecutorService service = Executors.newSingleThreadExecutor();
代碼示例:創(chuàng)建可擴容線程的線程池為 10 個線程服務。代碼同上,只修改:
1 ExecutorService service = Executors.newCachedThreadPool();
為什么要用線程池管理線程呢?當然是為了線程復用。
背景:經常創(chuàng)建和銷毀、使用量特別大的資源,比如并發(fā)情況下的線程,對性能影響很大。
思路:提前創(chuàng)建好多個線程,放入線程池中,使用時直接獲取,使用完放回池中??梢员苊忸l繁的創(chuàng)建和銷毀,實現(xiàn)重復利用。類似生活中的公共交通工具。
好處:提高響應速度(減少了創(chuàng)建新線程的時間);降低資源消耗(重復利用線程池中線程,不需要每次都創(chuàng)建);便于線程管理。
前面介紹了三種(固定數(shù)、單一的、可變的)創(chuàng)建線程池的方式,實際工作用哪一個呢?都不使用!為什么呢?
《阿里巴巴Java開發(fā)手冊》明確規(guī)定:線程池不允許使用Executors創(chuàng)建,而是通過ThreadPoolExecutor的方式,規(guī)避資源耗盡風險。
查看源碼,可以看到,用Executors創(chuàng)建線程池的三種方式中,都 new 了一個 ThreadPoolExecutor。所以,實際生產一般通過 ThreadPoolExecutor 的 7 個參數(shù),自定義線程池。
源碼示例:7 個參數(shù)的構造器。
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueueworkQueue, 6 ThreadFactory threadFactory, 7 RejectedExecutionHandler handler) { 8 if (corePoolSize < 0 || 9 maximumPoolSize <= 0 ||10 maximumPoolSize < corePoolSize ||11 keepAliveTime < 0)12 throw new IllegalArgumentException();13 if (workQueue == null || threadFactory == null || handler == null)14 throw new NullPointerException();15 this.corePoolSize = corePoolSize;16 this.maximumPoolSize = maximumPoolSize;17 this.workQueue = workQueue;18 this.keepAliveTime = unit.toNanos(keepAliveTime);19 this.threadFactory = threadFactory;20 this.handler = handler;21 }
介紹線程池之前,先來看一個生活中的案例。銀行業(yè)務辦理流程,如圖:
某銀行一共有 5 個服務窗口,但平時一般只開放兩個,另外三個不開放。大廳中還有 10 個等待服務的座位。某天:
(1)客人1(用Thread1表示)來辦理業(yè)務,他就直接去開放的窗口1辦理(假設他需要服務的時間很長,一直在服務中,后面的也一樣)。
?。?)Thread2來辦理業(yè)務,由于窗口1在服務中,所以他去了開放的窗口2辦理。
?。?)Thread3來辦理業(yè)務,由于窗口1和窗口2都在服務中,所以他去了大廳的等待服務座位上排隊等待。
(4)Thread4~Thread12 同理Thread3。
?。?)Thread13來辦理業(yè)務,由于窗口1和窗口2都在服務中,且此時大廳的等待座位上也已滿。銀行經理便將關閉的窗口3打開來為Thread13服務。注意:這里并不是Thread13去大廳排隊,然后隊列中隊頭元素Thread3出隊接受服務。而是直接為Thread13服務。
?。?)Thread14,Thread15來辦理業(yè)務,會開放窗口4為Thread14服務,開放窗口5為Thread15服務。
?。?)Thread16來辦理業(yè)務,此時,已無可用窗口,且大廳的等待座位上也已滿。銀行便拒絕再為 Thread16 服務。
說明:若 Thread13、Thread14、Thread15 業(yè)務辦理完畢后,沒有新的客人來銀行辦理業(yè)務。那么窗口3、窗口4、窗口5會在一定時間后又關閉起來。
下面介紹 ThreadPoolExecutor 構造器中的 7 個核心參數(shù)。
corePoolSize:線程池的核心線程數(shù)。
maximumPoolSize:線程池的最大線程數(shù),要大于corePoolSize。
keepAliveTime:非核心線程閑置下來最多存活的時間。
unit:線程池中非核心線程保持存活的時間單位,與keepAliveTime一起使用。
workQueue:用來保存提交后,等待執(zhí)行任務的阻塞隊列。
threadFactory:創(chuàng)建線程的工廠類。
handler:拒絕策略。
在理解上一節(jié)"銀行服務"的過程后,就不難理解上面 7 個參數(shù)的含義。
corePoolSize = 2:窗口1 + 窗口2。
maximumPoolSize = 5:窗口1 + 窗口2 + 窗口3 + 窗口4 + 窗口5。
workQueue = 10:銀行大廳排隊隊列的大小。關于阻塞隊列 BlockingQueue
keepAliveTime + unit:"窗口3、窗口4、窗口5會在一定時間后又關閉起來"的時間。
handler:"銀行便拒絕再為 Thread16 服務"的拒絕方式。
在了解 ThreadPoolExecutor 7個核心參數(shù)的作用后,再看Executors創(chuàng)建的三種線程池的源碼,就不難理解他們的作用。也就明白為什么《阿里巴巴Java開發(fā)手冊》中禁止使用Executors創(chuàng)建,而要使用ThreadPoolExecutor自定義線程池。
源碼示例:
一池N線程:創(chuàng)建一個固定(可重用)線程數(shù)的線程池。
1 ExecutorService Executors.newFixedThreadPool(int nThreads); 2 3 public static ExecutorService newFixedThreadPool(int nThreads) { 4 return new ThreadPoolExecutor(nThreads, nThreads, 5 0L, TimeUnit.MILLISECONDS, 6 new LinkedBlockingQueue()); 7 } 8 9 public LinkedBlockingQueue() {10 this(Integer.MAX_VALUE);11 }
一池一線程:創(chuàng)建一個只有一個線程的線程池。
1 ExecutorService Executors.newSingleThreadExecutor(); 2 3 public static ExecutorService newSingleThreadExecutor() { 4 return new FinalizableDelegatedExecutorService 5 (new ThreadPoolExecutor(1, 1, 6 0L, TimeUnit.MILLISECONDS, 7 new LinkedBlockingQueue())); 8 } 9 10 public LinkedBlockingQueue() {11 this(Integer.MAX_VALUE);12 }
可擴容:創(chuàng)建一個可根據(jù)需要線程數(shù),創(chuàng)建新的線程的線程池。
1 ExecutorService Executors.newCachedThreadPool();2 3 public static ExecutorService newCachedThreadPool() {4 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,5 60L, TimeUnit.SECONDS,6 new SynchronousQueue());7 }
在理解"銀行服務"的過程后,其實也就說清楚了線程池的工作流程。只是一些細節(jié)沒有說,比如:
?。?)窗口3為Thread13服務完成后,Thread14才來,情況如何?
?。?)……
代碼示例:銀行 2+3 個窗口為陸續(xù)來的 16 個客戶服務。
1 public class ThreadPoolDemo { 2 public static void main(String[] args) throws Exception { 3 // 模擬上圖中 2 + 3 + 10(大廳排隊長度) 的線程池 4 ThreadPoolExecutor executor = new ThreadPoolExecutor( 5 2, 5 6 , 6, TimeUnit.SECONDS 7 , new ArrayBlockingQueue<>(10) 8 , Executors.defaultThreadFactory() 9 , new ThreadPoolExecutor.AbortPolicy());10 11 // 創(chuàng)建16個線程模擬16個客戶12 final int num = 16;13 for (int i = 1; i <= num; i++) {14 int finalI = i;15 16 // 這里主要為了給線程起名字.17 Thread thread = new Thread(() -> {18 System.out.println(Thread.currentThread().getName() + "=====" + finalI + " 號客人開始服務");19 20 // 假設 13 和 16 服務很快.21 if (finalI != 16 && finalI != 13) {22 try {23 // 正在為 finalI 號客戶服務中24 Thread.sleep(1000_000);25 } catch (InterruptedException e) {26 e.printStackTrace();27 }28 }29 30 System.out.println(Thread.currentThread().getName() + "=====" + finalI + " 號客人服務結束");31 }, "------" + i);32 33 System.out.println(thread.getName() + " 號客人來了");34 executor.execute(thread);35 36 // 讓主線程休息一下,保證上面開啟的線程先執(zhí)行.37 Thread.sleep(200);38 }39 40 // 保證上面的線程執(zhí)行完41 Thread.sleep(1_000);42 43 System.out.println("===核心線程數(shù)===" + executor.getCorePoolSize());44 System.out.println("===總任務數(shù)=====" + executor.getTaskCount());45 46 final BlockingQueuequeue = executor.getQueue();47 System.out.println("===正在排隊====" + queue.size());48 49 System.out.println("===最大線程數(shù)===" + executor.getMaximumPoolSize());50 System.out.println("==============" + executor.getPoolSize());51 System.out.println("===完成任務數(shù)===" + executor.getCompletedTaskCount());52 System.out.println("==============" + executor.getLargestPoolSize());53 54 executor.shutdown();55 }56 }57 58 // 結果(程序未停止)59 ------1 號客人來了60 pool-1-thread-1=====1 號客人開始服務61 ------2 號客人來了62 pool-1-thread-2=====2 號客人開始服務63 ------3 號客人來了64 ------4 號客人來了65 ------5 號客人來了66 ------6 號客人來了67 ------7 號客人來了68 ------8 號客人來了69 ------9 號客人來了70 ------10 號客人來了71 ------11 號客人來了72 ------12 號客人來了 // 到這里都不難理解73 ------13 號客人來了74 pool-1-thread-3=====13 號客人開始服務75 pool-1-thread-3=====13 號客人服務結束 // 開放窗口3為客戶13立刻服務完畢.76 pool-1-thread-3=====3 號客人開始服務 // 阻塞隊列,隊頭 客戶3 出隊接受窗口3的服務77 ------14 號客人來了 // 加入阻塞隊列隊尾78 ------15 號客人來了79 pool-1-thread-4=====15 號客人開始服務80 ------16 號客人來了81 pool-1-thread-5=====16 號客人開始服務82 pool-1-thread-5=====16 號客人服務結束83 pool-1-thread-5=====4 號客人開始服務84 ===核心線程數(shù)===285 ===總任務數(shù)=====1686 ===正在排隊====987 ===最大線程數(shù)===588 ==============589 ===完成任務數(shù)===290 ==============5
其他的情況,可通過修改代碼示例中相關參數(shù)進行測試,自然就理解。
線程在Java中屬于稀缺資源,線程池不是越大越好,也不是越小越好。那么,線程池的參數(shù)要如何設置才合理呢?
任務分為CPU密集型、IO密集型、混合型。
CPU密集型:大部分都在用CPU跟內存,加密,邏輯操作,業(yè)務處理等。
IO密集型:數(shù)據(jù)庫鏈接,網絡通訊傳輸?shù)取?br> CPU密集型:一般推薦線程池不要過大,一般是CPU數(shù) + 1,+1是因為可能存在頁缺失(就是可能存在有些數(shù)據(jù)在硬盤中需要多來一個線程將數(shù)據(jù)讀入內存)。如果線程池數(shù)太大,可能會頻繁的進行線程上下文切換跟任務調度。
獲得當前CPU核心數(shù)代碼如下:
1 Runtime.getRuntime().availableProcessors();
IO密集型:線程數(shù)適當大一點,機器的CPU核心數(shù)*2。
混合型:可以考慮根絕情況將它拆分成CPU密集型和IO密集型任務,如果執(zhí)行時間相差不大,拆分可以提升吞吐量,反之沒有必要。
當線程池的任務緩存隊列已滿并且線程池中的線程數(shù)目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕策略,就會調用這個接口里的這個方法。也就是"銀行便拒絕再為 Thread16 服務"的拒絕方式。
1 public interface RejectedExecutionHandler {2 void rejectedExecution(Runnable r, ThreadPoolExecutor executor);3 }
ThreadPoolExecutor 提供了四種拒絕策略,分別是
AbortPolicy:直接拋出異常,這也是默認策略。
CallerRunsPolicy:返回給調用者處理。用調用者所在線程來運行任務。
DiscardOldestPolicy:拋棄隊列中等待最久的任務,然后把當前任務加入隊列中嘗試再次提交當前任務。
DiscardPolicy:不處理,直接丟棄當前任務。
代碼示例:4種拒絕策略,代碼同上,只需修改:
1 // 1.去掉這個if條件 2 if (finalI != 16 && finalI != 13) {} 3 4 5 6 // 2.1 線程池創(chuàng)建時拒絕策略為: 7 new ThreadPoolExecutor.AbortPolicy() 8 // 2.1 結果,直接拋出了異常.(只截取了最后一點) 9 ------16 號客人來了10 Exception in thread "main" java.util.concurrent.RejectedExecutionException11 12 13 14 // 2.2 線程池創(chuàng)建時拒絕策略為:15 new ThreadPoolExecutor.CallerRunsPolicy()16 // 2.2 結果,返回給調用者main處理了.(只截取了最后一點)17 ------16 號客人來了18 main=====16 號客人開始服務19 20 21 22 // 2.3 線程池創(chuàng)建時拒絕策略為:23 new ThreadPoolExecutor.DiscardOldestPolicy()24 // 2.3 結果,見下圖25 26 27 28 // 2.4 線程池創(chuàng)建時拒絕策略為:29 new ThreadPoolExecutor.DiscardPolicy()30 // 2.4 結果(丟棄了任務,沒有任何處理)
?
通過debug斷點的方式,可以查看到:DiscardOldestPolicy策略中,此時阻塞隊列中是客戶4~客戶16。也就是客戶3 出隊,被拋棄,客戶16入隊等待。
如果不使用線程池提供的4種拒絕策略,也可以自己實現(xiàn)拒絕策略的接口,實現(xiàn)對這些超出數(shù)量的任務的處理。比如:為被拒絕的任務開啟一個新的線程執(zhí)行,如下。
1 // 線程池創(chuàng)建時拒絕策略為: 2 new MyRejectedExecutionHandler() 3 // 結果 4 ------16 號客人來了 5 ---開啟新線程處理任務---=====16 號客人開始服務 6 7 8 // 自定義的策略拒絕 9 class MyRejectedExecutionHandler implements RejectedExecutionHandler {10 11 @Override12 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {13 new Thread(r, "---開啟新線程處理任務---").start();14 }15 }
參考文檔:https://www.matools.com/api/java8
《阿里巴巴Java開發(fā)手冊》百度網盤:https://pan.baidu.com/s/1FZ9DNr0sF1mAc6Nq_6QZmg? ? 密碼: 4sw1
作者:Craftsman-L
本博客所有文章僅用于學習、研究和交流目的,版權歸作者所有,歡迎非商業(yè)性質轉載。
如果本篇博客給您帶來幫助,請作者喝杯咖啡吧!點擊下面打賞,您的支持是我最大的動力!
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://systransis.cn/yun/123754.html
摘要:這里呢,我直接給出高并發(fā)場景通常都會考慮的一些解決思路和手段結尾如何有效的準備面試中并發(fā)類問題,我已經給出我的理解。 showImg(https://segmentfault.com/img/bV7Viy?w=550&h=405); 主題 又到面試季了,從群里,看到許多同學分享了自己的面試題目,我也抽空在網上搜索了一些許多公司使用的面試題,目前校招和社招的面試題基本都集中在幾個大方向上...
摘要:在中一般來說通過來創(chuàng)建所需要的線程池,如高并發(fā)原理初探后端掘金閱前熱身為了更加形象的說明同步異步阻塞非阻塞,我們以小明去買奶茶為例。 AbstractQueuedSynchronizer 超詳細原理解析 - 后端 - 掘金今天我們來研究學習一下AbstractQueuedSynchronizer類的相關原理,java.util.concurrent包中很多類都依賴于這個類所提供的隊列式...
摘要:在中一般來說通過來創(chuàng)建所需要的線程池,如高并發(fā)原理初探后端掘金閱前熱身為了更加形象的說明同步異步阻塞非阻塞,我們以小明去買奶茶為例。 AbstractQueuedSynchronizer 超詳細原理解析 - 后端 - 掘金今天我們來研究學習一下AbstractQueuedSynchronizer類的相關原理,java.util.concurrent包中很多類都依賴于這個類所提供的隊列式...
摘要:當活動線程核心線程非核心線程達到這個數(shù)值后,后續(xù)任務將會根據(jù)來進行拒絕策略處理。線程池工作原則當線程池中線程數(shù)量小于則創(chuàng)建線程,并處理請求。當線程池中的數(shù)量等于最大線程數(shù)時默默丟棄不能執(zhí)行的新加任務,不報任何異常。 spring-cache使用記錄 spring-cache的使用記錄,坑點記錄以及采用的解決方案 深入分析 java 線程池的實現(xiàn)原理 在這篇文章中,作者有條不紊的將 ja...
摘要:你僅僅需要一個大小為數(shù)據(jù)庫連接池,然后讓剩下的業(yè)務線程都在隊列里等待就可以了。你應該經常會看到一些用戶量不是很大的應用中,為應付大約十來個的并發(fā),卻將數(shù)據(jù)庫連接池設置成,的情況。請不要過度配置您的數(shù)據(jù)庫連接池的大小。 文章翻譯整理自: https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing歡迎關注個人微信公眾...
閱讀 758·2023-04-26 01:30
閱讀 3309·2021-11-24 10:32
閱讀 2196·2021-11-22 14:56
閱讀 1993·2021-11-18 10:07
閱讀 563·2019-08-29 17:14
閱讀 636·2019-08-26 12:21
閱讀 3115·2019-08-26 10:55
閱讀 2950·2019-08-23 18:09