摘要:框架使用的是工作竊取算法。由于此時它們訪問同一個隊列,為了減小競爭,通常會使用雙端隊列。方法返回對象,如果任務被取消了則返回,如果任務沒有完成或者沒有拋出異常則返回。
概述
Fork 就是把一個大任務切分為若干個子任務并行地執(zhí)行,Join 就是合并這些子任務的執(zhí)行結(jié)果,最后得到這個大任務的結(jié)果。Fork/Join 框架使用的是工作竊取算法。
工作竊取算法工作竊取算法是指某個線程從其他隊列里竊取任務來執(zhí)行。對于一個比較大的任務,可以把它分割為若干個互不依賴的子任務,為了減少線程間的競爭,把這些子任務分別放到不同的隊列里,并為每個隊列創(chuàng)建一個多帶帶的線程來執(zhí)行隊列里的任務,線程和隊列一一對應。但是,有的線程會先把自己隊列里的任務干完,而其他線程對應的隊列里還有任務需要處理,于是它就去其他線程的隊列里竊取一個任務來執(zhí)行。由于此時它們訪問同一個隊列,為了減小競爭,通常會使用雙端隊列。被竊取任務的線程永遠從雙端隊列的頭部獲取任務,竊取任務的線程永遠從雙端隊列的尾部獲取任務。
工作竊取算法的優(yōu)缺點優(yōu)點:充分利用線程進行并行計算,減少了線程間的競爭。
缺點:雙端隊列只存在一個任務時會導致競爭,會消耗更多的系統(tǒng)資源,因為需要創(chuàng)建多個線程和多個雙端隊列。
ForkJoinTask 在執(zhí)行的時候可能拋出異常,但沒有辦法在主線程中直接捕獲異常,所以 ForkJoinTask 提供了 isCompletedAbnormally() 方法檢查任務是否已經(jīng)拋出異?;蛞呀?jīng)被取消。getException() 方法返回 Throwable 對象,如果任務被取消了則返回 CancellationException,如果任務沒有完成或者沒有拋出異常則返回 null。
Fork/Join 框架的實現(xiàn)原理 fork() 方法的實現(xiàn)原理當調(diào)用 ForkJoinTask 的 fork() 方法時,程序會調(diào)用 ForkJoinPool.WorkQueue 的 push() 方法異步地執(zhí)行這個任務,然后立即返回結(jié)果。代碼如下:
public final ForkJoinTaskfork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; }
push() 方法把當前任務存放在一個 ForkJoinTask 數(shù)組隊列里,然后再調(diào)用 ForkJoinPool 的 signalWork() 方法喚醒或創(chuàng)建一個工作線程來執(zhí)行任務。代碼如下:
final void push(ForkJoinTask> task) { ForkJoinTask>[] a; ForkJoinPool p; int b = base, s = top, n; if ((a = array) != null) { // ignore if queue removed int m = a.length - 1; // fenced write for task visibility U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { if ((p = pool) != null) p.signalWork(p.workQueues, this); } else if (n >= m) growArray(); } }join() 方法的實現(xiàn)原理
當調(diào)用 ForkJoinTask 的 join() 方法時,程序會調(diào)用 doJoin() 方法,通過 doJoin() 方法來判斷返回什么結(jié)果
public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); } private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); } private void reportException(int s) { if (s == CANCELLED) throw new CancellationException(); if (s == EXCEPTIONAL) rethrow(getThrowableException()); } public abstract V getRawResult();
實例代碼:
import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; public class CountTask extends RecursiveTask{ private static final int THRESHOLD = 2; private int start; private int end; public CountTask(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; boolean canCompute = (end - start) <= THRESHOLD; if (canCompute) { // 如果任務足夠小,就計算任務 for (int i = start; i <= end; i++) { sum += i; } } else { // 如果任務大于閾值,分裂成兩個子任務執(zhí)行 int middle = (start + end) / 2; CountTask leftTask = new CountTask(start, middle); CountTask rightTask = new CountTask(middle + 1, end); // 執(zhí)行子任務 leftTask.fork(); rightTask.fork(); // 等待子任務執(zhí)行完,并得到其結(jié)果 int leftResult = leftTask.join(); int rightResult = rightTask.join(); // 合并子任務 sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask countTask = new CountTask(1, 100); peekNextLocalTask(); Future result = forkJoinPool.submit(countTask); try { if (countTask.isCompletedAbnormally()) { System.out.println(countTask.getException()); } System.out.println(result.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
參考資料
Java 并發(fā)編程的藝術
文章版權歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/75169.html
以下是Java技術棧微信公眾號發(fā)布的關于 Java 的技術干貨,從以下幾個方面匯總。 Java 基礎篇 Java 集合篇 Java 多線程篇 Java JVM篇 Java 進階篇 Java 新特性篇 Java 工具篇 Java 書籍篇 Java基礎篇 8張圖帶你輕松溫習 Java 知識 Java父類強制轉(zhuǎn)換子類原則 一張圖搞清楚 Java 異常機制 通用唯一標識碼UUID的介紹及使用 字符串...
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設計模式,設計了并發(fā)包,其中包下提供了一系列基礎的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:這減輕了手動重復執(zhí)行相同基準測試的痛苦,并簡化了獲取結(jié)果的流程。處理項目的代碼并從標有注釋的方法處生成基準測試程序。用和運行該基準測試得到以下結(jié)果。同時,和的基線測試結(jié)果也有略微的不同。 Java 8 已經(jīng)發(fā)布一段時間了,許多開發(fā)者已經(jīng)開始使用 Java 8。本文也將討論最新發(fā)布在 JDK 中的并發(fā)功能更新。事實上,JDK 中已經(jīng)有多處java.util.concurrent 改動,但...
摘要:同時,它會通過的方法將自己注冊到線程池中。線程池中的每個工作線程都有一個自己的任務隊列,工作線程優(yōu)先處理自身隊列中的任務或順序,由線程池構(gòu)造時的參數(shù)決定,自身隊列為空時,以的順序隨機竊取其它隊列中的任務。 showImg(https://segmentfault.com/img/bVbizJb?w=1802&h=762); 本文首發(fā)于一世流云的專欄:https://segmentfau...
摘要:方法返回對象,如果任務被取消了則返回。如果任務沒有完成或者沒有拋出異常則返回。 一. Fork/Join 1 . 簡單介紹 a . Fork/Join為JKD1.7引入,適用于對大量數(shù)據(jù)進行拆分成多個小任務進行計算的框架,最后把所有小任務的結(jié)果匯總合并得到最終的結(jié)果 b . 相關類 public abstract class RecursiveTask extends Fork...
閱讀 3056·2023-04-25 20:09
閱讀 3328·2021-11-23 09:51
閱讀 1981·2021-11-22 15:25
閱讀 3362·2021-11-18 10:02
閱讀 2761·2021-09-27 13:56
閱讀 1317·2019-08-30 15:44
閱讀 1158·2019-08-30 13:21
閱讀 3332·2019-08-30 11:05