摘要:當面試官問線程池時,你應該知道些什么一執(zhí)行流程與不同,向中提交任務的時候,任務被包裝成對象加入延遲隊列并啟動一個線程。當我們創(chuàng)建出一個調度線程池以后,就可以開始提交任務了。
最近新接手的項目里大量使用了ScheduledThreadPoolExecutor類去執(zhí)行一些定時任務,之前一直沒有機會研究這個類的源碼,這次趁著機會好好研讀一下。
原文地址:http://www.jianshu.com/p/18f4...
該類主要還是基于ThreadPoolExecutor類進行二次開發(fā),所以對Java線程池執(zhí)行過程還不了解的同學建議先看看我之前的文章。
當面試官問線程池時,你應該知道些什么?
與ThreadPoolExecutor不同,向ScheduledThreadPoolExecutor中提交任務的時候,任務被包裝成ScheduledFutureTask對象加入延遲隊列并啟動一個woker線程。
用戶提交的任務加入延遲隊列時,會按照執(zhí)行時間進行排列,也就是說隊列頭的任務是需要最早執(zhí)行的。而woker線程會從延遲隊列中獲取任務,如果已經(jīng)到了任務的執(zhí)行時間,則開始執(zhí)行。否則阻塞等待剩余延遲時間后再嘗試獲取任務。
任務執(zhí)行完成以后,如果該任務是一個需要周期性反復執(zhí)行的任務,則計算好下次執(zhí)行的時間后會重新加入到延遲隊列中。
二、源碼深入分析首先看下ScheduledThreadPoolExecutor類的幾個構造函數(shù):
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }
注:這里構造函數(shù)都是使用super,其實就是ThreadPoolExecutor的構造函數(shù)
這里有三點需要注意:
使用DelayedWorkQueue作為阻塞隊列,并沒有像ThreadPoolExecutor類一樣開放給用戶進行自定義設置。該隊列是ScheduledThreadPoolExecutor類的核心組件,后面詳細介紹。
這里沒有向用戶開放maximumPoolSize的設置,原因是DelayedWorkQueue中的元素在大于初始容量16時,會進行擴容,也就是說隊列不會裝滿,maximumPoolSize參數(shù)即使設置了也不會生效。
worker線程沒有回收時間,原因跟第2點一樣,因為不會觸發(fā)回收操作。所以這里的線程存活時間都設置為0。
再次說明:上面三點的理解需要先了解ThreadPoolExecutor的知識點。
當我們創(chuàng)建出一個調度線程池以后,就可以開始提交任務了。這里依次分析一下三個常用API的源碼:
首先是schedule方法,該方法是指任務在指定延遲時間到達后觸發(fā),只會執(zhí)行一次。
public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) { //參數(shù)校驗 if (command == null || unit == null) throw new NullPointerException(); //這里是一個嵌套結構,首先把用戶提交的任務包裝成ScheduledFutureTask //然后在調用decorateTask進行包裝,該方法是留給用戶去擴展的,默認是個空方法 RunnableScheduledFuture> t = decorateTask(command, new ScheduledFutureTask(command, null, triggerTime(delay, unit))); //包裝好任務以后,就進行提交了 delayedExecute(t); return t; }
重點看一下提交任務的源碼:
private void delayedExecute(RunnableScheduledFuture> task) { //如果線程池已經(jīng)關閉,則使用拒絕策略把提交任務拒絕掉 if (isShutdown()) reject(task); else { //與ThreadPoolExecutor不同,這里直接把任務加入延遲隊列 super.getQueue().add(task); //如果當前狀態(tài)無法執(zhí)行任務,則取消 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else //這里是增加一個worker線程,避免提交的任務沒有worker去執(zhí)行 //原因就是該類沒有像ThreadPoolExecutor一樣,woker滿了才放入隊列 ensurePrestart(); } }
這里的關鍵點其實就是super.getQueue().add(task)行代碼,ScheduledThreadPoolExecutor類在內部自己實現(xiàn)了一個基于堆數(shù)據(jù)結構的延遲隊列。add方法最終會落到offer方法中,一起看下:
public boolean offer(Runnable x) { //參數(shù)校驗 if (x == null) throw new NullPointerException(); RunnableScheduledFuture> e = (RunnableScheduledFuture>)x; final ReentrantLock lock = this.lock; lock.lock(); try { //查看當前元素數(shù)量,如果大于隊列長度則進行擴容 int i = size; if (i >= queue.length) grow(); //元素數(shù)量加1 size = i + 1; //如果當前隊列還沒有元素,則直接加入頭部 if (i == 0) { queue[0] = e; //記錄索引 setIndex(e, 0); } else { //把任務加入堆中,并調整堆結構,這里就會根據(jù)任務的觸發(fā)時間排列 //把需要最早執(zhí)行的任務放在前面 siftUp(i, e); } //如果新加入的元素就是隊列頭,這里有兩種情況 //1.這是用戶提交的第一個任務 //2.新任務進行堆調整以后,排在隊列頭 if (queue[0] == e) { //這個變量起優(yōu)化作用,后面說 leader = null; //加入元素以后,喚醒worker線程 available.signal(); } } finally { lock.unlock(); } return true; }
通過上面的邏輯,我們把提交的任務成功加入到了延遲隊列中,前面說了加入任務以后會開啟一個woker線程,該線程的任務就是從延遲隊列中不斷取出任務執(zhí)行。這些都是跟ThreadPoolExecutor相同的,我們看下從該延遲隊列中獲取元素的源碼:
public RunnableScheduledFuture> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { //取出隊列中第一個元素,即最早需要執(zhí)行的任務 RunnableScheduledFuture> first = queue[0]; //如果隊列為空,則阻塞等待加入元素時喚醒 if (first == null) available.await(); else { //計算任務執(zhí)行時間,這個delay是當前時間減去任務觸發(fā)時間 long delay = first.getDelay(NANOSECONDS); //如果到了觸發(fā)時間,則執(zhí)行出隊操作 if (delay <= 0) return finishPoll(first); first = null; //這里表示該任務已經(jīng)分配給了其他線程,當前線程等待喚醒就可以 if (leader != null) available.await(); else { //否則把給任務分配給當前線程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { //當前線程等待任務剩余延遲時間 available.awaitNanos(delay); } finally { //這里線程醒來以后,什么時候leader會發(fā)生變化呢? //就是上面的添加任務的時候 if (leader == thisThread) leader = null; } } } } } finally { //如果隊列不為空,則喚醒其他woker線程 if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
這里為什么會加入一個leader變量來分配阻塞隊列中的任務呢?原因是要減少不必要的時間等待。比如說現(xiàn)在隊列中的第一個任務1分鐘后執(zhí)行,那么用戶提交新的任務時會不斷的加入woker線程,如果新提交的任務都排在隊列后面,也就是說新的woker現(xiàn)在都會取出這第一個任務進行執(zhí)行延遲時間的等待,當該任務到觸發(fā)時間時,會喚醒很多woker線程,這顯然是沒有必要的。
當任務被woker線程取出以后,會執(zhí)行run方法,由于此時任務已經(jīng)被包裝成了ScheduledFutureTask對象,那我們來看下該類的run方法:
public void run() { boolean periodic = isPeriodic(); //如果當前線程池已經(jīng)不支持執(zhí)行任務,則取消 if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) //如果不需要周期性執(zhí)行,則直接執(zhí)行run方法然后結束 ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { //如果需要周期執(zhí)行,則在執(zhí)行完任務以后,設置下一次執(zhí)行時間 setNextRunTime(); //把任務重新加入延遲隊列 reExecutePeriodic(outerTask); } }
上面就是schedule方法完整的執(zhí)行過程。
ScheduledThreadPoolExecutor類中關于周期性執(zhí)行的任務提供了兩個方法scheduleAtFixedRate跟scheduleWithFixedDelay,一起看下區(qū)別。
public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { //刪除不必要的邏輯,重點看區(qū)別 ScheduledFutureTasksft = new ScheduledFutureTask (command, null, triggerTime(initialDelay, unit), //二者唯一區(qū)別 unit.toNanos(period)); //... } public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { //... ScheduledFutureTask sft = new ScheduledFutureTask (command, null, triggerTime(initialDelay, unit), //二者唯一區(qū)別 unit.toNanos(-delay)); //.. }
前者把周期延遲時間傳入ScheduledFutureTask中,而后者卻設置成負數(shù)傳入,區(qū)別在哪里呢?看下當任務執(zhí)行完成以后的收尾工作中設置任務下次執(zhí)行時間的方法setNextRunTime源碼:
private void setNextRunTime() { long p = period; //大于0是scheduleAtFixedRate方法,表示執(zhí)行時間是根據(jù)初始化參數(shù)計算的 if (p > 0) time += p; else //小于0是scheduleWithFixedDelay方法,表示執(zhí)行時間是根據(jù)當前時間重新計算的 time = triggerTime(-p); }
也就是說當使用scheduleAtFixedRate方法提交任務時,任務后續(xù)執(zhí)行的延遲時間都已經(jīng)確定好了,分別是initialDelay,initialDelay + period,initialDelay + 2 * period以此類推。
而調用scheduleWithFixedDelay方法提交任務時,第一次執(zhí)行的延遲時間為initialDelay,后面的每次執(zhí)行時間都是在前一次任務執(zhí)行完成以后的時間點上面加上period延遲執(zhí)行。
ScheduledThreadPoolExecutor可以說是在ThreadPoolExecutor上面進行了一些擴展操作,它只是重新包裝了任務以及阻塞隊列。該類的阻塞隊列DelayedWorkQueue是基于堆去實現(xiàn)的,本文沒有太詳細介紹堆結構插入跟刪除數(shù)據(jù)的調整工作,感興趣的同學可以私信或者評論交流。
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://systransis.cn/yun/67849.html
摘要:當面試官問線程池時,你應該知道些什么一執(zhí)行流程與不同,向中提交任務的時候,任務被包裝成對象加入延遲隊列并啟動一個線程。當我們創(chuàng)建出一個調度線程池以后,就可以開始提交任務了。 最近新接手的項目里大量使用了ScheduledThreadPoolExecutor類去執(zhí)行一些定時任務,之前一直沒有機會研究這個類的源碼,這次趁著機會好好研讀一下。 原文地址:http://www.jianshu....
摘要:當面試官問線程池時,你應該知道些什么一執(zhí)行流程與不同,向中提交任務的時候,任務被包裝成對象加入延遲隊列并啟動一個線程。當我們創(chuàng)建出一個調度線程池以后,就可以開始提交任務了。 最近新接手的項目里大量使用了ScheduledThreadPoolExecutor類去執(zhí)行一些定時任務,之前一直沒有機會研究這個類的源碼,這次趁著機會好好研讀一下。 原文地址:http://www.jianshu....
摘要:在前面介紹了的多線程的基本原理信息線程池架構原理和源碼解析,本文對這個本身的線程池的調度器做一個簡單擴展,如果還沒讀過上一篇文章,建議讀一下,因為這是調度器的核心組件部分。 在前面介紹了java的多線程的基本原理信息:《Java線程池架構原理和源碼解析》,本文對這個java本身的線程池的調度器做一個簡單擴展,如果還沒讀過上一篇文章,建議讀一下,因為這是調度器的核心組件部分。 我們如果...
摘要:深入理解線程池線程池初探所謂線程池,就是將多個線程放在一個池子里面所謂池化技術,然后需要線程的時候不是創(chuàng)建一個線程,而是從線程池里面獲取一個可用的線程,然后執(zhí)行我們的任務。最后的的意思是需要確保線程池已經(jīng)被啟動起來了。 深入理解Java線程池 線程池初探 ?所謂線程池,就是將多個線程放在一個池子里面(所謂池化技術),然后需要線程的時候不是創(chuàng)建一個線程,而是從線程池里面獲取一個可用的線程...
摘要:也是自帶的一個基于線程池設計的定時任務類。其每個調度任務都會分配到線程池中的一個線程執(zhí)行,所以其任務是并發(fā)執(zhí)行的,互不影響。 原創(chuàng)不易,如需轉載,請注明出處https://www.cnblogs.com/baixianlong/p/10659045.html,否則將追究法律責任?。?! 一、在JAVA開發(fā)領域,目前可以通過以下幾種方式進行定時任務 1、單機部署模式 Timer:jdk中...
閱讀 856·2023-04-25 21:21
閱讀 3237·2021-11-24 09:39
閱讀 3079·2021-09-02 15:41
閱讀 2009·2021-08-26 14:13
閱讀 1839·2019-08-30 11:18
閱讀 2786·2019-08-29 16:25
閱讀 517·2019-08-28 18:27
閱讀 1590·2019-08-28 18:17