成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

ScheduledThreadPoolExecutor詳解

546669204 / 3036人閱讀

摘要:該方法傳入表示如果當(dāng)前任務(wù)正在執(zhí)行,那么立即終止其執(zhí)行傳入表示如果當(dāng)前方法正在執(zhí)行,那么等待其執(zhí)行完成之后再取消當(dāng)前任務(wù)。

???????本文主要分為兩個(gè)部分,第一部分首先會(huì)對ScheduledThreadPoolExecutor進(jìn)行簡單的介紹,并且會(huì)介紹其主要API的使用方式,然后介紹了其使用時(shí)的注意點(diǎn),第二部分則主要對ScheduledThreadPoolExecutor的實(shí)現(xiàn)細(xì)節(jié)進(jìn)行介紹。

1. 使用簡介

???????ScheduledThreadPoolExecutor是一個(gè)使用線程池執(zhí)行定時(shí)任務(wù)的類,相較于Java中提供的另一個(gè)執(zhí)行定時(shí)任務(wù)的類Timer,其主要有如下兩個(gè)優(yōu)點(diǎn):

使用多線程執(zhí)行任務(wù),不用擔(dān)心任務(wù)執(zhí)行時(shí)間過長而導(dǎo)致任務(wù)相互阻塞的情況,Timer是單線程執(zhí)行的,因而會(huì)出現(xiàn)這個(gè)問題;

不用擔(dān)心任務(wù)執(zhí)行過程中,如果線程失活,其會(huì)新建線程執(zhí)行任務(wù),Timer類的單線程掛掉之后是不會(huì)重新創(chuàng)建線程執(zhí)行后續(xù)任務(wù)的。

???????除去上述兩個(gè)優(yōu)點(diǎn)外,ScheduledThreadPoolExecutor還提供了非常靈活的API,用于執(zhí)行任務(wù)。其任務(wù)的執(zhí)行策略主要分為兩大類:①在一定延遲之后只執(zhí)行一次某個(gè)任務(wù);②在一定延遲之后周期性的執(zhí)行某個(gè)任務(wù)。如下是其主要API:

public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit);
public  ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit);
public ScheduledFuture scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay, long delay, TimeUnit unit);
public ScheduledFuture scheduleAtFixedRate(Runnable command,
                                                  long initialDelay, long period, TimeUnit unit);

???????上述四個(gè)方法中,第一個(gè)和第二個(gè)方法屬于第一類,即在delay指定的延遲之后執(zhí)行第一個(gè)參數(shù)所指定的任務(wù),區(qū)別在于,第二個(gè)方法執(zhí)行之后會(huì)有返回值,而第一個(gè)方法執(zhí)行之后是沒有返回值的。第三個(gè)和第四個(gè)方法則屬于第二類,即在第二個(gè)參數(shù)(initialDelay)指定的時(shí)間之后開始周期性的執(zhí)行任務(wù),執(zhí)行周期間隔為第三個(gè)參數(shù)指定的時(shí)間,但是這兩個(gè)方法的區(qū)別在于第三個(gè)方法執(zhí)行任務(wù)的間隔是固定的,無論上一個(gè)任務(wù)是否執(zhí)行完成,而第四個(gè)方法的執(zhí)行時(shí)間間隔是不固定的,其會(huì)在周期任務(wù)的上一個(gè)任務(wù)執(zhí)行完成之后才開始計(jì)時(shí),并在指定時(shí)間間隔之后才開始執(zhí)行任務(wù)。如下是使用scheduleWithFixedDelay()和scheduleAtFixedRate()方法編寫的測試用例:

public class ScheduledThreadPoolExecutorTest {
  private ScheduledThreadPoolExecutor executor;
  private Runnable task;
  
  @Before
  public void before() {
    executor = initExecutor();
    task = initTask();
  }
  
  private ScheduledThreadPoolExecutor initExecutor() {
    return new ScheduledThreadPoolExecutor(2);;
  }
  
  private Runnable initTask() {
    long start = System.currentTimeMillis();
    return () -> {
      print("start task: " + getPeriod(start, System.currentTimeMillis()));
      sleep(SECONDS, 10);
      print("end task: " + getPeriod(start, System.currentTimeMillis()));
    };
  }
  
  @Test
  public void testFixedTask() {
    print("start main thread");
    executor.scheduleAtFixedRate(task, 15, 30, SECONDS);
    sleep(SECONDS, 120);
    print("end main thread");
  }
  
  @Test
  public void testDelayedTask() {
    print("start main thread");
    executor.scheduleWithFixedDelay(task, 15, 30, SECONDS);
    sleep(SECONDS, 120);
    print("end main thread");
  }

  private void sleep(TimeUnit unit, long time) {
    try {
      unit.sleep(time);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  private int getPeriod(long start, long end) {
    return (int)(end - start) / 1000;
  }

  private void print(String msg) {
    System.out.println(msg);
  }
}

???????可以看到,上述兩個(gè)測試用例代碼塊基本是一致的,區(qū)別在于第一個(gè)用例調(diào)用的是scheduleAtFixedRate()方法,而第二個(gè)用例調(diào)用的是scheduleWithFixedDelay()。這里兩個(gè)用例都是設(shè)置的在延遲15s后每個(gè)30s執(zhí)行一次指定的任務(wù),而該任務(wù)執(zhí)行時(shí)長為10s。如下分別是這兩個(gè)測試用例的執(zhí)行結(jié)果:

start main thread
start task: 15
end task: 25
start task: 45
end task: 55
start task: 75
end task: 85
start task: 105
end task: 115
end main thread
start main thread
start task: 15
end task: 25
start task: 55
end task: 65
start task: 95
end task: 105
end main thread

??????對比上述執(zhí)行結(jié)果可以看出,對于scheduleAtFixedRate()方法,其每次執(zhí)行任務(wù)的開始時(shí)間間隔都為固定不變的30s,與任務(wù)執(zhí)行時(shí)長無關(guān),而對于scheduleWithFixedDelay()方法,其每次執(zhí)行任務(wù)的開始時(shí)間間隔都為上次任務(wù)執(zhí)行時(shí)間加上指定的時(shí)間間隔。

???????這里關(guān)于ScheduledThreadPoolExecutor的使用有三點(diǎn)需要說明如下:

ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor(ThreadPoolExecutor詳解),因而也有繼承而來的execute()和submit()方法,但是ScheduledThreadPoolExecutor重寫了這兩個(gè)方法,重寫的方式是直接創(chuàng)建兩個(gè)立即執(zhí)行并且只執(zhí)行一次的任務(wù);

ScheduledThreadPoolExecutor使用ScheduledFutureTask封裝每個(gè)需要執(zhí)行的任務(wù),而任務(wù)都是放入DelayedWorkQueue隊(duì)列中的,該隊(duì)列是一個(gè)使用數(shù)組實(shí)現(xiàn)的優(yōu)先隊(duì)列,在調(diào)用ScheduledFutureTask::cancel()方法時(shí),其會(huì)根據(jù)removeOnCancel變量的設(shè)置來確認(rèn)是否需要將當(dāng)前任務(wù)真正的從隊(duì)列中移除,而不只是標(biāo)識(shí)其為已刪除狀態(tài);

ScheduledThreadPoolExecutor提供了一個(gè)鉤子方法decorateTask(Runnable, RunnableScheduledFuture)用于對執(zhí)行的任務(wù)進(jìn)行裝飾,該方法第一個(gè)參數(shù)是調(diào)用方傳入的任務(wù)實(shí)例,第二個(gè)參數(shù)則是使用ScheduledFutureTask對用戶傳入任務(wù)實(shí)例進(jìn)行封裝之后的實(shí)例。這里需要注意的是,在ScheduledFutureTask對象中有一個(gè)heapIndex變量,該變量用于記錄當(dāng)前實(shí)例處于隊(duì)列數(shù)組中的下標(biāo)位置,該變量可以將諸如contains(),remove()等方法的時(shí)間復(fù)雜度從O(N)降低到O(logN),因而效率提升是比較高的,但是如果這里用戶重寫decorateTask()方法封裝了隊(duì)列中的任務(wù)實(shí)例,那么heapIndex的優(yōu)化就不存在了,因而這里強(qiáng)烈建議是盡量不要重寫該方法,或者重寫時(shí)也還是復(fù)用ScheduledFutureTask類。

2. 源碼詳解 2.1 主要屬性

???????ScheduledThreadPoolExecutor主要有四個(gè)屬性,分別如下:

private volatile boolean continueExistingPeriodicTasksAfterShutdown;

private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

private volatile boolean removeOnCancel = false;

private static final AtomicLong sequencer = new AtomicLong();

continueExistingPeriodicTasksAfterShutdown:用于標(biāo)識(shí)當(dāng)前Executor對象shutdown時(shí),是否繼續(xù)執(zhí)行已經(jīng)存在于任務(wù)隊(duì)列中的定時(shí)任務(wù)(調(diào)用scheduleAtFixedRate()方法生成的任務(wù));

executeExistingDelayedTasksAfterShutdown:用于標(biāo)識(shí)當(dāng)前Executor對象shutdown時(shí),是否繼續(xù)執(zhí)行已經(jīng)存在于任務(wù)隊(duì)列中的定時(shí)任務(wù)(調(diào)用scheduleWithFixedDelay()方法生成的任務(wù));

removeOnCancel:用于標(biāo)識(shí)如果當(dāng)前任務(wù)已經(jīng)取消了,是否將其從任務(wù)隊(duì)列中真正的移除,而不只是標(biāo)識(shí)其為刪除狀態(tài);

sequencer:其為一個(gè)AtomicLong類型的變量,該變量記錄了當(dāng)前任務(wù)被創(chuàng)建時(shí)是第幾個(gè)任務(wù)的一個(gè)序號(hào),這個(gè)序號(hào)的主要用于確認(rèn)當(dāng)兩個(gè)任務(wù)開始執(zhí)行時(shí)間相同時(shí)具體哪個(gè)任務(wù)先執(zhí)行,比如兩個(gè)任務(wù)的開始執(zhí)行時(shí)間都為1515847881158,那么序號(hào)小的任務(wù)將先執(zhí)行。

2.2 ScheduledFutureTask

???????在ScheduledThreadPoolExecutor中,主要使用ScheduledFutureTask封裝需要執(zhí)行的任務(wù),該類的主要聲明如下:

private class ScheduledFutureTask extends FutureTask implements RunnableScheduledFuture {

  private final long sequenceNumber;    // 記錄當(dāng)前實(shí)例的序列號(hào)
  private long time;    // 記錄當(dāng)前任務(wù)下次開始執(zhí)行的時(shí)間
  
  // 記錄當(dāng)前任務(wù)執(zhí)行時(shí)間間隔,等于0則表示當(dāng)前任務(wù)只執(zhí)行一次,大于0表示當(dāng)前任務(wù)為fixedRate類型的任務(wù),
  // 小于0則表示其為fixedDelay類型的任務(wù)
  private final long period;

  RunnableScheduledFuture outerTask = this;    // 記錄需要周期性執(zhí)行的任務(wù)的實(shí)例
  int heapIndex;    // 記錄當(dāng)前任務(wù)在隊(duì)列數(shù)組中位置的下標(biāo)

  ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;
    this.period = period;
    this.sequenceNumber = sequencer.getAndIncrement();    // 序號(hào)在創(chuàng)建任務(wù)實(shí)例時(shí)指定,且后續(xù)不會(huì)變化
  }

  public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), NANOSECONDS);
  }

  // 各個(gè)任務(wù)在隊(duì)列中的存儲(chǔ)方式是一個(gè)基于時(shí)間和序號(hào)進(jìn)行比較的優(yōu)先隊(duì)列,當(dāng)前方法定義了優(yōu)先隊(duì)列中兩個(gè)
  // 任務(wù)執(zhí)行的先后順序。這里先對兩個(gè)任務(wù)開始執(zhí)行時(shí)間進(jìn)行比較,時(shí)間較小者優(yōu)先執(zhí)行,若開始時(shí)間相同,
  // 則比較兩個(gè)任務(wù)的序號(hào),序號(hào)小的任務(wù)先執(zhí)行
  public int compareTo(Delayed other) {
    if (other == this)
      return 0;
    if (other instanceof ScheduledFutureTask) {
      ScheduledFutureTask x = (ScheduledFutureTask)other;
      long diff = time - x.time;
      if (diff < 0)
        return -1;
      else if (diff > 0)
        return 1;
      else if (sequenceNumber < x.sequenceNumber)
        return -1;
      else
        return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
  }

  public boolean isPeriodic() {    // 判斷是否為周期性任務(wù)
    return period != 0;
  }

  // 當(dāng)前任務(wù)執(zhí)行之后,會(huì)判斷當(dāng)前任務(wù)是否為周期性任務(wù),如果為周期性任務(wù),那么就調(diào)用當(dāng)前方法計(jì)算
  // 當(dāng)前任務(wù)下次開始執(zhí)行的時(shí)間。這里如果當(dāng)前任務(wù)是fixedRate類型的任務(wù)(p > 0),那么下次執(zhí)行時(shí)間
  // 就是此次執(zhí)行的開始時(shí)間加上時(shí)間間隔,如果當(dāng)前任務(wù)是fixedDelay類型的任務(wù)(p < 0),那么下次執(zhí)行
  // 時(shí)間就是當(dāng)前時(shí)間(triggerTime()方法會(huì)獲取系統(tǒng)當(dāng)前時(shí)間)加上任務(wù)執(zhí)行時(shí)間間隔。可以看到,定頻率
  // 和定延遲的任務(wù)的執(zhí)行時(shí)間區(qū)別就在當(dāng)前方法中進(jìn)行了指定,因?yàn)檎{(diào)用當(dāng)前方法時(shí)任務(wù)已經(jīng)執(zhí)行完成了,
  // 因而triggerTime()方法中獲取的時(shí)間就是任務(wù)執(zhí)行完成之后的時(shí)間點(diǎn)
  private void setNextRunTime() {
    long p = period;
    if (p > 0)
      time += p;
    else
      time = triggerTime(-p);
  }

  // 取消當(dāng)前任務(wù)的執(zhí)行,super.cancel(boolean)方法也即FutureTask.cancel(boolean)方法。該方法傳入
  // true表示如果當(dāng)前任務(wù)正在執(zhí)行,那么立即終止其執(zhí)行;傳入false表示如果當(dāng)前方法正在執(zhí)行,那么等待其
  // 執(zhí)行完成之后再取消當(dāng)前任務(wù)。
  public boolean cancel(boolean mayInterruptIfRunning) {
    boolean cancelled = super.cancel(mayInterruptIfRunning);
    // 判斷是否設(shè)置了取消后移除隊(duì)列中當(dāng)前任務(wù),是則移除當(dāng)前任務(wù)
    if (cancelled && removeOnCancel && heapIndex >= 0)    
      remove(this);
    return cancelled;
  }

  public void run() {
    boolean periodic = isPeriodic();    // 判斷是否為周期性任務(wù)
    if (!canRunInCurrentRunState(periodic))    // 判斷是否能夠在當(dāng)前狀態(tài)下執(zhí)行該任務(wù)
      cancel(false);
    else if (!periodic)    // 如果能執(zhí)行當(dāng)前任務(wù),但是任務(wù)不是周期性的,那么就立即執(zhí)行該任務(wù)一次
      ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {    // 是周期性任務(wù),則立即執(zhí)行當(dāng)前任務(wù)并且重置
      setNextRunTime();    // 在當(dāng)前任務(wù)執(zhí)行完成后調(diào)用該方法計(jì)算當(dāng)前任務(wù)下次執(zhí)行的時(shí)間
      reExecutePeriodic(outerTask);    // 將當(dāng)前任務(wù)放入任務(wù)隊(duì)列中以便下次執(zhí)行
    }
  }
}

???????在ScheduledFutureTask中,主要有三個(gè)點(diǎn)需要強(qiáng)調(diào):

對于run()方法的第一個(gè)分支,canRunInCurrentRunState()方法的聲明如下所示,可以看到,該方法是用于判斷當(dāng)前任務(wù)如果為周期性任務(wù),那么其是否允許在shutdown狀態(tài)下繼續(xù)執(zhí)行已經(jīng)存在的周期性任務(wù),是則表示當(dāng)前狀態(tài)下是可以執(zhí)行當(dāng)前任務(wù)的,這里isRunningOrShutdown()方法繼承自ThreadPoolExecutor;

boolean canRunInCurrentRunState(boolean periodic) {
  return isRunningOrShutdown(periodic ?
                             continueExistingPeriodicTasksAfterShutdown :
                             executeExistingDelayedTasksAfterShutdown);
}

在run()方法的最后一個(gè)if分支中,其首先會(huì)執(zhí)行當(dāng)前任務(wù),在執(zhí)行完成時(shí)才會(huì)調(diào)用setNextRunTime()方法設(shè)置下次任務(wù)執(zhí)行時(shí)間,也就是說對于fixedRate和fixedDelay類型的任務(wù)都是在這個(gè)時(shí)間點(diǎn)才設(shè)置的,因而雖然fixedRate類型的任務(wù),即使該任務(wù)下次執(zhí)行時(shí)間比當(dāng)前時(shí)間要早,其也只會(huì)在當(dāng)前任務(wù)執(zhí)行完成后立即執(zhí)行,而不會(huì)與當(dāng)前任務(wù)還未執(zhí)行完時(shí)就執(zhí)行;對于fixedDelay任務(wù)則不會(huì)存在該問題,因?yàn)槠涫且匀蝿?wù)完成后的時(shí)間點(diǎn)為基礎(chǔ)計(jì)算下次執(zhí)行的時(shí)間點(diǎn);

對于run()方法的最后一個(gè)分支中的reExecutePeriodic()方法,其會(huì)將當(dāng)前任務(wù)加入到任務(wù)隊(duì)列中,并且調(diào)用父類的ensurePrestart()方法確保有可用的線程來執(zhí)行當(dāng)前任務(wù),如下是該方法的具體實(shí)現(xiàn):

void reExecutePeriodic(RunnableScheduledFuture task) {
  if (canRunInCurrentRunState(true)) {    // 判斷當(dāng)前任務(wù)是否可以繼續(xù)執(zhí)行
    super.getQueue().add(task);    // 將當(dāng)前任務(wù)加入到任務(wù)隊(duì)列中
    if (!canRunInCurrentRunState(true) && remove(task))    // 雙檢查法判斷任務(wù)在加入過程中是否取消了
      task.cancel(false);
    else
      ensurePrestart();    // 初始化核心線程等確保任務(wù)可以被執(zhí)行
  }
}

???????從ScheduledFutureTask的實(shí)現(xiàn)總結(jié)來看,當(dāng)每創(chuàng)建一個(gè)該類實(shí)例時(shí),會(huì)初始化該類的一些主要屬性,如下次開始執(zhí)行的時(shí)間和執(zhí)行的周期。當(dāng)某個(gè)線程調(diào)用該任務(wù),即執(zhí)行該任務(wù)的run()方法時(shí),如果該任務(wù)不為周期性任務(wù),那么執(zhí)行該任務(wù)之后就不會(huì)有其余的動(dòng)作,如果該任務(wù)為周期性任務(wù),那么在將當(dāng)前任務(wù)執(zhí)行完畢之后,還會(huì)重置當(dāng)前任務(wù)的狀態(tài),并且計(jì)算下次執(zhí)行當(dāng)前任務(wù)的時(shí)間,然后將其放入隊(duì)列中以便下次執(zhí)行。

2.3 DelayedWorkQueue

???????DelayedWorkQueue的實(shí)現(xiàn)與DelayQueue以及PriorityQueue的實(shí)現(xiàn)基本相似,形式都為一個(gè)優(yōu)先隊(duì)列,并且底層是使用堆結(jié)構(gòu)來實(shí)現(xiàn)優(yōu)先隊(duì)列的功能,在數(shù)據(jù)存儲(chǔ)方式上,其使用的是數(shù)組來實(shí)現(xiàn)。這里DelayedWorkQueue與DelayQueue以及PriorityQueue不同的點(diǎn)在于DelayedWorkQueue中主要存儲(chǔ)ScheduledFutureTask類型的任務(wù),該任務(wù)中有一個(gè)heapIndex屬性保存了當(dāng)前任務(wù)在當(dāng)前隊(duì)列數(shù)組中的位置下標(biāo),其主要提升的是對隊(duì)列的諸如contains()和remove()等需要定位當(dāng)前任務(wù)位置的方法的效率,時(shí)間復(fù)雜度可以從O(N)提升到O(logN)。如下是DelayedWorkQueue的實(shí)現(xiàn)代碼(這里只列出了該類的主要屬性和與實(shí)現(xiàn)ScheduledThreadPoolExecutor功能相關(guān)的方法,關(guān)于如何使用數(shù)組實(shí)現(xiàn)優(yōu)先隊(duì)列請讀者查閱相關(guān)文檔):

static class DelayedWorkQueue extends AbstractQueue implements BlockingQueue {

  private static final int INITIAL_CAPACITY = 16;    // 數(shù)組初始化大小
  private RunnableScheduledFuture[] queue = new RunnableScheduledFuture[INITIAL_CAPACITY];
  private final ReentrantLock lock = new ReentrantLock();    // 對添加和刪除元素所使用的鎖
  private int size = 0;    // 當(dāng)前隊(duì)列中有效任務(wù)的個(gè)數(shù)

  private Thread leader = null;    // 執(zhí)行隊(duì)列頭部任務(wù)的線程
  private final Condition available = lock.newCondition();    // 除leader線程外其余線程的等待隊(duì)列

  // 在對任務(wù)進(jìn)行移動(dòng)時(shí),判斷其是否為ScheduledFutureTask實(shí)例,如果是則維護(hù)其heapIndex屬性
  private void setIndex(RunnableScheduledFuture f, int idx) {
    if (f instanceof ScheduledFutureTask)
      ((ScheduledFutureTask)f).heapIndex = idx;
  }

  private void siftUp(int k, RunnableScheduledFuture key) {/* 省略 */}

  private void siftDown(int k, RunnableScheduledFuture key) {/* 省略 */}

  private int indexOf(Object x) {
    if (x != null) {
      if (x instanceof ScheduledFutureTask) {    // 如果為ScheduledFutureTask則可返回其heapIndex屬性
        int i = ((ScheduledFutureTask) x).heapIndex;
        if (i >= 0 && i < size && queue[i] == x)
          return i;
      } else {    // 如果不為ScheduledFutureTask實(shí)例,則需要遍歷隊(duì)列查詢當(dāng)前元素的位置
        for (int i = 0; i < size; i++)
          if (x.equals(queue[i]))
            return i;
      }
    }
    return -1;
  }

  public boolean offer(Runnable x) {
    if (x == null)
      throw new NullPointerException();
    RunnableScheduledFuture e = (RunnableScheduledFuture)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      int i = size;
      if (i >= queue.length)
        grow();    // 隊(duì)列容量不足,對其進(jìn)行擴(kuò)容
      size = i + 1;
      if (i == 0) {    // 如果其為隊(duì)列第一個(gè)元素,則將其放入隊(duì)列頭部
        queue[0] = e;
        setIndex(e, 0);
      } else {    //如果不為第一個(gè)元素,則通過堆的上移元素操作移動(dòng)當(dāng)前元素至合適的位置
        siftUp(i, e);
      }
      if (queue[0] == e) {    // 如果被更新的是隊(duì)列頭部元素,則更新記錄的執(zhí)行頭部任務(wù)的線程
        leader = null;
        available.signal();
      }
    } finally {
      lock.unlock();
    }
    return true;
  }

  // 完成從隊(duì)列拉取元素操作,并且將其從隊(duì)列中移除
  private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
    int s = --size;
    RunnableScheduledFuture x = queue[s];
    queue[s] = null;    // 將隊(duì)列最尾部的元素置空
    if (s != 0)    // 將最后一個(gè)元素放入第一個(gè)位置,并且將其下推至合適的位置
      siftDown(0, x);    // 這里idx置為0是因?yàn)楫?dāng)前方法的入?yún)都為隊(duì)列的第一個(gè)元素
    setIndex(f, -1);
    return f;
  }

  // 嘗試從隊(duì)列(堆)中獲取元素,如果沒有元素或者元素的延遲時(shí)間還未到則返回空
  public RunnableScheduledFuture poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      RunnableScheduledFuture first = queue[0];
      // 在此處代碼控制了當(dāng)從堆頂拉取元素時(shí),如果元素的延遲時(shí)間還未達(dá)到,則不返回當(dāng)前元素
      if (first == null || first.getDelay(NANOSECONDS) > 0)
        return null;
      else
        return finishPoll(first);    // 返回堆頂元素
    } finally {
      lock.unlock();
    }
  }

  // 通過無限for循環(huán)獲取堆頂?shù)脑兀@里take()方法會(huì)阻塞當(dāng)前線程,直至獲取到了可執(zhí)行的任務(wù)。
  // 可以看到,在第一次for循環(huán)中,如果堆頂不存在任務(wù),則其會(huì)加入阻塞隊(duì)列中,如果存在任務(wù),但是
  // 其延遲時(shí)間還未到,那么當(dāng)前線程會(huì)等待該延遲時(shí)間長的時(shí)間,然后查看任務(wù)是否可用,當(dāng)獲取到任務(wù)
  // 之后,其會(huì)將其從隊(duì)列中移除,并且喚醒等待隊(duì)列中其余等待的線程執(zhí)行下一個(gè)任務(wù)
  public RunnableScheduledFuture take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
      for (;;) {
        RunnableScheduledFuture first = queue[0];
        if (first == null)
          available.await();    // 堆內(nèi)沒有元素,當(dāng)前線程進(jìn)入等待隊(duì)列中
        else {
          long delay = first.getDelay(NANOSECONDS);
          if (delay <= 0)    // 堆頂元素延遲時(shí)間小于0,可立即獲取任務(wù)
            return finishPoll(first);
          first = null;
          if (leader != null)
            available.await();    // 已經(jīng)有線程在等待堆頂元素,則當(dāng)前線程進(jìn)入等待隊(duì)列中
          else {
            Thread thisThread = Thread.currentThread();
            leader = thisThread;
            try {
              available.awaitNanos(delay);    // 當(dāng)前線程等待一定時(shí)長后獲取任務(wù)并執(zhí)行
            } finally {
              if (leader == thisThread)
                leader = null;
            }
          }
        }
      }
    } finally {
      if (leader == null && queue[0] != null)
        available.signal();    // 當(dāng)前線程獲取完任務(wù)之后喚醒等待隊(duì)列中的下一個(gè)線程執(zhí)行下一個(gè)任務(wù)
      lock.unlock();
    }
  }
}

???????從DelayedWorkQueue的take()和poll()方法可以看出來,對于隊(duì)列中任務(wù)的等待時(shí)間的限制主要是在這兩個(gè)方法中實(shí)現(xiàn)的,如果任務(wù)的等待時(shí)間還未到,那么該方法就會(huì)阻塞線程池中的線程,直至任務(wù)可以執(zhí)行。

2.4 scheduleAtFixedRate()和scheduleWithFixedDelay()方法

???????前面我們對ScheduledThreadPoolExecutor的主要屬性和主要內(nèi)部類都進(jìn)行了詳細(xì)的講解,基本上已經(jīng)可以看出其是如何實(shí)現(xiàn)定時(shí)執(zhí)行任務(wù)的功能的,接下來我們主要對客戶端可以調(diào)用的主要方法進(jìn)行簡要介紹,這里scheduleAtFixedRate()和scheduleWithFixedDelay()方法的實(shí)現(xiàn)基本是一致的,兩個(gè)方法最細(xì)微的區(qū)別在于ScheduledFutureTask的setNextRunTime()方法的實(shí)現(xiàn),該方法的實(shí)現(xiàn)前面已經(jīng)進(jìn)行了講解,我們這里則以scheduleAtFixedRate()方法的實(shí)現(xiàn)為例對該方法進(jìn)行講解。如下是該方法的具體實(shí)現(xiàn):

public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, 
                                              long period, TimeUnit unit) {
  if (command == null || unit == null)
    throw new NullPointerException();
  if (period <= 0)
    throw new IllegalArgumentException();
  ScheduledFutureTask sft =    // 封裝客戶端的任務(wù)實(shí)例
    new ScheduledFutureTask(command, null, 
                                  triggerTime(initialDelay, unit),unit.toNanos(period));
  RunnableScheduledFuture t = decorateTask(command, sft);    // 對客戶端任務(wù)實(shí)例進(jìn)行裝飾
  sft.outerTask = t;    // 初始化周期任務(wù)屬性outerTask
  delayedExecute(t);    // 執(zhí)行該任務(wù)
  return t;
}

???????從上述代碼可以看出來,scheduleAtFixedRate()首先對客戶端任務(wù)實(shí)例進(jìn)行了封裝,裝飾,并且初始化了封裝后的任務(wù)實(shí)例的outerTask屬性,最后調(diào)用delayedExecute()方法執(zhí)行任務(wù)。如下是delayedExecute()方法的實(shí)現(xiàn):

private void delayedExecute(RunnableScheduledFuture task) {
  if (isShutdown())
    reject(task);
  else {
    super.getQueue().add(task);    // 添加當(dāng)前任務(wù)到任務(wù)隊(duì)列中
    if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
      task.cancel(false);    // 雙檢查法再次判斷當(dāng)前線程池是否處于可用狀態(tài),不是則移除當(dāng)前任務(wù)
    else
      ensurePrestart();    // 若線程池沒有初始化,則進(jìn)行一些初始化工作
  }
}

???????上述方法為主要的執(zhí)行任務(wù)的方法,該方法首先會(huì)將任務(wù)加入到任務(wù)隊(duì)列中,如果線程池已經(jīng)初始化過,那么該任務(wù)就會(huì)有等待的線程執(zhí)行該任務(wù)。在加入到任務(wù)隊(duì)列之后通過雙檢查法檢查線程池是否已經(jīng)shutdown了,如果是則將該任務(wù)從任務(wù)隊(duì)列中移除。如果當(dāng)前線程池沒有shutdown,就調(diào)用繼承自ThreadPoolExecutor的ensurePrestart()方法,該方法會(huì)對線程池進(jìn)行一些初始化工作,如初始化核心線程,然后各個(gè)線程會(huì)調(diào)用上述等待隊(duì)列的take()方法獲取任務(wù)執(zhí)行。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/76304.html

相關(guān)文章

  • ScheduledThreadPoolExecutor詳解

    摘要:該方法傳入表示如果當(dāng)前任務(wù)正在執(zhí)行,那么立即終止其執(zhí)行傳入表示如果當(dāng)前方法正在執(zhí)行,那么等待其執(zhí)行完成之后再取消當(dāng)前任務(wù)。 ???????本文主要分為兩個(gè)部分,第一部分首先會(huì)對ScheduledThreadPoolExecutor進(jìn)行簡單的介紹,并且會(huì)介紹其主要API的使用方式,然后介紹了其使用時(shí)的注意點(diǎn),第二部分則主要對ScheduledThreadPoolExecutor的實(shí)現(xiàn)細(xì)節(jié)...

    wangshijun 評(píng)論0 收藏0
  • ScheduledThreadPoolExecutor詳解

    摘要:該方法傳入表示如果當(dāng)前任務(wù)正在執(zhí)行,那么立即終止其執(zhí)行傳入表示如果當(dāng)前方法正在執(zhí)行,那么等待其執(zhí)行完成之后再取消當(dāng)前任務(wù)。 ???????本文主要分為兩個(gè)部分,第一部分首先會(huì)對ScheduledThreadPoolExecutor進(jìn)行簡單的介紹,并且會(huì)介紹其主要API的使用方式,然后介紹了其使用時(shí)的注意點(diǎn),第二部分則主要對ScheduledThreadPoolExecutor的實(shí)現(xiàn)細(xì)節(jié)...

    gself 評(píng)論0 收藏0
  • Java多線程學(xué)習(xí)(八)線程池與Executor 框架

    摘要:一使用線程池的好處線程池提供了一種限制和管理資源包括執(zhí)行一個(gè)任務(wù)。每個(gè)線程池還維護(hù)一些基本統(tǒng)計(jì)信息,例如已完成任務(wù)的數(shù)量。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。使用無界隊(duì)列作為線程池的工作隊(duì)列會(huì)對線程池帶來的影響與相同。 歷史優(yōu)質(zhì)文章推薦: Java并發(fā)編程指南專欄 分布式系統(tǒng)的經(jīng)典基礎(chǔ)理論 可能是最漂亮的Spring事務(wù)管理詳解 面試中關(guān)于Java虛擬機(jī)(jvm)的問...

    cheng10 評(píng)論0 收藏0
  • Java中的線程池

    摘要:中的線程池運(yùn)用場景非常廣泛,幾乎所有的一步或者并發(fā)執(zhí)行程序都可以使用。代碼中如果執(zhí)行了方法,線程池會(huì)提前創(chuàng)建并啟動(dòng)所有核心線程。線程池最大數(shù)量線程池允許創(chuàng)建的線程最大數(shù)量。被稱為是可重用固定線程數(shù)的線程池。 Java中的線程池運(yùn)用場景非常廣泛,幾乎所有的一步或者并發(fā)執(zhí)行程序都可以使用。那么線程池有什么好處呢,以及他的實(shí)現(xiàn)原理是怎么樣的呢? 使用線程池的好處 在開發(fā)過程中,合理的使用線程...

    tomato 評(píng)論0 收藏0
  • 美團(tuán)面試題:Java-線程池 ThreadPool 專題詳解

    摘要:去美團(tuán)面試,問到了什么是線程池,如何使用,為什么要用以下做個(gè)總結(jié)。二線程池線程池的作用線程池作用就是限制系統(tǒng)中執(zhí)行線程的數(shù)量。真正的線程池接口是。創(chuàng)建固定大小的線程池。此線程池支持定時(shí)以及周期性執(zhí)行任務(wù)的需求。 去美團(tuán)面試,問到了什么是線程池,如何使用,為什么要用,以下做個(gè)總結(jié)。關(guān)于線程之前也寫過一篇文章《高級(jí)面試題總結(jié)—線程池還能這么玩?》 1、什么是線程池:? java.util...

    enrecul101 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<