成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Java調度線程池ScheduledThreadPoolExecutor源碼分析

cheukyin / 1195人閱讀

摘要:當面試官問線程池時,你應該知道些什么一執(zhí)行流程與不同,向中提交任務的時候,任務被包裝成對象加入延遲隊列并啟動一個線程。當我們創(chuàng)建出一個調度線程池以后,就可以開始提交任務了。

最近新接手的項目里大量使用了ScheduledThreadPoolExecutor類去執(zhí)行一些定時任務,之前一直沒有機會研究這個類的源碼,這次趁著機會好好研讀一下。

原文地址:http://www.jianshu.com/p/18f4...

該類主要還是基于ThreadPoolExecutor類進行二次開發(fā),所以對Java線程池執(zhí)行過程還不了解的同學建議先看看我之前的文章。
當面試官問線程池時,你應該知道些什么?

一、執(zhí)行流程

與ThreadPoolExecutor不同,向ScheduledThreadPoolExecutor中提交任務的時候,任務被包裝成ScheduledFutureTask對象加入延遲隊列并啟動一個woker線程。

用戶提交的任務加入延遲隊列時,會按照執(zhí)行時間進行排列,也就是說隊列頭的任務是需要最早執(zhí)行的。而woker線程會從延遲隊列中獲取任務,如果已經(jīng)到了任務的執(zhí)行時間,則開始執(zhí)行。否則阻塞等待剩余延遲時間后再嘗試獲取任務。

任務執(zhí)行完成以后,如果該任務是一個需要周期性反復執(zhí)行的任務,則計算好下次執(zhí)行的時間后會重新加入到延遲隊列中。

二、源碼深入分析

首先看下ScheduledThreadPoolExecutor類的幾個構造函數(shù):

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }

注:這里構造函數(shù)都是使用super,其實就是ThreadPoolExecutor的構造函數(shù)
這里有三點需要注意:

使用DelayedWorkQueue作為阻塞隊列,并沒有像ThreadPoolExecutor類一樣開放給用戶進行自定義設置。該隊列是ScheduledThreadPoolExecutor類的核心組件,后面詳細介紹。

這里沒有向用戶開放maximumPoolSize的設置,原因是DelayedWorkQueue中的元素在大于初始容量16時,會進行擴容,也就是說隊列不會裝滿,maximumPoolSize參數(shù)即使設置了也不會生效。

worker線程沒有回收時間,原因跟第2點一樣,因為不會觸發(fā)回收操作。所以這里的線程存活時間都設置為0。

再次說明:上面三點的理解需要先了解ThreadPoolExecutor的知識點。

當我們創(chuàng)建出一個調度線程池以后,就可以開始提交任務了。這里依次分析一下三個常用API的源碼:

首先是schedule方法,該方法是指任務在指定延遲時間到達后觸發(fā),只會執(zhí)行一次。

    public ScheduledFuture schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        //參數(shù)校驗
        if (command == null || unit == null)
            throw new NullPointerException();
        //這里是一個嵌套結構,首先把用戶提交的任務包裝成ScheduledFutureTask
        //然后在調用decorateTask進行包裝,該方法是留給用戶去擴展的,默認是個空方法
        RunnableScheduledFuture t = decorateTask(command,
            new ScheduledFutureTask(command, null,
                                          triggerTime(delay, unit)));
        //包裝好任務以后,就進行提交了
        delayedExecute(t);
        return t;
    }

重點看一下提交任務的源碼:

    private void delayedExecute(RunnableScheduledFuture task) {
        //如果線程池已經(jīng)關閉,則使用拒絕策略把提交任務拒絕掉
        if (isShutdown())
            reject(task);
        else {
            //與ThreadPoolExecutor不同,這里直接把任務加入延遲隊列
            super.getQueue().add(task);
            //如果當前狀態(tài)無法執(zhí)行任務,則取消
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                //這里是增加一個worker線程,避免提交的任務沒有worker去執(zhí)行
                //原因就是該類沒有像ThreadPoolExecutor一樣,woker滿了才放入隊列
                ensurePrestart();
        }
    }

這里的關鍵點其實就是super.getQueue().add(task)行代碼,ScheduledThreadPoolExecutor類在內部自己實現(xiàn)了一個基于堆數(shù)據(jù)結構的延遲隊列。add方法最終會落到offer方法中,一起看下:

        public boolean offer(Runnable x) {
            //參數(shù)校驗
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture e = (RunnableScheduledFuture)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                //查看當前元素數(shù)量,如果大于隊列長度則進行擴容
                int i = size;
                if (i >= queue.length)
                    grow();
                //元素數(shù)量加1
                size = i + 1;
                //如果當前隊列還沒有元素,則直接加入頭部
                if (i == 0) {
                    queue[0] = e;
                    //記錄索引
                    setIndex(e, 0);
                } else {
                    //把任務加入堆中,并調整堆結構,這里就會根據(jù)任務的觸發(fā)時間排列
                    //把需要最早執(zhí)行的任務放在前面
                    siftUp(i, e);
                }
                //如果新加入的元素就是隊列頭,這里有兩種情況
                //1.這是用戶提交的第一個任務
                //2.新任務進行堆調整以后,排在隊列頭
                if (queue[0] == e) {
                    //這個變量起優(yōu)化作用,后面說
                    leader = null;
                    //加入元素以后,喚醒worker線程
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

通過上面的邏輯,我們把提交的任務成功加入到了延遲隊列中,前面說了加入任務以后會開啟一個woker線程,該線程的任務就是從延遲隊列中不斷取出任務執(zhí)行。這些都是跟ThreadPoolExecutor相同的,我們看下從該延遲隊列中獲取元素的源碼:

        public RunnableScheduledFuture take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    //取出隊列中第一個元素,即最早需要執(zhí)行的任務
                    RunnableScheduledFuture first = queue[0];
                    //如果隊列為空,則阻塞等待加入元素時喚醒
                    if (first == null)
                        available.await();
                    else {
                        //計算任務執(zhí)行時間,這個delay是當前時間減去任務觸發(fā)時間
                        long delay = first.getDelay(NANOSECONDS);
                        //如果到了觸發(fā)時間,則執(zhí)行出隊操作
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; 
                        //這里表示該任務已經(jīng)分配給了其他線程,當前線程等待喚醒就可以
                        if (leader != null)
                            available.await();
                        else {
                            //否則把給任務分配給當前線程
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                //當前線程等待任務剩余延遲時間
                                available.awaitNanos(delay);
                            } finally {
                                //這里線程醒來以后,什么時候leader會發(fā)生變化呢?
                                //就是上面的添加任務的時候
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                //如果隊列不為空,則喚醒其他woker線程
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

這里為什么會加入一個leader變量來分配阻塞隊列中的任務呢?原因是要減少不必要的時間等待。比如說現(xiàn)在隊列中的第一個任務1分鐘后執(zhí)行,那么用戶提交新的任務時會不斷的加入woker線程,如果新提交的任務都排在隊列后面,也就是說新的woker現(xiàn)在都會取出這第一個任務進行執(zhí)行延遲時間的等待,當該任務到觸發(fā)時間時,會喚醒很多woker線程,這顯然是沒有必要的。

當任務被woker線程取出以后,會執(zhí)行run方法,由于此時任務已經(jīng)被包裝成了ScheduledFutureTask對象,那我們來看下該類的run方法:

        public void run() {
            boolean periodic = isPeriodic();
            //如果當前線程池已經(jīng)不支持執(zhí)行任務,則取消
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                //如果不需要周期性執(zhí)行,則直接執(zhí)行run方法然后結束
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                //如果需要周期執(zhí)行,則在執(zhí)行完任務以后,設置下一次執(zhí)行時間
                setNextRunTime();
                //把任務重新加入延遲隊列
                reExecutePeriodic(outerTask);
            }
        }

上面就是schedule方法完整的執(zhí)行過程。

ScheduledThreadPoolExecutor類中關于周期性執(zhí)行的任務提供了兩個方法scheduleAtFixedRate跟scheduleWithFixedDelay,一起看下區(qū)別。

    public ScheduledFuture scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        //刪除不必要的邏輯,重點看區(qū)別
        ScheduledFutureTask sft =
            new ScheduledFutureTask(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          //二者唯一區(qū)別
                                          unit.toNanos(period));
        //...
    }

    public ScheduledFuture scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        //...
        ScheduledFutureTask sft =
            new ScheduledFutureTask(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          //二者唯一區(qū)別
                                          unit.toNanos(-delay));
       //..
    }

前者把周期延遲時間傳入ScheduledFutureTask中,而后者卻設置成負數(shù)傳入,區(qū)別在哪里呢?看下當任務執(zhí)行完成以后的收尾工作中設置任務下次執(zhí)行時間的方法setNextRunTime源碼:

        private void setNextRunTime() {
            long p = period;
            //大于0是scheduleAtFixedRate方法,表示執(zhí)行時間是根據(jù)初始化參數(shù)計算的
            if (p > 0)
                time += p;
            else
            //小于0是scheduleWithFixedDelay方法,表示執(zhí)行時間是根據(jù)當前時間重新計算的
                time = triggerTime(-p);
        }

也就是說當使用scheduleAtFixedRate方法提交任務時,任務后續(xù)執(zhí)行的延遲時間都已經(jīng)確定好了,分別是initialDelay,initialDelay + period,initialDelay + 2 * period以此類推。
而調用scheduleWithFixedDelay方法提交任務時,第一次執(zhí)行的延遲時間為initialDelay,后面的每次執(zhí)行時間都是在前一次任務執(zhí)行完成以后的時間點上面加上period延遲執(zhí)行。

三、總結

ScheduledThreadPoolExecutor可以說是在ThreadPoolExecutor上面進行了一些擴展操作,它只是重新包裝了任務以及阻塞隊列。該類的阻塞隊列DelayedWorkQueue是基于堆去實現(xiàn)的,本文沒有太詳細介紹堆結構插入跟刪除數(shù)據(jù)的調整工作,感興趣的同學可以私信或者評論交流。

文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉載請注明本文地址:http://systransis.cn/yun/67849.html

相關文章

  • Java調度線程ScheduledThreadPoolExecutor源碼分析

    摘要:當面試官問線程池時,你應該知道些什么一執(zhí)行流程與不同,向中提交任務的時候,任務被包裝成對象加入延遲隊列并啟動一個線程。當我們創(chuàng)建出一個調度線程池以后,就可以開始提交任務了。 最近新接手的項目里大量使用了ScheduledThreadPoolExecutor類去執(zhí)行一些定時任務,之前一直沒有機會研究這個類的源碼,這次趁著機會好好研讀一下。 原文地址:http://www.jianshu....

    kohoh_ 評論0 收藏0
  • Java調度線程ScheduledThreadPoolExecutor源碼分析

    摘要:當面試官問線程池時,你應該知道些什么一執(zhí)行流程與不同,向中提交任務的時候,任務被包裝成對象加入延遲隊列并啟動一個線程。當我們創(chuàng)建出一個調度線程池以后,就可以開始提交任務了。 最近新接手的項目里大量使用了ScheduledThreadPoolExecutor類去執(zhí)行一些定時任務,之前一直沒有機會研究這個類的源碼,這次趁著機會好好研讀一下。 原文地址:http://www.jianshu....

    myshell 評論0 收藏0
  • Java線程架構(二)多線程調度

    摘要:在前面介紹了的多線程的基本原理信息線程池架構原理和源碼解析,本文對這個本身的線程池的調度器做一個簡單擴展,如果還沒讀過上一篇文章,建議讀一下,因為這是調度器的核心組件部分。 在前面介紹了java的多線程的基本原理信息:《Java線程池架構原理和源碼解析》,本文對這個java本身的線程池的調度器做一個簡單擴展,如果還沒讀過上一篇文章,建議讀一下,因為這是調度器的核心組件部分。 我們如果...

    SmallBoyO 評論0 收藏0
  • 深入理解Java線程

    摘要:深入理解線程池線程池初探所謂線程池,就是將多個線程放在一個池子里面所謂池化技術,然后需要線程的時候不是創(chuàng)建一個線程,而是從線程池里面獲取一個可用的線程,然后執(zhí)行我們的任務。最后的的意思是需要確保線程池已經(jīng)被啟動起來了。 深入理解Java線程池 線程池初探 ?所謂線程池,就是將多個線程放在一個池子里面(所謂池化技術),然后需要線程的時候不是創(chuàng)建一個線程,而是從線程池里面獲取一個可用的線程...

    fredshare 評論0 收藏0
  • SpringBoot中并發(fā)定時任務的實現(xiàn)、動態(tài)定時任務的實現(xiàn)(看這一篇就夠了)

    摘要:也是自帶的一個基于線程池設計的定時任務類。其每個調度任務都會分配到線程池中的一個線程執(zhí)行,所以其任務是并發(fā)執(zhí)行的,互不影響。 原創(chuàng)不易,如需轉載,請注明出處https://www.cnblogs.com/baixianlong/p/10659045.html,否則將追究法律責任?。?! 一、在JAVA開發(fā)領域,目前可以通過以下幾種方式進行定時任務 1、單機部署模式 Timer:jdk中...

    BWrong 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<