成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

FutureTask源碼分析

luqiuwen / 1390人閱讀

摘要:從而可以啟動(dòng)和取消異步計(jì)算任務(wù)查詢異步計(jì)算任務(wù)是否完成和獲取異步計(jì)算任務(wù)的返回結(jié)果。原理分析在分析中我們沒有看它的父類,其中有一個(gè)方法,返回一個(gè),說明該方法可以獲取異步任務(wù)的返回結(jié)果。

FutureTask介紹

FutureTask是一種可取消的異步計(jì)算任務(wù)。它實(shí)現(xiàn)了Future接口,代表了異步任務(wù)的返回結(jié)果。從而FutureTask可以啟動(dòng)和取消異步計(jì)算任務(wù)、查詢異步計(jì)算任務(wù)是否完成和獲取異步計(jì)算任務(wù)的返回結(jié)果。只有到異步計(jì)算任務(wù)結(jié)束時(shí)才能獲取返回結(jié)果,當(dāng)異步計(jì)算任務(wù)還未結(jié)束時(shí)調(diào)用get方法會(huì)使線程阻塞。一旦異步計(jì)算任務(wù)完成,計(jì)算任務(wù)不能重新啟動(dòng)或者取消,除非調(diào)用了runAndReset。

FutureTask實(shí)現(xiàn)了RunnableFuture,RunnableFuture結(jié)合了Future和Runnable。

FutureTask原理分析

在ThreadPoolExecutor分析中我們沒有看它的父類AbstractExecutorService,其中有一個(gè)方法submit,返回一個(gè)Future,說明該方法可以獲取異步任務(wù)的返回結(jié)果。該方法有三個(gè)重載,可以接收Runnable和Callable,Callable是可以返回結(jié)果的一個(gè)Runnable,而Callable就是FutureTask的一個(gè)重要的變量。

@FunctionalInterface
public interface Callable {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}
FutureTask的一些變量和狀態(tài)
/**
 * The run state of this task, initially NEW.  The run state
 * transitions to a terminal state only in methods set,
 * setException, and cancel.  During completion, state may take on
 * transient values of COMPLETING (while outcome is being set) or
 * INTERRUPTING (only while interrupting the runner to satisfy a
 * cancel(true)). Transitions from these intermediate to final
 * states use cheaper ordered/lazy writes because values are unique
 * and cannot be further modified.
 *
 * Possible state transitions:
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

/** The underlying callable; nulled out after running */
//一個(gè)可以返回結(jié)果的任務(wù)
private Callable callable;
/** The result to return or exception to throw from get() */
//包裝返回結(jié)果或者異常,沒有被volatile修飾,狀態(tài)保護(hù)讀寫安全
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
//運(yùn)行線程
private volatile Thread runner;
/** Treiber stack of waiting threads */
//單鏈表,是一個(gè)線程的棧的結(jié)構(gòu)
private volatile WaitNode waiters;

FutureTask有7中狀態(tài),介紹一下狀態(tài)之間的轉(zhuǎn)換:
NEW -> COMPLETING -> NORMAL:任務(wù)正常執(zhí)行;
NEW -> COMPLETING -> EXCEPTIONAL:任務(wù)發(fā)生異常;
NEW -> CANCELLED:任務(wù)被取消;
NEW -> INTERRUPTING -> INTERRUPTED:任務(wù)被中斷;

run方法
public void run() {
    //如果state不為NEW,說明任務(wù)已經(jīng)在執(zhí)行或者取消
    //如果設(shè)置運(yùn)行線程失敗,說明任務(wù)已經(jīng)有運(yùn)行線程搶在前面
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable c = callable;
        //NEW狀態(tài)才可以執(zhí)行
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //執(zhí)行任務(wù)
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                //設(shè)置異常信息
                setException(ex);
            }
            if (ran)
                //設(shè)置任務(wù)運(yùn)行結(jié)果
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        //將運(yùn)行線程清空,在state被更改之前要保證runner非空,這樣能包裝run方法不被多次執(zhí)行
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        //中斷處理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

set、setException和handlePossibleCancellationInterrupt
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

當(dāng)執(zhí)行時(shí)發(fā)生異常,調(diào)用setException,首先將state設(shè)置為COMPLETING,設(shè)置成功后將outcome設(shè)置為異常,然后將state設(shè)置為EXCEPTIONAL。

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

當(dāng)callable執(zhí)行成功并返回,調(diào)用set,首先將state設(shè)置為COMPLETING,設(shè)置成功后將結(jié)果設(shè)置為outcome,然后設(shè)置state為NORMAL。

finally中如果state為中斷,調(diào)用handlePossibleCancellationInterrupt:

private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let"s spin-wait patiently.
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt

    // assert state == INTERRUPTED;

    // We want to clear any interrupt we may have received from
    // cancel(true).  However, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // Thread.interrupted();
}

如果狀態(tài)一直是INTERRUPTING,稍稍等待。

finishCompletion和get

在上面set和setException中最后都調(diào)用了finishCompletion方法:

private void finishCompletion() {
    // assert state > COMPLETING;
    //該方法必須在state > COMPLETING時(shí)調(diào)用
    //從頭到尾喚醒WaitNode中阻塞的線程
    for (WaitNode q; (q = waiters) != null;) {
        //設(shè)置棧頂為空
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                //喚醒線程
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                //如果next為空,break
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

在調(diào)用get方法時(shí),如果任務(wù)還在執(zhí)行,線程會(huì)阻塞,F(xiàn)utureTask會(huì)將阻塞的線程放入waiters單鏈表。等待任務(wù)結(jié)束時(shí)被喚醒,我們繼續(xù)看get方法:

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        //如果任務(wù)還在執(zhí)行,阻塞當(dāng)前線程,放入waiters單鏈表
        s = awaitDone(false, 0L);
    return report(s);
}
awaitDone
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        //如果線程被中斷,移除當(dāng)前node,拋出異常
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        //如果任務(wù)完成或者被取消,直接返回
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        //如果任務(wù)正在執(zhí)行,線程等待一下
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        //如果q為空,新建一個(gè)node
        else if (q == null)
            q = new WaitNode();
        //如果還未入列,嘗試將新建的node放入鏈表
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        //如果設(shè)置了超時(shí)且超時(shí)了
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                //超時(shí),移除node
                removeWaiter(q);
                return state;
            }
            //阻塞線程
            LockSupport.parkNanos(this, nanos);
        }
        //阻塞當(dāng)前線程
        else
            LockSupport.park(this);
    }
}
removeWaiter
private void removeWaiter(WaitNode node) {
    if (node != null) {
        //設(shè)置節(jié)點(diǎn)的線程為空,做刪除標(biāo)記
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                //thread不為空,continue
                if (q.thread != null)
                    pred = q;
                //thread為空且pred不為空
                else if (pred != null) {
                    //刪除q
                    pred.next = s;
                    //檢查一下pred的thread,如果被其他線程修改,retry outer loop
                    if (pred.thread == null) // check for race
                        continue retry;
                }
                //thread為空且pred為空說明q為棧頂,將q.next設(shè)置為棧頂,失敗則retry
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                      q, s))
                    continue retry;
            }
            break;
        }
    }
}

report方法

get方法最后調(diào)用了report方法:

private V report(int s) throws ExecutionException {
    Object x = outcome;
    //NORMAL表示任務(wù)執(zhí)行正常,返回結(jié)果
    if (s == NORMAL)
        return (V)x;
    //任務(wù)被取消,拋出異常
    if (s >= CANCELLED)
        throw new CancellationException();
    //其他情況只有可能發(fā)生異常,拋出該異常
    throw new ExecutionException((Throwable)x);
}

cancel方法

最后看一下cancel方法:

public boolean cancel(boolean mayInterruptIfRunning) {
    //當(dāng)state不為NEW說明任務(wù)已經(jīng)開始,不能被取消,返回false
    //當(dāng)設(shè)置state失敗時(shí),返回false
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                //中斷線程
                if (t != null)
                    t.interrupt();
            } finally { // final state
                //設(shè)置任務(wù)為INTERRUPTED
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/70968.html

相關(guān)文章

  • FutureTask源碼解析(2)——深入理解FutureTask

    摘要:本文的源碼基于。人如其名,包含了和兩部分。而將一個(gè)任務(wù)的狀態(tài)設(shè)置成終止態(tài)只有三種方法我們將在下文的源碼解析中分析這三個(gè)方法。將棧中所有掛起的線程都喚醒后,下面就是執(zhí)行方法這個(gè)方法是一個(gè)空方 前言 系列文章目錄 有了上一篇對(duì)預(yù)備知識(shí)的了解之后,分析源碼就容易多了,本篇我們就直接來看看FutureTask的源碼。 本文的源碼基于JDK1.8。 Future和Task 在深入分析源碼之前,我...

    Harpsichord1207 評(píng)論0 收藏0
  • FutureTask源碼分析筆記

    摘要:主要的實(shí)現(xiàn)實(shí)際上運(yùn)行還是一個(gè),它對(duì)做了一個(gè)封裝,讓開發(fā)人員可以從其中獲取返回值是有狀態(tài)的共種狀態(tài),四種狀態(tài)變換的可能和的區(qū)別通過方法調(diào)用有返回值可以拋異常結(jié)果的實(shí)現(xiàn)原理判斷狀態(tài)非狀態(tài)則直接進(jìn)入返回結(jié)果處于狀態(tài),則進(jìn)入等待流程獲 主要的實(shí)現(xiàn)FutureTask # FutureTask實(shí)際上運(yùn)行還是一個(gè)runnable,它對(duì)callable做了一個(gè)封裝,讓開發(fā)人員可以從其中獲取返回值; ...

    PascalXie 評(píng)論0 收藏0
  • 系列文章目錄

    摘要:為了避免一篇文章的篇幅過長(zhǎng),于是一些比較大的主題就都分成幾篇來講了,這篇文章是筆者所有文章的目錄,將會(huì)持續(xù)更新,以給大家一個(gè)查看系列文章的入口。 前言 大家好,筆者是今年才開始寫博客的,寫作的初衷主要是想記錄和分享自己的學(xué)習(xí)經(jīng)歷。因?yàn)閷懽鞯臅r(shí)候發(fā)現(xiàn),為了弄懂一個(gè)知識(shí),不得不先去了解另外一些知識(shí),這樣以來,為了說明一個(gè)問題,就要把一系列知識(shí)都了解一遍,寫出來的文章就特別長(zhǎng)。 為了避免一篇...

    lijy91 評(píng)論0 收藏0
  • 系列文章目錄

    摘要:為了避免一篇文章的篇幅過長(zhǎng),于是一些比較大的主題就都分成幾篇來講了,這篇文章是筆者所有文章的目錄,將會(huì)持續(xù)更新,以給大家一個(gè)查看系列文章的入口。 前言 大家好,筆者是今年才開始寫博客的,寫作的初衷主要是想記錄和分享自己的學(xué)習(xí)經(jīng)歷。因?yàn)閷懽鞯臅r(shí)候發(fā)現(xiàn),為了弄懂一個(gè)知識(shí),不得不先去了解另外一些知識(shí),這樣以來,為了說明一個(gè)問題,就要把一系列知識(shí)都了解一遍,寫出來的文章就特別長(zhǎng)。 為了避免一篇...

    Yumenokanata 評(píng)論0 收藏0
  • FutureTask源碼解析(1)——預(yù)備知識(shí)

    摘要:在分析它的源碼之前我們需要先了解一些預(yù)備知識(shí)。因?yàn)榻涌跊]有返回值所以為了與兼容我們額外傳入了一個(gè)參數(shù)使得返回的對(duì)象的方法直接執(zhí)行的方法然后返回傳入的參數(shù)。 前言 系列文章目錄 FutureTask 是一個(gè)同步工具類,它實(shí)現(xiàn)了Future語(yǔ)義,表示了一種抽象的可生成結(jié)果的計(jì)算。在包括線程池在內(nèi)的許多工具類中都會(huì)用到,弄懂它的實(shí)現(xiàn)將有利于我們更加深入地理解Java異步操作實(shí)現(xiàn)。 在分析...

    mmy123456 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<