成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Java 多線程(5):Fork/Join 型線程池與 Work-Stealing 算法

IamDLY / 3250人閱讀

摘要:時(shí),標(biāo)準(zhǔn)類庫添加了,作為對(duì)型線程池的實(shí)現(xiàn)。類圖用來專門定義型任務(wù)完成將大任務(wù)分割為小任務(wù)以及合并結(jié)果的工作。

JDK 1.7 時(shí),標(biāo)準(zhǔn)類庫添加了 ForkJoinPool,作為對(duì) Fork/Join 型線程池的實(shí)現(xiàn)。Fork 在英文中有 分叉 的意思,而 Join合并 的意思。ForkJoinPool 的功能也是如此:Fork 將大任務(wù)分叉為多個(gè)小任務(wù),然后讓小任務(wù)執(zhí)行,Join 是獲得小任務(wù)的結(jié)果,然后進(jìn)行合并,將合并的結(jié)果作為大任務(wù)的結(jié)果 —— 并且這會(huì)是一個(gè)遞歸的過程 —— 因?yàn)槿蝿?wù)如果足夠大,可以將任務(wù)多級(jí)分叉直到任務(wù)足夠小。

由此可見,ForkJoinPool 可以滿足 并行 地實(shí)現(xiàn) 分治算法(Divide-and-Conquer) 的需要。

ForkJoinPool 的類圖如下:

可以看到 ForkJoinPool 實(shí)現(xiàn)了 ExecutorService 接口,所以首先 ForkJoinPool 也是一個(gè) ExecutorService線程池)。因而 RunnableCallable 類型的任務(wù),ForkJoinPool 也可以通過 submit、invokeAllinvokeAny 等方法來執(zhí)行。但是標(biāo)準(zhǔn)類庫還為 ForkJoinPool 定義了一種新的任務(wù),它就是 ForkJoinTask。

ForkJoinTask 類圖:

ForkJoinTask 用來專門定義 Fork/Join 型任務(wù) —— 完成將大任務(wù)分割為小任務(wù)以及合并結(jié)果的工作。一般我們不需要直接使用 ForkJoinTask,而是通過繼承它的子類 RecursiveActionRecursiveTask 并實(shí)現(xiàn)對(duì)應(yīng)的抽象方法 —— compute ,來定義我們自己的任務(wù)。其中,RecursiveAction 是不帶返回值的 Fork/Join 型任務(wù),所以使用此類任務(wù)并不產(chǎn)生結(jié)果,也就不涉及到結(jié)果的合并;而 RecursiveTask 是帶返回值的 Fork/Join 型任務(wù),使用此類任務(wù)的話,在任務(wù)結(jié)束前,我們需要進(jìn)行結(jié)果的合并。其中,通過 ForkJoinTaskfork 方法,我們可以產(chǎn)生子任務(wù)并執(zhí)行;通過 join 方法,我們可以獲得子任務(wù)的結(jié)果。

ForkJoinPool 可以使用三種方法用來執(zhí)行 ForkJoinTask

invoke 方法:

invoke 方法用來執(zhí)行一個(gè)帶返回值的任務(wù)(通常繼承自RecursiveTask),并且該方法是阻塞的,直到任務(wù)執(zhí)行完畢,該方法才會(huì)停止阻塞并返回任務(wù)的執(zhí)行結(jié)果。

submit 方法:

除了從 ExecutorService 繼承的 submit 方法外,ForkJoinPool 還定義了用來執(zhí)行 ForkJoinTasksubmit 方法 —— 一般該 submit 方法用來執(zhí)行帶返回值的ForkJoinTask(通常繼承自RecursiveTask)。該方法是非阻塞的,調(diào)用之后將任務(wù)提交給 ForkJoinPool 去執(zhí)行便立即返回,返回的便是已經(jīng)提交到 ForkJoinPool 去執(zhí)行的 task —— 由類圖可知 ForkJoinTask 實(shí)現(xiàn)了 Future 接口,所以可以直接通過 task 來和已經(jīng)提交的任務(wù)進(jìn)行交互。

execute 方法:

除了從 Executor 獲得的 execute 方法外,ForkJoinPool 也定義了用來執(zhí)行ForkJoinTaskexecute 方法 —— 一般該 execute 方法用來執(zhí)行不帶返回值的ForkJoinTask(通常繼承自RecursiveAction) ,該方法同樣是非阻塞的。

現(xiàn)在讓我們來實(shí)踐下 ForkJoinPool 的功能:計(jì)算 π 的值。計(jì)算 π 的值有一個(gè)通過多項(xiàng)式方法,即:π = 4 * (1 - 1/3 + 1/5 - 1/7 + 1/9 - ……),而且多項(xiàng)式的項(xiàng)數(shù)越多,計(jì)算出的 π 的值越精確。

首先我們定義用來估算 π 的 PiEstimateTask

class PiEstimateTask extends RecursiveTask {

    private final long begin;
    private final long end;
    private final long threshold; // 分割任務(wù)的臨界值

    public PiEstimateTask(long begin, long end, long threshold) {
        this.begin = begin;
        this.end = end;
        this.threshold = threshold;
    }

    @Override
    protected Double compute() {  // 實(shí)現(xiàn) compute 方法
        if (end - begin <= threshold) {  // 臨界值之下,不再分割,直接計(jì)算

            int sign; // 符號(hào),多項(xiàng)式中偶數(shù)位取 1,奇數(shù)位取 -1(位置從 0 開始)
            double result = 0.0;
            
            for (long i = begin; i < end; i++) {
                sign = (i & 1) == 0 ? 1 : -1;
                result += sign / (i * 2.0 + 1);
            }

            return result * 4;
        }

        // 分割任務(wù)
        long middle = (begin + end) / 2;
        PiEstimateTask leftTask = new PiEstimateTask(begin, middle, threshold);
        PiEstimateTask rightTask = new PiEstimateTask(middle, end, threshold);

        leftTask.fork();  // 異步執(zhí)行 leftTask
        rightTask.fork(); // 異步執(zhí)行 rightTask

        double leftResult = leftTask.join();   // 阻塞,直到 leftTask 執(zhí)行完畢返回結(jié)果
        double rightResult = rightTask.join(); // 阻塞,直到 rightTask 執(zhí)行完畢返回結(jié)果

        return leftResult + rightResult; // 合并結(jié)果
    }

}

然后我們使用 ForkJoinPoolinvoke 執(zhí)行 PiEstimateTask

public class ForkJoinPoolTest {

    public static void main(String[] args) throws Exception {
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
    
        // 計(jì)算 10 億項(xiàng),分割任務(wù)的臨界值為 1 千萬
        PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000);
    
        double pi = forkJoinPool.invoke(task); // 阻塞,直到任務(wù)執(zhí)行完畢返回結(jié)果
    
        System.out.println("π 的值:" + pi);
        
        forkJoinPool.shutdown(); // 向線程池發(fā)送關(guān)閉的指令
    }
}

運(yùn)行結(jié)果:

我們也可以使用 submit 方法異步的執(zhí)行任務(wù)(此處 submit 方法返回的 future 指向的對(duì)象即提交任務(wù)時(shí)的 task):

public static void main(String[] args) throws Exception {
    ForkJoinPool forkJoinPool = new ForkJoinPool(4);

    PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000);
    Future future = forkJoinPool.submit(task); // 不阻塞
    
    double pi = future.get();
    System.out.println("π 的值:" + pi);
    System.out.println("future 指向的對(duì)象是 task 嗎:" + (future == task));
    
    forkJoinPool.shutdown(); // 向線程池發(fā)送關(guān)閉的指令
}

運(yùn)行結(jié)果:

值得注意的是,選取一個(gè)合適的分割任務(wù)的臨界值,對(duì) ForkJoinPool 執(zhí)行任務(wù)的效率有著至關(guān)重要的影響。臨界值選取過大,任務(wù)分割的不夠細(xì),則不能充分利用 CPU;臨界值選取過小,則任務(wù)分割過多,可能產(chǎn)生過多的子任務(wù),導(dǎo)致過多的線程間的切換和加重 GC 的負(fù)擔(dān)從而影響了效率。所以,需要根據(jù)實(shí)際的應(yīng)用場(chǎng)景選擇一個(gè)合適的分割任務(wù)的臨界值。

ForkJoinPool 相比于 ThreadPoolExecutor,還有一個(gè)非常重要的特點(diǎn)(優(yōu)點(diǎn))在于,ForkJoinPool具有 Work-Stealing (工作竊取)的能力。所謂 Work-Stealing,在 ForkJoinPool 中的實(shí)現(xiàn)為:線程池中每個(gè)線程都有一個(gè)互不影響的任務(wù)隊(duì)列(雙端隊(duì)列),線程每次都從自己的任務(wù)隊(duì)列的隊(duì)頭中取出一個(gè)任務(wù)來運(yùn)行;如果某個(gè)線程對(duì)應(yīng)的隊(duì)列已空并且處于空閑狀態(tài),而其他線程的隊(duì)列中還有任務(wù)需要處理但是該線程處于工作狀態(tài),那么空閑的線程可以從其他線程的隊(duì)列的隊(duì)尾取一個(gè)任務(wù)來幫忙運(yùn)行 —— 感覺就像是空閑的線程去偷人家的任務(wù)來運(yùn)行一樣,所以叫 “工作竊取”。

Work-Stealing 的適用場(chǎng)景是不同的任務(wù)的耗時(shí)相差比較大,即某些任務(wù)需要運(yùn)行較長(zhǎng)時(shí)間,而某些任務(wù)會(huì)很快的運(yùn)行完成,這種情況下用 Work-Stealing 很合適;但是如果任務(wù)的耗時(shí)很平均,則此時(shí) Work-Stealing 并不適合,因?yàn)楦`取任務(wù)時(shí)不同線程需要搶占鎖,這可能會(huì)造成額外的時(shí)間消耗,而且每個(gè)線程維護(hù)雙端隊(duì)列也會(huì)造成更大的內(nèi)存消耗。所以 ForkJoinPool 并不是 ThreadPoolExecutor 的替代品,而是作為對(duì) ThreadPoolExecutor 的補(bǔ)充。

總結(jié):
ForkJoinPoolThreadPoolExecutor 都是 ExecutorService(線程池),但ForkJoinPool 的獨(dú)特點(diǎn)在于:

ThreadPoolExecutor 只能執(zhí)行 RunnableCallable 任務(wù),而 ForkJoinPool 不僅可以執(zhí)行 RunnableCallable 任務(wù),還可以執(zhí)行 Fork/Join 型任務(wù) —— ForkJoinTask —— 從而滿足并行地實(shí)現(xiàn)分治算法的需要;

ThreadPoolExecutor 中任務(wù)的執(zhí)行順序是按照其在共享隊(duì)列中的順序來執(zhí)行的,所以后面的任務(wù)需要等待前面任務(wù)執(zhí)行完畢后才能執(zhí)行,而 ForkJoinPool 每個(gè)線程有自己的任務(wù)隊(duì)列,并在此基礎(chǔ)上實(shí)現(xiàn)了 Work-Stealing 的功能,使得在某些情況下 ForkJoinPool 能更大程度的提高并發(fā)效率。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/66494.html

相關(guān)文章

  • Fork/Join框架簡(jiǎn)介

    摘要:第二步執(zhí)行任務(wù)并合并結(jié)果。使用兩個(gè)類來完成以上兩件事情我們要使用框架,必須首先創(chuàng)建一個(gè)任務(wù)。用于有返回結(jié)果的任務(wù)。如果任務(wù)順利執(zhí)行完成了,則設(shè)置任務(wù)狀態(tài)為,如果出現(xiàn)異常,則紀(jì)錄異常,并將任務(wù)狀態(tài)設(shè)置為。 1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一個(gè)用于并行執(zhí)行任務(wù)的框架, 是一個(gè)把大任務(wù)分割成若干個(gè)小任務(wù),最終匯總每個(gè)小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的...

    W_BinaryTree 評(píng)論0 收藏0
  • Java7任務(wù)并行執(zhí)行神器:Fork&Join框架

    摘要:對(duì)于任務(wù)的分割,要求各個(gè)子任務(wù)之間相互獨(dú)立,能夠并行獨(dú)立地執(zhí)行任務(wù),互相之間不影響。是叉子分叉的意思,即將大任務(wù)分解成并行的小任務(wù),是連接結(jié)合的意思,即將所有并行的小任務(wù)的執(zhí)行結(jié)果匯總起來。使用方法會(huì)阻塞并等待子任務(wù)執(zhí)行完并得到其結(jié)果。 Fork/Join是什么? Fork/Join框架是Java7提供的并行執(zhí)行任務(wù)框架,思想是將大任務(wù)分解成小任務(wù),然后小任務(wù)又可以繼續(xù)分解,然后每個(gè)小...

    Luosunce 評(píng)論0 收藏0
  • Java 8 的 JVM 有快?Fork-Join 性能基準(zhǔn)測(cè)試

    摘要:這減輕了手動(dòng)重復(fù)執(zhí)行相同基準(zhǔn)測(cè)試的痛苦,并簡(jiǎn)化了獲取結(jié)果的流程。處理項(xiàng)目的代碼并從標(biāo)有注釋的方法處生成基準(zhǔn)測(cè)試程序。用和運(yùn)行該基準(zhǔn)測(cè)試得到以下結(jié)果。同時(shí),和的基線測(cè)試結(jié)果也有略微的不同。 Java 8 已經(jīng)發(fā)布一段時(shí)間了,許多開發(fā)者已經(jīng)開始使用 Java 8。本文也將討論最新發(fā)布在 JDK 中的并發(fā)功能更新。事實(shí)上,JDK 中已經(jīng)有多處java.util.concurrent 改動(dòng),但...

    Euphoria 評(píng)論0 收藏0
  • java 8 實(shí)戰(zhàn)》讀書筆記 -第六章 用流收集數(shù)據(jù)

    摘要:分區(qū)函數(shù)返回一個(gè)布爾值,這意味著得到的分組的鍵類型是,于是它最多可以分為兩組是一組,是一組。當(dāng)遍歷到流中第個(gè)元素時(shí),這個(gè)函數(shù)執(zhí)行時(shí)會(huì)有兩個(gè)參數(shù)保存歸約結(jié)果的累加器已收集了流中的前個(gè)項(xiàng)目,還有第個(gè)元素本身。 一、收集器簡(jiǎn)介 把列表中的交易按貨幣分組: Map transactionsByCurrencies = transactions.stream().collect(groupi...

    Airy 評(píng)論0 收藏0
  • Java 8 新特性之并行流與串行流

    摘要:概述簡(jiǎn)介并行流就是把一個(gè)內(nèi)容分成多個(gè)數(shù)據(jù)塊,并用不同的線程分別處理每個(gè)數(shù)據(jù)塊的流中將并行進(jìn)行了優(yōu)化,我們可以很容易的對(duì)數(shù)據(jù)進(jìn)行并行操作,可以聲明性地通過與在并行流與順序流之間進(jìn)行切換。 1. 概述 1.1 簡(jiǎn)介 并行流就是把一個(gè)內(nèi)容分成多個(gè)數(shù)據(jù)塊,并用不同的線程分別處理每個(gè)數(shù)據(jù)塊的流 Java 8 中將并行進(jìn)行了優(yōu)化,我們可以很容易的對(duì)數(shù)據(jù)進(jìn)行并行操作,Stream API 可以聲明性...

    2json 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<