成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

【源起Netty 外傳】ScheduledThreadPoolExecutor源碼解讀

funnyZhang / 2002人閱讀

引言

本文是源起netty專欄的第4篇文章,很明顯前3篇文章已經(jīng)在偏離主題的道路上越來(lái)越遠(yuǎn)。于是乎,我決定:繼續(xù)保持……

使用

首先看看源碼類注釋中的示例(未改變官方示例邏輯,只是增加了print輸出和注釋)

import java.time.LocalTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class ScheduleExecutorServiceDemo {
    private final static ScheduledExecutorService scheduler =
            Executors.newScheduledThreadPool(5);

    public static void main(String args[]){
        final Runnable beeper = new Runnable() {
            public void run() {
                System.out.println(Thread.currentThread().getName()+" >>> "+LocalTime.now().toString()+" >>> beep");
                
                //TODO 沉睡吧,少年
                //try {
                //    TimeUnit.SECONDS.sleep(3L);
                //} catch (InterruptedException e) {
                //    e.printStackTrace();
                //}
            }
        };

        //從0s開始輸出beep,間隔1s
        final ScheduledFuture beeperHandle =
                scheduler.scheduleAtFixedRate(beeper, 0, 1, TimeUnit.SECONDS);

        //10s之后停止beeperHandle的瘋狂輸出行為
        scheduler.schedule(new Runnable() {
            public void run() {
                System.out.println("覺悟吧,beeperHandle!I will kill you!");
                beeperHandle.cancel(true);
            }
        }, 10, TimeUnit.SECONDS);
    }
}

scheduleAtFixedRate也是該類常用的打開方式之一,網(wǎng)上很多文章會(huì)拿該方法與scheduleWithFixedDelay進(jìn)行對(duì)比,對(duì)比結(jié)果其實(shí)和方法名一致:

scheduleAtFixedRate    //以固定頻率執(zhí)行
scheduleWithFixedDelay    //延遲方式執(zhí)行,間隔時(shí)間=間隔時(shí)間入?yún)?任務(wù)執(zhí)行時(shí)間

ScheduleExecutorService實(shí)則是Timer的進(jìn)化版,主要改進(jìn)了Timer單線程方面的弊端,改進(jìn)方式自然是線程池,ScheduleExecutorService的好基友ScheduledThreadPoolExecutor華麗麗登場(chǎng)。其實(shí)ScheduledThreadPoolExecutor才是主角,ScheduleExecutorService扮演的是拋磚引玉中的磚……

先看下ScheduledThreadPoolExecutor類的江湖地位:

既然繼承自ThreadPoolExecutor,確乃線程池?zé)o疑。

疑問(wèn)

本文以如下方法作為切入點(diǎn):
public ScheduledFuture scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

方法入?yún)?b>period(譯:周期)就是scheduleAtFixedRate所指的固定頻率嗎?
這個(gè)問(wèn)題很好驗(yàn)證,把示例中這部分代碼的注釋去掉就能得到答案。

final Runnable beeper = new Runnable() {
    public void run() {
        System.out.println(Thread.currentThread().getName()+" >>> "+LocalTime.now().toString()+" >>> beep");
                
        //TODO 沉睡吧,少年
        //try {
        //    TimeUnit.SECONDS.sleep(3L);
        //} catch (InterruptedException e) {
        //    e.printStackTrace();
        //}
    }
};

答案就是,如果方法執(zhí)行時(shí)間大于間隔周期period,則任務(wù)的下次執(zhí)行時(shí)間將超過(guò)period的設(shè)定!

執(zhí)行結(jié)果如下,可以看出任務(wù)間隔為3s,而不是period設(shè)置的1s

不禁好奇,ScheduleExecutorService是怎么實(shí)現(xiàn)的多長(zhǎng)時(shí)間之后執(zhí)行下一個(gè)任務(wù)?有句話叫源碼之下無(wú)秘密,so..let"s do this !

源碼分析 1.初始化

從ScheduleExecutorService的初始化開始:

private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

追隨調(diào)用鏈Executors.newScheduledThreadPool -> new ScheduledThreadPoolExecutor(corePoolSize),進(jìn)入如下方法:

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());  //注意最后一個(gè)參數(shù)
}

線程池中的任務(wù)隊(duì)列用的new DelayedWorkQueue(),而DelayedWorkQueue是ScheduledThreadPoolExecutor的內(nèi)部類
初始化部分關(guān)注到這一點(diǎn)即可,之后會(huì)是一些成員變量的賦值,不作解釋。

2.任務(wù)封裝

接下來(lái)從scheduleAtFixedRate方法開始,進(jìn)入它的實(shí)現(xiàn)方法:

public ScheduledFuture scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask sft = new ScheduledFutureTask(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
    RunnableScheduledFuture t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

Runnable command被封裝成了ScheduledFutureTask類,無(wú)獨(dú)有偶,ScheduledFutureTask是ScheduledThreadPoolExecutor的另外一個(gè)內(nèi)部類。看下它的類關(guān)系圖:

有沒有發(fā)現(xiàn)ScheduledFutureTask實(shí)現(xiàn)了Comparable接口?眾所周知這個(gè)接口是以某種規(guī)則用來(lái)比較大小的,這里的規(guī)則就是任務(wù)的開始執(zhí)行時(shí)間——ScheduledFutureTask的一個(gè)屬性:

/** The time the task is enabled to execute in nanoTime units */
private long time;

compareTo方法就是明證:

public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask x = (ScheduledFutureTask)other;
        long diff = time - x.time;    //focus這里啊喂?。?!
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

一般來(lái)說(shuō),這些比較(compare)放在集合中才有意義,那ScheduledFutureTask之后會(huì)放在哪個(gè)集合中嗎?有些朋友可能已經(jīng)猜到了,沒錯(cuò),ScheduledFutureTask后續(xù)會(huì)置于前文提到的DelayedWorkQueue中。

3.延時(shí)執(zhí)行

繼續(xù)ScheduledThreadPoolExecutor.scheduleAtFixedRate方法:

ScheduledFutureTask sft = new ScheduledFutureTask(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
RunnableScheduledFuture t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);    //醒醒,該你出場(chǎng)了

進(jìn)入delayedExecute方法:

private void delayedExecute(RunnableScheduledFuture task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);    //代碼一 - 任務(wù)加入DelayedWorkQueue
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart();    //代碼二 - 任務(wù)開始
    }
}

追蹤 代碼一 位置的調(diào)用鏈:
-> DelayedWorkQueue.add -> offer -> siftUp(int k, RunnableScheduledFuture key)

private void siftUp(int k, RunnableScheduledFuture key) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        RunnableScheduledFuture e = queue[parent];
        if (key.compareTo(e) >= 0)
            break;
        queue[k] = e;
        setIndex(e, k);
        k = parent;
    }
    queue[k] = key;
    setIndex(key, k);
}

可以看到,siftUp方法實(shí)現(xiàn)了向DelayedWorkQueue添加任務(wù)時(shí)(add),開始時(shí)間靠后的任務(wù)(ScheduledFutureTask)會(huì)放在后面。

ok,回到 代碼二 位置的ensurePrestart方法,接著追:
ensurePrestart -> addWorker(Runnable firstTask, boolean core)

濃縮版addWorker方法如下:

private boolean addWorker(Runnable firstTask, boolean core){
    ...    //省略很多的驗(yàn)證邏輯

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try{
        w = new Worker(firstTask);    //代碼三 - 封裝成worker,new Worker會(huì)從線程池中獲取線程
        final Thread t = w.thread;
        if (t != null){
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            
            ...    //省略部分狀態(tài)控制邏輯
            
            if (workerAdded){
                t.start();    //代碼四 - 執(zhí)行Worker的run方法
                workerStarted = true;
            }
        }
    }finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

這里發(fā)現(xiàn)firstTask(ScheduledFutureTask)再次被封裝成了Worker(代碼三),那么t.start()(代碼四),自然會(huì)執(zhí)行Worker的run方法,跟下Worker.run方法:Worker.run -> runWorker(Worker w)

濃縮后的runWorker

final void runWorker(Worker w){
    
    ...    //省略部分代碼
    
    try{
        while (task != null || (task = getTask()) != null){    //代碼五 -  getTask()獲取任務(wù)
            
            ...    //省略部分代碼
                          
            task.run();    //代碼六 - 任務(wù)執(zhí)行
            
            ...    //省略部分代碼
        }
        completedAbruptly = false;
    }finally{
        processWorkerExit(w, completedAbruptly);
    }
}

老規(guī)矩,五、六兩處關(guān)鍵代碼分別看一下:

代碼五 getTask最終定位到DelayedWorkQueue.take方法,這里只分析延時(shí)任務(wù)的執(zhí)行情況

public RunnableScheduledFuture take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture first = queue[0];
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return finishPoll(first);
                first = null; // don"t retain ref while waiting
                if (leader != null)    //代碼八 - leader線程就是下一次的工作線程
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();    //代碼七 - 指定leader線程
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);    //等待
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

對(duì)于延時(shí)任務(wù)來(lái)說(shuō),線程池中第一個(gè)調(diào)用take的線程進(jìn)來(lái)會(huì)作為leader線程(代碼七),然后等待。結(jié)束等待的位置在哪?在ScheduledFutureTask.run的調(diào)用中?。ㄎ易鲾帱c(diǎn)調(diào)試的時(shí)候,這個(gè)等待時(shí)間總是很大,一般兩個(gè)小時(shí)以上,似乎直接用await就成?這一點(diǎn)確有疑問(wèn))。
而線程池中的其它線程調(diào)用take時(shí),發(fā)現(xiàn)leader已經(jīng)被第一個(gè)線程搶了,只能等著(代碼八)

回到 代碼六 位置,task.run()也就是ScheduledFutureTask.run

public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {    //對(duì)于延時(shí)任務(wù),會(huì)進(jìn)入這個(gè)分支
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}

對(duì)于延時(shí)任務(wù),會(huì)執(zhí)行ScheduledFutureTask.super.runAndReset()

protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable c = callable;
        if (c != null && s == NEW) {
            try {
                //代碼九 - 阻塞式等待beeper完成
                c.call(); // don"t set result
                ran = true;
            } catch (Throwable ex) {
                setException(ex);
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
}

runAndReset方法會(huì)等待最初定義的beeper邏輯執(zhí)行完成(代碼九),這也解釋了為什么scheduleAtFixedRate的下次任務(wù)執(zhí)行時(shí)間會(huì)有可能超過(guò)參數(shù)period的設(shè)定!

然后調(diào)用reExecutePeriodic

void reExecutePeriodic(RunnableScheduledFuture task) {
    if (canRunInCurrentRunState(true)) {
        super.getQueue().add(task);    //隊(duì)列中再次加入任務(wù)
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();    //再次回到ensurePrestart方法
    }
}

reExecutePeriodic方法看上去是不是似曾相識(shí),與本小節(jié)(3.延時(shí)執(zhí)行)開端的delayedExecute方法對(duì)比下:

private void delayedExecute(RunnableScheduledFuture task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);    //任務(wù)加入DelayedWorkQueue
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart();    //任務(wù)開始
    }
}

都是加入隊(duì)列,然后任務(wù)開始!

DelayedWorkQueue.add中到底做了什么?之前沒有分析,在這里看一下:DelayedWorkQueue.add -> offer

public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture e = (RunnableScheduledFuture)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        if (i >= queue.length)
            grow();
        size = i + 1;
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {
            siftUp(i, e);
        }
        if (queue[0] == e) {
            leader = null;    //將leader賦值清除
            available.signal();    //代碼十 - 通知線程
        }
    } finally {
        lock.unlock();
    }
    return true;
}

可以看到,就是在offer方法(代碼十),將DelayedWorkQueue.take中的available.awaitNanos(delay)喚醒了!

總結(jié)

是不是已經(jīng)繞暈了?很正常,因?yàn)樵创a終歸是需要自己去讀個(gè)幾遍才能理清整個(gè)脈絡(luò)。所以老鐵們,加油!

最后的總結(jié)還是不能缺少的,一個(gè)定時(shí)任務(wù)的執(zhí)行流程是這樣的:

1.任務(wù)開始時(shí),將任務(wù)ScheduledFutureTask放入隊(duì)列DelayedWorkQueue。任務(wù)放入過(guò)程會(huì)計(jì)算該任務(wù)的開始執(zhí)行時(shí)間,執(zhí)行時(shí)間靠前的放入隊(duì)列的前端,執(zhí)行時(shí)間靠后的放入隊(duì)列的后端。

2.之后的ensurePrestart方法,先從線程池中獲取線程,該線程會(huì)從隊(duì)列DelayedWorkQueue中獲取ScheduledFutureTask

獲取過(guò)程DelayedWorkQueue.take先計(jì)算任務(wù)的延時(shí)時(shí)間delay ,有兩種情況:

delay<=0 已不需要延時(shí),立即獲取任務(wù)

delay>0 需要延時(shí),出現(xiàn)如下局面:

第一個(gè)進(jìn)入的線程成為leader

其它線程等待

long delay = first.getDelay(NANOSECONDS);    //計(jì)算延時(shí)時(shí)間delay 

//已不需要延時(shí),立即獲取任務(wù)
if (delay <= 0)
    return finishPoll(first);    
first = null; // don"t retain ref while waiting

//需要延時(shí)的任務(wù)(與此同時(shí)有任務(wù)正在執(zhí)行)
if (leader != null)    //其它線程進(jìn)來(lái)時(shí),有l(wèi)eader線程存在了,等待
    available.await();
else {
    Thread thisThread = Thread.currentThread();    //第一個(gè)進(jìn)入這里的線程會(huì)成為leader
    leader = thisThread;
    try {
        available.awaitNanos(delay);    //等待
    } finally {
        if (leader == thisThread)
            leader = null;
    }
}

3.獲取任務(wù)后,進(jìn)入執(zhí)行環(huán)節(jié)Worker.run -> ScheduledFutureTask.run。執(zhí)行過(guò)程會(huì)阻塞式等待任務(wù)完成,這也是任務(wù)執(zhí)行時(shí)間可能會(huì)超過(guò)period的原因!任務(wù)執(zhí)行結(jié)束會(huì)再次放入任務(wù),這樣又回到步驟1,反復(fù)執(zhí)行。

感謝

分析Java延遲與周期任務(wù)的實(shí)現(xiàn)原理描述

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/70914.html

相關(guān)文章

  • 源起Netty 外傳ScheduledThreadPoolExecutor源碼解讀

    引言 本文是源起netty專欄的第4篇文章,很明顯前3篇文章已經(jīng)在偏離主題的道路上越來(lái)越遠(yuǎn)。于是乎,我決定:繼續(xù)保持…… 使用 首先看看源碼類注釋中的示例(未改變官方示例邏輯,只是增加了print輸出和注釋) import java.time.LocalTime; import java.util.concurrent.Executors; import java.util.concurrent....

    Eastboat 評(píng)論0 收藏0
  • 源起Netty 外傳ScheduledThreadPoolExecutor源碼解讀

    引言 本文是源起netty專欄的第4篇文章,很明顯前3篇文章已經(jīng)在偏離主題的道路上越來(lái)越遠(yuǎn)。于是乎,我決定:繼續(xù)保持…… 使用 首先看看源碼類注釋中的示例(未改變官方示例邏輯,只是增加了print輸出和注釋) import java.time.LocalTime; import java.util.concurrent.Executors; import java.util.concurrent....

    Martin91 評(píng)論0 收藏0
  • 源起Netty 外傳】FastThreadLocal怎么Fast?

    摘要:實(shí)現(xiàn)原理淺談幫助理解的示意圖中有一屬性,類型是的靜態(tài)內(nèi)部類。剛剛說(shuō)過(guò),是一個(gè)中的靜態(tài)內(nèi)部類,則是的內(nèi)部節(jié)點(diǎn)。這個(gè)會(huì)在線程中,作為其屬性初始是一個(gè)數(shù)組的索引,達(dá)成與類似的效果。的方法被調(diào)用時(shí),會(huì)根據(jù)記錄的槽位信息進(jìn)行大掃除。 概述 FastThreadLocal的類名本身就充滿了對(duì)ThreadLocal的挑釁,快男FastThreadLocal是怎么快的?源碼中類注釋坦白如下: /** ...

    gxyz 評(píng)論0 收藏0
  • 源起Netty 外傳】ServiceLoader詳解

    摘要:答曰摸索直譯為服務(wù)加載器,最終目的是獲取的實(shí)現(xiàn)類。代碼走起首先,要有一個(gè)接口形狀接口介紹然后,要有該接口的實(shí)現(xiàn)類。期具體實(shí)現(xiàn)依靠的內(nèi)部類,感性趣的朋友可以自己看一下。總結(jié)重點(diǎn)在于可跨越包獲取,這一點(diǎn)筆者通過(guò)多模塊項(xiàng)目親測(cè)延時(shí)加載特性 前戲 netty源碼注釋有云: ... If a provider class has been installed in a jar file tha...

    MoAir 評(píng)論0 收藏0
  • 源起Netty 外傳】System.getPropert()詳解

    摘要:閱讀源碼時(shí),發(fā)現(xiàn)很多,理所當(dāng)然會(huì)想翻閱資料后,該技能,姿勢(shì)如下環(huán)境中的全部屬性全部屬性注意如果將本行代碼放在自定義屬性之后,會(huì)不會(huì)打出把自定義屬性也給獲取到可以結(jié)論會(huì)獲取目前環(huán)境中全部的屬性值,無(wú)論系統(tǒng)提供還是個(gè)人定義系統(tǒng)提供屬性代碼中定義 閱讀源碼時(shí),發(fā)現(xiàn)很多System.getProperty(xxx),理所當(dāng)然會(huì)想:whats fucking this? 翻閱資料后,Get該技能...

    lixiang 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<