摘要:類(lèi)似的你可以用將并行流變?yōu)轫樞蛄?。中的使用順序求和并行求和將流轉(zhuǎn)為并行流配置并行流線(xiàn)程池并行流內(nèi)部使用了默認(rèn)的,默認(rèn)的線(xiàn)程數(shù)量就是處理器的數(shù)量包括虛擬內(nèi)核通過(guò)得到。
【概念
并行流就是一個(gè)把內(nèi)容分成多個(gè)數(shù)據(jù)塊,并用不同的線(xiàn)程分別處理每一個(gè)數(shù)據(jù)塊的流。在java7之前,并行處理數(shù)據(jù)很麻煩,第一,需要明確的把包含數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu)分成若干子部分。第二,給每一個(gè)子部分分配一個(gè)獨(dú)立的線(xiàn)程。第三,適當(dāng)?shù)臅r(shí)候進(jìn)行同步,避免出現(xiàn)數(shù)據(jù)競(jìng)爭(zhēng)帶來(lái)的問(wèn)題,最后將每一個(gè)子部分的結(jié)果合并。在java7中引入了forkjoin框架來(lái)完成這些步驟,而java8中的stream接口可以讓你不費(fèi)吹灰之力就對(duì)數(shù)據(jù)執(zhí)行并行處理,而stream接口幕后正是使用的forkjoin框架。不過(guò),對(duì)順序流調(diào)用parallel()并不意味著流本身有任何的變化。它在內(nèi)部實(shí)際上就是設(shè)了一個(gè)boolean標(biāo)志,表示你想讓parallel()之后的操作都并行執(zhí)行。類(lèi)似的你可以用sequential()將并行流變?yōu)轫樞蛄?。這兩個(gè)方法可以讓我們更細(xì)化的控制流。
eg.java8中stream的使用:
//順序求和 public static long sum(long n){ return Stream.iterate(1l,i -> i + 1) .limit(n) .reduce(0l,Long::sum); } //并行求和 public static long parallelSum(long n){ return Stream.iterate(1l,i -> i + 1) .limit(n) //將流轉(zhuǎn)為并行流 .parallel() .reduce(0l,Long::sum); }【配置并行流線(xiàn)程池
并行流內(nèi)部使用了默認(rèn)的forkjoinPool,默認(rèn)的線(xiàn)程數(shù)量就是處理器的數(shù)量(包括虛擬內(nèi)核),
通過(guò):Runtime.getRuntime().availableProcessors() 得到。
通過(guò):System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12")來(lái)改變線(xiàn)程池大小。
我們不應(yīng)該理所當(dāng)然的任認(rèn)為多線(xiàn)程比順序執(zhí)行的效率更高,來(lái)看下面的例子:
public class Exercise { public static void main(String[] args) { long num = 1000_000_0; long st = System.currentTimeMillis(); System.out.println("iterate順序" + sum(num) + ":" +(System.currentTimeMillis() - st)); st = System.currentTimeMillis(); System.out.println("iterate并行" + parallelSum(num) + ":" +(System.currentTimeMillis() - st)); st = System.currentTimeMillis(); System.out.println("迭代" + forSum(num) + ":" +(System.currentTimeMillis() - st)); st = System.currentTimeMillis(); System.out.println("LongStream并行" + longStreamParallelSum(num) + ":" +(System.currentTimeMillis() - st)); st = System.currentTimeMillis(); System.out.println("LongStream順序" + longStreamSum(num) + ":" +(System.currentTimeMillis() - st)); } //順序求和 public static long sum(long n){ return Stream.iterate(1l,i -> i + 1) .limit(n) .reduce(0l,Long::sum); } //并行求和 public static long parallelSum(long n){ return Stream.iterate(1l,i -> i + 1) .limit(n) //將流轉(zhuǎn)為并行流 .parallel() .reduce(0l,Long::sum); } //迭代求和 public static long forSum(long n){ long result = 0; for(long i = 0 ;i <= n ; i++){ result += i; } return result; } //longStream并行 public static long longStreamParallelSum(long n){ return LongStream.rangeClosed(1,n) .parallel() .reduce(0l,Long::sum); } //longStream順序執(zhí)行 public static long longStreamSum(long n){ return LongStream.rangeClosed(1,n) .reduce(0l,Long::sum); } }
并行流執(zhí)行的時(shí)間比順序流和迭代執(zhí)行的要長(zhǎng)很多,兩個(gè)原因:
iterate()生成的是裝箱對(duì)象,必須要拆箱才能求和;
iterate()很難分成多個(gè)獨(dú)立的塊并行運(yùn)行,因?yàn)槊看螒?yīng)用這個(gè)函數(shù)都要依賴(lài)前一次的應(yīng)用的結(jié)果。數(shù)字列表在歸納的過(guò)程開(kāi)始時(shí)沒(méi)有準(zhǔn)備好,因而無(wú)法有效的把流劃分成小塊來(lái)并行處理。但是我們又標(biāo)記流為并行執(zhí)行,這就給順序執(zhí)行增加了開(kāi)銷(xiāo),每一次的求和操作都新開(kāi)啟了一個(gè)線(xiàn)程。
【使用更有針對(duì)性的的方法LongStream.rangeClosed():
1. 直接產(chǎn)生long類(lèi)型數(shù)據(jù),沒(méi)有開(kāi)箱操作 2. 生成數(shù)字范圍,容易拆分成獨(dú)立的小塊
由此可見(jiàn),選擇適當(dāng)?shù)臄?shù)據(jù)結(jié)構(gòu)往往比并行化算法更重要。并行是有代價(jià)的。并行過(guò)程需要對(duì)流做遞歸劃分,把每個(gè)子流的操作分配到不同的線(xiàn)程,然后把這些操作的結(jié)果合并成一個(gè)值。但是多核之間移動(dòng)數(shù)據(jù)的代價(jià)比我們想象的要大,所以很重要的一點(diǎn)是保證再內(nèi)核中并行執(zhí)行的工作時(shí)間比內(nèi)核之間傳輸數(shù)據(jù)的時(shí)間要長(zhǎng)。
【正確的使用并行流錯(cuò)誤使用并行流的首要原因就是使用的算法改變了共享變量的狀態(tài),因?yàn)樾薷墓蚕碜兞恳馕吨?,而使用同步方法就?huì)使的并行毫無(wú)意義。以下是一些建議:
1. 測(cè)試,并行還是順序執(zhí)行最重要的基準(zhǔn)就是不停的測(cè)試性能。 2. 留意裝箱,自動(dòng)裝箱,拆箱會(huì)大大降低性能,java8提供了LongStream,IntStream,DoubleStream來(lái)避免這兩個(gè)操作。 3. 有些操作本身就是順序執(zhí)行要率高,例如:limit,findFirst等依賴(lài)元素順序的操作。 4. 當(dāng)執(zhí)行單個(gè)任務(wù)的成本高時(shí)使用并行,如果單個(gè)操作的成本很低,并行執(zhí)行反而會(huì)因?yàn)殚_(kāi)啟線(xiàn)程,標(biāo)記狀態(tài)等操作使得效率下降。 5. 小量數(shù)據(jù)不適用并行。 6. 考慮流中背后的數(shù)據(jù)結(jié)構(gòu)是否易于分解。ArrayList的拆分效率比LinkedList高得多,因?yàn)榍罢哂貌恢憷涂梢云骄鸱?。另外,range工廠(chǎng)方法的原始類(lèi)型數(shù)據(jù)流也可以快速分解。以下時(shí)流數(shù)據(jù)源的可分解性: - ArrayList:極佳 - LinkedList:差 - IntStream等:極佳 - Stream.iterate:差 - HashSet:好 - TreeSet:好 7. 中間操作改變流的方法,涉及到排序就不適用并行。 8. 終端操作合并流的代價(jià),涉及到排序就不適用并行。【正確的使用并行
高并發(fā)、任務(wù)執(zhí)行時(shí)間短的業(yè)務(wù),線(xiàn)程池線(xiàn)程數(shù)可以設(shè)置為CPU核數(shù)+1,減少線(xiàn)程上下文的切換
并發(fā)不高、任務(wù)執(zhí)行時(shí)間長(zhǎng)的業(yè)務(wù)要區(qū)分開(kāi)看:
假如是業(yè)務(wù)時(shí)間長(zhǎng)集中在IO操作上,也就是IO密集型的任務(wù),因?yàn)镮O操作并不占用CPU,所以不要讓所有的CPU閑下來(lái),可以加大線(xiàn)程池中的線(xiàn)程數(shù)目,讓CPU處理更多的業(yè)務(wù)
假如是業(yè)務(wù)時(shí)間長(zhǎng)集中在計(jì)算操作上,也就是計(jì)算密集型任務(wù),這個(gè)就沒(méi)辦法了,和(1)一樣吧,線(xiàn)程池中的線(xiàn)程數(shù)設(shè)置得少一些,減少線(xiàn)程上下文的切換
并發(fā)高、業(yè)務(wù)執(zhí)行時(shí)間長(zhǎng),解決這種類(lèi)型任務(wù)的關(guān)鍵不在于線(xiàn)程池而在于整體架構(gòu)的設(shè)計(jì),看看這些業(yè)務(wù)里面某些數(shù)據(jù)是否能做緩存是第一步,增加服務(wù)器是第二步,至于線(xiàn)程池的設(shè)置,設(shè)置參考(2)。最后,業(yè)務(wù)執(zhí)行時(shí)間長(zhǎng)的問(wèn)題,也可能需要分析一下,看看能不能使用中間件對(duì)任務(wù)進(jìn)行拆分和解耦。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/68095.html
摘要:并行流與目前,我們對(duì)集合進(jìn)行計(jì)算有兩種方式并行流而更加的靈活,我們可以配置線(xiàn)程池的大小確保整體的計(jì)算不會(huì)因?yàn)榈却l(fā)生阻塞。 【回顧Future接口 Future接口時(shí)java5引入的,設(shè)計(jì)初衷是對(duì)將來(lái)某個(gè)時(shí)刻會(huì)發(fā)生的結(jié)果建模。它建模了一種異步計(jì)算,返回了一個(gè)執(zhí)行預(yù)算結(jié)果的引用。比如,你去干洗店洗衣服,店員會(huì)告訴你什么時(shí)候可以來(lái)取衣服,而不是讓你一直在干洗店等待。要使用Future只需...
摘要:進(jìn)程線(xiàn)程與協(xié)程它們都是并行機(jī)制的解決方案。選擇是任意性的,并在對(duì)實(shí)現(xiàn)做出決定時(shí)發(fā)生。線(xiàn)程池的大小一旦達(dá)到最大值就會(huì)保持不變,如果某個(gè)線(xiàn)程因?yàn)閳?zhí)行異常而結(jié)束,那么線(xiàn)程池會(huì)補(bǔ)充一個(gè)新線(xiàn)程。此線(xiàn)程池支持定時(shí)以及周期性執(zhí)行任務(wù)的需求。 并發(fā)與并行的概念 并發(fā)(Concurrency): 問(wèn)題域中的概念—— 程序需要被設(shè)計(jì)成能夠處理多個(gè)同時(shí)(或者幾乎同時(shí))發(fā)生的事件 并行(Parallel...
摘要:關(guān)于三者的一些概括總結(jié)離線(xiàn)分析框架,適合離線(xiàn)的復(fù)雜的大數(shù)據(jù)處理內(nèi)存計(jì)算框架,適合在線(xiàn)離線(xiàn)快速的大數(shù)據(jù)處理流式計(jì)算框架,適合在線(xiàn)的實(shí)時(shí)的大數(shù)據(jù)處理我是一個(gè)以架構(gòu)師為年之內(nèi)目標(biāo)的小小白。 整理自《架構(gòu)解密從分布式到微服務(wù)》第七章——聊聊分布式計(jì)算.做了相應(yīng)補(bǔ)充和修改。 [TOC] 前言 不管是網(wǎng)絡(luò)、內(nèi)存、還是存儲(chǔ)的分布式,它們最終目的都是為了實(shí)現(xiàn)計(jì)算的分布式:數(shù)據(jù)在各個(gè)計(jì)算機(jī)節(jié)點(diǎn)上流動(dòng),同...
摘要:限制編寫(xiě)并行流,存在一些與非并行流不一樣的約定。底層框架并行流在底層沿用的框架,遞歸式的分解問(wèn)題,然后每段并行執(zhí)行,最終由合并結(jié)果,返回最后的值。 本書(shū)第六章的讀書(shū)筆記,也是我這個(gè)系列的最后一篇讀書(shū)筆記。后面7、8、9章分別講的測(cè)試、調(diào)試與重構(gòu)、設(shè)計(jì)和架構(gòu)的原則以及使用Lambda表達(dá)式編寫(xiě)并發(fā)程序,因?yàn)楣P記不好整理,就不寫(xiě)了,感興趣的同學(xué)自己買(mǎi)書(shū)來(lái)看吧。 并行化流操作 關(guān)于并行與并發(fā)...
摘要:但有一個(gè)限制它們不能修改定義的方法的局部變量的內(nèi)容。如前所述,這種限制存在的原因在于局部變量保存在棧上,并且隱式表示它們僅限于其所在線(xiàn)程。 2014年,Oracle發(fā)布了Java8新版本。對(duì)于Java來(lái)說(shuō),這顯然是一個(gè)具有里程碑意義的版本。尤其是那函數(shù)式編程的功能,避開(kāi)了Java那煩瑣的語(yǔ)法所帶來(lái)的麻煩。 這可以算是一篇Java8的學(xué)習(xí)筆記。將Java8一些常見(jiàn)的一些特性作了一個(gè)概要的...
閱讀 3532·2021-09-27 13:35
閱讀 3571·2019-08-29 17:09
閱讀 2450·2019-08-26 11:30
閱讀 711·2019-08-26 10:32
閱讀 545·2019-08-26 10:23
閱讀 1206·2019-08-26 10:20
閱讀 3161·2019-08-23 15:26
閱讀 3571·2019-08-23 14:33