摘要:源碼分析創(chuàng)建可緩沖的線程池。源碼分析使用創(chuàng)建線程池源碼分析的構(gòu)造函數(shù)構(gòu)造函數(shù)參數(shù)核心線程數(shù)大小,當(dāng)線程數(shù),會(huì)創(chuàng)建線程執(zhí)行最大線程數(shù),當(dāng)線程數(shù)的時(shí)候,會(huì)把放入中保持存活時(shí)間,當(dāng)線程數(shù)大于的空閑線程能保持的最大時(shí)間。
之前創(chuàng)建線程的時(shí)候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor 這四個(gè)方法。
當(dāng)然 Executors 也是用不同的參數(shù)去 new ThreadPoolExecutor 實(shí)現(xiàn)的,本文先分析前四種線程創(chuàng)建方式,后在分析 new ThreadPoolExecutor 創(chuàng)建方式
由于使用了LinkedBlockingQueue所以maximumPoolSize沒用,當(dāng)corePoolSize滿了之后就加入到LinkedBlockingQueue隊(duì)列中。
每當(dāng)某個(gè)線程執(zhí)行完成之后就從LinkedBlockingQueue隊(duì)列中取一個(gè)。
所以這個(gè)是創(chuàng)建固定大小的線程池。
源碼分析
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor( nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue2.newSingleThreadPool()()); }
創(chuàng)建線程數(shù)為1的線程池,由于使用了LinkedBlockingQueue所以maximumPoolSize 沒用,corePoolSize為1表示線程數(shù)大小為1,滿了就放入隊(duì)列中,執(zhí)行完了就從隊(duì)列取一個(gè)。
源碼分析
public static ExecutorService newSingleThreadExecutor() { return new Executors.FinalizableDelegatedExecutorService ( new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue3.newCachedThreadPool()()) ); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
創(chuàng)建可緩沖的線程池。沒有大小限制。由于corePoolSize為0所以任務(wù)會(huì)放入SynchronousQueue隊(duì)列中,SynchronousQueue只能存放大小為1,所以會(huì)立刻新起線程,由于maxumumPoolSize為Integer.MAX_VALUE所以可以認(rèn)為大小為2147483647。受內(nèi)存大小限制。
源碼分析
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue使用 ThreadPoolExecutor 創(chuàng)建線程池()); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
源碼分析 ,ThreadPoolExecutor 的構(gòu)造函數(shù)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue構(gòu)造函數(shù)參數(shù)workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
1、corePoolSize 核心線程數(shù)大小,當(dāng)線程數(shù) < corePoolSize ,會(huì)創(chuàng)建線程執(zhí)行 runnable
2、maximumPoolSize 最大線程數(shù), 當(dāng)線程數(shù) >= corePoolSize的時(shí)候,會(huì)把 runnable 放入 workQueue中
3、keepAliveTime 保持存活時(shí)間,當(dāng)線程數(shù)大于corePoolSize的空閑線程能保持的最大時(shí)間。
4、unit 時(shí)間單位
5、workQueue 保存任務(wù)的阻塞隊(duì)列
6、threadFactory 創(chuàng)建線程的工廠
7、handler 拒絕策略
任務(wù)執(zhí)行順序1、當(dāng)線程數(shù)小于 corePoolSize時(shí),創(chuàng)建線程執(zhí)行任務(wù)。
2、當(dāng)線程數(shù)大于等于 corePoolSize并且 workQueue 沒有滿時(shí),放入workQueue中
3、線程數(shù)大于等于 corePoolSize并且當(dāng) workQueue 滿時(shí),新任務(wù)新建線程運(yùn)行,線程總數(shù)要小于 maximumPoolSize
4、當(dāng)線程總數(shù)等于 maximumPoolSize 并且 workQueue 滿了的時(shí)候執(zhí)行 handler 的 rejectedExecution。也就是拒絕策略。
JDK7提供了7個(gè)阻塞隊(duì)列。(也屬于并發(fā)容器)
ArrayBlockingQueue :一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列。
LinkedBlockingQueue :一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列。
PriorityBlockingQueue :一個(gè)支持優(yōu)先級(jí)排序的無界阻塞隊(duì)列。
DelayQueue:一個(gè)使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的無界阻塞隊(duì)列。
SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。
LinkedTransferQueue:一個(gè)由鏈表結(jié)構(gòu)組成的無界阻塞隊(duì)列。
LinkedBlockingDeque:一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。
什么是阻塞隊(duì)列?阻塞隊(duì)列是一個(gè)在隊(duì)列基礎(chǔ)上又支持了兩個(gè)附加操作的隊(duì)列。
2個(gè)附加操作:
支持阻塞的插入方法:隊(duì)列滿時(shí),隊(duì)列會(huì)阻塞插入元素的線程,直到隊(duì)列不滿。
支持阻塞的移除方法:隊(duì)列空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强铡?/p>
阻塞隊(duì)列的應(yīng)用場景
阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場景,生產(chǎn)者是向隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里取元素的線程。簡而言之,阻塞隊(duì)列是生產(chǎn)者用來存放元素、消費(fèi)者獲取元素的容器。
幾個(gè)方法在阻塞隊(duì)列不可用的時(shí)候,上述2個(gè)附加操作提供了四種處理方法
方法處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時(shí)退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
檢查方法 | element() | peek() | 不可用 | 不可用 |
JDK 7 提供了7個(gè)阻塞隊(duì)列,如下
1、ArrayBlockingQueue 數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列。
此隊(duì)列按照先進(jìn)先出(FIFO)的原則對元素進(jìn)行排序,但是默認(rèn)情況下不保證線程公平的訪問隊(duì)列,即如果隊(duì)列滿了,那么被阻塞在外面的線程對隊(duì)列訪問的順序是不能保證線程公平(即先阻塞,先插入)的。
2、LinkedBlockingQueue一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列
此隊(duì)列按照先出先進(jìn)的原則對元素進(jìn)行排序
3、PriorityBlockingQueue支持優(yōu)先級(jí)的無界阻塞隊(duì)列
4、DelayQueue支持延時(shí)獲取元素的無界阻塞隊(duì)列,即可以指定多久才能從隊(duì)列中獲取當(dāng)前元素
5、SynchronousQueue不存儲(chǔ)元素的阻塞隊(duì)列,每一個(gè)put必須等待一個(gè)take操作,否則不能繼續(xù)添加元素。并且他支持公平訪問隊(duì)列。
6、LinkedTransferQueue由鏈表結(jié)構(gòu)組成的無界阻塞TransferQueue隊(duì)列。相對于其他阻塞隊(duì)列,多了tryTransfer和transfer方法
transfer方法
如果當(dāng)前有消費(fèi)者正在等待接收元素(take或者待時(shí)間限制的poll方法),transfer可以把生產(chǎn)者傳入的元素立刻傳給消費(fèi)者。如果沒有消費(fèi)者等待接收元素,則將元素放在隊(duì)列的tail節(jié)點(diǎn),并等到該元素被消費(fèi)者消費(fèi)了才返回。
tryTransfer方法
用來試探生產(chǎn)者傳入的元素能否直接傳給消費(fèi)者。,如果沒有消費(fèi)者在等待,則返回false。和上述方法的區(qū)別是該方法無論消費(fèi)者是否接收,方法立即返回。而transfer方法是必須等到消費(fèi)者消費(fèi)了才返回。
7、LinkedBlockingDeque鏈表結(jié)構(gòu)的雙向阻塞隊(duì)列,優(yōu)勢在于多線程入隊(duì)時(shí),減少一半的競爭。
四個(gè)拒絕策略ThreadPoolExecutor默認(rèn)有四個(gè)拒絕策略:
1、ThreadPoolExecutor.AbortPolicy() 直接拋出異常RejectedExecutionException
2、ThreadPoolExecutor.CallerRunsPolicy() 直接調(diào)用run方法并且阻塞執(zhí)行
3、ThreadPoolExecutor.DiscardPolicy() 直接丟棄后來的任務(wù)
4、ThreadPoolExecutor.DiscardOldestPolicy() 丟棄在隊(duì)列中隊(duì)首的任務(wù)
當(dāng)然可以自己繼承RejectedExecutionHandler來寫拒絕策略.
TestThreadPoolExecutor 示例 TestThreadPoolExecutor.javapackage io.ymq.thread.TestThreadPoolExecutor; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 描述: * * @author yanpenglei * @create 2017-10-12 15:39 **/ public class TestThreadPoolExecutor { public static void main(String[] args) { long currentTimeMillis = System.currentTimeMillis(); // 構(gòu)造一個(gè)線程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 6, 3, TimeUnit.SECONDS, new ArrayBlockingQueueThreadPoolTask.java(3) ); for (int i = 1; i <= 10; i++) { try { String task = "task=" + i; System.out.println("創(chuàng)建任務(wù)并提交到線程池中:" + task); threadPool.execute(new ThreadPoolTask(task)); Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); } } try { //等待所有線程執(zhí)行完畢當(dāng)前任務(wù)。 threadPool.shutdown(); boolean loop = true; do { //等待所有線程執(zhí)行完畢當(dāng)前任務(wù)結(jié)束 loop = !threadPool.awaitTermination(2, TimeUnit.SECONDS);//等待2秒 } while (loop); if (loop != true) { System.out.println("所有線程執(zhí)行完畢"); } } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("耗時(shí):" + (System.currentTimeMillis() - currentTimeMillis)); } } }
package io.ymq.thread.TestThreadPoolExecutor; import java.io.Serializable; /** * 描述: * * @author yanpenglei * @create 2017-10-12 15:40 **/ public class ThreadPoolTask implements Runnable, Serializable { private Object attachData; ThreadPoolTask(Object tasks) { this.attachData = tasks; } public void run() { try { System.out.println("開始執(zhí)行任務(wù):" + attachData + "任務(wù),使用的線程池,線程名稱:" + Thread.currentThread().getName()); System.out.println(); } catch (Exception e) { e.printStackTrace(); } attachData = null; } }
遇到j(luò)ava.util.concurrent.RejectedExecutionException
第一
你的線程池 ThreadPoolExecutor 顯示的 shutdown() 之后,再向線程池提交任務(wù)的時(shí)候。 如果你配置的拒絕策略是 AbortPolicy 的話,這個(gè)異常就會(huì)拋出來。
第二
當(dāng)你設(shè)置的任務(wù)緩存隊(duì)列過小的時(shí)候,或者說, 你的線程池里面所有的線程都在干活(線程數(shù)== maxPoolSize),并且你的任務(wù)緩存隊(duì)列也已經(jīng)充滿了等待的隊(duì)列, 這個(gè)時(shí)候,你再向它提交任務(wù),則會(huì)拋出這個(gè)異常。
響應(yīng)
可以看到線程 pool-1-thread-1 到5 循環(huán)使用
創(chuàng)建任務(wù)并提交到線程池中:task=1 開始執(zhí)行任務(wù):task=1任務(wù),使用的線程池,線程名稱:pool-1-thread-1 創(chuàng)建任務(wù)并提交到線程池中:task=2 開始執(zhí)行任務(wù):task=2任務(wù),使用的線程池,線程名稱:pool-1-thread-2 創(chuàng)建任務(wù)并提交到線程池中:task=3 開始執(zhí)行任務(wù):task=3任務(wù),使用的線程池,線程名稱:pool-1-thread-3 創(chuàng)建任務(wù)并提交到線程池中:task=4 開始執(zhí)行任務(wù):task=4任務(wù),使用的線程池,線程名稱:pool-1-thread-4 創(chuàng)建任務(wù)并提交到線程池中:task=5 開始執(zhí)行任務(wù):task=5任務(wù),使用的線程池,線程名稱:pool-1-thread-5 創(chuàng)建任務(wù)并提交到線程池中:task=6 開始執(zhí)行任務(wù):task=6任務(wù),使用的線程池,線程名稱:pool-1-thread-1 創(chuàng)建任務(wù)并提交到線程池中:task=7 開始執(zhí)行任務(wù):task=7任務(wù),使用的線程池,線程名稱:pool-1-thread-2 創(chuàng)建任務(wù)并提交到線程池中:task=8 開始執(zhí)行任務(wù):task=8任務(wù),使用的線程池,線程名稱:pool-1-thread-3 創(chuàng)建任務(wù)并提交到線程池中:task=9 開始執(zhí)行任務(wù):task=9任務(wù),使用的線程池,線程名稱:pool-1-thread-4 創(chuàng)建任務(wù)并提交到線程池中:task=10 開始執(zhí)行任務(wù):task=10任務(wù),使用的線程池,線程名稱:pool-1-thread-5 所有線程執(zhí)行完畢 耗時(shí):1015測試代碼
github https://github.com/souyunku/ymq-example/tree/master/ymq-thread
Contact作者:鵬磊
出處:http://www.ymq.io
Email:[email protected]
版權(quán)歸作者所有,轉(zhuǎn)載請注明出處
Wechat:關(guān)注公眾號(hào),搜云庫,專注于開發(fā)技術(shù)的研究與知識(shí)分享
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/67766.html
摘要:線程池的作用線程池能有效的處理多個(gè)線程的并發(fā)問題,避免大量的線程因?yàn)榛ハ鄰?qiáng)占系統(tǒng)資源導(dǎo)致阻塞現(xiàn)象,能夠有效的降低頻繁創(chuàng)建和銷毀線程對性能所帶來的開銷。固定的線程數(shù)由系統(tǒng)資源設(shè)置。線程池的排隊(duì)策略與有關(guān)。線程池的狀態(tài)值分別是。 線程池的作用 線程池能有效的處理多個(gè)線程的并發(fā)問題,避免大量的線程因?yàn)榛ハ鄰?qiáng)占系統(tǒng)資源導(dǎo)致阻塞現(xiàn)象,能夠有效的降低頻繁創(chuàng)建和銷毀線程對性能所帶來的開銷。 線程池的...
摘要:創(chuàng)建方法最大線程數(shù)即源碼單線程化的線程池有且僅有一個(gè)工作線程執(zhí)行任務(wù)所有任務(wù)按照指定順序執(zhí)行,即遵循隊(duì)列的入隊(duì)出隊(duì)規(guī)則創(chuàng)建方法源碼還有一個(gè)結(jié)合了和,就不介紹了,基本不用。 *本篇文章已授權(quán)微信公眾號(hào) guolin_blog (郭霖)獨(dú)家發(fā)布 為什么用線程池 創(chuàng)建/銷毀線程伴隨著系統(tǒng)開銷,過于頻繁的創(chuàng)建/銷毀線程,會(huì)很大程度上影響處理效率 >例如: > >記創(chuàng)建線程消耗時(shí)間T1,執(zhí)行...
摘要:本文主要內(nèi)容為簡單總結(jié)中線程池的相關(guān)信息。方法簇方法簇用于創(chuàng)建固定線程數(shù)的線程池。三種常見線程池的對比上文總結(jié)了工具類創(chuàng)建常見線程池的方法,現(xiàn)對三種線程池區(qū)別進(jìn)行比較。 概述 線程可認(rèn)為是操作系統(tǒng)可調(diào)度的最小的程序執(zhí)行序列,一般作為進(jìn)程的組成部分,同一進(jìn)程中多個(gè)線程可共享該進(jìn)程的資源(如內(nèi)存等)。在單核處理器架構(gòu)下,操作系統(tǒng)一般使用分時(shí)的方式實(shí)現(xiàn)多線程;在多核處理器架構(gòu)下,多個(gè)線程能夠...
摘要:線程池常見實(shí)現(xiàn)線程池一般包含三個(gè)主要部分調(diào)度器決定由哪個(gè)線程來執(zhí)行任務(wù)執(zhí)行任務(wù)所能夠的最大耗時(shí)等線程隊(duì)列存放并管理著一系列線程這些線程都處于阻塞狀態(tài)或休眠狀態(tài)任務(wù)隊(duì)列存放著用戶提交的需要被執(zhí)行的任務(wù)一般任務(wù)的執(zhí)行的即先提交的任務(wù)先被執(zhí)行調(diào)度 線程池常見實(shí)現(xiàn) 線程池一般包含三個(gè)主要部分: 調(diào)度器: 決定由哪個(gè)線程來執(zhí)行任務(wù), 執(zhí)行任務(wù)所能夠的最大耗時(shí)等 線程隊(duì)列: 存放并管理著一系列線...
摘要:提交任務(wù)當(dāng)創(chuàng)建了一個(gè)線程池之后我們就可以將任務(wù)提交到線程池中執(zhí)行了。提交任務(wù)到線程池中相當(dāng)簡單,我們只要把原來傳入類構(gòu)造器的對象傳入線程池的方法或者方法就可以了。 我們一般不會(huì)選擇直接使用線程類Thread進(jìn)行多線程編程,而是使用更方便的線程池來進(jìn)行任務(wù)的調(diào)度和管理。線程池就像共享單車,我們只要在我們有需要的時(shí)候去獲取就可以了。甚至可以說線程池更棒,我們只需要把任務(wù)提交給它,它就會(huì)在合...
閱讀 2957·2023-04-26 01:32
閱讀 1552·2021-09-13 10:37
閱讀 2288·2019-08-30 15:56
閱讀 1681·2019-08-30 14:00
閱讀 3057·2019-08-30 12:44
閱讀 1972·2019-08-26 12:20
閱讀 1070·2019-08-23 16:29
閱讀 3236·2019-08-23 14:44