摘要:使用方式要把任務(wù)提交到線程池,必須創(chuàng)建的一個子類,其中是并行化任務(wù)產(chǎn)生的結(jié)果如果沒有結(jié)果使用類型。對一個子任務(wù)調(diào)用的話,可以使一個子任務(wù)重用當(dāng)前線程,避免線程池中多分配一個任務(wù)帶來的開銷。
【概念
分支和并框架的目的是以遞歸的方式將可以并行的任務(wù)拆分成更小的任務(wù),然后將每個子任務(wù)的結(jié)果合并起來生成整體的結(jié)果,它是ExecutorService的一個實現(xiàn),它把子任務(wù)分配給線程池(ForkJoinPool)中的工作線程。某些應(yīng)用可能對每個處理器內(nèi)核飯別試用一個線程,來完成計算密集任務(wù),例如圖像處理。java7引入forkjoin框架,專門用來支持這一類應(yīng)用。假設(shè)有一個處理任務(wù),它可以很自然的分解成子任務(wù)。
【使用方式要把任務(wù)提交到線程池,必須創(chuàng)建RecursiveTask
if(任務(wù)足夠小){ 順序計算該任務(wù)的值; }else{ 將任務(wù)分成兩個子任務(wù); 遞歸調(diào)用本方法; 合并每個子任務(wù)的結(jié)果; }【最佳實踐
對一個任務(wù)調(diào)用join()方法會阻塞調(diào)用方,直到該任務(wù)結(jié)束。因此,有必要在兩個子任務(wù)的計算都開始之后再調(diào)用join()。否則,你得到的版本會比原始的順序執(zhí)行更加緩慢,因為每個子任務(wù)都需要等到另一個子任務(wù)完成后才能開始計算,中途還要加上開啟線程的開銷。
不應(yīng)該在RecursiveTask
對子任務(wù)調(diào)用fork()可以把他排進(jìn)ForkJoinPool。同時對左邊和右邊的子任務(wù)調(diào)用它似乎很自然,但是這樣做的效率比直接對其中一個調(diào)用compute()低。對一個子任務(wù)調(diào)用compute()的話,可以使一個子任務(wù)重用當(dāng)前線程,避免線程池中多分配一個任務(wù)帶來的開銷。
不應(yīng)該認(rèn)為多核系統(tǒng)中,分支合并就比順序計算要快。一個任務(wù)可以分解成多個獨(dú)立的子任務(wù),才能讓性能在并行化時有所提升。所有的子任務(wù)運(yùn)行時間應(yīng)該比分出新任務(wù)花費(fèi)的時間要長。通常我們把輸入輸出放在一個方法中,計算在另一個方法中。
【工作竊取我們很難確定要滿足什么條件才可以不再拆分任務(wù)。但是分出大量的小任務(wù)是一個好的選擇,因為在理想情況下,劃分并行任務(wù)時因該讓每個任務(wù)都花費(fèi)相同的時間。讓cpu的所有內(nèi)核都一樣的繁忙,但是現(xiàn)實中我們的子任務(wù)花費(fèi)的時間大不相同,這是因為有許多我們無法確認(rèn)的情況出現(xiàn):io,rpc,分配效率等等。分支合并框架使用:工作竊取來解決內(nèi)核之間任務(wù)不匹配的問題。讓所有任務(wù)大致相同的被平均分配到forkjoinpool的每個線程上。每個線程都擁有一個雙向鏈?zhǔn)疥犃衼肀4嫠娜蝿?wù),每完成一個任務(wù)就從隊列頭部取出下一個任務(wù)執(zhí)行。當(dāng)一個線程的任務(wù)隊列已空,而其他線程還在繁忙,這個空閑線程就隨機(jī)選擇一個繁忙線程并從其隊列尾部拿走一個任務(wù)開始執(zhí)行,直到所有的任務(wù)執(zhí)行完畢。
【例子1.輸出數(shù)組中有多少個數(shù)字小于0.5
public class ExerciseFilter { //數(shù)據(jù)源 static double numbers[] = new double[100]; static { for(int i = 0 ; i < 100 ; i++){ numbers[i] = i + 1; } numbers[0] = 0.08; numbers[1] = 0.18; numbers[11] = 0.18; } public static void main(String[] args) { Counter counter = new Counter(numbers,x -> x < 0.5); //使用單例 ForkJoinPool pool = ForkJoinPool.commonPool(); long st = System.currentTimeMillis(); //啟動并行任務(wù) pool.invoke(counter); System.out.println((System.currentTimeMillis() - st) + " : " + counter.join()); } } class Counter extends RecursiveTask{ //分界線,當(dāng)一個數(shù)組的長度 < 1000 就不再繼續(xù)拆分 public static final int THRESHOLD = 1000; //數(shù)組 private double[] values; //判斷條件 private DoublePredicate filter; public Counter(double [] values,DoublePredicate filter){ this.values = values; this.filter = filter; } @Override protected Integer compute() { //任務(wù)足夠小就不再拆分 if(values.length < THRESHOLD ){ //返回該數(shù)組中有多少數(shù)字滿足判斷邏輯 int count = 0; for(int i = 0; i < values.length ; i++){ if(filter.test(values[i])){ count++; } } return count; }else { //將打數(shù)組拆分成兩個 int mid = values.length / 2; Counter first = new Counter(Arrays.copyOfRange(values,0,mid),filter); //第一個子任務(wù)提交到線程池 first.fork(); Counter second = new Counter(Arrays.copyOfRange(values,mid,values.length),filter); //當(dāng)前線程執(zhí)行第二個子任務(wù),節(jié)約一個線程的開銷 int secondResult = second.compute(); //等待第一個子任務(wù)執(zhí)行完畢 int firstResult = first.join(); return firstResult + secondResult; } } }
2.列表中求和
public class ExerciseSum { //數(shù)據(jù)源 static int sum[] = new int[100]; static { for(int i = 0 ; i < 100 ; i++){ sum[i] = i + 1; } } public static void main(String[] args) { CounterSum counter = new CounterSum(sum); ForkJoinPool pool = ForkJoinPool.commonPool(); long st = System.currentTimeMillis(); pool.invoke(counter); System.out.println((System.currentTimeMillis() - st) + " : " + counter.join()); } } class CounterSum extends RecursiveTask{ //最小拆分單位:每個小數(shù)組length = 10 public static final int THRESHOLD = 10; private int[] values; public CounterSum(int [] values){ this.values = values; } @Override protected Integer compute() { if(values.length < THRESHOLD ){ int count = 0; for(int i = 0; i < values.length ; i++){ count += values[i]; } return count; }else { int mid = values.length / 2; CounterSum first = new CounterSum(Arrays.copyOfRange(values,0,mid)); first.fork(); CounterSum second = new CounterSum(Arrays.copyOfRange(values,mid,values.length)); int secondResult = second.compute(); int firstResult = first.join(); return firstResult + secondResult; } } }
3.排序
public class ExerciseSort { //數(shù)據(jù)源 static int num[] = new int[100]; static { Random r = new Random(); for(int i = 0 ; i < 100 ; i++){ num[i] = r.nextInt(100); } } public static void main(String[] args) { CounterSort counter = new CounterSort(num); //使用單例 ForkJoinPool pool = ForkJoinPool.commonPool(); long st = System.currentTimeMillis(); //啟動并行任務(wù) pool.invoke(counter); System.out.println((System.currentTimeMillis() - st)); Arrays.stream(counter.join()).forEach(System.out::println); } } class CounterSort extends RecursiveTask{ //最小拆分單位:每個小數(shù)組length = 10 public static final int THRESHOLD = 10; //待排序數(shù)組 private int[] values; public CounterSort(int [] values){ this.values = values; } @Override protected int[] compute() { if(values.length < THRESHOLD ){ int[] result = new int[values.length]; //1.單數(shù)組排序 result = Arrays.stream(values).sorted().toArray(); return result; }else { int mid = values.length / 2; CounterSort first = new CounterSort(Arrays.copyOfRange(values,0,mid)); first.fork(); CounterSort second = new CounterSort(Arrays.copyOfRange(values,mid,values.length)); int[] secondResult = second.compute(); int[] firstResult = first.join(); //兩個數(shù)組混合排序 int[] combineRsult = combineIntArray(firstResult,secondResult); return combineRsult; } } private int[] combineIntArray(int[] a1,int[] a2){ int result[] = new int[a1.length + a2.length]; int a1Len = a1.length; int a2Len = a2.length; int destLen = result.length; for(int index = 0 , a1Index = 0 , a2Index = 0 ; index < destLen ; index++) { int value1 = a1Index >= a1Len?Integer.MAX_VALUE:a1[a1Index]; int value2 = a2Index >= a2Len?Integer.MAX_VALUE:a2[a2Index]; if(value1 < value2) { a1Index++; result[index] = value1; } else { a2Index++; result[index] = value2; } } return result; } }
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/68096.html
摘要:相比與其他操作系統(tǒng)包括其他類系統(tǒng)有很多的優(yōu)點(diǎn),其中有一項就是,其上下文切換和模式切換的時間消耗非常少。因為多線程競爭鎖時會引起上下文切換。減少線程的使用。很多編程語言中都有協(xié)程。所以如何避免死鎖的產(chǎn)生,在我們使用并發(fā)編程時至關(guān)重要。 系列文章傳送門: Java多線程學(xué)習(xí)(一)Java多線程入門 Java多線程學(xué)習(xí)(二)synchronized關(guān)鍵字(1) java多線程學(xué)習(xí)(二)syn...
摘要:因為多線程競爭鎖時會引起上下文切換。減少線程的使用。舉個例子如果說服務(wù)器的帶寬只有,某個資源的下載速度是,系統(tǒng)啟動個線程下載該資源并不會導(dǎo)致下載速度編程,所以在并發(fā)編程時,需要考慮這些資源的限制。 最近私下做一項目,一bug幾日未解決,總惶恐。一日頓悟,bug不可怕,怕的是項目不存在bug,與其懼怕,何不與其剛正面。 系列文章傳送門: Java多線程學(xué)習(xí)(一)Java多線程入門 Jav...
摘要:開始學(xué)習(xí)也有一段時間了,一些基礎(chǔ)的書也掃了一遍了。最近慢慢開始看和,后者的話和有類似之處,都是一些編程經(jīng)驗的編程的世界里好多的東西都是相同的。這里其實是對的最佳實踐,之后該對象已經(jīng)變成一個過期的引用了,此時就應(yīng)該清空這個引用。 開始學(xué)習(xí)java也有一段時間了,一些基礎(chǔ)的書也掃了一遍了(think in java/core java volume 1)。最近慢慢開始看和,后者的話和有類似...
摘要:學(xué)習(xí)編程的本最佳書籍這些書涵蓋了各個領(lǐng)域,包括核心基礎(chǔ)知識,集合框架,多線程和并發(fā),內(nèi)部和性能調(diào)優(yōu),設(shè)計模式等。擅長解釋錯誤及錯誤的原因以及如何解決簡而言之,這是學(xué)習(xí)中并發(fā)和多線程的最佳書籍之一。 showImg(https://segmentfault.com/img/remote/1460000018913016); 來源 | 愿碼(ChainDesk.CN)內(nèi)容編輯 愿碼Slo...
閱讀 3491·2021-10-13 09:39
閱讀 1473·2021-10-08 10:05
閱讀 2277·2021-09-26 09:56
閱讀 2294·2021-09-03 10:28
閱讀 2693·2019-08-29 18:37
閱讀 2050·2019-08-29 17:07
閱讀 613·2019-08-29 16:23
閱讀 2201·2019-08-29 11:24