摘要:對(duì)于任務(wù)的分割,要求各個(gè)子任務(wù)之間相互獨(dú)立,能夠并行獨(dú)立地執(zhí)行任務(wù),互相之間不影響。是叉子分叉的意思,即將大任務(wù)分解成并行的小任務(wù),是連接結(jié)合的意思,即將所有并行的小任務(wù)的執(zhí)行結(jié)果匯總起來(lái)。使用方法會(huì)阻塞并等待子任務(wù)執(zhí)行完并得到其結(jié)果。
Fork/Join是什么?
Fork/Join框架是Java7提供的并行執(zhí)行任務(wù)框架,思想是將大任務(wù)分解成小任務(wù),然后小任務(wù)又可以繼續(xù)分解,然后每個(gè)小任務(wù)分別計(jì)算出結(jié)果再合并起來(lái),最后將匯總的結(jié)果作為大任務(wù)結(jié)果。其思想和MapReduce的思想非常類似。對(duì)于任務(wù)的分割,要求各個(gè)子任務(wù)之間相互獨(dú)立,能夠并行獨(dú)立地執(zhí)行任務(wù),互相之間不影響。
Fork/Join的運(yùn)行流程圖如下:
[站外圖片上傳中...(image-938a4-1529976385943)]
我們可以通過(guò)Fork/Join單詞字面上的意思去理解這個(gè)框架。Fork是叉子分叉的意思,即將大任務(wù)分解成并行的小任務(wù),Join是連接結(jié)合的意思,即將所有并行的小任務(wù)的執(zhí)行結(jié)果匯總起來(lái)。
工作竊取算法ForkJoin采用了工作竊?。╳ork-stealing)算法,若一個(gè)工作線程的任務(wù)隊(duì)列為空沒有任務(wù)執(zhí)行時(shí),便從其他工作線程中獲取任務(wù)主動(dòng)執(zhí)行。為了實(shí)現(xiàn)工作竊取,在工作線程中維護(hù)了雙端隊(duì)列,竊取任務(wù)線程從隊(duì)尾獲取任務(wù),被竊取任務(wù)線程從隊(duì)頭獲取任務(wù)。這種機(jī)制充分利用線程進(jìn)行并行計(jì)算,減少了線程競(jìng)爭(zhēng)。但是當(dāng)隊(duì)列中只存在一個(gè)任務(wù)了時(shí),兩個(gè)線程去取反而會(huì)造成資源浪費(fèi)。
工作竊取的運(yùn)行流程圖如下:
[站外圖片上傳中...(image-17ddfc-1529976385943)]
Fork/Join核心類Fork/Join框架主要由子任務(wù)、任務(wù)調(diào)度兩部分組成,類層次圖如下。
ForkJoinPool
ForkJoinPool是ForkJoin框架中的任務(wù)調(diào)度器,和ThreadPoolExecutor一樣實(shí)現(xiàn)了自己的線程池,提供了三種調(diào)度子任務(wù)的方法:
execute:異步執(zhí)行指定任務(wù),無(wú)返回結(jié)果;
invoke、invokeAll:異步執(zhí)行指定任務(wù),等待完成才返回結(jié)果;
submit:異步執(zhí)行指定任務(wù),并立即返回一個(gè)Future對(duì)象;
ForkJoinTask
Fork/Join框架中的實(shí)際的執(zhí)行任務(wù)類,有以下兩種實(shí)現(xiàn),一般繼承這兩種實(shí)現(xiàn)類即可。
RecursiveAction:用于無(wú)結(jié)果返回的子任務(wù);
RecursiveTask:用于有結(jié)果返回的子任務(wù);
Fork/Join框架實(shí)戰(zhàn)下面實(shí)現(xiàn)一個(gè)Fork/Join小例子,從1+2+...10億,每個(gè)任務(wù)只能處理1000個(gè)數(shù)相加,超過(guò)1000個(gè)的自動(dòng)分解成小任務(wù)并行處理;并展示了通過(guò)不使用Fork/Join和使用時(shí)的時(shí)間損耗對(duì)比。
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class ForkJoinTask extends RecursiveTask{ private static final long MAX = 1000000000L; private static final long THRESHOLD = 1000L; private long start; private long end; public ForkJoinTask(long start, long end) { this.start = start; this.end = end; } public static void main(String[] args) { test(); System.out.println("--------------------"); testForkJoin(); } private static void test() { System.out.println("test"); long start = System.currentTimeMillis(); Long sum = 0L; for (long i = 0L; i <= MAX; i++) { sum += i; } System.out.println(sum); System.out.println(System.currentTimeMillis() - start + "ms"); } private static void testForkJoin() { System.out.println("testForkJoin"); long start = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); Long sum = forkJoinPool.invoke(new ForkJoinTask(1, MAX)); System.out.println(sum); System.out.println(System.currentTimeMillis() - start + "ms"); } @Override protected Long compute() { long sum = 0; if (end - start <= THRESHOLD) { for (long i = start; i <= end; i++) { sum += i; } return sum; } else { long mid = (start + end) / 2; ForkJoinTask task1 = new ForkJoinTask(start, mid); task1.fork(); ForkJoinTask task2 = new ForkJoinTask(mid + 1, end); task2.fork(); return task1.join() + task2.join(); } } }
這里需要計(jì)算結(jié)果,所以任務(wù)繼承的是RecursiveTask類。ForkJoinTask需要實(shí)現(xiàn)compute方法,在這個(gè)方法里首先需要判斷任務(wù)是否小于等于閾值1000,如果是就直接執(zhí)行任務(wù)。否則分割成兩個(gè)子任務(wù),每個(gè)子任務(wù)在調(diào)用fork方法時(shí),又會(huì)進(jìn)入compute方法,看看當(dāng)前子任務(wù)是否需要繼續(xù)分割成孫任務(wù),如果不需要繼續(xù)分割,則執(zhí)行當(dāng)前子任務(wù)并返回結(jié)果。使用join方法會(huì)阻塞并等待子任務(wù)執(zhí)行完并得到其結(jié)果。
程序輸出:
test 500000000500000000 4992ms -------------------- testForkJoin 500000000500000000 508ms
從結(jié)果看出,并行的時(shí)間損耗明顯要少于串行的,這就是并行任務(wù)的好處。
盡管如此,在使用Fork/Join時(shí)也得注意,不要盲目使用。
如果任務(wù)拆解的很深,系統(tǒng)內(nèi)的線程數(shù)量堆積,導(dǎo)致系統(tǒng)性能性能嚴(yán)重下降;
如果函數(shù)的調(diào)用棧很深,會(huì)導(dǎo)致棧內(nèi)存溢出;
推薦閱讀干貨:Spring Boot & Cloud 最強(qiáng)技術(shù)教程
工具:推薦一款在線創(chuàng)作流程圖、思維導(dǎo)圖軟件
分享Java干貨,高并發(fā)編程,熱門技術(shù)教程,微服務(wù)及分布式技術(shù),架構(gòu)設(shè)計(jì),區(qū)塊鏈技術(shù),人工智能,大數(shù)據(jù),Java面試題,以及前沿?zé)衢T資訊等。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/71384.html
摘要:第二步執(zhí)行任務(wù)并合并結(jié)果。使用兩個(gè)類來(lái)完成以上兩件事情我們要使用框架,必須首先創(chuàng)建一個(gè)任務(wù)。用于有返回結(jié)果的任務(wù)。如果任務(wù)順利執(zhí)行完成了,則設(shè)置任務(wù)狀態(tài)為,如果出現(xiàn)異常,則紀(jì)錄異常,并將任務(wù)狀態(tài)設(shè)置為。 1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一個(gè)用于并行執(zhí)行任務(wù)的框架, 是一個(gè)把大任務(wù)分割成若干個(gè)小任務(wù),最終匯總每個(gè)小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的...
摘要:框架框架簡(jiǎn)介框架是提供的一個(gè)用于并行執(zhí)行任務(wù)的框架,是一個(gè)把大任務(wù)分割成若干個(gè)小任務(wù),最終匯總每個(gè)小任務(wù)結(jié)果得到大任務(wù)結(jié)果的框架。框架實(shí)例需求計(jì)算的結(jié)果。 Fork/Join框架 1. Fork/Join框架簡(jiǎn)介 Fork/Join框架是java7提供的一個(gè)用于并行執(zhí)行任務(wù)的框架,是一個(gè)把大任務(wù)分割成若干個(gè)小任務(wù),最終匯總每個(gè)小任務(wù)結(jié)果得到大任務(wù)結(jié)果的框架。Fork指的就是把一個(gè)大任務(wù)...
摘要:分區(qū)函數(shù)返回一個(gè)布爾值,這意味著得到的分組的鍵類型是,于是它最多可以分為兩組是一組,是一組。當(dāng)遍歷到流中第個(gè)元素時(shí),這個(gè)函數(shù)執(zhí)行時(shí)會(huì)有兩個(gè)參數(shù)保存歸約結(jié)果的累加器已收集了流中的前個(gè)項(xiàng)目,還有第個(gè)元素本身。 一、收集器簡(jiǎn)介 把列表中的交易按貨幣分組: Map transactionsByCurrencies = transactions.stream().collect(groupi...
摘要:使用方式要把任務(wù)提交到線程池,必須創(chuàng)建的一個(gè)子類,其中是并行化任務(wù)產(chǎn)生的結(jié)果如果沒有結(jié)果使用類型。對(duì)一個(gè)子任務(wù)調(diào)用的話,可以使一個(gè)子任務(wù)重用當(dāng)前線程,避免線程池中多分配一個(gè)任務(wù)帶來(lái)的開銷。 【概念 分支和并框架的目的是以遞歸的方式將可以并行的任務(wù)拆分成更小的任務(wù),然后將每個(gè)子任務(wù)的結(jié)果合并起來(lái)生成整體的結(jié)果,它是ExecutorService的一個(gè)實(shí)現(xiàn),它把子任務(wù)分配給線程池(Fork...
摘要:概述簡(jiǎn)介并行流就是把一個(gè)內(nèi)容分成多個(gè)數(shù)據(jù)塊,并用不同的線程分別處理每個(gè)數(shù)據(jù)塊的流中將并行進(jìn)行了優(yōu)化,我們可以很容易的對(duì)數(shù)據(jù)進(jìn)行并行操作,可以聲明性地通過(guò)與在并行流與順序流之間進(jìn)行切換。 1. 概述 1.1 簡(jiǎn)介 并行流就是把一個(gè)內(nèi)容分成多個(gè)數(shù)據(jù)塊,并用不同的線程分別處理每個(gè)數(shù)據(jù)塊的流 Java 8 中將并行進(jìn)行了優(yōu)化,我們可以很容易的對(duì)數(shù)據(jù)進(jìn)行并行操作,Stream API 可以聲明性...
閱讀 2942·2021-08-20 09:37
閱讀 1630·2019-08-30 12:47
閱讀 1154·2019-08-29 13:27
閱讀 1712·2019-08-28 18:02
閱讀 776·2019-08-23 18:15
閱讀 3115·2019-08-23 16:51
閱讀 956·2019-08-23 14:13
閱讀 2183·2019-08-23 13:05