成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

java并發(fā)編程學(xué)習(xí)14--CompletableFuture(一)

VioletJack / 2775人閱讀

摘要:并行流與目前,我們對集合進行計算有兩種方式并行流而更加的靈活,我們可以配置線程池的大小確保整體的計算不會因為等待而發(fā)生阻塞。

【回顧Future接口

Future接口時java5引入的,設(shè)計初衷是對將來某個時刻會發(fā)生的結(jié)果建模。它建模了一種異步計算,返回了一個執(zhí)行預(yù)算結(jié)果的引用。比如,你去干洗店洗衣服,店員會告訴你什么時候可以來取衣服,而不是讓你一直在干洗店等待。要使用Future只需要將耗時操作封裝在一個Callable對象中,再將其提交給ExecutorService就可以了。

 ExecutorService executor = Executors.newFixedThreadPool(10);
        Future future = executor.submit(new Callable() {
            @Override
            public Double call() throws Exception {
                return doSomeLongComputation();
            }
        });
        
        doSomethingElse();

        try {
            //最多等待1秒
            Double result = future.get(1,TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            //當(dāng)前線程等待過程中被打斷
            e.printStackTrace();
        } catch (ExecutionException e) {
            //計算時出現(xiàn)異常
            e.printStackTrace();
        } catch (TimeoutException e) {
            //完成計算前就超時
            e.printStackTrace();
        }

但是Future依然有一些局限性:

無法將兩個異步計算的結(jié)果合并為一個。

等待Future集合中所有任務(wù)完成。

等待Future集合中最快任務(wù)完成(選擇最優(yōu)的執(zhí)行方案)。

通過編程的方式完成一個Future任務(wù)的執(zhí)行(手工設(shè)定異步結(jié)果處理)。

應(yīng)對Future的完成事件,當(dāng)Future的完成事件發(fā)生時會收到通知,并可以使用Future的結(jié)果進行下一步操作,不只是簡單的阻塞等待。

而CompletableFuture類實現(xiàn)了Future接口,可以將上述的問題全部解決。CompletableFuture與Stream的設(shè)計都遵循了類似的設(shè)計模式:使用Lambda表達式以及流水線的思想,從這個角度可以說CompletableFuture與Future的關(guān)系類似于Stream與Collection的關(guān)系。

【構(gòu)建一個異步應(yīng)用

最佳價格查詢器:查詢多個線上商店對同一商品的價格。

首先構(gòu)建商店對象:

package BestPriceFinder;

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

public class Shop {

    private String name;
    public Shop(String name){
        this.name = name;
    }
    public String getName(){
        return name;
    }

    /**
     * 異步api:使用創(chuàng)建CompletableFuture類提供的工廠方法與getPriceAsync()效果完全一致
     * 可以更輕易的完成這個流程,并且不用擔(dān)心實現(xiàn)細節(jié)
     * @param product
     * @return
     */
    public Future getPriceAsyncByFactory(String product){
        return CompletableFuture.supplyAsync(() -> calculatePrice(product));
    }
    /**
     * 異步api:
     * @param product
     * @return
     */
    public Future getPriceAsync(String product){
        //創(chuàng)建CompletableFuture對象,它將包含計算結(jié)果
        CompletableFuture futurePrice = new CompletableFuture<>();
        //在新線程中異步計算結(jié)果
        new Thread(() -> {
            try {
                double price = calculatePrice(product);
                //需要長時間計算的任務(wù)結(jié)束時,設(shè)置future的返回值
                futurePrice.complete(price);
            }catch (Exception e){
                //如這里沒有使用completeExceptionally,線程不會結(jié)束,調(diào)用方會永遠的執(zhí)行下去
                futurePrice.completeExceptionally(e);
            }
        }).start();
        //無需等待計算結(jié)果,直接返回future對象
        return futurePrice;
    }

    /**
     * 同步api:
     * 每個商店都需要提供的查詢api:根據(jù)名稱返回價格;
     * 模擬查詢數(shù)據(jù)庫等一些耗時操作:使用delay()模擬這些耗時操作。
     * @param product
     * @return
     */
    public double getPrice(String product){
        return calculatePrice(product);
    }

    private double calculatePrice(String product){
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    private Random random = new Random();
    /**
     * 模擬耗時操作:延遲一秒
     */
    private static void delay(){
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

下面我們針對Shop.java提供的同步方法與異步方法來進行測試:

package BestPriceFinder;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;

/**
 * 最佳價格查詢器
 */
public class BestFinder {

    List shops = Arrays.asList(
            new Shop("A"),
            new Shop("B"),
            new Shop("C"),
            new Shop("D"),
            new Shop("E"),
            new Shop("F"),
            new Shop("G"),
            new Shop("H"),
            new Shop("I"),
            new Shop("J")
    );

    /**
     * 順序查詢
     */
    public List findPrices(String product){
        return shops.stream()
                     .map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product)))
                     .collect(Collectors.toList());
    }

    /**
     * 并行流查詢
     */
    public List findPricesParallel(String product){
        return shops.parallelStream()
                .map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product)))
                .collect(Collectors.toList());
    }

    /**
     * 異步查詢
     * 相比并行流的話CompletableFuture更有優(yōu)勢:可以對執(zhí)行器配置,設(shè)置線程池大小
     */
    @SuppressWarnings("all")
    private final Executor myExecutor = Executors.newFixedThreadPool(Math.min(shops.size(), 800), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            //使用守護線程保證不會阻止程序的關(guān)停
            t.setDaemon(true);
            return t;
        }
    });
    @SuppressWarnings("all")
    public List findPricesAsync(String product){
        List> priceFuctures = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product)),myExecutor))
                .collect(Collectors.toList());
        /** 這里需要使用新的stream來等待所有的子線程執(zhí)行完,
         *   因為:如果在一個stream中使用兩個map:
         *   List> priceFuctures = shops.parallelStream()
         *           .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product))))
         *           .map(c -> c.join()).collect(Collectors.toList())
         *           .collect(Collectors.toList());
         *   考慮到流操作之間的延遲特性。如果你在單一的流水線中處理流,發(fā)向不同商家的請求只能以同步順序的方式執(zhí)行才會成功。因此每個創(chuàng)建CompletableFuture
         *   對象只能在前一個操作結(jié)束之后執(zhí)行查詢商家動作。
         */
        return priceFuctures.stream().map(c -> c.join()).collect(Collectors.toList());
    }
}
 @Test
    public void findPrices(){
        BestFinder bestFinder = new BestFinder();
        long st = System.currentTimeMillis();
        System.out.println(bestFinder.findPrices("iPhonX"));
        System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs");
    }
    @Test
    public void findPricesParallel(){
        BestFinder bestFinder = new BestFinder();
        long st = System.currentTimeMillis();
        System.out.println(bestFinder.findPrices("iPhonX"));
        System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs");
    }
    @Test
    public void findPricesAsync(){
        BestFinder bestFinder = new BestFinder();
        long st = System.currentTimeMillis();
        System.out.println(bestFinder.findPricesAsync("iPhonX"));
        System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs");
    }

同步api測試結(jié)果:毫無疑問是10秒之上

并行流獲取同步api測試結(jié)果:也是10秒之上,但是并行流不是很高效嗎?怎么會如此凄慘呢?因為這與并行流可以調(diào)用的系統(tǒng)核數(shù)相關(guān),我的計算機是8核,最多8個線程同時運行。而商店有10個,也就是說,我們的兩個線程會一直等待前面的某一個線程釋放出空閑才能繼續(xù)運行。

異步獲取api測試結(jié)果:一秒左右


為何差距如此大呢?
明智的選擇是創(chuàng)建了一個配有線程池的執(zhí)行器,線程池中線程的數(shù)目取決于你的應(yīng)用需要處理的負擔(dān),但是你該如何選擇合適的線程數(shù)目呢?

【選擇正確的線程池大小

《Java并發(fā)編程實戰(zhàn)》中給出如下公式:

Number = NCpu * Ucpu * ( 1 + W/C)
Number : 線程數(shù)量
NCpu : 處理器核數(shù)
UCpu : 期望cpu利用率
W/C : 等待時間與計算時間比

我們這里:99%d的時間是等待商店響應(yīng) W/C = 99 ,cpu利用率期望 100% ,NCpu = 9,推斷出 number = 800。但是為了避免過多的線程搞死計算機,我們選擇商店數(shù)與計算值中較小的一個。

【并行流與CompletableFuture

目前,我們對集合進行計算有兩種方式:1.并行流 2.CompletableFuture;而CompletableFuture更加的靈活,我們可以配置線程池的大小確保整體的計算不會因為等待IO而發(fā)生阻塞。
書上給出的建議如下:

如果是計算密集型的操作并且沒有IO推薦stream接口,因為實現(xiàn)簡單效率也高,如果所有的線程都是計算密集型的也就沒有必要創(chuàng)建比核數(shù)更多的線程。

反之,如果任務(wù)涉及到IO,網(wǎng)絡(luò)等操作:CompletableFuture靈活性更好,因為大部分線程處于等待狀態(tài),需要讓他們更加忙碌,并且再邏輯中加入異常處理可以更有效的監(jiān)控是什么原因觸發(fā)了等待。

現(xiàn)在我們知道了如何用CompletableFuture提供異步的api,后面的文章會學(xué)習(xí)如何利用CompletableFuture高效的操作同步api。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/68219.html

相關(guān)文章

  • Java多線程學(xué)習(xí)(七)并發(fā)編程些問題

    摘要:相比與其他操作系統(tǒng)包括其他類系統(tǒng)有很多的優(yōu)點,其中有一項就是,其上下文切換和模式切換的時間消耗非常少。因為多線程競爭鎖時會引起上下文切換。減少線程的使用。很多編程語言中都有協(xié)程。所以如何避免死鎖的產(chǎn)生,在我們使用并發(fā)編程時至關(guān)重要。 系列文章傳送門: Java多線程學(xué)習(xí)(一)Java多線程入門 Java多線程學(xué)習(xí)(二)synchronized關(guān)鍵字(1) java多線程學(xué)習(xí)(二)syn...

    dingding199389 評論0 收藏0
  • Java多線程學(xué)習(xí)(七)并發(fā)編程些問題

    摘要:因為多線程競爭鎖時會引起上下文切換。減少線程的使用。舉個例子如果說服務(wù)器的帶寬只有,某個資源的下載速度是,系統(tǒng)啟動個線程下載該資源并不會導(dǎo)致下載速度編程,所以在并發(fā)編程時,需要考慮這些資源的限制。 最近私下做一項目,一bug幾日未解決,總惶恐。一日頓悟,bug不可怕,怕的是項目不存在bug,與其懼怕,何不與其剛正面。 系列文章傳送門: Java多線程學(xué)習(xí)(一)Java多線程入門 Jav...

    yimo 評論0 收藏0
  • 學(xué)習(xí)Java必讀的10本書籍

    摘要:學(xué)習(xí)編程的本最佳書籍這些書涵蓋了各個領(lǐng)域,包括核心基礎(chǔ)知識,集合框架,多線程和并發(fā),內(nèi)部和性能調(diào)優(yōu),設(shè)計模式等。擅長解釋錯誤及錯誤的原因以及如何解決簡而言之,這是學(xué)習(xí)中并發(fā)和多線程的最佳書籍之一。 showImg(https://segmentfault.com/img/remote/1460000018913016); 來源 | 愿碼(ChainDesk.CN)內(nèi)容編輯 愿碼Slo...

    masturbator 評論0 收藏0
  • 并發(fā)

    摘要:表示的是兩個,當(dāng)其中任意一個計算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢娝氖褂?,在開始分析它的高并發(fā)實現(xiàn)機制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個比較典型的互聯(lián)網(wǎng)高并發(fā)場景。 干貨:深度剖析分布式搜索引擎設(shè)計 分布式,高可用,和機器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個名詞,今天我們首先來說說分布式。 探究...

    supernavy 評論0 收藏0
  • 并發(fā)

    摘要:表示的是兩個,當(dāng)其中任意一個計算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢娝氖褂?,在開始分析它的高并發(fā)實現(xiàn)機制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個比較典型的互聯(lián)網(wǎng)高并發(fā)場景。 干貨:深度剖析分布式搜索引擎設(shè)計 分布式,高可用,和機器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個名詞,今天我們首先來說說分布式。 探究...

    ddongjian0000 評論0 收藏0

發(fā)表評論

0條評論

VioletJack

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<