摘要:當(dāng)面試官問線程池時(shí),你應(yīng)該知道些什么一執(zhí)行流程與不同,向中提交任務(wù)的時(shí)候,任務(wù)被包裝成對(duì)象加入延遲隊(duì)列并啟動(dòng)一個(gè)線程。當(dāng)我們創(chuàng)建出一個(gè)調(diào)度線程池以后,就可以開始提交任務(wù)了。
最近新接手的項(xiàng)目里大量使用了ScheduledThreadPoolExecutor類去執(zhí)行一些定時(shí)任務(wù),之前一直沒有機(jī)會(huì)研究這個(gè)類的源碼,這次趁著機(jī)會(huì)好好研讀一下。
原文地址:http://www.jianshu.com/p/18f4...
該類主要還是基于ThreadPoolExecutor類進(jìn)行二次開發(fā),所以對(duì)Java線程池執(zhí)行過程還不了解的同學(xué)建議先看看我之前的文章。
當(dāng)面試官問線程池時(shí),你應(yīng)該知道些什么?
與ThreadPoolExecutor不同,向ScheduledThreadPoolExecutor中提交任務(wù)的時(shí)候,任務(wù)被包裝成ScheduledFutureTask對(duì)象加入延遲隊(duì)列并啟動(dòng)一個(gè)woker線程。
用戶提交的任務(wù)加入延遲隊(duì)列時(shí),會(huì)按照?qǐng)?zhí)行時(shí)間進(jìn)行排列,也就是說隊(duì)列頭的任務(wù)是需要最早執(zhí)行的。而woker線程會(huì)從延遲隊(duì)列中獲取任務(wù),如果已經(jīng)到了任務(wù)的執(zhí)行時(shí)間,則開始執(zhí)行。否則阻塞等待剩余延遲時(shí)間后再嘗試獲取任務(wù)。
任務(wù)執(zhí)行完成以后,如果該任務(wù)是一個(gè)需要周期性反復(fù)執(zhí)行的任務(wù),則計(jì)算好下次執(zhí)行的時(shí)間后會(huì)重新加入到延遲隊(duì)列中。
二、源碼深入分析首先看下ScheduledThreadPoolExecutor類的幾個(gè)構(gòu)造函數(shù):
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }
注:這里構(gòu)造函數(shù)都是使用super,其實(shí)就是ThreadPoolExecutor的構(gòu)造函數(shù)
這里有三點(diǎn)需要注意:
使用DelayedWorkQueue作為阻塞隊(duì)列,并沒有像ThreadPoolExecutor類一樣開放給用戶進(jìn)行自定義設(shè)置。該隊(duì)列是ScheduledThreadPoolExecutor類的核心組件,后面詳細(xì)介紹。
這里沒有向用戶開放maximumPoolSize的設(shè)置,原因是DelayedWorkQueue中的元素在大于初始容量16時(shí),會(huì)進(jìn)行擴(kuò)容,也就是說隊(duì)列不會(huì)裝滿,maximumPoolSize參數(shù)即使設(shè)置了也不會(huì)生效。
worker線程沒有回收時(shí)間,原因跟第2點(diǎn)一樣,因?yàn)椴粫?huì)觸發(fā)回收操作。所以這里的線程存活時(shí)間都設(shè)置為0。
再次說明:上面三點(diǎn)的理解需要先了解ThreadPoolExecutor的知識(shí)點(diǎn)。
當(dāng)我們創(chuàng)建出一個(gè)調(diào)度線程池以后,就可以開始提交任務(wù)了。這里依次分析一下三個(gè)常用API的源碼:
首先是schedule方法,該方法是指任務(wù)在指定延遲時(shí)間到達(dá)后觸發(fā),只會(huì)執(zhí)行一次。
public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) { //參數(shù)校驗(yàn) if (command == null || unit == null) throw new NullPointerException(); //這里是一個(gè)嵌套結(jié)構(gòu),首先把用戶提交的任務(wù)包裝成ScheduledFutureTask //然后在調(diào)用decorateTask進(jìn)行包裝,該方法是留給用戶去擴(kuò)展的,默認(rèn)是個(gè)空方法 RunnableScheduledFuture> t = decorateTask(command, new ScheduledFutureTask(command, null, triggerTime(delay, unit))); //包裝好任務(wù)以后,就進(jìn)行提交了 delayedExecute(t); return t; }
重點(diǎn)看一下提交任務(wù)的源碼:
private void delayedExecute(RunnableScheduledFuture> task) { //如果線程池已經(jīng)關(guān)閉,則使用拒絕策略把提交任務(wù)拒絕掉 if (isShutdown()) reject(task); else { //與ThreadPoolExecutor不同,這里直接把任務(wù)加入延遲隊(duì)列 super.getQueue().add(task); //如果當(dāng)前狀態(tài)無法執(zhí)行任務(wù),則取消 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else //這里是增加一個(gè)worker線程,避免提交的任務(wù)沒有worker去執(zhí)行 //原因就是該類沒有像ThreadPoolExecutor一樣,woker滿了才放入隊(duì)列 ensurePrestart(); } }
這里的關(guān)鍵點(diǎn)其實(shí)就是super.getQueue().add(task)行代碼,ScheduledThreadPoolExecutor類在內(nèi)部自己實(shí)現(xiàn)了一個(gè)基于堆數(shù)據(jù)結(jié)構(gòu)的延遲隊(duì)列。add方法最終會(huì)落到offer方法中,一起看下:
public boolean offer(Runnable x) { //參數(shù)校驗(yàn) if (x == null) throw new NullPointerException(); RunnableScheduledFuture> e = (RunnableScheduledFuture>)x; final ReentrantLock lock = this.lock; lock.lock(); try { //查看當(dāng)前元素?cái)?shù)量,如果大于隊(duì)列長(zhǎng)度則進(jìn)行擴(kuò)容 int i = size; if (i >= queue.length) grow(); //元素?cái)?shù)量加1 size = i + 1; //如果當(dāng)前隊(duì)列還沒有元素,則直接加入頭部 if (i == 0) { queue[0] = e; //記錄索引 setIndex(e, 0); } else { //把任務(wù)加入堆中,并調(diào)整堆結(jié)構(gòu),這里就會(huì)根據(jù)任務(wù)的觸發(fā)時(shí)間排列 //把需要最早執(zhí)行的任務(wù)放在前面 siftUp(i, e); } //如果新加入的元素就是隊(duì)列頭,這里有兩種情況 //1.這是用戶提交的第一個(gè)任務(wù) //2.新任務(wù)進(jìn)行堆調(diào)整以后,排在隊(duì)列頭 if (queue[0] == e) { //這個(gè)變量起優(yōu)化作用,后面說 leader = null; //加入元素以后,喚醒worker線程 available.signal(); } } finally { lock.unlock(); } return true; }
通過上面的邏輯,我們把提交的任務(wù)成功加入到了延遲隊(duì)列中,前面說了加入任務(wù)以后會(huì)開啟一個(gè)woker線程,該線程的任務(wù)就是從延遲隊(duì)列中不斷取出任務(wù)執(zhí)行。這些都是跟ThreadPoolExecutor相同的,我們看下從該延遲隊(duì)列中獲取元素的源碼:
public RunnableScheduledFuture> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { //取出隊(duì)列中第一個(gè)元素,即最早需要執(zhí)行的任務(wù) RunnableScheduledFuture> first = queue[0]; //如果隊(duì)列為空,則阻塞等待加入元素時(shí)喚醒 if (first == null) available.await(); else { //計(jì)算任務(wù)執(zhí)行時(shí)間,這個(gè)delay是當(dāng)前時(shí)間減去任務(wù)觸發(fā)時(shí)間 long delay = first.getDelay(NANOSECONDS); //如果到了觸發(fā)時(shí)間,則執(zhí)行出隊(duì)操作 if (delay <= 0) return finishPoll(first); first = null; //這里表示該任務(wù)已經(jīng)分配給了其他線程,當(dāng)前線程等待喚醒就可以 if (leader != null) available.await(); else { //否則把給任務(wù)分配給當(dāng)前線程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { //當(dāng)前線程等待任務(wù)剩余延遲時(shí)間 available.awaitNanos(delay); } finally { //這里線程醒來以后,什么時(shí)候leader會(huì)發(fā)生變化呢? //就是上面的添加任務(wù)的時(shí)候 if (leader == thisThread) leader = null; } } } } } finally { //如果隊(duì)列不為空,則喚醒其他woker線程 if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
這里為什么會(huì)加入一個(gè)leader變量來分配阻塞隊(duì)列中的任務(wù)呢?原因是要減少不必要的時(shí)間等待。比如說現(xiàn)在隊(duì)列中的第一個(gè)任務(wù)1分鐘后執(zhí)行,那么用戶提交新的任務(wù)時(shí)會(huì)不斷的加入woker線程,如果新提交的任務(wù)都排在隊(duì)列后面,也就是說新的woker現(xiàn)在都會(huì)取出這第一個(gè)任務(wù)進(jìn)行執(zhí)行延遲時(shí)間的等待,當(dāng)該任務(wù)到觸發(fā)時(shí)間時(shí),會(huì)喚醒很多woker線程,這顯然是沒有必要的。
當(dāng)任務(wù)被woker線程取出以后,會(huì)執(zhí)行run方法,由于此時(shí)任務(wù)已經(jīng)被包裝成了ScheduledFutureTask對(duì)象,那我們來看下該類的run方法:
public void run() { boolean periodic = isPeriodic(); //如果當(dāng)前線程池已經(jīng)不支持執(zhí)行任務(wù),則取消 if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) //如果不需要周期性執(zhí)行,則直接執(zhí)行run方法然后結(jié)束 ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { //如果需要周期執(zhí)行,則在執(zhí)行完任務(wù)以后,設(shè)置下一次執(zhí)行時(shí)間 setNextRunTime(); //把任務(wù)重新加入延遲隊(duì)列 reExecutePeriodic(outerTask); } }
上面就是schedule方法完整的執(zhí)行過程。
ScheduledThreadPoolExecutor類中關(guān)于周期性執(zhí)行的任務(wù)提供了兩個(gè)方法scheduleAtFixedRate跟scheduleWithFixedDelay,一起看下區(qū)別。
public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { //刪除不必要的邏輯,重點(diǎn)看區(qū)別 ScheduledFutureTasksft = new ScheduledFutureTask (command, null, triggerTime(initialDelay, unit), //二者唯一區(qū)別 unit.toNanos(period)); //... } public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { //... ScheduledFutureTask sft = new ScheduledFutureTask (command, null, triggerTime(initialDelay, unit), //二者唯一區(qū)別 unit.toNanos(-delay)); //.. }
前者把周期延遲時(shí)間傳入ScheduledFutureTask中,而后者卻設(shè)置成負(fù)數(shù)傳入,區(qū)別在哪里呢?看下當(dāng)任務(wù)執(zhí)行完成以后的收尾工作中設(shè)置任務(wù)下次執(zhí)行時(shí)間的方法setNextRunTime源碼:
private void setNextRunTime() { long p = period; //大于0是scheduleAtFixedRate方法,表示執(zhí)行時(shí)間是根據(jù)初始化參數(shù)計(jì)算的 if (p > 0) time += p; else //小于0是scheduleWithFixedDelay方法,表示執(zhí)行時(shí)間是根據(jù)當(dāng)前時(shí)間重新計(jì)算的 time = triggerTime(-p); }
也就是說當(dāng)使用scheduleAtFixedRate方法提交任務(wù)時(shí),任務(wù)后續(xù)執(zhí)行的延遲時(shí)間都已經(jīng)確定好了,分別是initialDelay,initialDelay + period,initialDelay + 2 * period以此類推。
而調(diào)用scheduleWithFixedDelay方法提交任務(wù)時(shí),第一次執(zhí)行的延遲時(shí)間為initialDelay,后面的每次執(zhí)行時(shí)間都是在前一次任務(wù)執(zhí)行完成以后的時(shí)間點(diǎn)上面加上period延遲執(zhí)行。
ScheduledThreadPoolExecutor可以說是在ThreadPoolExecutor上面進(jìn)行了一些擴(kuò)展操作,它只是重新包裝了任務(wù)以及阻塞隊(duì)列。該類的阻塞隊(duì)列DelayedWorkQueue是基于堆去實(shí)現(xiàn)的,本文沒有太詳細(xì)介紹堆結(jié)構(gòu)插入跟刪除數(shù)據(jù)的調(diào)整工作,感興趣的同學(xué)可以私信或者評(píng)論交流。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/70622.html
摘要:當(dāng)面試官問線程池時(shí),你應(yīng)該知道些什么一執(zhí)行流程與不同,向中提交任務(wù)的時(shí)候,任務(wù)被包裝成對(duì)象加入延遲隊(duì)列并啟動(dòng)一個(gè)線程。當(dāng)我們創(chuàng)建出一個(gè)調(diào)度線程池以后,就可以開始提交任務(wù)了。 最近新接手的項(xiàng)目里大量使用了ScheduledThreadPoolExecutor類去執(zhí)行一些定時(shí)任務(wù),之前一直沒有機(jī)會(huì)研究這個(gè)類的源碼,這次趁著機(jī)會(huì)好好研讀一下。 原文地址:http://www.jianshu....
摘要:當(dāng)面試官問線程池時(shí),你應(yīng)該知道些什么一執(zhí)行流程與不同,向中提交任務(wù)的時(shí)候,任務(wù)被包裝成對(duì)象加入延遲隊(duì)列并啟動(dòng)一個(gè)線程。當(dāng)我們創(chuàng)建出一個(gè)調(diào)度線程池以后,就可以開始提交任務(wù)了。 最近新接手的項(xiàng)目里大量使用了ScheduledThreadPoolExecutor類去執(zhí)行一些定時(shí)任務(wù),之前一直沒有機(jī)會(huì)研究這個(gè)類的源碼,這次趁著機(jī)會(huì)好好研讀一下。 原文地址:http://www.jianshu....
摘要:在前面介紹了的多線程的基本原理信息線程池架構(gòu)原理和源碼解析,本文對(duì)這個(gè)本身的線程池的調(diào)度器做一個(gè)簡(jiǎn)單擴(kuò)展,如果還沒讀過上一篇文章,建議讀一下,因?yàn)檫@是調(diào)度器的核心組件部分。 在前面介紹了java的多線程的基本原理信息:《Java線程池架構(gòu)原理和源碼解析》,本文對(duì)這個(gè)java本身的線程池的調(diào)度器做一個(gè)簡(jiǎn)單擴(kuò)展,如果還沒讀過上一篇文章,建議讀一下,因?yàn)檫@是調(diào)度器的核心組件部分。 我們?nèi)绻?..
摘要:深入理解線程池線程池初探所謂線程池,就是將多個(gè)線程放在一個(gè)池子里面所謂池化技術(shù),然后需要線程的時(shí)候不是創(chuàng)建一個(gè)線程,而是從線程池里面獲取一個(gè)可用的線程,然后執(zhí)行我們的任務(wù)。最后的的意思是需要確保線程池已經(jīng)被啟動(dòng)起來了。 深入理解Java線程池 線程池初探 ?所謂線程池,就是將多個(gè)線程放在一個(gè)池子里面(所謂池化技術(shù)),然后需要線程的時(shí)候不是創(chuàng)建一個(gè)線程,而是從線程池里面獲取一個(gè)可用的線程...
摘要:也是自帶的一個(gè)基于線程池設(shè)計(jì)的定時(shí)任務(wù)類。其每個(gè)調(diào)度任務(wù)都會(huì)分配到線程池中的一個(gè)線程執(zhí)行,所以其任務(wù)是并發(fā)執(zhí)行的,互不影響。 原創(chuàng)不易,如需轉(zhuǎn)載,請(qǐng)注明出處https://www.cnblogs.com/baixianlong/p/10659045.html,否則將追究法律責(zé)任?。?! 一、在JAVA開發(fā)領(lǐng)域,目前可以通過以下幾種方式進(jìn)行定時(shí)任務(wù) 1、單機(jī)部署模式 Timer:jdk中...
閱讀 3730·2021-11-17 09:33
閱讀 2756·2021-09-22 15:12
閱讀 3356·2021-08-12 13:24
閱讀 2451·2019-08-30 11:14
閱讀 1742·2019-08-29 14:09
閱讀 1334·2019-08-26 14:01
閱讀 3074·2019-08-26 13:49
閱讀 1786·2019-08-26 12:16