摘要:創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。創(chuàng)建一個定長線程池,支持定時及周期性任務執(zhí)行。
轉載請注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/...
線程池Wiki 上是這樣解釋的:Thread Pool
作用:利用線程池可以大大減少在創(chuàng)建和銷毀線程上所花的時間以及系統(tǒng)資源的開銷!
下面主要講下線程池中最重要的一個類 ThreadPoolExecutor 。
ThreadPoolExecutor 構造器:
有四個構造器的,挑了參數(shù)最長的一個進行講解。
七個參數(shù):
corePoolSize:核心池的大小,在創(chuàng)建了線程池后,默認情況下,線程池中并沒有任何線程,而是等待有任務到來才創(chuàng)建線程去執(zhí)行任務,當有任務來之后,就會創(chuàng)建一個線程去執(zhí)行任務,當線程池中的線程數(shù)目達到corePoolSize后,就會把到達的任務放到緩存隊列當中;
maximumPoolSize:線程池最大線程數(shù);
keepAliveTime:表示線程沒有任務執(zhí)行時最多保持多久時間會終止;
unit:參數(shù)keepAliveTime的時間單位(DAYS、HOURS、MINUTES、SECONDS 等);
workQueue:阻塞隊列,用來存儲等待執(zhí)行的任務;
ArrayBlockingQueue (有界隊列)
LinkedBlockingQueue (無界隊列)
SynchronousQueue
threadFactory:線程工廠,主要用來創(chuàng)建線程
handler:拒絕處理任務的策略
AbortPolicy:丟棄任務并拋出 RejectedExecutionException 異常。(默認這種)
DiscardPolicy:也是丟棄任務,但是不拋出異常
DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執(zhí)行任務(重復此過程)
CallerRunsPolicy:由調用線程處理該任務
重要方法:
execute():通過這個方法可以向線程池提交一個任務,交由線程池去執(zhí)行;
shutdown():關閉線程池;
execute() 方法:
注:JDK 1.7 和 1.8 這個方法有點區(qū)別,下面代碼是 1.8 中的。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //1、如果當前的線程數(shù)小于核心線程池的大小,根據(jù)現(xiàn)有的線程作為第一個 Worker 運行的線程,新建一個 Worker,addWorker 自動的檢查當前線程池的狀態(tài)和 Worker 的數(shù)量,防止線程池在不能添加線程的狀態(tài)下添加線程 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //2、如果線程入隊成功,然后還是要進行 double-check 的,因為線程在入隊之后狀態(tài)是可能會發(fā)生變化的 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // recheck 防止線程池狀態(tài)的突變,如果突變,那么將 reject 線程,防止 workQueue 中增加新線程 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0)//上下兩個操作都有 addWorker 的操作,但是如果在workQueue.offer 的時候 Worker 變?yōu)?0,那么將沒有 Worker 執(zhí)行新的 task,所以增加一個 Worker. addWorker(null, false); } //3、如果 task 不能入隊(隊列滿了),這時候嘗試增加一個新線程,如果增加失敗那么當前的線程池狀態(tài)變化了或者線程池已經(jīng)滿了然后拒絕task else if (!addWorker(command, false)) reject(command); }
其中調用了 addWorker() 方法:
private boolean addWorker(Runnable firstTask, boolean core) {// firstTask: 新增一個線程并執(zhí)行這個任務,可空,增加的線程從隊列獲取任務;core:是否使用 corePoolSize 作為上限,否則使用 maxmunPoolSize retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /** * rs!=Shutdown || fistTask!=null || workQueue.isEmpty * 如果當前的線程池的狀態(tài) > SHUTDOWN 那么拒絕 Worker 的 add 如果 =SHUTDOWN * 那么此時不能新加入不為 null 的 Task,如果在 workQueue 為 empty 的時候不能加入任何類型的 Worker, * 如果不為 empty 可以加入 task 為 null 的 Worker, 增加消費的 Worker */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //如果當前的數(shù)量超過了 CAPACITY,或者超過了 corePoolSize 和 maximumPoolSize(試 core 而定) if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //CAS 嘗試增加線程數(shù),如果失敗,證明有競爭,那么重新到 retry。 if (compareAndIncrementWorkerCount(c))// AtomicInteger 的 CAS 操作; break retry; c = ctl.get(); // Re-read ctl //判斷當前線程池的運行狀態(tài),狀態(tài)發(fā)生改變,重試 retry; if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask);// Worker 為內部類,封裝了線程和任務,通過 ThreadFactory 創(chuàng)建線程,可能失敗拋異常或者返回 null final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable // SHUTDOWN 以后的狀態(tài)和 SHUTDOWN 狀態(tài)下 firstTask 為 null,不可新增線程 throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s;//記錄最大線程數(shù) workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w);//失敗回退,從 wokers 移除 w, 線程數(shù)減一,嘗試結束線程池(調用tryTerminate 方法) } return workerStarted; }
示意圖:
執(zhí)行流程:
1、當有任務進入時,線程池創(chuàng)建線程去執(zhí)行任務,直到核心線程數(shù)滿為止
2、核心線程數(shù)量滿了之后,任務就會進入一個緩沖的任務隊列中
當任務隊列為無界隊列時,任務就會一直放入緩沖的任務隊列中,不會和最大線程數(shù)量進行比較
當任務隊列為有界隊列時,任務先放入緩沖的任務隊列中,當任務隊列滿了之后,才會將任務放入線程池,此時會與線程池中最大的線程數(shù)量進行比較,如果超出了,則默認會拋出異常。然后線程池才會執(zhí)行任務,當任務執(zhí)行完,又會將緩沖隊列中的任務放入線程池中,然后重復此操作。
shutdown() 方法:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //判斷是否可以操作目標線程 checkShutdownAccess(); //設置線程池狀態(tài)為 SHUTDOWN, 此處之后,線程池中不會增加新 Task advanceRunState(SHUTDOWN); //中斷所有的空閑線程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } //轉到 Terminate tryTerminate(); }
參考資料:深入理解java線程池—ThreadPoolExecutor
JDK 自帶四種線程池分析與比較 1、newFixedThreadPool創(chuàng)建一個定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會在隊列中等待。
2、newSingleThreadExecutor創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務,保證所有任務按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行。
3、newCachedThreadPool創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
4、newScheduledThreadPool創(chuàng)建一個定長線程池,支持定時及周期性任務執(zhí)行。
四種線程池其實內部方法都是調用的 ThreadPoolExecutor 類,只不過利用了其不同的構造器方法而已(傳入自己需要傳入的參數(shù)),那么利用這個特性,我們自己也是可以實現(xiàn)自己定義的線程池的。
自定義線程池1、創(chuàng)建任務類
package com.zhisheng.thread.threadpool.demo; /** * Created by 10412 on 2017/7/24. * 任務 */ public class MyTask implements Runnable { private int taskId; //任務 id private String taskName; //任務名字 public int getTaskId() { return taskId; } public void setTaskId(int taskId) { this.taskId = taskId; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public MyTask(int taskId, String taskName) { this.taskId = taskId; this.taskName = taskName; } @Override public void run() { System.out.println("當前正在執(zhí)行 ****** 線程Id-->" + taskId + ",任務名稱-->" + taskName); try { Thread.currentThread().sleep(5 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("線程Id-->" + taskId + ",任務名稱-->" + taskName + " ----------- 執(zhí)行完畢!"); } }
2、自定義拒絕策略,實現(xiàn) RejectedExecutionHandler 接口,重寫 rejectedExecution 方法
package com.zhisheng.thread.threadpool.demo; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; /** * Created by 10412 on 2017/7/24. * 自定義拒絕策略,實現(xiàn) RejectedExecutionHandler 接口 */ public class RejectedThreadPoolHandler implements RejectedExecutionHandler { public RejectedThreadPoolHandler() { } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("WARNING 自定義拒絕策略: Task " + r.toString() + " rejected from " + executor.toString()); } }
3、創(chuàng)建線程池
package com.zhisheng.thread.threadpool.demo; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * Created by 10412 on 2017/7/24. */ public class ThreadPool { public static void main(String[] args) { //核心線程數(shù)量為 2,最大線程數(shù)量 4,空閑線程存活的時間 60s,有界隊列長度為 3, //ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3)); //核心線程數(shù)量為 2,最大線程數(shù)量 4,空閑線程存活的時間 60s, 無界隊列, //ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); //核心線程數(shù)量為 2,最大線程數(shù)量 4,空閑線程存活的時間 60s,有界隊列長度為 3, 使用自定義拒絕策略 ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue(3), new RejectedThreadPoolHandler()); for (int i = 1; i <= 10; i++) { //創(chuàng)建 10 個任務 MyTask task = new MyTask(i, "任務" + i); //運行 pool.execute(task); System.out.println("活躍的線程數(shù):"+pool.getActiveCount() + ",核心線程數(shù):" + pool.getCorePoolSize() + ",線程池大?。? + pool.getPoolSize() + ",隊列的大小" + pool.getQueue().size()); } //關閉線程池 pool.shutdown(); } }
這里運行結果就不截圖了,我在本地測試了代碼是沒問題的,感興趣的建議還是自己跑一下,然后分析下結果是不是和前面分析的一樣,如有問題,請在我博客下面評論!
總結本文一開始講了線程池的介紹和好處,然后分析了線程池中最核心的 ThreadPoolExecutor 類中構造器的七個參數(shù)的作用、類中兩個重要的方法,然后在對比研究了下 JDK 中自帶的四種線程池的用法和內部代碼細節(jié),最后寫了一個自定義的線程池。
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://systransis.cn/yun/67500.html
摘要:當活動線程核心線程非核心線程達到這個數(shù)值后,后續(xù)任務將會根據(jù)來進行拒絕策略處理。線程池工作原則當線程池中線程數(shù)量小于則創(chuàng)建線程,并處理請求。當線程池中的數(shù)量等于最大線程數(shù)時默默丟棄不能執(zhí)行的新加任務,不報任何異常。 spring-cache使用記錄 spring-cache的使用記錄,坑點記錄以及采用的解決方案 深入分析 java 線程池的實現(xiàn)原理 在這篇文章中,作者有條不紊的將 ja...
摘要:線程的啟動與銷毀都與本地線程同步。操作系統(tǒng)會調度所有線程并將它們分配給可用的??蚣艿某蓡T主要成員線程池接口接口接口以及工具類。創(chuàng)建單個線程的接口與其實現(xiàn)類用于表示異步計算的結果。參考書籍并發(fā)編程的藝術方騰飛魏鵬程曉明著 在java中,直接使用線程來異步的執(zhí)行任務,線程的每次創(chuàng)建與銷毀需要一定的計算機資源開銷。每個任務創(chuàng)建一個線程的話,當任務數(shù)量多的時候,則對應的創(chuàng)建銷毀開銷會消耗大量...
摘要:開頭正式開啟我入職的里程,現(xiàn)在已是工作了一個星期了,這個星期算是我入職的過渡期,算是知道了學校生活和工作的差距了,總之,盡快習慣這種生活吧。當時是看的廖雪峰的博客自己也用做爬蟲寫過幾篇博客,不過有些是在前人的基礎上寫的。 showImg(https://segmentfault.com/img/remote/1460000010867984); 開頭 2017.08.21 正式開啟我...
閱讀 1353·2021-11-11 16:54
閱讀 2396·2021-09-22 10:51
閱讀 2662·2019-08-30 15:44
閱讀 3213·2019-08-29 17:05
閱讀 1458·2019-08-29 17:01
閱讀 2915·2019-08-29 12:28
閱讀 2480·2019-08-26 13:50
閱讀 1737·2019-08-23 16:47