摘要:正確使用并行流錯(cuò)用并行流而產(chǎn)生錯(cuò)誤的首要原因,就是使用的算法改變了某些共享狀態(tài)。高效使用并行流留意裝箱有些操作本身在并行流上的性能就比順序流差還要考慮流的操作流水線的總計(jì)算成本。
一、并行流 1.將順序流轉(zhuǎn)換為并行流
對順序流調(diào)用parallel方法:
public static long parallelSum(long n) { return Stream.iterate(1L, i -> i + 1) .limit(n) .parallel() .reduce(0L, Long::sum); }
它在內(nèi)部實(shí)際上就是設(shè)了一個(gè)boolean標(biāo)志,表示你想讓調(diào)用parallel之后進(jìn)行的所有操作都并行執(zhí)行。類似地,你只需要對并行流調(diào)用sequential方法就可以把它變成順序流。但最后一次parallel或sequential調(diào)用會影響整個(gè)流水線。
2.測量流性能iterate生成的是裝箱的對象,必須拆箱成數(shù)字才能求和;
我們很難把iterate分成多個(gè)獨(dú)立塊來并行執(zhí)行。
iterate很難分割成能夠獨(dú)立執(zhí)行的小塊,因?yàn)槊看螒?yīng)用這個(gè)函數(shù)都要依賴前一次應(yīng)用的結(jié)果,整張數(shù)字列表在歸納過程開始時(shí)沒有準(zhǔn)備好,因而無法有效地把流劃分為小塊來并行處理。把流標(biāo)記成并行,你其實(shí)是給順序處理增加了開銷,它還要把每次求和操作分到一個(gè)不同的線程上。
3.正確使用并行流錯(cuò)用并行流而產(chǎn)生錯(cuò)誤的首要原因,就是使用的算法改變了某些共享狀態(tài)。
public class Accumulator { public long total = 0; public void add(long value) { total += value; } } public static long sideEffectParallelSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add); return accumulator.total; }
上面的示例在本質(zhì)上就是順序的,每次訪問total都會出現(xiàn)數(shù)據(jù)競爭.由于多個(gè)線程在同時(shí)訪問累加器,執(zhí)行total += value,而這一句雖然看似簡單,卻不是一個(gè)原子操作。所得的結(jié)果也是不可控的(錯(cuò)誤的)。4.高效使用并行流
留意裝箱
有些操作本身在并行流上的性能就比順序流差
還要考慮流的操作流水線的總計(jì)算成本。設(shè)N是要處理的元素的總數(shù),Q是一個(gè)元素通過流水線的大致處理成本,則N*Q就是這個(gè)對成本的一個(gè)粗略的定性估計(jì)。Q值較高就意味著使用并行流時(shí)性能好的可能性比較大
對于較小的數(shù)據(jù)量,選擇并行流幾乎從來都不是一個(gè)好的決定
要考慮流背后的數(shù)據(jù)結(jié)構(gòu)是否易于分解
流自身的特點(diǎn),以及流水線中的中間操作修改流的方式,都可能會改變分解過程的性能。
還要考慮終端操作中合并步驟的代價(jià)是大是小
二、分支/合并框架(Fork/Join)詳見第六章相關(guān)內(nèi)容
注意:不應(yīng)該在RecursiveTask內(nèi)部使用ForkJoinPool的invoke方法。相反,你應(yīng)該始終直接調(diào)用compute或fork方法,只有順序代碼才應(yīng)該用invoke來啟動(dòng)并行計(jì)算。
Spliterator是Java 8中加入的另一個(gè)新接口;這個(gè)名字代表“可分迭代器”(splitable iterator)。和Iterator一樣,Spliterator也用于遍歷數(shù)據(jù)源中的元素,但它是為了并行執(zhí)行而設(shè)計(jì)的。
Spliterator接口
public interface Spliterator{ boolean tryAdvance(Consumer super T> action); Spliterator trySplit(); long estimateSize(); int characteristics(); }
與往常一樣,T是Spliterator遍歷的元素的類型。tryAdvance方法的行為類似于普通的Iterator,因?yàn)樗鼤错樞蛞粋€(gè)一個(gè)使用Spliterator中的元素,并且如果還有其他元素要遍歷就返回true。但trySplit是專為Spliterator接口設(shè)計(jì)的,因?yàn)樗梢园岩恍┰貏澇鋈シ纸o第二個(gè)Spliterator(由該方法返回),讓它們兩個(gè)并行處理。Spliterator還可通過estimateSize方法估計(jì)還剩下多少元素要遍歷,因?yàn)榧词共荒敲创_切,能快速算出來是一個(gè)值也有助于讓拆分均勻一點(diǎn).
1.拆分過程將Stream拆分成多個(gè)部分的算法是一個(gè)遞歸過程,如圖所示。第一步是對第一個(gè)Spliterator調(diào)用trySplit,生成第二個(gè)Spliterator。第二步對這兩個(gè)Spliterator調(diào)用trysplit,這樣總共就有了四個(gè)Spliterator。這個(gè)框架不斷對Spliterator調(diào)用trySplit直到它返回null,表明它處理的數(shù)據(jù)結(jié)構(gòu)不能再分割,如第三步所示。最后,這個(gè)遞歸拆分過程到第四步就終止了,這時(shí)所有的Spliterator在調(diào)用trySplit時(shí)都返回了null。
文中提到了reduce的三參數(shù)重載方法
U reduce(U identity,BiFunction accumulator,BinaryOperator combiner)它的三個(gè)參數(shù):
identity: 一個(gè)初始化的值;這個(gè)初始化的值其類型是泛型U,與Reduce方法返回的類型一致;注意此時(shí)Stream中元素的類型是T,與U可以不一樣也可以一樣,這樣的話操作空間就大了;不管Stream中存儲的元素是什么類型,U都可以是任何類型,如U可以是一些基本數(shù)據(jù)類型的包裝類型Integer、Long等;或者是String,又或者是一些集合類型ArrayList等;后面會說到這些用法。
accumulator: 其類型是BiFunction,輸入是U與T兩個(gè)類型的數(shù)據(jù),而返回的是U類型;也就是說返回的類型與輸入的第一個(gè)參數(shù)類型是一樣的,而輸入的第二個(gè)參數(shù)類型與Stream中元素類型是一樣的。
combiner: 其類型是BinaryOperator,支持的是對U類型的對象進(jìn)行操作;
第三個(gè)參數(shù)combiner主要是使用在并行計(jì)算的場景下;如果Stream是非并行時(shí),第三個(gè)參數(shù)實(shí)際上是不生效的。
代碼實(shí)現(xiàn):
class WordCounter { private final int counter; private final boolean lastSpace; public WordCounter(int counter, boolean lastSpace) { this.counter = counter; this.lastSpace = lastSpace; } public WordCounter accumulate(Character c) { if (Character.isWhitespace(c)) { return lastSpace ? this : new WordCounter(counter, true); } else { return lastSpace ? new WordCounter(counter + 1, false) : this; } } public WordCounter combine(WordCounter wordCounter) { return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace); } public int getCounter() { return counter; } }
class WordCounterSpliterator implements Spliterator{ private final String string; private int currentChar = 0; public WordCounterSpliterator(String string) { this.string = string; } @Override public boolean tryAdvance(Consumer action) { action.accept(string.charAt(currentChar++)); return currentChar < string.length(); } @Override public Spliterator trySplit() { int currentSize = string.length() - currentChar; if (currentSize < 10) { return null; } for (int splitPos = (currentSize / 2) + currentChar; splitPos < string.length(); splitPos++) { if (Character.isWhitespace(string.charAt(splitPos))) { Spliterator spliterator = new WordCounterSpliterator(string.substring( currentChar, splitPos)); currentChar = splitPos; return spliterator; } } return null; } @Override public long estimateSize() { return string.length() - currentChar; } @Override public int characteristics() { return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE; } }
final String SENTENCE = " Nel mezzo del cammin di nostra vita " + "mi ritrovai in una selva oscura" + " ché la dritta via era smarrita "; private int countWords(Streamstream) { WordCounter wordCounter = stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine); return wordCounter.getCounter(); } Spliterator spliterator = new WordCounterSpliterator(SENTENCE); Stream stream = StreamSupport.stream(spliterator, true); System.out.println("Found " + countWords(stream) + " words");
最后打印顯示
Found 19 words
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/74284.html
摘要:實(shí)戰(zhàn)讀書筆記第一章從方法傳遞到接著上次的,繼續(xù)來了解一下,如果繼續(xù)簡化代碼。去掉并且生成的數(shù)字是萬,所消耗的時(shí)間循序流并行流至于為什么有時(shí)候并行流效率比循序流還低,這個(gè)以后的文章會解釋。 《Java8實(shí)戰(zhàn)》-讀書筆記第一章(02) 從方法傳遞到Lambda 接著上次的Predicate,繼續(xù)來了解一下,如果繼續(xù)簡化代碼。 把方法作為值來傳遞雖然很有用,但是要是有很多類似與isHeavy...
摘要:限制編寫并行流,存在一些與非并行流不一樣的約定。底層框架并行流在底層沿用的框架,遞歸式的分解問題,然后每段并行執(zhí)行,最終由合并結(jié)果,返回最后的值。 本書第六章的讀書筆記,也是我這個(gè)系列的最后一篇讀書筆記。后面7、8、9章分別講的測試、調(diào)試與重構(gòu)、設(shè)計(jì)和架構(gòu)的原則以及使用Lambda表達(dá)式編寫并發(fā)程序,因?yàn)楣P記不好整理,就不寫了,感興趣的同學(xué)自己買書來看吧。 并行化流操作 關(guān)于并行與并發(fā)...
摘要:第四章引入流一什么是流流是的新成員,它允許你以聲明性方式處理數(shù)據(jù)集合通過查詢語句來表達(dá),而不是臨時(shí)編寫一個(gè)實(shí)現(xiàn)。 第四章 引入流 一、什么是流 流是Java API的新成員,它允許你以聲明性方式處理數(shù)據(jù)集合(通過查詢語句來表達(dá),而不是臨時(shí)編寫一個(gè)實(shí)現(xiàn))。就現(xiàn)在來說,你可以把它們看成遍歷數(shù)據(jù)集的高級迭代器。此外,流還可以透明地并行處理,你無需寫任何多線程代碼。 下面兩段代碼都是用來返回低...
摘要:分區(qū)函數(shù)返回一個(gè)布爾值,這意味著得到的分組的鍵類型是,于是它最多可以分為兩組是一組,是一組。當(dāng)遍歷到流中第個(gè)元素時(shí),這個(gè)函數(shù)執(zhí)行時(shí)會有兩個(gè)參數(shù)保存歸約結(jié)果的累加器已收集了流中的前個(gè)項(xiàng)目,還有第個(gè)元素本身。 一、收集器簡介 把列表中的交易按貨幣分組: Map transactionsByCurrencies = transactions.stream().collect(groupi...
摘要:內(nèi)部迭代與使用迭代器顯式迭代的集合不同,流的迭代操作是在背后進(jìn)行的。流只能遍歷一次請注意,和迭代器類似,流只能遍歷一次。 流(Stream) 流是什么 流是Java API的新成員,它允許你以聲明性方式處理數(shù)據(jù)集合(通過查詢語句來表達(dá),而不是臨時(shí)編寫一個(gè)實(shí)現(xiàn))。就現(xiàn)在來說,你可以把它們看成遍歷數(shù)據(jù)集的高級迭代器。此外,流還可以透明地并行處理,你無需寫任何多線程代碼了!我會在后面的筆記中...
摘要:方法接受一個(gè)生產(chǎn)者作為參數(shù),返回一個(gè)對象,該對象完成異步執(zhí)行后會讀取調(diào)用生產(chǎn)者方法的返回值。該方法接收一個(gè)對象構(gòu)成的數(shù)組,返回由第一個(gè)執(zhí)行完畢的對象的返回值構(gòu)成的。 一、Future 接口 在Future中觸發(fā)那些潛在耗時(shí)的操作把調(diào)用線程解放出來,讓它能繼續(xù)執(zhí)行其他有價(jià)值的工作,不再需要呆呆等待耗時(shí)的操作完成。打個(gè)比方,你可以把它想象成這樣的場景:你拿了一袋子衣服到你中意的干洗店去洗。...
閱讀 2910·2021-11-11 10:58
閱讀 1958·2021-10-11 10:59
閱讀 3520·2019-08-29 16:23
閱讀 2363·2019-08-29 11:11
閱讀 2812·2019-08-28 17:59
閱讀 3882·2019-08-27 10:56
閱讀 2114·2019-08-23 18:37
閱讀 3139·2019-08-23 16:53