成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Java8的CompletableFuture進(jìn)階之道

SunZhaopeng / 1959人閱讀

摘要:方法接收的是的實例,但是它沒有返回值方法是函數(shù)式接口,無參數(shù),會返回一個結(jié)果這兩個方法是的升級,表示讓任務(wù)在指定的線程池中執(zhí)行,不指定的話,通常任務(wù)是在線程池中執(zhí)行的。該的接口是在線程使用舊的接口,它不允許返回值。

簡介

作為Java 8 Concurrency API改進(jìn)而引入,本文是CompletableFuture類的功能和用例的介紹。同時在Java 9 也有對CompletableFuture有一些改進(jìn),之后再進(jìn)入講解。

Future計算

Future異步計算很難操作,通常我們希望將任何計算邏輯視為一系列步驟。但是在異步計算的情況下,表示為回調(diào)的方法往往分散在代碼中或者深深地嵌套在彼此內(nèi)部。但是當(dāng)我們需要處理其中一個步驟中可能發(fā)生的錯誤時,情況可能會變得更復(fù)雜。

Futrue接口是Java 5中作為異步計算而新增的,但它沒有任何方法去進(jìn)行計算組合或者處理可能出現(xiàn)的錯誤。

在Java 8中,引入了CompletableFuture類。與Future接口一起,它還實現(xiàn)了CompletionStage接口。此接口定義了可與其他Future組合成異步計算契約。

CompletableFuture同時是一個組合和一個框架,具有大約50種不同的構(gòu)成,結(jié)合,執(zhí)行異步計算步驟和處理錯誤。

如此龐大的API可能會令人難以招架,下文將調(diào)一些重要的做重點介紹。

使用CompletableFuture作為Future實現(xiàn)

首先,CompletableFuture類實現(xiàn)Future接口,因此你可以將其用作Future實現(xiàn),但需要額外的完成實現(xiàn)邏輯。

例如,你可以使用無構(gòu)參構(gòu)造函數(shù)創(chuàng)建此類的實例,然后使用complete方法完成。消費(fèi)者可以使用get方法來阻塞當(dāng)前線程,直到get()結(jié)果。

在下面的示例中,我們有一個創(chuàng)建CompletableFuture實例的方法,然后在另一個線程中計算并立即返回Future。

計算完成后,該方法通過將結(jié)果提供給完整方法來完成Future:

public Future calculateAsync() throws InterruptedException {
    CompletableFuture completableFuture 
      = new CompletableFuture<>();
 
    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.complete("Hello");
        return null;
    });
 
    return completableFuture;
}

為了分離計算,我們使用了Executor?API?,這種創(chuàng)建和完成CompletableFuture的方法可以與任何并發(fā)包(包括原始線程)一起使用。

請注意,calculateAsync方法返回一個Future實例。

我們只是調(diào)用方法,接收Future實例并在我們準(zhǔn)備阻塞結(jié)果時調(diào)用它的get方法。

另請注意,get方法拋出一些已檢查的異常,即ExecutionException(封裝計算期間發(fā)生的異常)和InterruptedException(表示執(zhí)行方法的線程被中斷的異常):

Future completableFuture = calculateAsync();
 
// ... 
 
String result = completableFuture.get();
assertEquals("Hello", result);

如果你已經(jīng)知道計算的結(jié)果,也可以用變成同步的方式來返回結(jié)果。

Future completableFuture = 
  CompletableFuture.completedFuture("Hello");
 
// ...
 
String result = completableFuture.get();
assertEquals("Hello", result);

作為在某些場景中,你可能希望取消Future任務(wù)的執(zhí)行。

假設(shè)我們沒有找到結(jié)果并決定完全取消異步執(zhí)行任務(wù)。這可以通過Future的取消方法完成。此方法mayInterruptIfRunning,但在CompletableFuture的情況下,它沒有任何效果,因為中斷不用于控制CompletableFuture的處理。

這是異步方法的修改版本:

public Future calculateAsyncWithCancellation() throws InterruptedException {
    CompletableFuture completableFuture = new CompletableFuture<>();
 
    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.cancel(false);
        return null;
    });
 
    return completableFuture;
}

當(dāng)我們使用Future.get()方法阻塞結(jié)果時,cancel()表示取消執(zhí)行,它將拋出CancellationException:

Future future = calculateAsyncWithCancellation();
future.get(); // CancellationException
API介紹 static方法說明

上面的代碼很簡單,下面介紹幾個 static 方法,它們使用任務(wù)來實例化一個 CompletableFuture 實例。

CompletableFuture.runAsync(Runnable runnable);
CompletableFuture.runAsync(Runnable runnable, Executor executor);

CompletableFuture.supplyAsync(Supplier supplier);
CompletableFuture.supplyAsync(Supplier supplier, Executor executor)

runAsync 方法接收的是 Runnable 的實例,但是它沒有返回值

supplyAsync 方法是JDK8函數(shù)式接口,無參數(shù),會返回一個結(jié)果

這兩個方法是 executor 的升級,表示讓任務(wù)在指定的線程池中執(zhí)行,不指定的話,通常任務(wù)是在 ForkJoinPool.commonPool() 線程池中執(zhí)行的。

supplyAsync()使用

靜態(tài)方法runAsyncsupplyAsync允許我們相應(yīng)地從Runnable和Supplier功能類型中創(chuàng)建CompletableFuture實例。

該Runnable的接口是在線程使用舊的接口,它不允許返回值。

Supplier接口是一個不具有參數(shù),并返回參數(shù)化類型的一個值的單個方法的通用功能接口。

這允許將Supplier的實例作為lambda表達(dá)式提供,該表達(dá)式執(zhí)行計算并返回結(jié)果:

CompletableFuture future
  = CompletableFuture.supplyAsync(() -> "Hello");
 
// ...
 
assertEquals("Hello", future.get());
thenRun()使用

在兩個任務(wù)任務(wù)A,任務(wù)B中,如果既不需要任務(wù)A的值也不想在任務(wù)B中引用,那么你可以將Runnable lambda 傳遞給thenRun()方法。在下面的示例中,在調(diào)用future.get()方法之后,我們只需在控制臺中打印一行:

模板

CompletableFuture.runAsync(() -> {}).thenRun(() -> {}); 
CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {});

第一行用的是 thenRun(Runnable runnable),任務(wù) A 執(zhí)行完執(zhí)行 B,并且 B 不需要 A 的結(jié)果。

第二行用的是 thenRun(Runnable runnable),任務(wù) A 執(zhí)行完執(zhí)行 B,會返回resultA,但是 B 不需要 A 的結(jié)果。

實戰(zhàn)

CompletableFuture completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture future = completableFuture
  .thenRun(() -> System.out.println("Computation finished."));
 
future.get();
thenAccept()使用

在兩個任務(wù)任務(wù)A,任務(wù)B中,如果你不需要在Future中有返回值,則可以用 thenAccept方法接收將計算結(jié)果傳遞給它。最后的future.get()調(diào)用返回Void類型的實例。

模板

CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {}); 

CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});

第一行中,runAsync不會有返回值,第二個方法thenAccept,接收到的resultA值為null,同時任務(wù)B也不會有返回結(jié)果

第二行中,supplyAsync有返回值,同時任務(wù)B不會有返回結(jié)果。

實戰(zhàn)

CompletableFuture completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture future = completableFuture
  .thenAccept(s -> System.out.println("Computation returned: " + s));
 
future.get();
thenApply()使用

在兩個任務(wù)任務(wù)A,任務(wù)B中,任務(wù)B想要任務(wù)A計算的結(jié)果,可以用thenApply方法來接受一個函數(shù)實例,用它來處理結(jié)果,并返回一個Future函數(shù)的返回值:

模板

CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB");
CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");

第二行用的是 thenApply(Function fn),任務(wù) A 執(zhí)行完執(zhí)行 B,B 需要 A 的結(jié)果,同時任務(wù) B 有返回值。

實戰(zhàn)

CompletableFuture completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture future = completableFuture
  .thenApply(s -> s + " World");
 
assertEquals("Hello World", future.get());

當(dāng)然,多個任務(wù)的情況下,如果任務(wù) B 后面還有任務(wù) C,往下繼續(xù)調(diào)用 .thenXxx() 即可。

thenCompose()使用
接下來會有一個很有趣的設(shè)計模式;

CompletableFuture API 的最佳場景是能夠在一系列計算步驟中組合CompletableFuture實例。

這種組合結(jié)果本身就是CompletableFuture,允許進(jìn)一步再續(xù)組合。這種方法在函數(shù)式語言中無處不在,通常被稱為monadic設(shè)計模式。

簡單說,Monad就是一種設(shè)計模式,表示將一個運(yùn)算過程,通過函數(shù)拆解成互相連接的多個步驟。你只要提供下一步運(yùn)算所需的函數(shù),整個運(yùn)算就會自動進(jìn)行下去。

在下面的示例中,我們使用thenCompose方法按順序組合兩個Futures。

請注意,此方法采用返回CompletableFuture實例的函數(shù)。該函數(shù)的參數(shù)是先前計算步驟的結(jié)果。這允許我們在下一個CompletableFuture的lambda中使用這個值:

CompletableFuture completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
 
assertEquals("Hello World", completableFuture.get());

該thenCompose方法連同thenApply一樣實現(xiàn)了結(jié)果的合并計算。但是他們的內(nèi)部形式是不一樣的,它們與Java 8中可用的Stream和Optional類的map和flatMap方法是有著類似的設(shè)計思路在里面的。

兩個方法都接收一個CompletableFuture并將其應(yīng)用于計算結(jié)果,但thenCompose(flatMap)方法接收一個函數(shù),該函數(shù)返回相同類型的另一個CompletableFuture對象。此功能結(jié)構(gòu)允許將這些類的實例繼續(xù)進(jìn)行組合計算。

thenCombine()
取兩個任務(wù)的結(jié)果

如果要執(zhí)行兩個獨(dú)立的任務(wù),并對其結(jié)果執(zhí)行某些操作,可以用Future的thenCombine方法:

模板

CompletableFuture cfA = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture cfB = CompletableFuture.supplyAsync(() -> "resultB");

cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {});

cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");

實戰(zhàn)

CompletableFuture completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCombine(CompletableFuture.supplyAsync(
      () -> " World"), (s1, s2) -> s1 + s2));
 
assertEquals("Hello World", completableFuture.get());

更簡單的情況是,當(dāng)你想要使用兩個Future結(jié)果時,但不需要將任何結(jié)果值進(jìn)行返回時,可以用thenAcceptBoth,它表示后續(xù)的處理不需要返回值,而 thenCombine 表示需要返回值:

CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
  .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
    (s1, s2) -> System.out.println(s1 + s2));
thenApply()和thenCompose()之間的區(qū)別

在前面的部分中,我們展示了關(guān)于thenApply()和thenCompose()的示例。這兩個API都是使用的CompletableFuture調(diào)用,但這兩個API的使用是不同的。

thenApply()

此方法用于處理先前調(diào)用的結(jié)果。但是,要記住的一個關(guān)鍵點是返回類型是轉(zhuǎn)換泛型中的類型,是同一個CompletableFuture。

因此,當(dāng)我們想要轉(zhuǎn)換CompletableFuture 調(diào)用的結(jié)果時,效果是這樣的 :

CompletableFuture finalResult = compute().thenApply(s-> s + 1);

thenCompose()

該thenCompose()方法類似于thenApply()在都返回一個新的計算結(jié)果。但是,thenCompose()使用前一個Future作為參數(shù)。它會直接使結(jié)果變新的Future,而不是我們在thenApply()中到的嵌套Future,而是用來連接兩個CompletableFuture,是生成一個新的CompletableFuture:

CompletableFuture computeAnother(Integer i){
    return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture finalResult = compute().thenCompose(this::computeAnother);

因此,如果想要繼續(xù)嵌套鏈接CompletableFuture??方法,那么最好使用thenCompose()。

并行運(yùn)行多個任務(wù)

當(dāng)我們需要并行執(zhí)行多個任務(wù)時,我們通常希望等待所有它們執(zhí)行,然后處理它們的組合結(jié)果。

CompletableFuture.allOf靜態(tài)方法允許等待所有的完成任務(wù):

API

public static CompletableFuture allOf(CompletableFuture... cfs){...}

實戰(zhàn)

CompletableFuture future1  
  = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture future2  
  = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture future3  
  = CompletableFuture.supplyAsync(() -> "World");
 
CompletableFuture combinedFuture 
  = CompletableFuture.allOf(future1, future2, future3);
 
// ...
 
combinedFuture.get();
 
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());

請注意,CompletableFuture.allOf()的返回類型是CompletableFuture 。這種方法的局限性在于它不會返回所有任務(wù)的綜合結(jié)果。相反,你必須手動從Futures獲取結(jié)果。幸運(yùn)的是,CompletableFuture.join()方法和Java 8 Streams API可以解決:

String combined = Stream.of(future1, future2, future3)
  .map(CompletableFuture::join)
  .collect(Collectors.joining(" "));
 
assertEquals("Hello Beautiful World", combined);

CompletableFuture 提供了 join() 方法,它的功能和 get() 方法是一樣的,都是阻塞獲取值,它們的區(qū)別在于 join() 拋出的是 unchecked Exception。這使得它可以在Stream.map()方法中用作方法引用。

異常處理

說到這里,我們順便來說下 CompletableFuture 的異常處理。這里我們要介紹兩個方法:

public CompletableFuture exceptionally(Function fn);
public  CompletionStage handle(BiFunction fn);

看下代碼

CompletableFuture.supplyAsync(() -> "resultA")
    .thenApply(resultA -> resultA + " resultB")
    .thenApply(resultB -> resultB + " resultC")
    .thenApply(resultC -> resultC + " resultD");

上面的代碼中,任務(wù) A、B、C、D 依次執(zhí)行,如果任務(wù) A 拋出異常(當(dāng)然上面的代碼不會拋出異常),那么后面的任務(wù)都得不到執(zhí)行。如果任務(wù) C 拋出異常,那么任務(wù) D 得不到執(zhí)行。

那么我們怎么處理異常呢?看下面的代碼,我們在任務(wù) A 中拋出異常,并對其進(jìn)行處理:

CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException();
})
        .exceptionally(ex -> "errorResultA")
        .thenApply(resultA -> resultA + " resultB")
        .thenApply(resultB -> resultB + " resultC")
        .thenApply(resultC -> resultC + " resultD");

System.out.println(future.join());

上面的代碼中,任務(wù) A 拋出異常,然后通過 .exceptionally() 方法處理了異常,并返回新的結(jié)果,這個新的結(jié)果將傳遞給任務(wù) B。所以最終的輸出結(jié)果是:

errorResultA resultB resultC resultD
String name = null;
 
// ...
 
CompletableFuture completableFuture  
  =  CompletableFuture.supplyAsync(() -> {
      if (name == null) {
          throw new RuntimeException("Computation error!");
      }
      return "Hello, " + name;
  })}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
 
assertEquals("Hello, Stranger!", completableFuture.get());

當(dāng)然,它們也可以都為 null,因為如果它作用的那個 CompletableFuture 實例沒有返回值的時候,s 就是 null。

Async后綴方法

CompletableFuture類中的API的大多數(shù)方法都有兩個帶有Async后綴的附加修飾。這些方法表示用于異步線程。

沒有Async后綴的方法使用調(diào)用線程運(yùn)行下一個執(zhí)行線程階段。不帶Async方法使用ForkJoinPool.commonPool()線程池的fork / join實現(xiàn)運(yùn)算任務(wù)。帶有Async方法使用傳遞式的Executor任務(wù)去運(yùn)行。

下面附帶一個案例,可以看到有thenApplyAsync方法。在程序內(nèi)部,線程被包裝到ForkJoinTask實例中。這樣可以進(jìn)一步并行化你的計算并更有效地使用系統(tǒng)資源。

CompletableFuture completableFuture  
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture future = completableFuture
  .thenApplyAsync(s -> s + " World");
 
assertEquals("Hello World", future.get());
JDK 9 CompletableFuture API

在Java 9中, CompletableFuture API通過以下更改得到了進(jìn)一步增強(qiáng):

新工廠方法增加了

支持延遲和超時

改進(jìn)了對子類化的支持。

引入了新的實例API:

Executor defaultExecutor()

CompletableFuture newIncompleteFuture()

CompletableFuture copy()

CompletionStage minimalCompletionStage()

CompletableFuture completeAsync(Supplier supplier, Executor executor)

CompletableFuture completeAsync(Supplier supplier)

CompletableFuture orTimeout(long timeout, TimeUnit unit)

CompletableFuture completeOnTimeout(T value, long timeout, TimeUnit unit)

還有一些靜態(tài)實用方法:

Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)

Executor delayedExecutor(long delay, TimeUnit unit)

CompletionStage completedStage(U value)

CompletionStage failedStage(Throwable ex)

CompletableFuture failedFuture(Throwable ex)

最后,為了解決超時問題,Java 9又引入了兩個新功能:

orTimeout()

completeOnTimeout()

結(jié)論

在本文中,我們描述了CompletableFuture類的方法和典型用例。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/74065.html

相關(guān)文章

  • java8CompletableFuture使用實例

    摘要:這個方法返回與等待所有返回等待多個返回取多個當(dāng)中最快的一個返回等待多個當(dāng)中最快的一個返回二詳解終極指南并發(fā)編程中的風(fēng)格 thenApply(等待并轉(zhuǎn)化future) @Test public void testThen() throws ExecutionException, InterruptedException { CompletableFutur...

    kycool 評論0 收藏0
  • 強(qiáng)大CompletableFuture

    摘要:首先想到的是開啟一個新的線程去做某項工作。再進(jìn)一步,為了讓新線程可以返回一個值,告訴主線程事情做完了,于是乎粉墨登場。然而提供的方式是主線程主動問詢新線程,要是有個回調(diào)函數(shù)就爽了。極大的提高效率。 showImg(https://segmentfault.com/img/bVbvgBJ?w=1920&h=1200); 引子 為了讓程序更加高效,讓CPU最大效率的工作,我們會采用異步編程...

    skinner 評論0 收藏0
  • Java8實戰(zhàn)》-第十一章筆記(CompletableFuture:組合式異步編程)

    摘要:組合式異步編程最近這些年,兩種趨勢不斷地推動我們反思我們設(shè)計軟件的方式。第章中介紹的分支合并框架以及并行流是實現(xiàn)并行處理的寶貴工具它們將一個操作切分為多個子操作,在多個不同的核甚至是機(jī)器上并行地執(zhí)行這些子操作。 CompletableFuture:組合式異步編程 最近這些年,兩種趨勢不斷地推動我們反思我們設(shè)計軟件的方式。第一種趨勢和應(yīng)用運(yùn)行的硬件平臺相關(guān),第二種趨勢與應(yīng)用程序的架構(gòu)相關(guān)...

    hlcfan 評論0 收藏0
  • CompletableFuture實現(xiàn)異步任務(wù)

    摘要:項目需求項目中需要優(yōu)化一個接口,這個接口需要拉取個第三方接口,需求延遲時間小于技術(shù)選型是提出的一個支持非阻塞的多功能的,同樣也是實現(xiàn)了接口,是添加的類,用來描述一個異步計算的結(jié)果。對進(jìn)一步完善,擴(kuò)展了諸多功能形成了。 項目需求: 項目中需要優(yōu)化一個接口,這個接口需要拉取23個第三方接口,需求延遲時間小于200ms; 技術(shù)選型: CompletableFuture是JDK8提出的一個支持...

    劉東 評論0 收藏0
  • Java 8原生API也可以開發(fā)響應(yīng)式代碼?

    摘要:中使用了提供的原生接口對自身的異步化做了改進(jìn)??梢灾С趾蛢煞N調(diào)用方式。實戰(zhàn)通過下面的例子,可以看出的最大好處特性。 showImg(https://segmentfault.com/img/remote/1460000020032427?w=1240&h=655); 前段時間工作上比較忙,這篇文章一直沒來得及寫,本文是閱讀《Java8實戰(zhàn)》的時候,了解到Java 8里已經(jīng)提供了一個異步...

    HtmlCssJs 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<