摘要:實際上,在并行流上使用新的方法。此外,我們了解到所有并行流操作共享相同的范圍。因此,您可能希望避免實施慢速阻塞流操作,因為這可能會減慢嚴(yán)重依賴并行流的應(yīng)用程序的其他部分。
流可以并行執(zhí)行,以增加大量輸入元素的運行時性能。并行流ForkJoinPool通過靜態(tài)ForkJoinPool.commonPool()方法使用公共可用的流。底層線程池的大小最多使用五個線程 - 具體取決于可用物理CPU核心的數(shù)量:
ForkJoinPool commonPool = ForkJoinPool.commonPool(); System.out.println(commonPool.getParallelism()); // 3
在我的機器上,公共池初始化為默認(rèn)值為3的并行度。通過設(shè)置以下JVM參數(shù)可以減小或增加此值:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
集合支持創(chuàng)建并行元素流的方法parallelStream()?;蛘撸梢栽诮o定流上調(diào)用中間方法parallel(),以將順序流轉(zhuǎn)換為并行流。
為了評估并行流的并行執(zhí)行行為,下一個示例將有關(guān)當(dāng)前線程的信息打印出來:
Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s] ", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s] ", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .forEach(s -> System.out.format("forEach: %s [%s] ", s, Thread.currentThread().getName()));
通過調(diào)查調(diào)試輸出,我們應(yīng)該更好地理解哪些線程實際用于執(zhí)行流操作:
filter: b1 [main] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: c2 [ForkJoinPool.commonPool-worker-3] map: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: A2 [ForkJoinPool.commonPool-worker-1] map: b1 [main] forEach: B1 [main] filter: a1 [ForkJoinPool.commonPool-worker-3] map: a1 [ForkJoinPool.commonPool-worker-3] forEach: A1 [ForkJoinPool.commonPool-worker-3] forEach: C1 [ForkJoinPool.commonPool-worker-2]
如您所見,并行流利用公共中的所有可用線程ForkJoinPool來執(zhí)行流操作。輸出在連續(xù)運行中可能不同,因為實際使用的特定線程的行為是非確定性的。
讓我們通過一個額外的流操作來擴展該示例:
Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s] ", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s] ", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .sorted((s1, s2) -> { System.out.format("sort: %s <> %s [%s] ", s1, s2, Thread.currentThread().getName()); return s1.compareTo(s2); }) .forEach(s -> System.out.format("forEach: %s [%s] ", s, Thread.currentThread().getName()));
結(jié)果可能最初看起來很奇怪:
filter: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: b1 [main] map: b1 [main] filter: a1 [ForkJoinPool.commonPool-worker-2] map: a1 [ForkJoinPool.commonPool-worker-2] map: c2 [ForkJoinPool.commonPool-worker-3] sort: A2 <> A1 [main] sort: B1 <> A2 [main] sort: C2 <> B1 [main] sort: C1 <> C2 [main] sort: C1 <> B1 [main] sort: C1 <> C2 [main] forEach: A1 [ForkJoinPool.commonPool-worker-1] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: B1 [main] forEach: A2 [ForkJoinPool.commonPool-worker-2] forEach: C1 [ForkJoinPool.commonPool-worker-1]
似乎sort只在主線程上順序執(zhí)行。實際上,sort在并行流上使用新的Java 8方法Arrays.parallelSort()。如Javadoc中所述,如果排序?qū)错樞蚧虿⑿袌?zhí)行,則此方法決定數(shù)組的長度:
如果指定數(shù)組的長度小于最小粒度,則使用適當(dāng)?shù)腁rrays.sort方法對其進行排序。
回到reduce一節(jié)的例子。我們已經(jīng)發(fā)現(xiàn)組合器函數(shù)只是并行調(diào)用,而不是順序流調(diào)用。讓我們看看實際涉及哪些線程:
Listpersons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12)); persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s [%s] ", sum, p, Thread.currentThread().getName()); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s [%s] ", sum1, sum2, Thread.currentThread().getName()); return sum1 + sum2; });
控制臺輸出顯示累加器和組合器函數(shù)在所有可用線程上并行執(zhí)行:
accumulator: sum=0; person=Pamela; [main] accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3] accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2] accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1] combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1] combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2] combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2]
總之,并行流可以為具有大量輸入元素的流帶來良好的性能提升。但請記住,某些并行流操作reduce,collect需要額外的計算(組合操作),這在順序執(zhí)行時是不需要的。
此外,我們了解到所有并行流操作共享相同的JVM范圍ForkJoinPool。因此,您可能希望避免實施慢速阻塞流操作,因為這可能會減慢嚴(yán)重依賴并行流的應(yīng)用程序的其他部分。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/72912.html
摘要:需要注意的是很多流操作本身就會返回一個流,所以多個操作可以直接連接起來,如下圖這樣,操作可以進行鏈?zhǔn)秸{(diào)用,并且并行流還可以實現(xiàn)數(shù)據(jù)流并行處理操作。為集合創(chuàng)建并行流。 上一篇文章,小樂給大家介紹了《Java8新特性之方法引用》,下面接下來小樂將會給大家介紹Java8新特性之Stream,稱之為流,本篇文章為上半部分。 1、什么是流? Java Se中對于流的操作有輸入輸出IO流,而Jav...
摘要:新特性總覽標(biāo)簽本文主要介紹的新特性,包括表達式方法引用流默認(rèn)方法組合式異步編程新的時間,等等各個方面。還有對應(yīng)的和類型的函數(shù)連接字符串廣義的歸約匯總起始值,映射方法,二元結(jié)合二元結(jié)合。使用并行流時要注意避免共享可變狀態(tài)。 Java8新特性總覽 標(biāo)簽: java [TOC] 本文主要介紹 Java 8 的新特性,包括 Lambda 表達式、方法引用、流(Stream API)、默認(rèn)方...
摘要:限制編寫并行流,存在一些與非并行流不一樣的約定。底層框架并行流在底層沿用的框架,遞歸式的分解問題,然后每段并行執(zhí)行,最終由合并結(jié)果,返回最后的值。 本書第六章的讀書筆記,也是我這個系列的最后一篇讀書筆記。后面7、8、9章分別講的測試、調(diào)試與重構(gòu)、設(shè)計和架構(gòu)的原則以及使用Lambda表達式編寫并發(fā)程序,因為筆記不好整理,就不寫了,感興趣的同學(xué)自己買書來看吧。 并行化流操作 關(guān)于并行與并發(fā)...
摘要:串行與并行可以分為串行與并行兩種,串行流和并行流差別就是單線程和多線程的執(zhí)行。返回串行流返回并行流和方法返回的都是類型的對象,說明它們在功能的使用上是沒差別的。唯一的差別就是單線程和多線程的執(zhí)行。 Stream是什么 Stream是Java8中新加入的api,更準(zhǔn)確的說: Java 8 中的 Stream 是對集合(Collection)對象功能的增強,它專注于對集合對象進行各種非常便...
閱讀 1361·2023-04-26 00:35
閱讀 2731·2023-04-25 18:32
閱讀 3384·2021-11-24 11:14
閱讀 786·2021-11-22 15:24
閱讀 1435·2021-11-18 10:07
閱讀 6586·2021-09-22 10:57
閱讀 2789·2021-09-07 09:58
閱讀 3577·2019-08-30 15:54