成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

《Java8實(shí)戰(zhàn)》-第十一章筆記(CompletableFuture:組合式異步編程)

hlcfan / 3701人閱讀

摘要:組合式異步編程最近這些年,兩種趨勢(shì)不斷地推動(dòng)我們反思我們?cè)O(shè)計(jì)軟件的方式。第章中介紹的分支合并框架以及并行流是實(shí)現(xiàn)并行處理的寶貴工具它們將一個(gè)操作切分為多個(gè)子操作,在多個(gè)不同的核甚至是機(jī)器上并行地執(zhí)行這些子操作。

CompletableFuture:組合式異步編程

最近這些年,兩種趨勢(shì)不斷地推動(dòng)我們反思我們?cè)O(shè)計(jì)軟件的方式。第一種趨勢(shì)和應(yīng)用運(yùn)行的硬件平臺(tái)相關(guān),第二種趨勢(shì)與應(yīng)用程序的架構(gòu)相關(guān),尤其是它們之間如何交互。我們?cè)诘?章中已經(jīng)討論過硬件平臺(tái)的影響。我們注意到隨著多核處理器的出現(xiàn),提升應(yīng)用程序處理速度最有效的方式是編寫能充分發(fā)揮多核能力的軟件。你已經(jīng)看到通過切分大型的任務(wù),讓每個(gè)子任務(wù)并行運(yùn)行,這一目標(biāo)是能夠?qū)崿F(xiàn)的;你也已經(jīng)了解相對(duì)直接使用線程的方式,使用分支/合并框架(在Java 7中引入)和并行流(在Java 8中新引入)能以更簡(jiǎn)單、更有效的方式實(shí)現(xiàn)這一目標(biāo)。

第二種趨勢(shì)反映在公共API日益增長(zhǎng)的互聯(lián)網(wǎng)服務(wù)應(yīng)用。著名的互聯(lián)網(wǎng)大鱷們紛紛提供了自己的公共API服務(wù),比如谷歌提供了地理信息服務(wù),F(xiàn)acebook提供了社交信息服務(wù),Twitter提供了新聞服務(wù)。現(xiàn)在,很少有網(wǎng)站或者網(wǎng)絡(luò)應(yīng)用會(huì)以完全隔離的方式工作。更多的時(shí)候,我們看到的下一代網(wǎng)絡(luò)應(yīng)用都采用“混聚”(mash-up)的方式:它會(huì)使用來自多個(gè)來源的內(nèi)容,將這些內(nèi)容聚合在一起,方便用戶的生活。

比如,你可能希望為你的法國(guó)客戶提供指定主題的熱點(diǎn)報(bào)道。為實(shí)現(xiàn)這一功能,你需要向谷歌或者Twitter的API請(qǐng)求所有語言中針對(duì)該主題最熱門的評(píng)論,可能還需要依據(jù)你的內(nèi)部算法對(duì)它們的相關(guān)性進(jìn)行排序。之后,你可能還需要使用谷歌的翻譯服務(wù)把它們翻譯成法語,甚至利用谷歌地圖服務(wù)定位出評(píng)論作者的位置信息,最終將所有這些信息聚集起來,呈現(xiàn)在你的網(wǎng)站上。

當(dāng)然,如果某些外部網(wǎng)絡(luò)服務(wù)發(fā)生響應(yīng)慢的情況,你希望依舊能為用戶提供部分信息,比如提供帶問號(hào)標(biāo)記的通用地圖,以文本的方式顯示信息,而不是呆呆地顯示一片空白屏幕,直到地圖服務(wù)器返回結(jié)果或者超時(shí)退出。

要實(shí)現(xiàn)類似的服務(wù),你需要與互聯(lián)網(wǎng)上的多個(gè)Web服務(wù)通信??墒?,你并不希望因?yàn)榈却承┓?wù)的響應(yīng),阻塞應(yīng)用程序的運(yùn)行,浪費(fèi)數(shù)十億寶貴的CPU時(shí)鐘周期。比如,不要因?yàn)榈却鼺acebook的數(shù)據(jù),暫停對(duì)來自Twitter的數(shù)據(jù)處理。

這些場(chǎng)景體現(xiàn)了多任務(wù)程序設(shè)計(jì)的另一面。第7章中介紹的分支/合并框架以及并行流是實(shí)現(xiàn)并行處理的寶貴工具;它們將一個(gè)操作切分為多個(gè)子操作,在多個(gè)不同的核、CPU甚至是機(jī)器上并行地執(zhí)行這些子操作。

與此相反,如果你的意圖是實(shí)現(xiàn)并發(fā),而非并行,或者你的主要目標(biāo)是在同一個(gè)CPU上執(zhí)行幾個(gè)松耦合的任務(wù),充分利用CPU的核,讓其足夠忙碌,從而最大化程序的吞吐量,那么你其實(shí)真正想做的是避免因?yàn)榈却h(yuǎn)程服務(wù)的返回,或者對(duì)數(shù)據(jù)庫的查詢,而阻塞線程的執(zhí)行,浪費(fèi)寶貴的計(jì)算資源,因?yàn)檫@種等待的時(shí)間很可能相當(dāng)長(zhǎng)。通過本章中你會(huì)了解,F(xiàn)uture接口,尤其是它的新版實(shí)現(xiàn)CompletableFuture,是處理這種情況的利器。

Future 接口

Future接口在Java 5中被引入,設(shè)計(jì)初衷是對(duì)將來某個(gè)時(shí)刻會(huì)發(fā)生的結(jié)果進(jìn)行建模。它建模了一種異步計(jì)算,返回一個(gè)執(zhí)行運(yùn)算結(jié)果的引用,當(dāng)運(yùn)算結(jié)束后,這個(gè)引用被返回給調(diào)用方。在Future中觸發(fā)那些潛在耗時(shí)的操作把調(diào)用線程解放出來,讓它能繼續(xù)執(zhí)行其他有價(jià)值的工作,不再需要呆呆等待耗時(shí)的操作完成。打個(gè)比方,你可以把它想象成這樣的場(chǎng)景:你拿了一袋子衣服到你中意的干洗店去洗。干洗店的員工會(huì)給你張發(fā)票,告訴你什么時(shí)候你的衣服會(huì)洗好(這就是一個(gè)Future事件)。衣服干洗的同時(shí),你可以去做其他的事情。Future的另一個(gè)優(yōu)點(diǎn)是它比更底層的Thread更易用。要使用Future,通常你只需要將耗時(shí)的操作封裝在一個(gè)Callable對(duì)象中,再將它提交給ExecutorService,就萬事大吉了。下面這段代碼展示了Java 8之前使用Future的一個(gè)例子。

ExecutorService executor = Executors.newCachedThreadPool();
Future future = executor.submit(new Callable() {
    public Double call() {
        return doSomeLongComputation();
    }
});
// 異步操作進(jìn)行的同時(shí),你可以做其他的事情
doSomethingElse();

try {
    Double result = future.get(1, TimeUnit.SECONDS);
} catch (ExecutionException ee) {
    // 計(jì)算拋出一個(gè)異常
} catch (InterruptedException ie) {
    // 當(dāng)前線程在等待過程中被中斷
} catch (TimeoutException te) {
    // 在Future對(duì)象完成之前超過已過期
}

這種編程方式讓你的線程可以在ExecutorService以并發(fā)方式調(diào)用另一個(gè)線程執(zhí)行耗時(shí)操作的同時(shí),去執(zhí)行一些其他的任務(wù)。接著,如果你已經(jīng)運(yùn)行到?jīng)]有異步操作的結(jié)果就無法繼續(xù)任何有意義的工作時(shí),可以調(diào)用它的get方法去獲取操作的結(jié)果。如果操作已經(jīng)完成,該方法會(huì)立刻返回操作的結(jié)果,否則它會(huì)阻塞你的線程,直到操作完成,返回相應(yīng)的結(jié)果。

你能想象這種場(chǎng)景存在怎樣的問題嗎?如果該長(zhǎng)時(shí)間運(yùn)行的操作永遠(yuǎn)不返回了會(huì)怎樣?為了處理這種可能性,雖然Future提供了一個(gè)無需任何參數(shù)的get方法,我們還是推薦大家使用重載版本的get方法,它接受一個(gè)超時(shí)的參數(shù),通過它,你可以定義你的線程等待Future結(jié)果的最長(zhǎng)時(shí)間,而不是樣永無止境地等待下去。

Future 接口的局限性

通過第一個(gè)例子,我們知道Future接口提供了方法來檢測(cè)異步計(jì)算是否已經(jīng)結(jié)束(使用isDone方法),等待異步操作結(jié)束,以及獲取計(jì)算的結(jié)果。但是這些特性還不足以讓你編寫簡(jiǎn)潔的并發(fā)代碼。比如,我們很難表述Future結(jié)果之間的依賴性;從文字描述上這很簡(jiǎn)單,“當(dāng)長(zhǎng)時(shí)間計(jì)算任務(wù)完成時(shí),請(qǐng)將該計(jì)算的結(jié)果通知到另一個(gè)長(zhǎng)時(shí)間運(yùn)行的計(jì)算任務(wù),這兩個(gè)計(jì)算任務(wù)都完成后,將計(jì)算的結(jié)果與另一個(gè)查詢操作結(jié)果合并”。但是,使用Future中提供的方法完成這樣的操作又是另外一回事。這也是我們需要更具描述能力的特性的原因,比如下面這些。

將兩個(gè)異步計(jì)算合并為一個(gè)——這兩個(gè)異步計(jì)算之間相互獨(dú)立,同時(shí)第二個(gè)又依賴于第一個(gè)的結(jié)果。

等待Future集合中的所有任務(wù)都完成。

僅等待Future集合中最快結(jié)束的任務(wù)完成(有可能因?yàn)樗鼈冊(cè)噲D通過不同的方式計(jì)算同一個(gè)值),并返回它的結(jié)果。

通過編程方式完成一個(gè)Future任務(wù)的執(zhí)行(即以手工設(shè)定異步操作結(jié)果的方式)。

應(yīng)對(duì)Future的完成事件(即當(dāng)Future的完成事件發(fā)生時(shí)會(huì)收到通知,并能使用Future計(jì)算的結(jié)果進(jìn)行下一步的操作,不只是簡(jiǎn)單地阻塞等待操作的結(jié)果)。

這一章中,你會(huì)了解新的CompletableFuture類(它實(shí)現(xiàn)了Future接口)如何利用Java 8的新特性以更直觀的方式將上述需求都變?yōu)榭赡?。Stream和CompletableFuture的設(shè)計(jì)都遵循了類似的模式:它們都使用了Lambda表達(dá)式以及流水線的思想。從這個(gè)角度,你可以說CompletableFuture和Future的關(guān)系就跟Stream和Collection的關(guān)系一樣。

使用CompletableFuture 構(gòu)建異步應(yīng)用

為了展示CompletableFuture的強(qiáng)大特性,我們會(huì)創(chuàng)建一個(gè)名為“最佳價(jià)格查詢器”(best-price-finder)的應(yīng)用,它會(huì)查詢多個(gè)在線商店,依據(jù)給定的產(chǎn)品或服務(wù)找出最低的價(jià)格。這個(gè)過程中,你會(huì)學(xué)到幾個(gè)重要的技能。

首先,你會(huì)學(xué)到如何為你的客戶提供異步API(如果你擁有一間在線商店的話,這是非常有幫助的)。

其次,你會(huì)掌握如何讓你使用了同步API的代碼變?yōu)榉亲枞a。你會(huì)了解如何使用流水線將兩個(gè)接續(xù)的異步操作合并為一個(gè)異步計(jì)算操作。這種情況肯定會(huì)出現(xiàn),比如,在線商店返回了你想要購(gòu)買商品的原始價(jià)格,并附帶著一個(gè)折扣代碼——最終,要計(jì)算出該商品的實(shí)際價(jià)格,你不得不訪問第二個(gè)遠(yuǎn)程折扣服務(wù),查詢?cè)撜劭鄞a對(duì)應(yīng)的折扣比率。

你還會(huì)學(xué)到如何以響應(yīng)式的方式處理異步操作的完成事件,以及隨著各個(gè)商店返回它的商品價(jià)格,最佳價(jià)格查詢器如何持續(xù)地更新每種商品的最佳推薦,而不是等待所有的商店都返回他們各自的價(jià)格(這種方式存在著一定的風(fēng)險(xiǎn),一旦某家商店的服務(wù)中斷,用戶可能遭遇白屏)。

實(shí)現(xiàn)異步API

為了實(shí)現(xiàn)最佳價(jià)格查詢器應(yīng)用,讓我們從每個(gè)商店都應(yīng)該提供的API定義入手。首先,商店應(yīng)該聲明依據(jù)指定產(chǎn)品名稱返回價(jià)格的方法:

public double getPrice(String product) {
    // 待實(shí)現(xiàn)
}

該方法的內(nèi)部實(shí)現(xiàn)會(huì)查詢商店的數(shù)據(jù)庫,但也有可能執(zhí)行一些其他耗時(shí)的任務(wù),比如聯(lián)系其他外部服務(wù)(比如,商店的供應(yīng)商,或者跟制造商相關(guān)的推廣折扣)。我們?cè)诒菊率O碌膬?nèi)容中,采用delay方法模擬這些長(zhǎng)期運(yùn)行的方法的執(zhí)行,它會(huì)人為地引入1秒鐘的延遲,方法聲明如下。

public class Util {
    public static void delay() {
        int delay = 1000;
        try {
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

為了介紹本章的內(nèi)容,getPrice方法會(huì)調(diào)用delay方法,并返回一個(gè)隨機(jī)計(jì)算的值,代碼清單如下所示。返回隨機(jī)計(jì)算的價(jià)格這段代碼看起來有些取巧。它使用charAt,依據(jù)產(chǎn)品的名稱,生成一個(gè)隨機(jī)值作為價(jià)格。

public class Shop {
    private final String name;
    private final Random random;

    public Shop(String name) {
        this.name = name;
        random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2));
    }

    public double getPrice(String product) {
        return calculatePrice(product);
    }

    private double calculatePrice(String product) {
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }
}

很明顯,這個(gè)API的使用者(這個(gè)例子中為最佳價(jià)格查詢器)調(diào)用該方法時(shí),它依舊會(huì)被阻塞。為等待同步事件完成而等待1秒鐘,這是無法接受的,尤其是考慮到最佳價(jià)格查詢器對(duì)網(wǎng)絡(luò)中的所有商店都要重復(fù)這種操作。本章接下來的小節(jié)中,你會(huì)了解如何以異步方式使用同步API解決這個(gè)問題。但是,出于學(xué)習(xí)如何設(shè)計(jì)異步API的考慮,我們會(huì)繼續(xù)這一節(jié)的內(nèi)容,假裝我們還在深受這一困難的煩擾:你是一個(gè)睿智的商店店主,你已經(jīng)意識(shí)到了這種同步API會(huì)為你的用戶帶來多么痛苦的體驗(yàn),你希望以異步API的方式重寫這段代碼,讓用戶更流暢地訪問你的網(wǎng)站。

將同步方法轉(zhuǎn)換為異步方法

為了實(shí)現(xiàn)這個(gè)目標(biāo),你首先需要將getPrice轉(zhuǎn)換為getPriceAsync方法,并修改它的返回值:

public class Shop {
    ...
    public Future getPriceAsync(String product) {
        CompletableFuture futurePrice = new CompletableFuture<>();
        new Thread(() -> {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        }).start();
        return futurePrice;
    }
    ...
}

在這段代碼中,你創(chuàng)建了一個(gè)代表異步計(jì)算的CompletableFuture對(duì)象實(shí)例,它在計(jì)算完成時(shí)會(huì)包含計(jì)算的結(jié)果。接著,你調(diào)用fork創(chuàng)建了另一個(gè)線程去執(zhí)行實(shí)際的價(jià)格計(jì)算工作,不等該耗時(shí)計(jì)算任務(wù)結(jié)束,直接返回一個(gè)Future實(shí)例。當(dāng)請(qǐng)求的產(chǎn)品價(jià)格最終計(jì)算得出時(shí),你可以使用它的complete方法,結(jié)束completableFuture對(duì)象的運(yùn)行,并設(shè)置變量的值。很顯然,這個(gè)新版Future的名稱也解釋了它所具有的特性。使用這個(gè)API的客戶端,可以通過下面的這段代碼對(duì)其進(jìn)行調(diào)用。

public class ShopMain {

    public static void main(String[] args) {
        Shop shop = new Shop("最好的商店");
        long start = System.nanoTime();
        Future futurePrice = shop.getPriceAsync("我最喜歡的商品");
        long invocationTime = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("調(diào)用時(shí)間 " + invocationTime);
        // 這里可以做其他的事情,比如查詢其他的商店
        doSomethingElse();
        // 計(jì)算商品價(jià)格
        try {
            double price = futurePrice.get();
            System.out.printf("價(jià)格是 %.2f%n", price);
        } catch (ExecutionException | InterruptedException e) {
            throw new RuntimeException(e);
        }
        long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("計(jì)算價(jià)格時(shí)間 " + retrievalTime);
    }

    private static void doSomethingElse() {
        System.out.println("正在查詢其他的商店...");
    }
}

我們看到這段代碼中,客戶向商店查詢了某種商品的價(jià)格。由于商店提供了異步API,該次調(diào)用立刻返回了一個(gè)Future對(duì)象,通過該對(duì)象客戶可以在將來的某個(gè)時(shí)刻取得商品的價(jià)格。這種方式下,客戶在進(jìn)行商品價(jià)格查詢的同時(shí),還能執(zhí)行一些其他的任務(wù),比如查詢其他家商店中商品的價(jià)格,不會(huì)呆呆地阻塞在那里等待第一家商店返回請(qǐng)求的結(jié)果。最后,如果所有有意義的工作都已經(jīng)完成,客戶所有要執(zhí)行的工作都依賴于商品價(jià)格時(shí),再調(diào)用Future的get方法。執(zhí)行了這個(gè)操作后,客戶要么獲得Future中封裝的值(如果異步任務(wù)已經(jīng)完成),要么發(fā)生阻塞,直到該異步任務(wù)完成,期望的值能夠訪問。上面的代碼中,輸出的結(jié)果:

調(diào)用時(shí)間 116
正在查詢其他的商店...
價(jià)格是 49107.07
計(jì)算價(jià)格時(shí)間 1172

你一定已經(jīng)發(fā)現(xiàn)getPriceAsync方法的調(diào)用時(shí)間遠(yuǎn)遠(yuǎn)早于最終價(jià)格計(jì)算完成的時(shí)間,在之前的代碼,你還會(huì)知道我們有可能避免發(fā)生客戶端被阻塞的風(fēng)險(xiǎn)。實(shí)際上這非常簡(jiǎn)單,F(xiàn)uture執(zhí)行完畢可以發(fā)送一個(gè)通知,僅在計(jì)算結(jié)果可用時(shí)執(zhí)行一個(gè)由Lambda表達(dá)式或者方法引用定義的回調(diào)函數(shù)。不過,我們當(dāng)下不會(huì)對(duì)此進(jìn)行討論,現(xiàn)在我們要解決的是另一個(gè)問題:如何正確地管理異步任務(wù)執(zhí)行過程中可能出現(xiàn)的錯(cuò)誤。

錯(cuò)誤處理

如果沒有意外,我們目前開發(fā)的代碼工作得很正常。但是,如果價(jià)格計(jì)算過程中產(chǎn)生了錯(cuò)誤會(huì)怎樣呢?非常不幸,這種情況下你會(huì)得到一個(gè)相當(dāng)糟糕的結(jié)果:用于提示錯(cuò)誤的異常會(huì)被限制在試圖計(jì)算商品價(jià)格的當(dāng)前線程的范圍內(nèi),最終會(huì)殺死該線程,而這會(huì)導(dǎo)致等待get方法返回結(jié)果的客戶端永久地被阻塞。

客戶端可以使用重載版本的get方法,它使用一個(gè)超時(shí)參數(shù)來避免發(fā)生這樣的情況。這是一種值得推薦的做法,你應(yīng)該盡量在你的代碼中添加超時(shí)判斷的邏輯,避免發(fā)生類似的問題。使用這種方法至少能防止程序永久地等待下去,超時(shí)發(fā)生時(shí),程序會(huì)得到通知發(fā)生了TimeoutException。不過,也因?yàn)槿绱?,你不?huì)有機(jī)會(huì)發(fā)現(xiàn)計(jì)算商品價(jià)格的線程內(nèi)到底發(fā)生了什么問題才引發(fā)了這樣的失效。為了讓客戶端能了解商店無法提供請(qǐng)求商品價(jià)格的原因,你需要使用CompletableFuture的completeExceptionally方法將導(dǎo)致CompletableFuture內(nèi)發(fā)生問題的異常拋出。對(duì)代碼優(yōu)化后的結(jié)果如下所示。

public Future getPriceAsync(String product) {
    CompletableFuture futurePrice = new CompletableFuture<>();
    new Thread(() -> {
        try {
            double price = calculatePrice(product);
            // 如果價(jià)格計(jì)算正常結(jié)束,完成Future操作并設(shè)置商品價(jià)格
            futurePrice.complete(price);
        } catch (Exception e) {
            // 否則就拋出導(dǎo)致失敗的異常,完成這次Future操作
            futurePrice.completeExceptionally(e);
        }
    }).start();
    return futurePrice;
}

客戶端現(xiàn)在會(huì)收到一個(gè)ExecutionException異常,該異常接收了一個(gè)包含失敗原因的Exception參數(shù),即價(jià)格計(jì)算方法最初拋出的異常。所以,舉例來說,如果該方法拋出了一個(gè)運(yùn)行時(shí)異?!皃roduct not available”,客戶端就會(huì)得到像下面這樣一段ExecutionException:

java.util.concurrent.ExecutionException: java.lang.RuntimeException: product
    not availableat java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2237)
    at xin.codedream.java8.chap11.AsyncShopClient.main(AsyncShopClient.java:14)
    ... 5 more
Caused by: java.lang.RuntimeException: product not available
    at xin.codedream.java8.chap11.AsyncShop.calculatePrice(AsyncShop.java:36)
    atxin.codedream.java8.chap11.AsyncShop.lambda$getPrice$0(AsyncShop.java:23)
    at xin.codedream.java8.chap11.AsyncShop$$Lambda$1/24071475.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:744)

使用工廠方法supplyAsync創(chuàng)建CompletableFuture
目前為止我們已經(jīng)了解了如何通過編程創(chuàng)建CompletableFuture對(duì)象以及如何獲取返回值,雖然看起來這些操作已經(jīng)比較方便,但還有進(jìn)一步提升的空間,CompletableFuture類自身提供了大量精巧的工廠方法,使用這些方法能更容易地完成整個(gè)流程,還不用擔(dān)心實(shí)現(xiàn)的細(xì)節(jié)。比如,采用supplyAsync方法后,你可以用一行語句重寫getPriceAsync方法,如下所示。

public Future getPriceAsync(String product) {
    return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

太棒了!七八行才能實(shí)現(xiàn)的功能,我們現(xiàn)在只需要一行就可以搞定了!supplyAsync方法接受一個(gè)生產(chǎn)者(Supplier)作為參數(shù),返回一個(gè)CompletableFuture對(duì)象,該對(duì)象完成異步執(zhí)行后會(huì)讀取調(diào)用生產(chǎn)者方法的返回值。生產(chǎn)者方法會(huì)交由ForkJoinPool池中的某個(gè)執(zhí)行線程(Executor)運(yùn)行,但是你也可以使用supplyAsync方法的重載版本,傳遞第二個(gè)參數(shù)指定不同的執(zhí)行線程執(zhí)行生產(chǎn)者方法。一般而言,向CompletableFuture的工廠方法傳遞可選參數(shù),指定生產(chǎn)者方法的執(zhí)行線程是可行的,在后面,你會(huì)使用這一能力,后面我們將使用適合你應(yīng)用特性的執(zhí)行線程改善程序的性能。

接下來剩余部分中,我們會(huì)假設(shè)你非常不幸,無法控制Shop類提供API的具體實(shí)現(xiàn),最終提供給你的API都是同步阻塞式的方法。這也是當(dāng)你試圖使用服務(wù)提供的HTTP API時(shí)最常發(fā)生的情況。你會(huì)學(xué)到如何以異步的方式查詢多個(gè)商店,避免被單一的請(qǐng)求所阻塞,并由此提升你的“最佳價(jià)格查詢器”的性能和吞吐量。

讓你的代碼免受阻塞之苦

所以,你已經(jīng)被要求進(jìn)行“最佳價(jià)格查詢器”應(yīng)用的開發(fā)了,不過你需要查詢的所有商店都如上面開始時(shí)介紹的那樣,只提供了同步API。換句話說,你有一個(gè)商家的列表,如下所示:

public class BestPriceFinder {
    private final List shops = Arrays.asList(new Shop("BestPrice"),
            new Shop("LetsSaveBig"),
            new Shop("MyFavoriteShop"),
            new Shop("BuyItAll"));
    ...
}

你需要使用下面這樣的簽名實(shí)現(xiàn)一個(gè)方法,它接受產(chǎn)品名作為參數(shù),返回一個(gè)字符串列表,這個(gè)字符串列表中包括商店的名稱、該商店中指定商品的價(jià)格:

public List findPrices(String product);

你的第一個(gè)想法可能是使用我們?cè)谇懊娴恼鹿?jié)中學(xué)習(xí)的Stream特性。你可能試圖寫出類似下面這個(gè)代碼(是的,作為第一個(gè)方案,如果你想到這些已經(jīng)相當(dāng)棒了!)。

好吧,這段代碼看起來非常直白?,F(xiàn)在試著用該方法去查詢你最近這些天瘋狂著迷的唯一產(chǎn)品(是的,你已經(jīng)猜到了,它就是Old-Mi-Mix3)。此外,也請(qǐng)記錄下方法的執(zhí)行時(shí)間,通過這些數(shù)據(jù),我們可以比較優(yōu)化之后的方法會(huì)帶來多大的性能提升,具體的代碼如下。

public class BestPriceFinder {
    private final List shops = Arrays.asList(new Shop("BestPrice"),
            new Shop("LetsSaveBig"),
            new Shop("MyFavoriteShop"),
            new Shop("BuyItAll"));

    public static void main(String[] args) {
        BestPriceFinder finder = new BestPriceFinder();
        finder.testFindPrices();
    }

    public void testFindPrices() {
        long start = System.nanoTime();
        System.out.println(findPrices("Old-Mi-Mix3"));
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("完成時(shí)間 " + duration);
    }

    public List findPrices(String product) {
        return shops.stream()
                .map(shop -> String.format("%s 價(jià)格 %.2f",
                        shop.getName(), shop.getPrice(product)))
                .collect(toList());
    }
}

輸出結(jié)果:

[BestPrice 價(jià)格 109.64, LetsSaveBig 價(jià)格 143.13, MyFavoriteShop 價(jià)格 175.50, BuyItAll 價(jià)格 154.20]
完成時(shí)間 4184

正如你預(yù)期的,findPrices方法的執(zhí)行時(shí)間僅比4秒鐘多了那么幾百毫秒,因?yàn)閷?duì)這4個(gè)商店的查詢是順序進(jìn)行的,并且一個(gè)查詢操作會(huì)阻塞另一個(gè),每一個(gè)操作都要花費(fèi)大約1秒左右的時(shí)間計(jì)算請(qǐng)求商品的價(jià)格。你怎樣才能改進(jìn)這個(gè)結(jié)果呢?

使用并行流對(duì)請(qǐng)求進(jìn)行并行操作

如果你看了第七章的筆記,那么你應(yīng)該想到的第一個(gè),可能也是最快的改善方法是使用并行流來避免順序計(jì)算,如下所示。

public List findPricesParallel(String product) {
    return shops.parallelStream()
            .map(shop -> String.format("%s 價(jià)格 %.2f",
                    shop.getName(), shop.getPrice(product)))
            .collect(toList());
}

運(yùn)行代碼,與最初的代碼執(zhí)行結(jié)果相比較,你發(fā)現(xiàn)了新版findPrices的改進(jìn)了吧。

[BestPrice 價(jià)格 109.64, LetsSaveBig 價(jià)格 143.13, MyFavoriteShop 價(jià)格 175.50, BuyItAll 價(jià)格 154.20]
完成時(shí)間 1248

相當(dāng)不錯(cuò)啊!看起來這是個(gè)簡(jiǎn)單但有效的主意:現(xiàn)在對(duì)四個(gè)不同商店的查詢實(shí)現(xiàn)了并行,所以完成所有操作的總耗時(shí)只有1秒多一點(diǎn)兒。你能做得更好嗎?讓我們嘗試使用剛學(xué)過的CompletableFuture,將findPrices方法中對(duì)不同商店的同步調(diào)用替換為異步調(diào)用。

使用CompletableFuture 發(fā)起異步請(qǐng)求

你已經(jīng)知道我們可以使用工廠方法supplyAsync創(chuàng)建CompletableFuture對(duì)象。讓我們把它利用起來:

public List> findPricesFuture(String product) {
    return shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s 價(jià)格 %.2f",
                    shop.getName(), shop.getPrice(product))))
            .collect(toList());
}

使用這種方式,你會(huì)得到一個(gè)List>,列表中的每個(gè)CompletableFuture對(duì)象在計(jì)算完成后都包含商店的String類型的名稱。但是,由于你用CompletableFutures實(shí)現(xiàn)的findPrices方法要求返回一個(gè)List,你需要等待所有的future執(zhí)行完畢,將其包含的值抽取出來,填充到列表中才能返回。

為了實(shí)現(xiàn)這個(gè)效果,你可以向最初的List>施加第二個(gè)map操作,對(duì)List中的所有future對(duì)象執(zhí)行join操作,一個(gè)接一個(gè)地等待它們運(yùn)行結(jié)束。注意CompletableFuture類中的join方法和Future接口中的get有相同的含義,并且也聲明在Future接口中,它們唯一的不同是join不會(huì)拋出任何檢測(cè)到的異常。使用它你不再需要使用try/catch語句塊讓你傳遞給第二個(gè)map方法的Lambda表達(dá)式變得過于臃腫。所有這些整合在一起,你就可以重新實(shí)現(xiàn)findPrices了,具體代碼如下。

public List findPrices(String product) {
    List> priceFutures = shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s 價(jià)格 %.2f",
                    shop.getName(), shop.getPrice(product))))
            .collect(toList());

    return priceFutures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}

運(yùn)行下代碼了解下第三個(gè)版本findPrices方法的性能,你會(huì)得到下面這幾行輸出:

[BestPrice 價(jià)格 109.64, LetsSaveBig 價(jià)格 143.13, MyFavoriteShop 價(jià)格 175.50, BuyItAll 價(jià)格 154.20]
完成時(shí)間 2207

這個(gè)結(jié)果讓人相當(dāng)失望,不是嗎?超過2秒意味著利用CompletableFuture實(shí)現(xiàn)的版本,比剛開始的代碼中的原生順序執(zhí)行且會(huì)發(fā)生阻塞的版本快。但是它的用時(shí)也差不多是使用并行流的前一個(gè)版本的兩倍。尤其是,考慮到從順序執(zhí)行的版本轉(zhuǎn)換到并行流的版本只做了非常小的改動(dòng),就讓人更加沮喪。

與此形成鮮明對(duì)比的是,我們?yōu)椴捎肅ompletableFutures完成的新版方法做了大量的工作!但,這就是全部的真相嗎?這種場(chǎng)景下使用CompletableFutures真的是浪費(fèi)時(shí)間嗎?或者我們可能漏掉了某些重要的東西?繼續(xù)往下探究之前,讓我們休息幾分鐘,尤其是想想你測(cè)試代碼的機(jī)器是否足以以并行方式運(yùn)行四個(gè)線程。

尋找更好的方案

并行流的版本工作得非常好,那是因?yàn)樗懿⑿械貓?zhí)行四個(gè)任務(wù),所以它幾乎能為每個(gè)商家分配一個(gè)線程。但是,如果你想要增加第五個(gè)商家到商店列表中,讓你的“最佳價(jià)格查詢”應(yīng)用對(duì)其進(jìn)行處理,這時(shí)會(huì)發(fā)生什么情況?

public class BestPriceFinder {
    private final List shops = Arrays.asList(new Shop("BestPrice"),
            new Shop("LetsSaveBig"),
            new Shop("MyFavoriteShop"),
            new Shop("BuyItAll"),
            new Shop("ShopEasy"));
    ...

    public List findPricesParallel(String product) {
        return shops.parallelStream()
                .map(shop -> String.format("%s 價(jià)格 %.2f",
                        shop.getName(), shop.getPrice(product)))
                .collect(toList());
    }

    public List findPricesSequential(String product) {
        return shops.stream()
                .map(shop -> String.format("%s 價(jià)格 %.2f",
                        shop.getName(), shop.getPrice(product)))
                .collect(toList());
    }


    public List findPricesFuture(String product) {
        List> priceFutures = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s 價(jià)格 %.2f",
                        shop.getName(), shop.getPrice(product))))
                .collect(toList());

        return priceFutures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    }
}

public class BestPriceFinderMain {

    private static BestPriceFinder bestPriceFinder = new BestPriceFinder();

    public static void main(String[] args) {
        execute("sequential", () -> bestPriceFinder.findPricesSequential("Old-Mi-Mix3"));
    }

    private static void execute(String msg, Supplier> s) {
        long start = System.nanoTime();
        System.out.println(s.get());
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println(msg + " 完成時(shí)間 " + duration);
    }
}

毫不意外,順序執(zhí)行版本的執(zhí)行還是需要大約5秒多鐘的時(shí)間,下面是執(zhí)行的輸出:

[BestPrice 價(jià)格 109.64, LetsSaveBig 價(jià)格 143.13, MyFavoriteShop 價(jià)格 175.50, BuyItAll 價(jià)格 154.20, ShopEasy 價(jià)格 147.92]
sequential 完成時(shí)間 5139

非常不幸,并行流版本的程序這次比之前也多消耗了差不多1秒鐘的時(shí)間,因?yàn)榭梢圆⑿羞\(yùn)行(通用線程池中處于可用狀態(tài)的)的四個(gè)線程現(xiàn)在都處于繁忙狀態(tài),都在對(duì)前4個(gè)商店進(jìn)行查詢。第五個(gè)查詢只能等到前面某一個(gè)操作完成釋放出空閑線程才能繼續(xù),它的運(yùn)行結(jié)果如下:

[BestPrice 價(jià)格 163.19, LetsSaveBig 價(jià)格 141.77, MyFavoriteShop 價(jià)格 159.81, BuyItAll 價(jià)格 165.02, ShopEasy 價(jià)格 165.81]
parallel 完成時(shí)間 2106

CompletableFuture版本的程序結(jié)果如何呢?我們也試著添加第5個(gè)商店對(duì)其進(jìn)行了測(cè)試,結(jié)果如下:

[BestPrice 價(jià)格 144.31, LetsSaveBig 價(jià)格 142.49, MyFavoriteShop 價(jià)格 146.99, BuyItAll 價(jià)格 132.52, ShopEasy 價(jià)格 139.15]
composed CompletableFuture 完成時(shí)間 2004

CompletableFuture版本的程序似乎比并行流版本的程序還快那么一點(diǎn)兒。但是最后這個(gè)版本也不太令人滿意。比如,如果你試圖讓你的代碼處理9個(gè)商店,并行流版本耗時(shí)3143毫秒,而CompletableFuture版本耗時(shí)3009毫秒。它們看起來不相伯仲,究其原因都一樣:它們內(nèi)部采用的是同樣的通用線程池,默認(rèn)都使用固定數(shù)目的線程,具體線程數(shù)取決于Runtime.getRuntime().availableProcessors()的返回值。然而,CompletableFuture具有一定的優(yōu)勢(shì),因?yàn)樗试S你對(duì)執(zhí)行器(Executor)進(jìn)行配置,尤其是線程池的大小,讓它以更適合應(yīng)用需求的方式進(jìn)行配置,滿足程序的要求,而這是并行流API無法提供的。讓我們看看你怎樣利用這種配置上的靈活性帶來實(shí)際應(yīng)用程序性能上的提升。

使用定制的執(zhí)行器

就這個(gè)主題而言,明智的選擇似乎是創(chuàng)建一個(gè)配有線程池的執(zhí)行器,線程池中線程的數(shù)目取決于你預(yù)計(jì)你的應(yīng)用需要處理的負(fù)荷,但是你該如何選擇合適的線程數(shù)目呢?

你的應(yīng)用99%的時(shí)間都在等待商店的響應(yīng),所以估算出的W/C比率為100。這意味著如果你期望的CPU利用率是100%,你需要?jiǎng)?chuàng)建一個(gè)擁有400個(gè)線程的線程池。實(shí)際操作中,如果你創(chuàng)建的線程數(shù)比商店的數(shù)目更多,反而是一種浪費(fèi),因?yàn)檫@樣做之后,你線程池中的有些線程根本沒有機(jī)會(huì)被使用。出于這種考慮,我們建議你將執(zhí)行器使用的線程數(shù),與你需要查詢的商店數(shù)目設(shè)定為同一個(gè)值,這樣每個(gè)商店都應(yīng)該對(duì)應(yīng)一個(gè)服務(wù)線程。不過,為了避免發(fā)生由于商店的數(shù)目過多導(dǎo)致服務(wù)器超負(fù)荷而崩潰,你還是需要設(shè)置一個(gè)上限,比如100個(gè)線程。代碼清單如下所示。

private final Executor executor = Executors.newFixedThreadPool(100, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        }
    });

注意,你現(xiàn)在正創(chuàng)建的是一個(gè)由守護(hù)線程構(gòu)成的線程池。Java程序無法終止或者退出一個(gè)正在運(yùn)行中的線程,所以最后剩下的那個(gè)線程會(huì)由于一直等待無法發(fā)生的事件而引發(fā)問題。與此相反,如果將線程標(biāo)記為守護(hù)進(jìn)程,意味著程序退出時(shí)它也會(huì)被回收。這二者之間沒有性能上的差異。現(xiàn)在,你可以將執(zhí)行器作為第二個(gè)參數(shù)傳遞給supplyAsync工廠方法了。比如,你現(xiàn)在可以按照下面的方式創(chuàng)建一個(gè)可查詢指定商品價(jià)格的CompletableFuture對(duì)象:

CompletableFuture.supplyAsync(() -> String.format("%s 價(jià)格 %.2f",
                        shop.getName(), shop.getPrice(product)), executor)

改進(jìn)之后,使用CompletableFuture方案的程序處理5個(gè)商店結(jié)果:

[BestPrice 價(jià)格 144.31, LetsSaveBig 價(jià)格 142.49, MyFavoriteShop 價(jià)格 146.99, BuyItAll 價(jià)格 132.52, ShopEasy 價(jià)格 139.15]
composed CompletableFuture 完成時(shí)間 1004

這個(gè)例子證明了要?jiǎng)?chuàng)建更適合你的應(yīng)用特性的執(zhí)行器,利用CompletableFutures向其提交任務(wù)執(zhí)行是個(gè)不錯(cuò)的主意。處理需大量使用異步操作的情況時(shí),這幾乎是最有效的策略。

并行——使用流還是CompletableFutures?

目前為止,你已經(jīng)知道對(duì)集合進(jìn)行并行計(jì)算有兩種方式:要么將其轉(zhuǎn)化為并行流,利用map這樣的操作開展工作,要么枚舉出集合中的每一個(gè)元素,創(chuàng)建新的線程,在CompletableFuture內(nèi)對(duì)其進(jìn)行操作。后者提供了更多的靈活性,你可以調(diào)整線程池的大小,而這能幫助你確保整體的計(jì)算不會(huì)因?yàn)榫€程都在等待I/O而發(fā)生阻塞。書中使用這些API的建議如下。

如果你進(jìn)行的是計(jì)算密集型的操作,并且沒有I/O,那么推薦使用Stream接口,因?yàn)閷?shí)現(xiàn)簡(jiǎn)單,同時(shí)效率也可能是最高的(如果所有的線程都是計(jì)算密集型的,那就沒有必要?jiǎng)?chuàng)建比處理器核數(shù)更多的線程)。

反之,如果你并行的工作單元還涉及等待I/O的操作(包括網(wǎng)絡(luò)連接等待),那么使用CompletableFuture靈活性更好,你可以像前文討論的那樣,依據(jù)等待/計(jì)算,或者W/C的比率設(shè)定需要使用的線程數(shù)。這種情況不使用并行流的另一個(gè)原因是,處理流的流水線中如果發(fā)生I/O等待,流的延遲特性會(huì)讓我們很難判斷到底什么時(shí)候觸發(fā)了等待。

現(xiàn)在你已經(jīng)了解了如何利用CompletableFuture為你的用戶提供異步API,以及如何將一個(gè)同步又緩慢的服務(wù)轉(zhuǎn)換為異步的服務(wù)。不過到目前為止,我們每個(gè)Future中進(jìn)行的都是單次的操作。

代碼

Github: chap11

Gitee: chap11

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/72126.html

相關(guān)文章

  • 《java 8 實(shí)戰(zhàn)》讀書筆記 -第十一章 CompletableFuture合式異步編程

    摘要:方法接受一個(gè)生產(chǎn)者作為參數(shù),返回一個(gè)對(duì)象,該對(duì)象完成異步執(zhí)行后會(huì)讀取調(diào)用生產(chǎn)者方法的返回值。該方法接收一個(gè)對(duì)象構(gòu)成的數(shù)組,返回由第一個(gè)執(zhí)行完畢的對(duì)象的返回值構(gòu)成的。 一、Future 接口 在Future中觸發(fā)那些潛在耗時(shí)的操作把調(diào)用線程解放出來,讓它能繼續(xù)執(zhí)行其他有價(jià)值的工作,不再需要呆呆等待耗時(shí)的操作完成。打個(gè)比方,你可以把它想象成這樣的場(chǎng)景:你拿了一袋子衣服到你中意的干洗店去洗。...

    zhangqh 評(píng)論0 收藏0
  • Java8CompletableFuture進(jìn)階之道

    摘要:方法接收的是的實(shí)例,但是它沒有返回值方法是函數(shù)式接口,無參數(shù),會(huì)返回一個(gè)結(jié)果這兩個(gè)方法是的升級(jí),表示讓任務(wù)在指定的線程池中執(zhí)行,不指定的話,通常任務(wù)是在線程池中執(zhí)行的。該的接口是在線程使用舊的接口,它不允許返回值。 簡(jiǎn)介 作為Java 8 Concurrency API改進(jìn)而引入,本文是CompletableFuture類的功能和用例的介紹。同時(shí)在Java 9 也有對(duì)Completab...

    SunZhaopeng 評(píng)論0 收藏0
  • Java 8原生API也可以開發(fā)響應(yīng)式代碼?

    摘要:中使用了提供的原生接口對(duì)自身的異步化做了改進(jìn)??梢灾С趾蛢煞N調(diào)用方式。實(shí)戰(zhàn)通過下面的例子,可以看出的最大好處特性。 showImg(https://segmentfault.com/img/remote/1460000020032427?w=1240&h=655); 前段時(shí)間工作上比較忙,這篇文章一直沒來得及寫,本文是閱讀《Java8實(shí)戰(zhàn)》的時(shí)候,了解到Java 8里已經(jīng)提供了一個(gè)異步...

    HtmlCssJs 評(píng)論0 收藏0
  • 《深入理解ES6》筆記——導(dǎo)讀

    摘要:最近買了深入理解的書籍來看,為什么學(xué)習(xí)這么久還要買這本書呢主要是看到核心團(tuán)隊(duì)成員及的創(chuàng)造者為本書做了序,作為一個(gè)粉絲,還是挺看好這本書能給我?guī)硪粋€(gè)新的升華,而且本書的作者也非常厲害。 使用ES6開發(fā)已經(jīng)有1年多了,以前看的是阮一峰老師的ES6教程,也看過MDN文檔的ES6語法介紹。 最近買了《深入理解ES6》的書籍來看,為什么學(xué)習(xí)ES6這么久還要買這本書呢?主要是看到Daniel A...

    Godtoy 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

hlcfan

|高級(jí)講師

TA的文章

閱讀更多
最新活動(dòng)
閱讀需要支付1元查看
<