成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

貓頭鷹的深夜翻譯:使用JAVA CompletableFuture的20例子

AZmake / 3761人閱讀

摘要:這個(gè)例子想要說明兩個(gè)事情中以為結(jié)尾的方法將會(huì)異步執(zhí)行默認(rèn)情況下即指沒有傳入的情況下,異步執(zhí)行會(huì)使用實(shí)現(xiàn),該線程池使用一個(gè)后臺(tái)線程來執(zhí)行任務(wù)。這個(gè)例子展示了如何使用一個(gè)固定大小的線程池來實(shí)現(xiàn)大寫操作。

前言

這篇博客回顧JAVA8的CompletionStageAPI以及其在JAVA庫中的標(biāo)準(zhǔn)實(shí)現(xiàn)CompletableFuture。將會(huì)通過幾個(gè)例子來展示API的各種行為。

因?yàn)?b>CompletableFuture是CompletionInterface接口的實(shí)現(xiàn),所以我們首先要了解該接口的契約。它代表某個(gè)同步或異步計(jì)算的一個(gè)階段。你可以把它理解為是一個(gè)為了產(chǎn)生有價(jià)值最終結(jié)果的計(jì)算的流水線上的一個(gè)單元。這意味著多個(gè)ComletionStage指令可以鏈接起來從而一個(gè)階段的完成可以觸發(fā)下一個(gè)階段的執(zhí)行。

除了實(shí)現(xiàn)了CompletionStage接口,Completion還繼承了Future,這個(gè)接口用于實(shí)現(xiàn)一個(gè)未開始的異步事件。因?yàn)槟軌蝻@式的完成Future,所以取名為CompletableFuture

1.新建一個(gè)完成的CompletableFuture

這個(gè)簡單的示例中創(chuàng)建了一個(gè)已經(jīng)完成的預(yù)先設(shè)置好結(jié)果的CompletableFuture。通常作為計(jì)算的起點(diǎn)階段。

static void completedFutureExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message");
    assertTrue(cf.isDone());
    assertEquals("message", cf.getNow(null));
}

getNow方法會(huì)返回完成后的結(jié)果(這里就是message),如果還未完成,則返回傳入的默認(rèn)值null

2.運(yùn)行一個(gè)簡單的異步stage

下面的例子解釋了如何創(chuàng)建一個(gè)異步運(yùn)行Runnable的stage。

static void runAsyncExample() {
    CompletableFuture cf = CompletableFuture.runAsync(() -> {
        assertTrue(Thread.currentThread().isDaemon());
        randomSleep();
    });
    assertFalse(cf.isDone());
    sleepEnough();
    assertTrue(cf.isDone());
}

這個(gè)例子想要說明兩個(gè)事情:

CompletableFuture中以Async為結(jié)尾的方法將會(huì)異步執(zhí)行

默認(rèn)情況下(即指沒有傳入Executor的情況下),異步執(zhí)行會(huì)使用ForkJoinPool實(shí)現(xiàn),該線程池使用一個(gè)后臺(tái)線程來執(zhí)行Runnable任務(wù)。注意這只是特定于CompletableFuture實(shí)現(xiàn),其它的CompletableStage實(shí)現(xiàn)可以重寫該默認(rèn)行為。

3.將方法作用于前一個(gè)Stage

下面的例子引用了第一個(gè)例子中已經(jīng)完成的CompletableFuture,它將引用生成的字符串結(jié)果并將該字符串大寫。

static void thenApplyExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApply(s -> {
        assertFalse(Thread.currentThread().isDaemon());
        return s.toUpperCase();
    });
    assertEquals("MESSAGE", cf.getNow(null));
}

這里的關(guān)鍵詞是thenApply

then是指在當(dāng)前階段正常執(zhí)行完成后(正常執(zhí)行是指沒有拋出異常)進(jìn)行的操作。在本例中,當(dāng)前階段已經(jīng)完成并得到值message

Apply是指將一個(gè)Function作用于之前階段得出的結(jié)果

Function是阻塞的,這意味著只有當(dāng)大寫操作執(zhí)行完成之后才會(huì)執(zhí)行getNow()方法。

4.異步的的將方法作用于前一個(gè)Stage

通過在方法后面添加Async后綴,該CompletableFuture鏈將會(huì)異步執(zhí)行(使用ForkJoinPool.commonPool())

static void thenApplyAsyncExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
        assertTrue(Thread.currentThread().isDaemon());
        randomSleep();
        return s.toUpperCase();
    });
    assertNull(cf.getNow(null));
    assertEquals("MESSAGE", cf.join());
}
使用一個(gè)自定義的Executor來異步執(zhí)行該方法

異步方法的一個(gè)好處是可以提供一個(gè)Executor來執(zhí)行CompletableStage。這個(gè)例子展示了如何使用一個(gè)固定大小的線程池來實(shí)現(xiàn)大寫操作。

static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {
    int count = 1;
    @Override
    public Thread newThread(Runnable runnable) {
        return new Thread(runnable, "custom-executor-" + count++);
    }
});
static void thenApplyAsyncWithExecutorExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
        assertTrue(Thread.currentThread().getName().startsWith("custom-executor-"));
        assertFalse(Thread.currentThread().isDaemon());
        randomSleep();
        return s.toUpperCase();
    }, executor);
    assertNull(cf.getNow(null));
    assertEquals("MESSAGE", cf.join());
}
6.消費(fèi)(Consume)前一個(gè)Stage的結(jié)果

如果下一個(gè)Stage接收了當(dāng)前Stage的結(jié)果但是在計(jì)算中無需返回值(比如其返回值為void),那么它將使用方法thenAccept并傳入一個(gè)Consumer接口。

static void thenAcceptExample() {
    StringBuilder result = new StringBuilder();
    CompletableFuture.completedFuture("thenAccept message")
            .thenAccept(s -> result.append(s));
    assertTrue("Result was empty", result.length() > 0);
}

Consumer將會(huì)同步執(zhí)行,所以我們無需在返回的CompletableFuture上執(zhí)行join操作。

7.異步執(zhí)行Comsume

同樣,使用Asyn后綴實(shí)現(xiàn):

static void thenAcceptAsyncExample() {
    StringBuilder result = new StringBuilder();
    CompletableFuture cf = CompletableFuture.completedFuture("thenAcceptAsync message")
            .thenAcceptAsync(s -> result.append(s));
    cf.join();
    assertTrue("Result was empty", result.length() > 0);
}
8.計(jì)算出現(xiàn)異常時(shí)

我們現(xiàn)在來模擬一個(gè)出現(xiàn)異常的場(chǎng)景。為了簡潔性,我們還是將一個(gè)字符串大寫,但是我們會(huì)模擬延時(shí)進(jìn)行該操作。我們會(huì)使用thenApplyAsyn(Function, Executor),第一個(gè)參數(shù)是大寫轉(zhuǎn)化方法,第二個(gè)參數(shù)是一個(gè)延時(shí)executor,它會(huì)延時(shí)一秒鐘再將操作提交給ForkJoinPool。

static void completeExceptionallyExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
            CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
    CompletableFuture exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; });
    cf.completeExceptionally(new RuntimeException("completed exceptionally"));
    assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
    try {
        cf.join();
        fail("Should have thrown an exception");
    } catch(CompletionException ex) { // just for testing
        assertEquals("completed exceptionally", ex.getCause().getMessage());
    }
    assertEquals("message upon cancel", exceptionHandler.join());
}

首先,我們新建了一個(gè)已經(jīng)完成并帶有返回值messageCompletableFuture對(duì)象。然后我們調(diào)用thenApplyAsync方法,該方法會(huì)返回一個(gè)新的CompletableFuture。這個(gè)方法用異步的方式執(zhí)行大寫操作。這里還展示了如何使用delayedExecutor(timeout, timeUnit)方法來延時(shí)異步操作。

然后我們創(chuàng)建了一個(gè)handler stage,exceptionHandler,這個(gè)階段會(huì)處理一切異常并返回另一個(gè)消息message upon cancel。

最后,我們顯式的完成第二個(gè)階段并拋出異常,它會(huì)導(dǎo)致進(jìn)行大寫操作的階段拋出CompletionException。它還會(huì)觸發(fā)handler階段。

API補(bǔ)充:
CompletableFuture handle(BiFunction fn)
返回一個(gè)新的CompletionStage,無論之前的Stage是否正常運(yùn)行完畢。傳入的參數(shù)包括上一個(gè)階段的結(jié)果和拋出異常。
9.取消計(jì)算

和計(jì)算時(shí)異常處理很相似,我們可以通過Future接口中的cancel(boolean mayInterruptIfRunning)來取消計(jì)算。

static void cancelExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
            CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
    CompletableFuture cf2 = cf.exceptionally(throwable -> "canceled message");
    assertTrue("Was not canceled", cf.cancel(true));
    assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
    assertEquals("canceled message", cf2.join());
}
API補(bǔ)充
public CompletableFuture exceptionally(Function fn)
返回一個(gè)新的CompletableFuture,如果出現(xiàn)異常,則為該方法中執(zhí)行的結(jié)果,否則就是正常執(zhí)行的結(jié)果。
10.將Function作用于兩個(gè)已完成Stage的結(jié)果之一

下面的例子創(chuàng)建了一個(gè)CompletableFuture對(duì)象并將Function作用于已完成的兩個(gè)Stage中的任意一個(gè)(沒有保證哪一個(gè)將會(huì)傳遞給Function)。這兩個(gè)階段分別如下:一個(gè)將字符串大寫,另一個(gè)小寫。

static void applyToEitherExample() {
    String original = "Message";
    CompletableFuture cf1 = CompletableFuture.completedFuture(original)
            .thenApplyAsync(s -> delayedUpperCase(s));
    CompletableFuture cf2 = cf1.applyToEither(
            CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
            s -> s + " from applyToEither");
    assertTrue(cf2.join().endsWith(" from applyToEither"));
}
public  CompletableFuture applyToEitherAsync(CompletionStage other,Function fn)
返回一個(gè)全新的CompletableFuture,包含著this或是other操作完成之后,在二者中的任意一個(gè)執(zhí)行fn
11.消費(fèi)兩個(gè)階段的任意一個(gè)結(jié)果

和前一個(gè)例子類似,將Function替換為Consumer

static void acceptEitherExample() {
    String original = "Message";
    StringBuilder result = new StringBuilder();
    CompletableFuture cf = CompletableFuture.completedFuture(original)
            .thenApplyAsync(s -> delayedUpperCase(s))
            .acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
                    s -> result.append(s).append("acceptEither"));
    cf.join();
    assertTrue("Result was empty", result.toString().endsWith("acceptEither"));
}
12.在兩個(gè)階段都完成后運(yùn)行Runnable

注意這里的兩個(gè)Stage都是同步運(yùn)行的,第一個(gè)stage將字符串轉(zhuǎn)化為大寫之后,第二個(gè)stage將其轉(zhuǎn)化為小寫。

static void runAfterBothExample() {
    String original = "Message";
    StringBuilder result = new StringBuilder();
    CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth(
            CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
            () -> result.append("done"));
    assertTrue("Result was empty", result.length() > 0);
}
13.用Biconsumer接收兩個(gè)stage的結(jié)果

Biconsumer支持同時(shí)對(duì)兩個(gè)Stage的結(jié)果進(jìn)行操作。

static void thenAcceptBothExample() {
    String original = "Message";
    StringBuilder result = new StringBuilder();
    CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth(
            CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
            (s1, s2) -> result.append(s1 + s2));
    assertEquals("MESSAGEmessage", result.toString());
}
14.將Bifunction同時(shí)作用于兩個(gè)階段的結(jié)果

如果CompletableFuture想要合并兩個(gè)階段的結(jié)果并且返回值,我們可以使用方法thenCombine。這里的計(jì)算流都是同步的,所以最后的getNow()方法會(huì)獲得最終結(jié)果,即大寫操作和小寫操作的結(jié)果的拼接。

static void thenCombineExample() {
    String original = "Message";
    CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
            .thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)),
                    (s1, s2) -> s1 + s2);
    assertEquals("MESSAGEmessage", cf.getNow(null));
}
15.異步將Bifunction同時(shí)作用于兩個(gè)階段的結(jié)果

和之前的例子類似,只是這里用了不同的方法:即兩個(gè)階段的操作都是異步的。那么thenCombine也會(huì)異步執(zhí)行,及時(shí)它沒有Async后綴。

static void thenCombineAsyncExample() {
    String original = "Message";
    CompletableFuture cf = CompletableFuture.completedFuture(original)
            .thenApplyAsync(s -> delayedUpperCase(s))
            .thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
                    (s1, s2) -> s1 + s2);
    assertEquals("MESSAGEmessage", cf.join());
}
16.Compose CompletableFuture

我們可以使用thenCompose來完成前兩個(gè)例子中的操作。

static void thenComposeExample() {
    String original = "Message";
    CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
            .thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s))
                    .thenApply(s -> upper + s));
    assertEquals("MESSAGEmessage", cf.join());
}
17.當(dāng)多個(gè)階段中有有何一個(gè)完成,即新建一個(gè)完成階段
static void anyOfExample() {
    StringBuilder result = new StringBuilder();
    List messages = Arrays.asList("a", "b", "c");
    List futures = messages.stream()
            .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
            .collect(Collectors.toList());
    CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> {
        if(th == null) {
            assertTrue(isUpperCase((String) res));
            result.append(res);
        }
    });
    assertTrue("Result was empty", result.length() > 0);
}
18.當(dāng)所有的階段完成,新建一個(gè)完成階段
static void allOfExample() {
    StringBuilder result = new StringBuilder();
    List messages = Arrays.asList("a", "b", "c");
    List futures = messages.stream()
            .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
            .collect(Collectors.toList());
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
        .whenComplete((v, th) -> {
            futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
            result.append("done");
        });
    assertTrue("Result was empty", result.length() > 0);
}
19.當(dāng)所有階段完成以后,新建一個(gè)異步完成階段
static void allOfAsyncExample() {
    StringBuilder result = new StringBuilder();
    List messages = Arrays.asList("a", "b", "c");
    List futures = messages.stream()
            .map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s)))
            .collect(Collectors.toList());
    CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
            .whenComplete((v, th) -> {
                futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
                result.append("done");
            });
    allOf.join();
    assertTrue("Result was empty", result.length() > 0);
}
20.真實(shí)場(chǎng)景

下面展示了一個(gè)實(shí)踐CompletableFuture的場(chǎng)景:

先通過調(diào)用cars()方法異步獲得Car列表。它將會(huì)返回一個(gè)CompletionStage>。cars()方法應(yīng)當(dāng)使用一個(gè)遠(yuǎn)程的REST端點(diǎn)來實(shí)現(xiàn)。

我們將該Stage和另一個(gè)Stage組合,另一個(gè)Stage會(huì)通過調(diào)用rating(manufactureId)來異步獲取每輛車的評(píng)分。

當(dāng)所有的Car對(duì)象都填入評(píng)分后,我們調(diào)用allOf()來進(jìn)入最終Stage,它將在這兩個(gè)階段完成后執(zhí)行

在最終Stage上使用whenComplete(),打印出車輛的評(píng)分。

cars().thenCompose(cars -> {
    List updatedCars = cars.stream()
            .map(car -> rating(car.manufacturerId).thenApply(r -> {
                car.setRating(r);
                return car;
            })).collect(Collectors.toList());
    CompletableFuture done = CompletableFuture
            .allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()]));
    return done.thenApply(v -> updatedCars.stream().map(CompletionStage::toCompletableFuture)
            .map(CompletableFuture::join).collect(Collectors.toList()));
}).whenComplete((cars, th) -> {
    if (th == null) {
        cars.forEach(System.out::println);
    } else {
        throw new RuntimeException(th);
    }
}).toCompletableFuture().join();
參考資料
Java CompletableFuture 詳解
Guide To CompletableFuture


想要了解更多開發(fā)技術(shù),面試教程以及互聯(lián)網(wǎng)公司內(nèi)推,歡迎關(guān)注我的微信公眾號(hào)!將會(huì)不定期的發(fā)放福利哦~

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/68632.html

相關(guān)文章

  • 頭鷹深夜翻譯Java 10JEP 286-局部變量類型推斷

    摘要:在此基礎(chǔ)上又向前邁進(jìn)了一步局部變量類型推斷允許開發(fā)人員跳過局部變量的類型聲明局部變量是指在方法定義,初始化塊,循環(huán)和其它的如代碼塊,會(huì)推斷該局部變量的類型。 前言 之前面試的時(shí)候問了我是否了解JDK10的變化,一時(shí)回答不出來,所以只回答了JDK8中的函數(shù)式編程和流編程。今天看到這篇講JAVA10的文章,順便了解一下。 正文 JAVA10的所有新特性請(qǐng)參考這里。在所有的JEP中,JEP-...

    chavesgu 評(píng)論0 收藏0
  • 頭鷹深夜翻譯:Volatile原子性, 可見性和有序性

    摘要:有可能一個(gè)線程中的動(dòng)作相對(duì)于另一個(gè)線程出現(xiàn)亂序。當(dāng)實(shí)際輸出取決于線程交錯(cuò)的結(jié)果時(shí),這種情況被稱為競爭條件。這里的問題在于代碼塊不是原子性的,而且實(shí)例的變化對(duì)別的線程不可見。這種不能同時(shí)在多個(gè)線程上執(zhí)行的部分被稱為關(guān)鍵部分。 為什么要額外寫一篇文章來研究volatile呢?是因?yàn)檫@可能是并發(fā)中最令人困惑以及最被誤解的結(jié)構(gòu)。我看過不少解釋volatile的博客,但是大多數(shù)要么不完整,要么難...

    Lionad-Morotar 評(píng)論0 收藏0
  • 頭鷹深夜翻譯:核心JAVA并發(fā)(二)

    摘要:前言上一篇文章請(qǐng)參考貓頭鷹的深夜翻譯核心并發(fā)一安全發(fā)布發(fā)布一個(gè)對(duì)象是指該對(duì)象的引用對(duì)當(dāng)前的域之外也可見比如,從方法中獲取一個(gè)引用。任務(wù)的功能性接口表示一個(gè)沒有返回值的任務(wù)表示一個(gè)包含返回值的計(jì)算。 前言 上一篇文章請(qǐng)參考貓頭鷹的深夜翻譯:核心JAVA并發(fā)(一) 安全發(fā)布 發(fā)布一個(gè)對(duì)象是指該對(duì)象的引用對(duì)當(dāng)前的域之外也可見(比如,從getter方法中獲取一個(gè)引用)。要確保一個(gè)對(duì)象被安全的發(fā)...

    Pink 評(píng)論0 收藏0
  • 頭鷹深夜翻譯Java WeakHashMap

    摘要:本文簡介類概覽類構(gòu)造器總結(jié)類構(gòu)造方法類使用舉例類概覽是一個(gè)實(shí)現(xiàn)了接口,并且鍵為型的哈希表。中的條目不再被正常使用時(shí),會(huì)被自動(dòng)刪除。它的鍵值均支持。和絕大多數(shù)的集合類一樣,這個(gè)類不是同步的。 本文簡介 WeakHashMap類概覽 WeakHashMap類構(gòu)造器總結(jié) WeakHashMap類構(gòu)造方法 WeakHasjMap類使用舉例 1. WeakHashMap類概覽 Wea...

    BothEyes1993 評(píng)論0 收藏0
  • 頭鷹深夜翻譯:JDK Vs. JRE Vs. JVM之間區(qū)別

    摘要:什么是為執(zhí)行字節(jié)碼提供一個(gè)運(yùn)行環(huán)境。它的實(shí)現(xiàn)主要包含三個(gè)部分,描述實(shí)現(xiàn)規(guī)格的文檔,具體實(shí)現(xiàn)和滿足要求的計(jì)算機(jī)程序以及實(shí)例具體執(zhí)行字節(jié)碼。該類先被轉(zhuǎn)化為一組字節(jié)碼并放入文件中。字節(jié)碼校驗(yàn)器通過字節(jié)碼校驗(yàn)器檢查格式并找出非法代碼。 什么是Java Development Kit (JDK)? JDK通常用來開發(fā)Java應(yīng)用和插件。基本上可以認(rèn)為是一個(gè)軟件開發(fā)環(huán)境。JDK包含Java Run...

    blair 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<