成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

java并發(fā)編程學(xué)習(xí)5--forkJoin

Neilyo / 1773人閱讀

摘要:使用方式要把任務(wù)提交到線程池,必須創(chuàng)建的一個子類,其中是并行化任務(wù)產(chǎn)生的結(jié)果如果沒有結(jié)果使用類型。對一個子任務(wù)調(diào)用的話,可以使一個子任務(wù)重用當(dāng)前線程,避免線程池中多分配一個任務(wù)帶來的開銷。

【概念

分支和并框架的目的是以遞歸的方式將可以并行的任務(wù)拆分成更小的任務(wù),然后將每個子任務(wù)的結(jié)果合并起來生成整體的結(jié)果,它是ExecutorService的一個實現(xiàn),它把子任務(wù)分配給線程池(ForkJoinPool)中的工作線程。某些應(yīng)用可能對每個處理器內(nèi)核飯別試用一個線程,來完成計算密集任務(wù),例如圖像處理。java7引入forkjoin框架,專門用來支持這一類應(yīng)用。假設(shè)有一個處理任務(wù),它可以很自然的分解成子任務(wù)。

【使用方式

要把任務(wù)提交到線程池,必須創(chuàng)建RecursiveTask的一個子類,其中R是并行化任務(wù)產(chǎn)生的結(jié)果(如果沒有結(jié)果使用RecursiveAction類型)。然后在子類中實現(xiàn)product abstract R compute()方法即可。這個方法同時實現(xiàn)了“拆分子任務(wù)”,“任務(wù)不可拆時”的處理邏輯。如下所示:

if(任務(wù)足夠小){
    順序計算該任務(wù)的值;
}else{
    將任務(wù)分成兩個子任務(wù);
    遞歸調(diào)用本方法;
    合并每個子任務(wù)的結(jié)果;
}
【最佳實踐

對一個任務(wù)調(diào)用join()方法會阻塞調(diào)用方,直到該任務(wù)結(jié)束。因此,有必要在兩個子任務(wù)的計算都開始之后再調(diào)用join()。否則,你得到的版本會比原始的順序執(zhí)行更加緩慢,因為每個子任務(wù)都需要等到另一個子任務(wù)完成后才能開始計算,中途還要加上開啟線程的開銷。

不應(yīng)該在RecursiveTask的內(nèi)部使用ForkJoinPool.invoke(),相反你應(yīng)該始終調(diào)用compute()或者fork(),只有順序代碼才使用ForkJoinPool.invoke()來啟動并行運(yùn)算。

對子任務(wù)調(diào)用fork()可以把他排進(jìn)ForkJoinPool。同時對左邊和右邊的子任務(wù)調(diào)用它似乎很自然,但是這樣做的效率比直接對其中一個調(diào)用compute()低。對一個子任務(wù)調(diào)用compute()的話,可以使一個子任務(wù)重用當(dāng)前線程,避免線程池中多分配一個任務(wù)帶來的開銷。

不應(yīng)該認(rèn)為多核系統(tǒng)中,分支合并就比順序計算要快。一個任務(wù)可以分解成多個獨(dú)立的子任務(wù),才能讓性能在并行化時有所提升。所有的子任務(wù)運(yùn)行時間應(yīng)該比分出新任務(wù)花費(fèi)的時間要長。通常我們把輸入輸出放在一個方法中,計算在另一個方法中。

【工作竊取

我們很難確定要滿足什么條件才可以不再拆分任務(wù)。但是分出大量的小任務(wù)是一個好的選擇,因為在理想情況下,劃分并行任務(wù)時因該讓每個任務(wù)都花費(fèi)相同的時間。讓cpu的所有內(nèi)核都一樣的繁忙,但是現(xiàn)實中我們的子任務(wù)花費(fèi)的時間大不相同,這是因為有許多我們無法確認(rèn)的情況出現(xiàn):io,rpc,分配效率等等。分支合并框架使用:工作竊取來解決內(nèi)核之間任務(wù)不匹配的問題。讓所有任務(wù)大致相同的被平均分配到forkjoinpool的每個線程上。每個線程都擁有一個雙向鏈?zhǔn)疥犃衼肀4嫠娜蝿?wù),每完成一個任務(wù)就從隊列頭部取出下一個任務(wù)執(zhí)行。當(dāng)一個線程的任務(wù)隊列已空,而其他線程還在繁忙,這個空閑線程就隨機(jī)選擇一個繁忙線程并從其隊列尾部拿走一個任務(wù)開始執(zhí)行,直到所有的任務(wù)執(zhí)行完畢。

【例子

1.輸出數(shù)組中有多少個數(shù)字小于0.5

public class ExerciseFilter {

    //數(shù)據(jù)源
    static double numbers[] = new double[100];
    static {
        for(int i = 0 ; i < 100 ; i++){
            numbers[i] = i + 1;
        }
        numbers[0] = 0.08;
        numbers[1] = 0.18;
        numbers[11] = 0.18;
    }

    public static void main(String[] args) {
        Counter counter = new Counter(numbers,x -> x < 0.5);
        //使用單例
        ForkJoinPool pool = ForkJoinPool.commonPool();
        long st = System.currentTimeMillis();
        //啟動并行任務(wù)
        pool.invoke(counter);
        System.out.println((System.currentTimeMillis() - st) + " : " + counter.join());
    }
}

class Counter extends RecursiveTask{
    //分界線,當(dāng)一個數(shù)組的長度 < 1000 就不再繼續(xù)拆分
    public static final int THRESHOLD = 1000;
    //數(shù)組
    private double[] values;
    //判斷條件
    private DoublePredicate filter;

    public Counter(double [] values,DoublePredicate filter){
        this.values = values;
        this.filter = filter;
    }

    @Override
    protected Integer compute() {
        //任務(wù)足夠小就不再拆分
        if(values.length < THRESHOLD ){
            //返回該數(shù)組中有多少數(shù)字滿足判斷邏輯
            int count = 0;
            for(int i = 0; i < values.length ; i++){
                if(filter.test(values[i])){
                    count++;
                }
            }
            return count;
        }else {
            //將打數(shù)組拆分成兩個
            int mid = values.length / 2;
            Counter first = new Counter(Arrays.copyOfRange(values,0,mid),filter);
            //第一個子任務(wù)提交到線程池
            first.fork();
            Counter second = new Counter(Arrays.copyOfRange(values,mid,values.length),filter);
            //當(dāng)前線程執(zhí)行第二個子任務(wù),節(jié)約一個線程的開銷
            int secondResult = second.compute();
            //等待第一個子任務(wù)執(zhí)行完畢
            int firstResult = first.join();
            return firstResult + secondResult;
        }
    }
}

2.列表中求和

public class ExerciseSum {
    //數(shù)據(jù)源
    static int sum[] = new int[100];
    static {
        for(int i = 0 ; i < 100 ; i++){
            sum[i] = i + 1;
        }
    }

    public static void main(String[] args) {
        CounterSum counter = new CounterSum(sum);
        ForkJoinPool pool = ForkJoinPool.commonPool();
        long st = System.currentTimeMillis();
        pool.invoke(counter);
        System.out.println((System.currentTimeMillis() - st) + " : " + counter.join());
    }
}

class CounterSum extends RecursiveTask {

    //最小拆分單位:每個小數(shù)組length = 10
    public static final int THRESHOLD = 10;
    private int[] values;

    public CounterSum(int [] values){
        this.values = values;
    }

    @Override
    protected Integer compute() {
        if(values.length < THRESHOLD ){
            int count = 0;
            for(int i = 0; i < values.length ; i++){
                count += values[i];
            }
            return count;
        }else {
            int mid = values.length / 2;
            CounterSum first = new CounterSum(Arrays.copyOfRange(values,0,mid));
            first.fork();
            CounterSum second = new CounterSum(Arrays.copyOfRange(values,mid,values.length));
            int secondResult = second.compute();
            int firstResult = first.join();
            return firstResult + secondResult;
        }
    }
}

3.排序

public class ExerciseSort {

    //數(shù)據(jù)源
    static int num[] = new int[100];
    static {
        Random r = new Random();
        for(int i = 0 ; i < 100 ; i++){
            num[i] = r.nextInt(100);
        }
    }

    public static void main(String[] args) {
        CounterSort counter = new CounterSort(num);
        //使用單例
        ForkJoinPool pool = ForkJoinPool.commonPool();
        long st = System.currentTimeMillis();
        //啟動并行任務(wù)
        pool.invoke(counter);
        System.out.println((System.currentTimeMillis() - st));
        Arrays.stream(counter.join()).forEach(System.out::println);
    }
}

class CounterSort extends RecursiveTask {

    //最小拆分單位:每個小數(shù)組length = 10
    public static final int THRESHOLD = 10;
    //待排序數(shù)組
    private int[] values;

    public CounterSort(int [] values){
        this.values = values;
    }

    @Override
    protected int[] compute() {
        if(values.length < THRESHOLD ){
            int[] result = new int[values.length];
            //1.單數(shù)組排序
            result = Arrays.stream(values).sorted().toArray();
            return result;
        }else {
            int mid = values.length / 2;
            CounterSort first = new CounterSort(Arrays.copyOfRange(values,0,mid));
            first.fork();
            CounterSort second = new CounterSort(Arrays.copyOfRange(values,mid,values.length));
            int[] secondResult = second.compute();
            int[] firstResult = first.join();
            //兩個數(shù)組混合排序
            int[] combineRsult = combineIntArray(firstResult,secondResult);
            return combineRsult;
        }
    }
    private int[] combineIntArray(int[] a1,int[] a2){
        int result[] = new int[a1.length + a2.length];
        int a1Len = a1.length;
        int a2Len = a2.length;
        int destLen = result.length;

        for(int index = 0 , a1Index = 0 , a2Index = 0 ; index < destLen ; index++) {
            int value1 = a1Index >= a1Len?Integer.MAX_VALUE:a1[a1Index];
            int value2 = a2Index >= a2Len?Integer.MAX_VALUE:a2[a2Index];
            if(value1 < value2) {
                a1Index++;
                result[index] = value1;
            }
            else {
                a2Index++;
                result[index] = value2;
            }
        }
        return result;
    }
}

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/68096.html

相關(guān)文章

  • Java多線程學(xué)習(xí)(七)并發(fā)編程中一些問題

    摘要:相比與其他操作系統(tǒng)包括其他類系統(tǒng)有很多的優(yōu)點(diǎn),其中有一項就是,其上下文切換和模式切換的時間消耗非常少。因為多線程競爭鎖時會引起上下文切換。減少線程的使用。很多編程語言中都有協(xié)程。所以如何避免死鎖的產(chǎn)生,在我們使用并發(fā)編程時至關(guān)重要。 系列文章傳送門: Java多線程學(xué)習(xí)(一)Java多線程入門 Java多線程學(xué)習(xí)(二)synchronized關(guān)鍵字(1) java多線程學(xué)習(xí)(二)syn...

    dingding199389 評論0 收藏0
  • Java多線程學(xué)習(xí)(七)并發(fā)編程中一些問題

    摘要:因為多線程競爭鎖時會引起上下文切換。減少線程的使用。舉個例子如果說服務(wù)器的帶寬只有,某個資源的下載速度是,系統(tǒng)啟動個線程下載該資源并不會導(dǎo)致下載速度編程,所以在并發(fā)編程時,需要考慮這些資源的限制。 最近私下做一項目,一bug幾日未解決,總惶恐。一日頓悟,bug不可怕,怕的是項目不存在bug,與其懼怕,何不與其剛正面。 系列文章傳送門: Java多線程學(xué)習(xí)(一)Java多線程入門 Jav...

    yimo 評論0 收藏0
  • java并發(fā)編程學(xué)習(xí)---之一

    摘要:開始學(xué)習(xí)也有一段時間了,一些基礎(chǔ)的書也掃了一遍了。最近慢慢開始看和,后者的話和有類似之處,都是一些編程經(jīng)驗的編程的世界里好多的東西都是相同的。這里其實是對的最佳實踐,之后該對象已經(jīng)變成一個過期的引用了,此時就應(yīng)該清空這個引用。 開始學(xué)習(xí)java也有一段時間了,一些基礎(chǔ)的書也掃了一遍了(think in java/core java volume 1)。最近慢慢開始看和,后者的話和有類似...

    chavesgu 評論0 收藏0
  • 學(xué)習(xí)Java必讀的10本書籍

    摘要:學(xué)習(xí)編程的本最佳書籍這些書涵蓋了各個領(lǐng)域,包括核心基礎(chǔ)知識,集合框架,多線程和并發(fā),內(nèi)部和性能調(diào)優(yōu),設(shè)計模式等。擅長解釋錯誤及錯誤的原因以及如何解決簡而言之,這是學(xué)習(xí)中并發(fā)和多線程的最佳書籍之一。 showImg(https://segmentfault.com/img/remote/1460000018913016); 來源 | 愿碼(ChainDesk.CN)內(nèi)容編輯 愿碼Slo...

    masturbator 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<