成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Java調(diào)度線程池ScheduledThreadPoolExecutor源碼分析

kohoh_ / 2024人閱讀

摘要:當(dāng)面試官問線程池時(shí),你應(yīng)該知道些什么一執(zhí)行流程與不同,向中提交任務(wù)的時(shí)候,任務(wù)被包裝成對(duì)象加入延遲隊(duì)列并啟動(dòng)一個(gè)線程。當(dāng)我們創(chuàng)建出一個(gè)調(diào)度線程池以后,就可以開始提交任務(wù)了。

最近新接手的項(xiàng)目里大量使用了ScheduledThreadPoolExecutor類去執(zhí)行一些定時(shí)任務(wù),之前一直沒有機(jī)會(huì)研究這個(gè)類的源碼,這次趁著機(jī)會(huì)好好研讀一下。

原文地址:http://www.jianshu.com/p/18f4...

該類主要還是基于ThreadPoolExecutor類進(jìn)行二次開發(fā),所以對(duì)Java線程池執(zhí)行過程還不了解的同學(xué)建議先看看我之前的文章。
當(dāng)面試官問線程池時(shí),你應(yīng)該知道些什么?

一、執(zhí)行流程

與ThreadPoolExecutor不同,向ScheduledThreadPoolExecutor中提交任務(wù)的時(shí)候,任務(wù)被包裝成ScheduledFutureTask對(duì)象加入延遲隊(duì)列并啟動(dòng)一個(gè)woker線程。

用戶提交的任務(wù)加入延遲隊(duì)列時(shí),會(huì)按照?qǐng)?zhí)行時(shí)間進(jìn)行排列,也就是說隊(duì)列頭的任務(wù)是需要最早執(zhí)行的。而woker線程會(huì)從延遲隊(duì)列中獲取任務(wù),如果已經(jīng)到了任務(wù)的執(zhí)行時(shí)間,則開始執(zhí)行。否則阻塞等待剩余延遲時(shí)間后再嘗試獲取任務(wù)。

任務(wù)執(zhí)行完成以后,如果該任務(wù)是一個(gè)需要周期性反復(fù)執(zhí)行的任務(wù),則計(jì)算好下次執(zhí)行的時(shí)間后會(huì)重新加入到延遲隊(duì)列中。

二、源碼深入分析

首先看下ScheduledThreadPoolExecutor類的幾個(gè)構(gòu)造函數(shù):

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }

注:這里構(gòu)造函數(shù)都是使用super,其實(shí)就是ThreadPoolExecutor的構(gòu)造函數(shù)
這里有三點(diǎn)需要注意:

使用DelayedWorkQueue作為阻塞隊(duì)列,并沒有像ThreadPoolExecutor類一樣開放給用戶進(jìn)行自定義設(shè)置。該隊(duì)列是ScheduledThreadPoolExecutor類的核心組件,后面詳細(xì)介紹。

這里沒有向用戶開放maximumPoolSize的設(shè)置,原因是DelayedWorkQueue中的元素在大于初始容量16時(shí),會(huì)進(jìn)行擴(kuò)容,也就是說隊(duì)列不會(huì)裝滿,maximumPoolSize參數(shù)即使設(shè)置了也不會(huì)生效。

worker線程沒有回收時(shí)間,原因跟第2點(diǎn)一樣,因?yàn)椴粫?huì)觸發(fā)回收操作。所以這里的線程存活時(shí)間都設(shè)置為0。

再次說明:上面三點(diǎn)的理解需要先了解ThreadPoolExecutor的知識(shí)點(diǎn)。

當(dāng)我們創(chuàng)建出一個(gè)調(diào)度線程池以后,就可以開始提交任務(wù)了。這里依次分析一下三個(gè)常用API的源碼:

首先是schedule方法,該方法是指任務(wù)在指定延遲時(shí)間到達(dá)后觸發(fā),只會(huì)執(zhí)行一次。

    public ScheduledFuture schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        //參數(shù)校驗(yàn)
        if (command == null || unit == null)
            throw new NullPointerException();
        //這里是一個(gè)嵌套結(jié)構(gòu),首先把用戶提交的任務(wù)包裝成ScheduledFutureTask
        //然后在調(diào)用decorateTask進(jìn)行包裝,該方法是留給用戶去擴(kuò)展的,默認(rèn)是個(gè)空方法
        RunnableScheduledFuture t = decorateTask(command,
            new ScheduledFutureTask(command, null,
                                          triggerTime(delay, unit)));
        //包裝好任務(wù)以后,就進(jìn)行提交了
        delayedExecute(t);
        return t;
    }

重點(diǎn)看一下提交任務(wù)的源碼:

    private void delayedExecute(RunnableScheduledFuture task) {
        //如果線程池已經(jīng)關(guān)閉,則使用拒絕策略把提交任務(wù)拒絕掉
        if (isShutdown())
            reject(task);
        else {
            //與ThreadPoolExecutor不同,這里直接把任務(wù)加入延遲隊(duì)列
            super.getQueue().add(task);
            //如果當(dāng)前狀態(tài)無法執(zhí)行任務(wù),則取消
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                //這里是增加一個(gè)worker線程,避免提交的任務(wù)沒有worker去執(zhí)行
                //原因就是該類沒有像ThreadPoolExecutor一樣,woker滿了才放入隊(duì)列
                ensurePrestart();
        }
    }

這里的關(guān)鍵點(diǎn)其實(shí)就是super.getQueue().add(task)行代碼,ScheduledThreadPoolExecutor類在內(nèi)部自己實(shí)現(xiàn)了一個(gè)基于堆數(shù)據(jù)結(jié)構(gòu)的延遲隊(duì)列。add方法最終會(huì)落到offer方法中,一起看下:

        public boolean offer(Runnable x) {
            //參數(shù)校驗(yàn)
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture e = (RunnableScheduledFuture)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                //查看當(dāng)前元素?cái)?shù)量,如果大于隊(duì)列長(zhǎng)度則進(jìn)行擴(kuò)容
                int i = size;
                if (i >= queue.length)
                    grow();
                //元素?cái)?shù)量加1
                size = i + 1;
                //如果當(dāng)前隊(duì)列還沒有元素,則直接加入頭部
                if (i == 0) {
                    queue[0] = e;
                    //記錄索引
                    setIndex(e, 0);
                } else {
                    //把任務(wù)加入堆中,并調(diào)整堆結(jié)構(gòu),這里就會(huì)根據(jù)任務(wù)的觸發(fā)時(shí)間排列
                    //把需要最早執(zhí)行的任務(wù)放在前面
                    siftUp(i, e);
                }
                //如果新加入的元素就是隊(duì)列頭,這里有兩種情況
                //1.這是用戶提交的第一個(gè)任務(wù)
                //2.新任務(wù)進(jìn)行堆調(diào)整以后,排在隊(duì)列頭
                if (queue[0] == e) {
                    //這個(gè)變量起優(yōu)化作用,后面說
                    leader = null;
                    //加入元素以后,喚醒worker線程
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

通過上面的邏輯,我們把提交的任務(wù)成功加入到了延遲隊(duì)列中,前面說了加入任務(wù)以后會(huì)開啟一個(gè)woker線程,該線程的任務(wù)就是從延遲隊(duì)列中不斷取出任務(wù)執(zhí)行。這些都是跟ThreadPoolExecutor相同的,我們看下從該延遲隊(duì)列中獲取元素的源碼:

        public RunnableScheduledFuture take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    //取出隊(duì)列中第一個(gè)元素,即最早需要執(zhí)行的任務(wù)
                    RunnableScheduledFuture first = queue[0];
                    //如果隊(duì)列為空,則阻塞等待加入元素時(shí)喚醒
                    if (first == null)
                        available.await();
                    else {
                        //計(jì)算任務(wù)執(zhí)行時(shí)間,這個(gè)delay是當(dāng)前時(shí)間減去任務(wù)觸發(fā)時(shí)間
                        long delay = first.getDelay(NANOSECONDS);
                        //如果到了觸發(fā)時(shí)間,則執(zhí)行出隊(duì)操作
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; 
                        //這里表示該任務(wù)已經(jīng)分配給了其他線程,當(dāng)前線程等待喚醒就可以
                        if (leader != null)
                            available.await();
                        else {
                            //否則把給任務(wù)分配給當(dāng)前線程
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                //當(dāng)前線程等待任務(wù)剩余延遲時(shí)間
                                available.awaitNanos(delay);
                            } finally {
                                //這里線程醒來以后,什么時(shí)候leader會(huì)發(fā)生變化呢?
                                //就是上面的添加任務(wù)的時(shí)候
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                //如果隊(duì)列不為空,則喚醒其他woker線程
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

這里為什么會(huì)加入一個(gè)leader變量來分配阻塞隊(duì)列中的任務(wù)呢?原因是要減少不必要的時(shí)間等待。比如說現(xiàn)在隊(duì)列中的第一個(gè)任務(wù)1分鐘后執(zhí)行,那么用戶提交新的任務(wù)時(shí)會(huì)不斷的加入woker線程,如果新提交的任務(wù)都排在隊(duì)列后面,也就是說新的woker現(xiàn)在都會(huì)取出這第一個(gè)任務(wù)進(jìn)行執(zhí)行延遲時(shí)間的等待,當(dāng)該任務(wù)到觸發(fā)時(shí)間時(shí),會(huì)喚醒很多woker線程,這顯然是沒有必要的。

當(dāng)任務(wù)被woker線程取出以后,會(huì)執(zhí)行run方法,由于此時(shí)任務(wù)已經(jīng)被包裝成了ScheduledFutureTask對(duì)象,那我們來看下該類的run方法:

        public void run() {
            boolean periodic = isPeriodic();
            //如果當(dāng)前線程池已經(jīng)不支持執(zhí)行任務(wù),則取消
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                //如果不需要周期性執(zhí)行,則直接執(zhí)行run方法然后結(jié)束
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                //如果需要周期執(zhí)行,則在執(zhí)行完任務(wù)以后,設(shè)置下一次執(zhí)行時(shí)間
                setNextRunTime();
                //把任務(wù)重新加入延遲隊(duì)列
                reExecutePeriodic(outerTask);
            }
        }

上面就是schedule方法完整的執(zhí)行過程。

ScheduledThreadPoolExecutor類中關(guān)于周期性執(zhí)行的任務(wù)提供了兩個(gè)方法scheduleAtFixedRate跟scheduleWithFixedDelay,一起看下區(qū)別。

    public ScheduledFuture scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        //刪除不必要的邏輯,重點(diǎn)看區(qū)別
        ScheduledFutureTask sft =
            new ScheduledFutureTask(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          //二者唯一區(qū)別
                                          unit.toNanos(period));
        //...
    }

    public ScheduledFuture scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        //...
        ScheduledFutureTask sft =
            new ScheduledFutureTask(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          //二者唯一區(qū)別
                                          unit.toNanos(-delay));
       //..
    }

前者把周期延遲時(shí)間傳入ScheduledFutureTask中,而后者卻設(shè)置成負(fù)數(shù)傳入,區(qū)別在哪里呢?看下當(dāng)任務(wù)執(zhí)行完成以后的收尾工作中設(shè)置任務(wù)下次執(zhí)行時(shí)間的方法setNextRunTime源碼:

        private void setNextRunTime() {
            long p = period;
            //大于0是scheduleAtFixedRate方法,表示執(zhí)行時(shí)間是根據(jù)初始化參數(shù)計(jì)算的
            if (p > 0)
                time += p;
            else
            //小于0是scheduleWithFixedDelay方法,表示執(zhí)行時(shí)間是根據(jù)當(dāng)前時(shí)間重新計(jì)算的
                time = triggerTime(-p);
        }

也就是說當(dāng)使用scheduleAtFixedRate方法提交任務(wù)時(shí),任務(wù)后續(xù)執(zhí)行的延遲時(shí)間都已經(jīng)確定好了,分別是initialDelay,initialDelay + period,initialDelay + 2 * period以此類推。
而調(diào)用scheduleWithFixedDelay方法提交任務(wù)時(shí),第一次執(zhí)行的延遲時(shí)間為initialDelay,后面的每次執(zhí)行時(shí)間都是在前一次任務(wù)執(zhí)行完成以后的時(shí)間點(diǎn)上面加上period延遲執(zhí)行。

三、總結(jié)

ScheduledThreadPoolExecutor可以說是在ThreadPoolExecutor上面進(jìn)行了一些擴(kuò)展操作,它只是重新包裝了任務(wù)以及阻塞隊(duì)列。該類的阻塞隊(duì)列DelayedWorkQueue是基于堆去實(shí)現(xiàn)的,本文沒有太詳細(xì)介紹堆結(jié)構(gòu)插入跟刪除數(shù)據(jù)的調(diào)整工作,感興趣的同學(xué)可以私信或者評(píng)論交流。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/70622.html

相關(guān)文章

  • Java調(diào)度線程ScheduledThreadPoolExecutor源碼分析

    摘要:當(dāng)面試官問線程池時(shí),你應(yīng)該知道些什么一執(zhí)行流程與不同,向中提交任務(wù)的時(shí)候,任務(wù)被包裝成對(duì)象加入延遲隊(duì)列并啟動(dòng)一個(gè)線程。當(dāng)我們創(chuàng)建出一個(gè)調(diào)度線程池以后,就可以開始提交任務(wù)了。 最近新接手的項(xiàng)目里大量使用了ScheduledThreadPoolExecutor類去執(zhí)行一些定時(shí)任務(wù),之前一直沒有機(jī)會(huì)研究這個(gè)類的源碼,這次趁著機(jī)會(huì)好好研讀一下。 原文地址:http://www.jianshu....

    cheukyin 評(píng)論0 收藏0
  • Java調(diào)度線程ScheduledThreadPoolExecutor源碼分析

    摘要:當(dāng)面試官問線程池時(shí),你應(yīng)該知道些什么一執(zhí)行流程與不同,向中提交任務(wù)的時(shí)候,任務(wù)被包裝成對(duì)象加入延遲隊(duì)列并啟動(dòng)一個(gè)線程。當(dāng)我們創(chuàng)建出一個(gè)調(diào)度線程池以后,就可以開始提交任務(wù)了。 最近新接手的項(xiàng)目里大量使用了ScheduledThreadPoolExecutor類去執(zhí)行一些定時(shí)任務(wù),之前一直沒有機(jī)會(huì)研究這個(gè)類的源碼,這次趁著機(jī)會(huì)好好研讀一下。 原文地址:http://www.jianshu....

    myshell 評(píng)論0 收藏0
  • Java線程架構(gòu)(二)多線程調(diào)度

    摘要:在前面介紹了的多線程的基本原理信息線程池架構(gòu)原理和源碼解析,本文對(duì)這個(gè)本身的線程池的調(diào)度器做一個(gè)簡(jiǎn)單擴(kuò)展,如果還沒讀過上一篇文章,建議讀一下,因?yàn)檫@是調(diào)度器的核心組件部分。 在前面介紹了java的多線程的基本原理信息:《Java線程池架構(gòu)原理和源碼解析》,本文對(duì)這個(gè)java本身的線程池的調(diào)度器做一個(gè)簡(jiǎn)單擴(kuò)展,如果還沒讀過上一篇文章,建議讀一下,因?yàn)檫@是調(diào)度器的核心組件部分。 我們?nèi)绻?..

    SmallBoyO 評(píng)論0 收藏0
  • 深入理解Java線程

    摘要:深入理解線程池線程池初探所謂線程池,就是將多個(gè)線程放在一個(gè)池子里面所謂池化技術(shù),然后需要線程的時(shí)候不是創(chuàng)建一個(gè)線程,而是從線程池里面獲取一個(gè)可用的線程,然后執(zhí)行我們的任務(wù)。最后的的意思是需要確保線程池已經(jīng)被啟動(dòng)起來了。 深入理解Java線程池 線程池初探 ?所謂線程池,就是將多個(gè)線程放在一個(gè)池子里面(所謂池化技術(shù)),然后需要線程的時(shí)候不是創(chuàng)建一個(gè)線程,而是從線程池里面獲取一個(gè)可用的線程...

    fredshare 評(píng)論0 收藏0
  • SpringBoot中并發(fā)定時(shí)任務(wù)的實(shí)現(xiàn)、動(dòng)態(tài)定時(shí)任務(wù)的實(shí)現(xiàn)(看這一篇就夠了)

    摘要:也是自帶的一個(gè)基于線程池設(shè)計(jì)的定時(shí)任務(wù)類。其每個(gè)調(diào)度任務(wù)都會(huì)分配到線程池中的一個(gè)線程執(zhí)行,所以其任務(wù)是并發(fā)執(zhí)行的,互不影響。 原創(chuàng)不易,如需轉(zhuǎn)載,請(qǐng)注明出處https://www.cnblogs.com/baixianlong/p/10659045.html,否則將追究法律責(zé)任?。?! 一、在JAVA開發(fā)領(lǐng)域,目前可以通過以下幾種方式進(jìn)行定時(shí)任務(wù) 1、單機(jī)部署模式 Timer:jdk中...

    BWrong 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<