摘要:時(shí),標(biāo)準(zhǔn)類庫添加了,作為對(duì)型線程池的實(shí)現(xiàn)。類圖用來專門定義型任務(wù)完成將大任務(wù)分割為小任務(wù)以及合并結(jié)果的工作。
JDK 1.7 時(shí),標(biāo)準(zhǔn)類庫添加了 ForkJoinPool,作為對(duì) Fork/Join 型線程池的實(shí)現(xiàn)。Fork 在英文中有 分叉 的意思,而 Join 有 合并 的意思。ForkJoinPool 的功能也是如此:Fork 將大任務(wù)分叉為多個(gè)小任務(wù),然后讓小任務(wù)執(zhí)行,Join 是獲得小任務(wù)的結(jié)果,然后進(jìn)行合并,將合并的結(jié)果作為大任務(wù)的結(jié)果 —— 并且這會(huì)是一個(gè)遞歸的過程 —— 因?yàn)槿蝿?wù)如果足夠大,可以將任務(wù)多級(jí)分叉直到任務(wù)足夠小。
由此可見,ForkJoinPool 可以滿足 并行 地實(shí)現(xiàn) 分治算法(Divide-and-Conquer) 的需要。
ForkJoinPool 的類圖如下:
可以看到 ForkJoinPool 實(shí)現(xiàn)了 ExecutorService 接口,所以首先 ForkJoinPool 也是一個(gè) ExecutorService (線程池)。因而 Runnable 和 Callable 類型的任務(wù),ForkJoinPool 也可以通過 submit、invokeAll 和 invokeAny 等方法來執(zhí)行。但是標(biāo)準(zhǔn)類庫還為 ForkJoinPool 定義了一種新的任務(wù),它就是 ForkJoinTask
ForkJoinTask 類圖:
ForkJoinTask
ForkJoinPool 可以使用三種方法用來執(zhí)行 ForkJoinTask:
invoke 方法:
invoke 方法用來執(zhí)行一個(gè)帶返回值的任務(wù)(通常繼承自RecursiveTask),并且該方法是阻塞的,直到任務(wù)執(zhí)行完畢,該方法才會(huì)停止阻塞并返回任務(wù)的執(zhí)行結(jié)果。
submit 方法:
除了從 ExecutorService 繼承的 submit 方法外,ForkJoinPool 還定義了用來執(zhí)行 ForkJoinTask 的 submit 方法 —— 一般該 submit 方法用來執(zhí)行帶返回值的ForkJoinTask(通常繼承自RecursiveTask)。該方法是非阻塞的,調(diào)用之后將任務(wù)提交給 ForkJoinPool 去執(zhí)行便立即返回,返回的便是已經(jīng)提交到 ForkJoinPool 去執(zhí)行的 task —— 由類圖可知 ForkJoinTask 實(shí)現(xiàn)了 Future 接口,所以可以直接通過 task 來和已經(jīng)提交的任務(wù)進(jìn)行交互。
execute 方法:
除了從 Executor 獲得的 execute 方法外,ForkJoinPool 也定義了用來執(zhí)行ForkJoinTask 的 execute 方法 —— 一般該 execute 方法用來執(zhí)行不帶返回值的ForkJoinTask(通常繼承自RecursiveAction) ,該方法同樣是非阻塞的。
現(xiàn)在讓我們來實(shí)踐下 ForkJoinPool 的功能:計(jì)算 π 的值。計(jì)算 π 的值有一個(gè)通過多項(xiàng)式方法,即:π = 4 * (1 - 1/3 + 1/5 - 1/7 + 1/9 - ……),而且多項(xiàng)式的項(xiàng)數(shù)越多,計(jì)算出的 π 的值越精確。
首先我們定義用來估算 π 的 PiEstimateTask:
class PiEstimateTask extends RecursiveTask{ private final long begin; private final long end; private final long threshold; // 分割任務(wù)的臨界值 public PiEstimateTask(long begin, long end, long threshold) { this.begin = begin; this.end = end; this.threshold = threshold; } @Override protected Double compute() { // 實(shí)現(xiàn) compute 方法 if (end - begin <= threshold) { // 臨界值之下,不再分割,直接計(jì)算 int sign; // 符號(hào),多項(xiàng)式中偶數(shù)位取 1,奇數(shù)位取 -1(位置從 0 開始) double result = 0.0; for (long i = begin; i < end; i++) { sign = (i & 1) == 0 ? 1 : -1; result += sign / (i * 2.0 + 1); } return result * 4; } // 分割任務(wù) long middle = (begin + end) / 2; PiEstimateTask leftTask = new PiEstimateTask(begin, middle, threshold); PiEstimateTask rightTask = new PiEstimateTask(middle, end, threshold); leftTask.fork(); // 異步執(zhí)行 leftTask rightTask.fork(); // 異步執(zhí)行 rightTask double leftResult = leftTask.join(); // 阻塞,直到 leftTask 執(zhí)行完畢返回結(jié)果 double rightResult = rightTask.join(); // 阻塞,直到 rightTask 執(zhí)行完畢返回結(jié)果 return leftResult + rightResult; // 合并結(jié)果 } }
然后我們使用 ForkJoinPool 的 invoke 執(zhí)行 PiEstimateTask:
public class ForkJoinPoolTest { public static void main(String[] args) throws Exception { ForkJoinPool forkJoinPool = new ForkJoinPool(4); // 計(jì)算 10 億項(xiàng),分割任務(wù)的臨界值為 1 千萬 PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000); double pi = forkJoinPool.invoke(task); // 阻塞,直到任務(wù)執(zhí)行完畢返回結(jié)果 System.out.println("π 的值:" + pi); forkJoinPool.shutdown(); // 向線程池發(fā)送關(guān)閉的指令 } }
運(yùn)行結(jié)果:
我們也可以使用 submit 方法異步的執(zhí)行任務(wù)(此處 submit 方法返回的 future 指向的對(duì)象即提交任務(wù)時(shí)的 task):
public static void main(String[] args) throws Exception { ForkJoinPool forkJoinPool = new ForkJoinPool(4); PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000); Futurefuture = forkJoinPool.submit(task); // 不阻塞 double pi = future.get(); System.out.println("π 的值:" + pi); System.out.println("future 指向的對(duì)象是 task 嗎:" + (future == task)); forkJoinPool.shutdown(); // 向線程池發(fā)送關(guān)閉的指令 }
運(yùn)行結(jié)果:
值得注意的是,選取一個(gè)合適的分割任務(wù)的臨界值,對(duì) ForkJoinPool 執(zhí)行任務(wù)的效率有著至關(guān)重要的影響。臨界值選取過大,任務(wù)分割的不夠細(xì),則不能充分利用 CPU;臨界值選取過小,則任務(wù)分割過多,可能產(chǎn)生過多的子任務(wù),導(dǎo)致過多的線程間的切換和加重 GC 的負(fù)擔(dān)從而影響了效率。所以,需要根據(jù)實(shí)際的應(yīng)用場(chǎng)景選擇一個(gè)合適的分割任務(wù)的臨界值。
ForkJoinPool 相比于 ThreadPoolExecutor,還有一個(gè)非常重要的特點(diǎn)(優(yōu)點(diǎn))在于,ForkJoinPool具有 Work-Stealing (工作竊取)的能力。所謂 Work-Stealing,在 ForkJoinPool 中的實(shí)現(xiàn)為:線程池中每個(gè)線程都有一個(gè)互不影響的任務(wù)隊(duì)列(雙端隊(duì)列),線程每次都從自己的任務(wù)隊(duì)列的隊(duì)頭中取出一個(gè)任務(wù)來運(yùn)行;如果某個(gè)線程對(duì)應(yīng)的隊(duì)列已空并且處于空閑狀態(tài),而其他線程的隊(duì)列中還有任務(wù)需要處理但是該線程處于工作狀態(tài),那么空閑的線程可以從其他線程的隊(duì)列的隊(duì)尾取一個(gè)任務(wù)來幫忙運(yùn)行 —— 感覺就像是空閑的線程去偷人家的任務(wù)來運(yùn)行一樣,所以叫 “工作竊取”。
Work-Stealing 的適用場(chǎng)景是不同的任務(wù)的耗時(shí)相差比較大,即某些任務(wù)需要運(yùn)行較長(zhǎng)時(shí)間,而某些任務(wù)會(huì)很快的運(yùn)行完成,這種情況下用 Work-Stealing 很合適;但是如果任務(wù)的耗時(shí)很平均,則此時(shí) Work-Stealing 并不適合,因?yàn)楦`取任務(wù)時(shí)不同線程需要搶占鎖,這可能會(huì)造成額外的時(shí)間消耗,而且每個(gè)線程維護(hù)雙端隊(duì)列也會(huì)造成更大的內(nèi)存消耗。所以 ForkJoinPool 并不是 ThreadPoolExecutor 的替代品,而是作為對(duì) ThreadPoolExecutor 的補(bǔ)充。
總結(jié):
ForkJoinPool 和 ThreadPoolExecutor 都是 ExecutorService(線程池),但ForkJoinPool 的獨(dú)特點(diǎn)在于:
ThreadPoolExecutor 只能執(zhí)行 Runnable 和 Callable 任務(wù),而 ForkJoinPool 不僅可以執(zhí)行 Runnable 和 Callable 任務(wù),還可以執(zhí)行 Fork/Join 型任務(wù) —— ForkJoinTask —— 從而滿足并行地實(shí)現(xiàn)分治算法的需要;
ThreadPoolExecutor 中任務(wù)的執(zhí)行順序是按照其在共享隊(duì)列中的順序來執(zhí)行的,所以后面的任務(wù)需要等待前面任務(wù)執(zhí)行完畢后才能執(zhí)行,而 ForkJoinPool 每個(gè)線程有自己的任務(wù)隊(duì)列,并在此基礎(chǔ)上實(shí)現(xiàn)了 Work-Stealing 的功能,使得在某些情況下 ForkJoinPool 能更大程度的提高并發(fā)效率。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/66494.html
摘要:第二步執(zhí)行任務(wù)并合并結(jié)果。使用兩個(gè)類來完成以上兩件事情我們要使用框架,必須首先創(chuàng)建一個(gè)任務(wù)。用于有返回結(jié)果的任務(wù)。如果任務(wù)順利執(zhí)行完成了,則設(shè)置任務(wù)狀態(tài)為,如果出現(xiàn)異常,則紀(jì)錄異常,并將任務(wù)狀態(tài)設(shè)置為。 1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一個(gè)用于并行執(zhí)行任務(wù)的框架, 是一個(gè)把大任務(wù)分割成若干個(gè)小任務(wù),最終匯總每個(gè)小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的...
摘要:對(duì)于任務(wù)的分割,要求各個(gè)子任務(wù)之間相互獨(dú)立,能夠并行獨(dú)立地執(zhí)行任務(wù),互相之間不影響。是叉子分叉的意思,即將大任務(wù)分解成并行的小任務(wù),是連接結(jié)合的意思,即將所有并行的小任務(wù)的執(zhí)行結(jié)果匯總起來。使用方法會(huì)阻塞并等待子任務(wù)執(zhí)行完并得到其結(jié)果。 Fork/Join是什么? Fork/Join框架是Java7提供的并行執(zhí)行任務(wù)框架,思想是將大任務(wù)分解成小任務(wù),然后小任務(wù)又可以繼續(xù)分解,然后每個(gè)小...
摘要:這減輕了手動(dòng)重復(fù)執(zhí)行相同基準(zhǔn)測(cè)試的痛苦,并簡(jiǎn)化了獲取結(jié)果的流程。處理項(xiàng)目的代碼并從標(biāo)有注釋的方法處生成基準(zhǔn)測(cè)試程序。用和運(yùn)行該基準(zhǔn)測(cè)試得到以下結(jié)果。同時(shí),和的基線測(cè)試結(jié)果也有略微的不同。 Java 8 已經(jīng)發(fā)布一段時(shí)間了,許多開發(fā)者已經(jīng)開始使用 Java 8。本文也將討論最新發(fā)布在 JDK 中的并發(fā)功能更新。事實(shí)上,JDK 中已經(jīng)有多處java.util.concurrent 改動(dòng),但...
摘要:分區(qū)函數(shù)返回一個(gè)布爾值,這意味著得到的分組的鍵類型是,于是它最多可以分為兩組是一組,是一組。當(dāng)遍歷到流中第個(gè)元素時(shí),這個(gè)函數(shù)執(zhí)行時(shí)會(huì)有兩個(gè)參數(shù)保存歸約結(jié)果的累加器已收集了流中的前個(gè)項(xiàng)目,還有第個(gè)元素本身。 一、收集器簡(jiǎn)介 把列表中的交易按貨幣分組: Map transactionsByCurrencies = transactions.stream().collect(groupi...
摘要:概述簡(jiǎn)介并行流就是把一個(gè)內(nèi)容分成多個(gè)數(shù)據(jù)塊,并用不同的線程分別處理每個(gè)數(shù)據(jù)塊的流中將并行進(jìn)行了優(yōu)化,我們可以很容易的對(duì)數(shù)據(jù)進(jìn)行并行操作,可以聲明性地通過與在并行流與順序流之間進(jìn)行切換。 1. 概述 1.1 簡(jiǎn)介 并行流就是把一個(gè)內(nèi)容分成多個(gè)數(shù)據(jù)塊,并用不同的線程分別處理每個(gè)數(shù)據(jù)塊的流 Java 8 中將并行進(jìn)行了優(yōu)化,我們可以很容易的對(duì)數(shù)據(jù)進(jìn)行并行操作,Stream API 可以聲明性...
閱讀 1094·2023-04-26 02:02
閱讀 2438·2021-09-26 10:11
閱讀 3585·2019-08-30 13:10
閱讀 3780·2019-08-29 17:12
閱讀 750·2019-08-29 14:20
閱讀 2216·2019-08-28 18:19
閱讀 2262·2019-08-26 13:52
閱讀 983·2019-08-26 13:43