引言
本文是源起netty專欄的第4篇文章,很明顯前3篇文章已經(jīng)在偏離主題的道路上越來越遠(yuǎn)。于是乎,我決定:繼續(xù)保持……
使用首先看看源碼類注釋中的示例(未改變官方示例邏輯,只是增加了print輸出和注釋)
import java.time.LocalTime; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; public class ScheduleExecutorServiceDemo { private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5); public static void main(String args[]){ final Runnable beeper = new Runnable() { public void run() { System.out.println(Thread.currentThread().getName()+" >>> "+LocalTime.now().toString()+" >>> beep"); //TODO 沉睡吧,少年 //try { // TimeUnit.SECONDS.sleep(3L); //} catch (InterruptedException e) { // e.printStackTrace(); //} } }; //從0s開始輸出beep,間隔1s final ScheduledFuture> beeperHandle = scheduler.scheduleAtFixedRate(beeper, 0, 1, TimeUnit.SECONDS); //10s之后停止beeperHandle的瘋狂輸出行為 scheduler.schedule(new Runnable() { public void run() { System.out.println("覺悟吧,beeperHandle!I will kill you!"); beeperHandle.cancel(true); } }, 10, TimeUnit.SECONDS); } }
scheduleAtFixedRate也是該類常用的打開方式之一,網(wǎng)上很多文章會拿該方法與scheduleWithFixedDelay進(jìn)行對比,對比結(jié)果其實和方法名一致:
scheduleAtFixedRate //以固定頻率執(zhí)行 scheduleWithFixedDelay //延遲方式執(zhí)行,間隔時間=間隔時間入?yún)?任務(wù)執(zhí)行時間
ScheduleExecutorService實則是Timer的進(jìn)化版,主要改進(jìn)了Timer單線程方面的弊端,改進(jìn)方式自然是線程池,ScheduleExecutorService的好基友ScheduledThreadPoolExecutor華麗麗登場。其實ScheduledThreadPoolExecutor才是主角,ScheduleExecutorService扮演的是拋磚引玉中的磚……
先看下ScheduledThreadPoolExecutor類的江湖地位:
既然繼承自ThreadPoolExecutor,確乃線程池?zé)o疑。
疑問本文以如下方法作為切入點:
public ScheduledFuture> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
方法入?yún)?b>period(譯:周期)就是scheduleAtFixedRate所指的固定頻率嗎?
這個問題很好驗證,把示例中這部分代碼的注釋去掉就能得到答案。
final Runnable beeper = new Runnable() { public void run() { System.out.println(Thread.currentThread().getName()+" >>> "+LocalTime.now().toString()+" >>> beep"); //TODO 沉睡吧,少年 //try { // TimeUnit.SECONDS.sleep(3L); //} catch (InterruptedException e) { // e.printStackTrace(); //} } };
答案就是,如果方法執(zhí)行時間大于間隔周期period,則任務(wù)的下次執(zhí)行時間將超過period的設(shè)定!
執(zhí)行結(jié)果如下,可以看出任務(wù)間隔為3s,而不是period設(shè)置的1s
不禁好奇,ScheduleExecutorService是怎么實現(xiàn)的多長時間之后執(zhí)行下一個任務(wù)?有句話叫源碼之下無秘密,so..let"s do this !
源碼分析 1.初始化從ScheduleExecutorService的初始化開始:
private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
追隨調(diào)用鏈Executors.newScheduledThreadPool -> new ScheduledThreadPoolExecutor(corePoolSize),進(jìn)入如下方法:
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue()); //注意最后一個參數(shù) }
線程池中的任務(wù)隊列用的new DelayedWorkQueue(),而DelayedWorkQueue是ScheduledThreadPoolExecutor的內(nèi)部類。
初始化部分關(guān)注到這一點即可,之后會是一些成員變量的賦值,不作解釋。
接下來從scheduleAtFixedRate方法開始,進(jìn)入它的實現(xiàn)方法:
public ScheduledFuture> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTasksft = new ScheduledFutureTask (command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
Runnable command被封裝成了ScheduledFutureTask類,無獨有偶,ScheduledFutureTask是ScheduledThreadPoolExecutor的另外一個內(nèi)部類??聪滤念愱P(guān)系圖:
有沒有發(fā)現(xiàn)ScheduledFutureTask實現(xiàn)了Comparable接口?眾所周知這個接口是以某種規(guī)則用來比較大小的,這里的規(guī)則就是任務(wù)的開始執(zhí)行時間——ScheduledFutureTask的一個屬性:
/** The time the task is enabled to execute in nanoTime units */ private long time;
compareTo方法就是明證:
public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask> x = (ScheduledFutureTask>)other; long diff = time - x.time; //focus這里啊喂?。?! if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
一般來說,這些比較(compare)放在集合中才有意義,那ScheduledFutureTask之后會放在哪個集合中嗎?有些朋友可能已經(jīng)猜到了,沒錯,ScheduledFutureTask后續(xù)會置于前文提到的DelayedWorkQueue中。
3.延時執(zhí)行繼續(xù)ScheduledThreadPoolExecutor.scheduleAtFixedRate方法:
ScheduledFutureTasksft = new ScheduledFutureTask (command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); //醒醒,該你出場了
進(jìn)入delayedExecute方法:
private void delayedExecute(RunnableScheduledFuture> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); //代碼一 - 任務(wù)加入DelayedWorkQueue if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); //代碼二 - 任務(wù)開始 } }
追蹤 代碼一 位置的調(diào)用鏈:
-> DelayedWorkQueue.add -> offer -> siftUp(int k, RunnableScheduledFuture> key)
private void siftUp(int k, RunnableScheduledFuture> key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture> e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }
可以看到,siftUp方法實現(xiàn)了向DelayedWorkQueue添加任務(wù)時(add),開始時間靠后的任務(wù)(ScheduledFutureTask)會放在后面。
ok,回到 代碼二 位置的ensurePrestart方法,接著追:
ensurePrestart -> addWorker(Runnable firstTask, boolean core)
濃縮版addWorker方法如下:
private boolean addWorker(Runnable firstTask, boolean core){ ... //省略很多的驗證邏輯 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try{ w = new Worker(firstTask); //代碼三 - 封裝成worker,new Worker會從線程池中獲取線程 final Thread t = w.thread; if (t != null){ final ReentrantLock mainLock = this.mainLock; mainLock.lock(); ... //省略部分狀態(tài)控制邏輯 if (workerAdded){ t.start(); //代碼四 - 執(zhí)行Worker的run方法 workerStarted = true; } } }finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
這里發(fā)現(xiàn)firstTask(ScheduledFutureTask)再次被封裝成了Worker(代碼三),那么t.start()(代碼四),自然會執(zhí)行Worker的run方法,跟下Worker.run方法:Worker.run -> runWorker(Worker w)
濃縮后的runWorker:
final void runWorker(Worker w){ ... //省略部分代碼 try{ while (task != null || (task = getTask()) != null){ //代碼五 - getTask()獲取任務(wù) ... //省略部分代碼 task.run(); //代碼六 - 任務(wù)執(zhí)行 ... //省略部分代碼 } completedAbruptly = false; }finally{ processWorkerExit(w, completedAbruptly); } }
老規(guī)矩,五、六兩處關(guān)鍵代碼分別看一下:
代碼五 getTask最終定位到DelayedWorkQueue.take方法,這里只分析延時任務(wù)的執(zhí)行情況
public RunnableScheduledFuture> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don"t retain ref while waiting if (leader != null) //代碼八 - leader線程就是下一次的工作線程 available.await(); else { Thread thisThread = Thread.currentThread(); //代碼七 - 指定leader線程 leader = thisThread; try { available.awaitNanos(delay); //等待 } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
對于延時任務(wù)來說,線程池中第一個調(diào)用take的線程進(jìn)來會作為leader線程(代碼七),然后等待。結(jié)束等待的位置在哪?在ScheduledFutureTask.run的調(diào)用中?。ㄎ易鲾帱c調(diào)試的時候,這個等待時間總是很大,一般兩個小時以上,似乎直接用await就成?這一點確有疑問)。
而線程池中的其它線程調(diào)用take時,發(fā)現(xiàn)leader已經(jīng)被第一個線程搶了,只能等著(代碼八)
回到 代碼六 位置,task.run()也就是ScheduledFutureTask.run
public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { //對于延時任務(wù),會進(jìn)入這個分支 setNextRunTime(); reExecutePeriodic(outerTask); } }
對于延時任務(wù),會執(zhí)行ScheduledFutureTask.super.runAndReset():
protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callablec = callable; if (c != null && s == NEW) { try { //代碼九 - 阻塞式等待beeper完成 c.call(); // don"t set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }
runAndReset方法會等待最初定義的beeper邏輯執(zhí)行完成(代碼九),這也解釋了為什么scheduleAtFixedRate的下次任務(wù)執(zhí)行時間會有可能超過參數(shù)period的設(shè)定!
然后調(diào)用reExecutePeriodic:
void reExecutePeriodic(RunnableScheduledFuture> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); //隊列中再次加入任務(wù) if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); //再次回到ensurePrestart方法 } }
reExecutePeriodic方法看上去是不是似曾相識,與本小節(jié)(3.延時執(zhí)行)開端的delayedExecute方法對比下:
private void delayedExecute(RunnableScheduledFuture> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); //任務(wù)加入DelayedWorkQueue if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); //任務(wù)開始 } }
都是加入隊列,然后任務(wù)開始!
而DelayedWorkQueue.add中到底做了什么?之前沒有分析,在這里看一下:DelayedWorkQueue.add -> offer
public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture> e = (RunnableScheduledFuture>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; //將leader賦值清除 available.signal(); //代碼十 - 通知線程 } } finally { lock.unlock(); } return true; }
可以看到,就是在offer方法(代碼十),將DelayedWorkQueue.take中的available.awaitNanos(delay)喚醒了!
總結(jié)是不是已經(jīng)繞暈了?很正常,因為源碼終歸是需要自己去讀個幾遍才能理清整個脈絡(luò)。所以老鐵們,加油!
最后的總結(jié)還是不能缺少的,一個定時任務(wù)的執(zhí)行流程是這樣的:
1.任務(wù)開始時,將任務(wù)ScheduledFutureTask放入隊列DelayedWorkQueue。任務(wù)放入過程會計算該任務(wù)的開始執(zhí)行時間,執(zhí)行時間靠前的放入隊列的前端,執(zhí)行時間靠后的放入隊列的后端。
2.之后的ensurePrestart方法,先從線程池中獲取線程,該線程會從隊列DelayedWorkQueue中獲取ScheduledFutureTask。
獲取過程DelayedWorkQueue.take先計算任務(wù)的延時時間delay ,有兩種情況:
delay<=0 已不需要延時,立即獲取任務(wù)
delay>0 需要延時,出現(xiàn)如下局面:
第一個進(jìn)入的線程成為leader
其它線程等待
long delay = first.getDelay(NANOSECONDS); //計算延時時間delay //已不需要延時,立即獲取任務(wù) if (delay <= 0) return finishPoll(first); first = null; // don"t retain ref while waiting //需要延時的任務(wù)(與此同時有任務(wù)正在執(zhí)行) if (leader != null) //其它線程進(jìn)來時,有l(wèi)eader線程存在了,等待 available.await(); else { Thread thisThread = Thread.currentThread(); //第一個進(jìn)入這里的線程會成為leader leader = thisThread; try { available.awaitNanos(delay); //等待 } finally { if (leader == thisThread) leader = null; } }
3.獲取任務(wù)后,進(jìn)入執(zhí)行環(huán)節(jié)Worker.run -> ScheduledFutureTask.run。執(zhí)行過程會阻塞式等待任務(wù)完成,這也是任務(wù)執(zhí)行時間可能會超過period的原因!任務(wù)執(zhí)行結(jié)束會再次放入任務(wù),這樣又回到步驟1,反復(fù)執(zhí)行。
感謝分析Java延遲與周期任務(wù)的實現(xiàn)原理描述
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/76305.html
引言 本文是源起netty專欄的第4篇文章,很明顯前3篇文章已經(jīng)在偏離主題的道路上越來越遠(yuǎn)。于是乎,我決定:繼續(xù)保持…… 使用 首先看看源碼類注釋中的示例(未改變官方示例邏輯,只是增加了print輸出和注釋) import java.time.LocalTime; import java.util.concurrent.Executors; import java.util.concurrent....
引言 本文是源起netty專欄的第4篇文章,很明顯前3篇文章已經(jīng)在偏離主題的道路上越來越遠(yuǎn)。于是乎,我決定:繼續(xù)保持…… 使用 首先看看源碼類注釋中的示例(未改變官方示例邏輯,只是增加了print輸出和注釋) import java.time.LocalTime; import java.util.concurrent.Executors; import java.util.concurrent....
摘要:實現(xiàn)原理淺談幫助理解的示意圖中有一屬性,類型是的靜態(tài)內(nèi)部類。剛剛說過,是一個中的靜態(tài)內(nèi)部類,則是的內(nèi)部節(jié)點。這個會在線程中,作為其屬性初始是一個數(shù)組的索引,達(dá)成與類似的效果。的方法被調(diào)用時,會根據(jù)記錄的槽位信息進(jìn)行大掃除。 概述 FastThreadLocal的類名本身就充滿了對ThreadLocal的挑釁,快男FastThreadLocal是怎么快的?源碼中類注釋坦白如下: /** ...
摘要:答曰摸索直譯為服務(wù)加載器,最終目的是獲取的實現(xiàn)類。代碼走起首先,要有一個接口形狀接口介紹然后,要有該接口的實現(xiàn)類。期具體實現(xiàn)依靠的內(nèi)部類,感性趣的朋友可以自己看一下。總結(jié)重點在于可跨越包獲取,這一點筆者通過多模塊項目親測延時加載特性 前戲 netty源碼注釋有云: ... If a provider class has been installed in a jar file tha...
摘要:閱讀源碼時,發(fā)現(xiàn)很多,理所當(dāng)然會想翻閱資料后,該技能,姿勢如下環(huán)境中的全部屬性全部屬性注意如果將本行代碼放在自定義屬性之后,會不會打出把自定義屬性也給獲取到可以結(jié)論會獲取目前環(huán)境中全部的屬性值,無論系統(tǒng)提供還是個人定義系統(tǒng)提供屬性代碼中定義 閱讀源碼時,發(fā)現(xiàn)很多System.getProperty(xxx),理所當(dāng)然會想:whats fucking this? 翻閱資料后,Get該技能...
閱讀 3163·2021-09-30 09:47
閱讀 2022·2021-09-22 16:04
閱讀 2290·2021-09-22 15:44
閱讀 2550·2021-08-25 09:38
閱讀 550·2019-08-26 13:23
閱讀 1238·2019-08-26 12:20
閱讀 2819·2019-08-26 11:59
閱讀 1087·2019-08-23 18:40