摘要:在這種方式中,主線程不會被阻塞,不需要一直等到子線程完成。主線程可以并行的執(zhí)行其他任務(wù)。如果我們不想等待結(jié)果返回,我們可以把需要等待完成執(zhí)行的邏輯寫入到回調(diào)函數(shù)中。任何立即執(zhí)行完成那就是執(zhí)行在主線程中嘗試刪除測試下。可以使用達成目的。
Java 8 有大量的新特性和增強如 Lambda 表達式,Streams,CompletableFuture等。在本篇文章中我將詳細解釋清楚CompletableFuture以及它所有方法的使用。
什么是CompletableFuture?在Java中CompletableFuture用于異步編程,異步編程是編寫非阻塞的代碼,運行的任務(wù)在一個多帶帶的線程,與主線程隔離,并且會通知主線程它的進度,成功或者失敗。
在這種方式中,主線程不會被阻塞,不需要一直等到子線程完成。主線程可以并行的執(zhí)行其他任務(wù)。
使用這種并行方式,可以極大的提高程序的性能。
Future vs CompletableFutureCompletableFuture 是 Future API的擴展。
Future 被用于作為一個異步計算結(jié)果的引用。提供一個 isDone() 方法來檢查計算任務(wù)是否完成。當任務(wù)完成時,get() 方法用來接收計算任務(wù)的結(jié)果。
從 Callbale和 Future 教程可以學習更多關(guān)于 Future 知識.
Future API 是非常好的 Java 異步編程進階,但是它缺乏一些非常重要和有用的特性。
Future 的局限性不能手動完成
當你寫了一個函數(shù),用于通過一個遠程API獲取一個電子商務(wù)產(chǎn)品最新價格。因為這個 API 太耗時,你把它允許在一個獨立的線程中,并且從你的函數(shù)中返回一個 Future?,F(xiàn)在假設(shè)這個API服務(wù)宕機了,這時你想通過該產(chǎn)品的最新緩存價格手工完成這個Future 。你會發(fā)現(xiàn)無法這樣做。
Future 的結(jié)果在非阻塞的情況下,不能執(zhí)行更進一步的操作
Future 不會通知你它已經(jīng)完成了,它提供了一個阻塞的 get() 方法通知你結(jié)果。你無法給 Future 植入一個回調(diào)函數(shù),當 Future 結(jié)果可用的時候,用該回調(diào)函數(shù)自動的調(diào)用 Future 的結(jié)果。
多個 Future 不能串聯(lián)在一起組成鏈式調(diào)用
有時候你需要執(zhí)行一個長時間運行的計算任務(wù),并且當計算任務(wù)完成的時候,你需要把它的計算結(jié)果發(fā)送給另外一個長時間運行的計算任務(wù)等等。你會發(fā)現(xiàn)你無法使用 Future 創(chuàng)建這樣的一個工作流。
不能組合多個 Future 的結(jié)果
假設(shè)你有10個不同的Future,你想并行的運行,然后在它們運行未完成后運行一些函數(shù)。你會發(fā)現(xiàn)你也無法使用 Future 這樣做。
沒有異常處理
Future API 沒有任務(wù)的異常處理結(jié)構(gòu)居然有如此多的限制,幸好我們有CompletableFuture,你可以使用 CompletableFuture 達到以上所有目的。
CompletableFuture 實現(xiàn)了 Future 和 CompletionStage接口,并且提供了許多關(guān)于創(chuàng)建,鏈式調(diào)用和組合多個 Future 的便利方法集,而且有廣泛的異常處理支持。
創(chuàng)建 CompletableFuture1. 簡單的例子
可以使用如下無參構(gòu)造函數(shù)簡單的創(chuàng)建 CompletableFuture:
CompletableFuturecompletableFuture = new CompletableFuture ();
這是一個最簡單的 CompletableFuture,想獲取CompletableFuture 的結(jié)果可以使用 CompletableFuture.get() 方法:
String result = completableFuture.get()
get() 方法會一直阻塞直到 Future 完成。因此,以上的調(diào)用將被永遠阻塞,因為該Future一直不會完成。
你可以使用 CompletableFuture.complete() 手工的完成一個 Future:
completableFuture.complete("Future"s Result")
所有等待這個 Future 的客戶端都將得到一個指定的結(jié)果,并且 completableFuture.complete() 之后的調(diào)用將被忽略。
2. 使用 runAsync() 運行異步計算
如果你想異步的運行一個后臺任務(wù)并且不想改任務(wù)返回任務(wù)東西,這時候可以使用 CompletableFuture.runAsync()方法,它持有一個Runnable 對象,并返回 CompletableFuture
// Run a task specified by a Runnable Object asynchronously. CompletableFuturefuture = CompletableFuture.runAsync(new Runnable() { @Override public void run() { // Simulate a long-running Job try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } System.out.println("I"ll run in a separate thread than the main thread."); } }); // Block and wait for the future to complete future.get()
你也可以以 lambda 表達式的形式傳入 Runnable 對象:
// Using Lambda Expression CompletableFuturefuture = CompletableFuture.runAsync(() -> { // Simulate a long-running Job try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } System.out.println("I"ll run in a separate thread than the main thread."); });
在本文中,我使用lambda表達式會比較頻繁,如果以前你沒有使用過,建議你也多使用lambda 表達式。
3. 使用 supplyAsync() 運行一個異步任務(wù)并且返回結(jié)果
當任務(wù)不需要返回任何東西的時候, CompletableFuture.runAsync() 非常有用。但是如果你的后臺任務(wù)需要返回一些結(jié)果應(yīng)該要怎么樣?
CompletableFuture.supplyAsync() 就是你的選擇。它持有supplier
// Run a task specified by a Supplier object asynchronously CompletableFuturefuture = CompletableFuture.supplyAsync(new Supplier () { @Override public String get() { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of the asynchronous computation"; } }); // Block and get the result of the Future String result = future.get(); System.out.println(result);
Supplier
你可以使用lambda表達式使得上面的示例更加簡明:
// Using Lambda Expression CompletableFuturefuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of the asynchronous computation"; });
一個關(guān)于Executor 和Thread Pool筆記
你可能想知道,我們知道runAsync() 和supplyAsync()方法在多帶帶的線程中執(zhí)行他們的任務(wù)。但是我們不會永遠只創(chuàng)建一個線程。
CompletableFuture可以從全局的 ForkJoinPool.commonPool()獲得一個線程中執(zhí)行這些任務(wù)。
但是你也可以創(chuàng)建一個線程池并傳給runAsync() 和supplyAsync()方法來讓他們從線程池中獲取一個線程執(zhí)行它們的任務(wù)。
CompletableFuture API 的所有方法都有兩個變體-一個接受Executor作為參數(shù),另一個不這樣:
// Variations of runAsync() and supplyAsync() methods static CompletableFuturerunAsync(Runnable runnable) static CompletableFuture runAsync(Runnable runnable, Executor executor) static CompletableFuture supplyAsync(Supplier supplier) static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
創(chuàng)建一個線程池,并傳遞給其中一個方法:
Executor executor = Executors.newFixedThreadPool(10); CompletableFuture在 CompletableFuture 轉(zhuǎn)換和運行future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of the asynchronous computation"; }, executor);
CompletableFuture.get()方法是阻塞的。它會一直等到Future完成并且在完成后返回結(jié)果。
但是,這是我們想要的嗎?對于構(gòu)建異步系統(tǒng),我們應(yīng)該附上一個回調(diào)給CompletableFuture,當Future完成的時候,自動的獲取結(jié)果。
如果我們不想等待結(jié)果返回,我們可以把需要等待Future完成執(zhí)行的邏輯寫入到回調(diào)函數(shù)中。
可以使用 thenApply(), thenAccept() 和thenRun()方法附上一個回調(diào)給CompletableFuture。
1. thenApply()
可以使用 thenApply() 處理和改變CompletableFuture的結(jié)果。持有一個Function
// Create a CompletableFuture CompletableFuturewhatsYourNameFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Rajeev"; }); // Attach a callback to the Future using thenApply() CompletableFuture greetingFuture = whatsYourNameFuture.thenApply(name -> { return "Hello " + name; }); // Block and get the result of the future. System.out.println(greetingFuture.get()); // Hello Rajeev
你也可以通過附加一系列的thenApply()在回調(diào)方法 在CompletableFuture寫一個連續(xù)的轉(zhuǎn)換。這樣的話,結(jié)果中的一個 thenApply方法就會傳遞給該系列的另外一個 thenApply方法。
CompletableFuturewelcomeText = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Rajeev"; }).thenApply(name -> { return "Hello " + name; }).thenApply(greeting -> { return greeting + ", Welcome to the CalliCoder Blog"; }); System.out.println(welcomeText.get()); // Prints - Hello Rajeev, Welcome to the CalliCoder Blog
2. thenAccept() 和 thenRun()
如果你不想從你的回調(diào)函數(shù)中返回任何東西,僅僅想在Future完成后運行一些代碼片段,你可以使用thenAccept() 和 thenRun()方法,這些方法經(jīng)常在調(diào)用鏈的最末端的最后一個回調(diào)函數(shù)中使用。
CompletableFuture.thenAccept() 持有一個Consumer
// thenAccept() example CompletableFuture.supplyAsync(() -> { return ProductService.getProductDetail(productId); }).thenAccept(product -> { System.out.println("Got product detail from remote service " + product.getName()) });
雖然thenAccept()可以訪問CompletableFuture的結(jié)果,但thenRun()不能訪Future的結(jié)果,它持有一個Runnable返回CompletableFuture
// thenRun() example CompletableFuture.supplyAsync(() -> { // Run some computation }).thenRun(() -> { // Computation Finished. });
異步回調(diào)方法的筆記
CompletableFuture提供的所有回調(diào)方法都有兩個變體:
`// thenApply() variants
CompletableFuture thenApply(Function super T,? extends U> fn)
CompletableFuture thenApplyAsync(Function super T,? extends U> fn)
CompletableFuture thenApplyAsync(Function super T,? extends U> fn, Executor executor)`
這些異步回調(diào)變體通過在獨立的線程中執(zhí)行回調(diào)任務(wù)幫助你進一步執(zhí)行并行計算。
以下示例:
CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Some Result" }).thenApply(result -> { /* Executed in the same thread where the supplyAsync() task is executed or in the main thread If the supplyAsync() task completes immediately (Remove sleep() call to verify) */ return "Processed Result" })
在以上示例中,在thenApply()中的任務(wù)和在supplyAsync()中的任務(wù)執(zhí)行在相同的線程中。任何supplyAsync()立即執(zhí)行完成,那就是執(zhí)行在主線程中(嘗試刪除sleep測試下)。
為了控制執(zhí)行回調(diào)任務(wù)的線程,你可以使用異步回調(diào)。如果你使用thenApplyAsync()回調(diào),將從ForkJoinPool.commonPool()獲取不同的線程執(zhí)行。
CompletableFuture.supplyAsync(() -> { return "Some Result" }).thenApplyAsync(result -> { // Executed in a different thread from ForkJoinPool.commonPool() return "Processed Result" })
此外,如果你傳入一個Executor到thenApplyAsync()回調(diào)中,,任務(wù)將從Executor線程池獲取一個線程執(zhí)行。
Executor executor = Executors.newFixedThreadPool(2); CompletableFuture.supplyAsync(() -> { return "Some result" }).thenApplyAsync(result -> { // Executed in a thread obtained from the executor return "Processed Result" }, executor);組合兩個CompletableFuture
1. 使用 thenCompose() 組合兩個獨立的future
假設(shè)你想從一個遠程API中獲取一個用戶的詳細信息,一旦用戶信息可用,你想從另外一個服務(wù)中獲取他的貸方。
考慮下以下兩個方法getUserDetail() 和getCreditRating()的實現(xiàn):
CompletableFuturegetUsersDetail(String userId) { return CompletableFuture.supplyAsync(() -> { UserService.getUserDetails(userId); }); } CompletableFuture getCreditRating(User user) { return CompletableFuture.supplyAsync(() -> { CreditRatingService.getCreditRating(user); }); }
現(xiàn)在讓我們弄明白當使用了thenApply()后是否會達到我們期望的結(jié)果-
CompletableFuture> result = getUserDetail(userId) .thenApply(user -> getCreditRating(user));
在更早的示例中,Supplier函數(shù)傳入thenApply將返回一個簡單的值,但是在本例中,將返回一個CompletableFuture。以上示例的最終結(jié)果是一個嵌套的CompletableFuture。
如果你想獲取最終的結(jié)果給最頂層future,使用 thenCompose()方法代替-
CompletableFutureresult = getUserDetail(userId) .thenCompose(user -> getCreditRating(user));
因此,規(guī)則就是-如果你的回調(diào)函數(shù)返回一個CompletableFuture,但是你想從CompletableFuture鏈中獲取一個直接合并后的結(jié)果,這時候你可以使用thenCompose()。
2. 使用thenCombine()組合兩個獨立的 future
雖然thenCompose()被用于當一個future依賴另外一個future的時候用來組合兩個future。thenCombine()被用來當兩個獨立的Future都完成的時候,用來做一些事情。
System.out.println("Retrieving weight."); CompletableFutureweightInKgFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return 65.0; }); System.out.println("Retrieving height."); CompletableFuture heightInCmFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return 177.8; }); System.out.println("Calculating BMI."); CompletableFuture combinedFuture = weightInKgFuture .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> { Double heightInMeter = heightInCm/100; return weightInKg/(heightInMeter*heightInMeter); }); System.out.println("Your BMI is - " + combinedFuture.get());
當兩個Future都完成的時候,傳給``thenCombine()的回調(diào)函數(shù)將被調(diào)用。
組合多個CompletableFuture我們使用thenCompose() 和 thenCombine()把兩個CompletableFuture組合在一起。現(xiàn)在如果你想組合任意數(shù)量的CompletableFuture,應(yīng)該怎么做?我們可以使用以下兩個方法組合任意數(shù)量的CompletableFuture。
static CompletableFutureallOf(CompletableFuture>... cfs) static CompletableFuture
1. CompletableFuture.allOf()
CompletableFuture.allOf的使用場景是當你一個列表的獨立future,并且你想在它們都完成后并行的做一些事情。
假設(shè)你想下載一個網(wǎng)站的100個不同的頁面。你可以串行的做這個操作,但是這非常消耗時間。因此你想寫一個函數(shù),傳入一個頁面鏈接,返回一個CompletableFuture,異步的下載頁面內(nèi)容。
CompletableFuturedownloadWebPage(String pageLink) { return CompletableFuture.supplyAsync(() -> { // Code to download and return the web page"s content }); }
現(xiàn)在,當所有的頁面已經(jīng)下載完畢,你想計算包含關(guān)鍵字CompletableFuture頁面的數(shù)量??梢允褂?b>CompletableFuture.allOf()達成目的。
ListwebPageLinks = Arrays.asList(...) // A list of 100 web page links // Download contents of all the web pages asynchronously List > pageContentFutures = webPageLinks.stream() .map(webPageLink -> downloadWebPage(webPageLink)) .collect(Collectors.toList()); // Create a combined Future using allOf() CompletableFuture allFutures = CompletableFuture.allOf( pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()]) );
使用CompletableFuture.allOf()的問題是它返回CompletableFuture
// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list - CompletableFuture> allPageContentsFuture = allFutures.thenApply(v -> { return pageContentFutures.stream() .map(pageContentFuture -> pageContentFuture.join()) .collect(Collectors.toList()); });
花一些時間理解下以上代碼片段。當所有future完成的時候,我們調(diào)用了future.join(),因此我們不會在任何地方阻塞。
join()方法和get()方法非常類似,這唯一不同的地方是如果最頂層的CompletableFuture完成的時候發(fā)生了異常,它會拋出一個未經(jīng)檢查的異常。
現(xiàn)在讓我們計算包含關(guān)鍵字頁面的數(shù)量。
// Count the number of web pages having the "CompletableFuture" keyword. CompletableFuturecountFuture = allPageContentsFuture.thenApply(pageContents -> { return pageContents.stream() .filter(pageContent -> pageContent.contains("CompletableFuture")) .count(); }); System.out.println("Number of Web Pages having CompletableFuture keyword - " + countFuture.get());
2. CompletableFuture.anyOf()
CompletableFuture.anyOf()和其名字介紹的一樣,當任何一個CompletableFuture完成的時候【相同的結(jié)果類型】,返回一個新的CompletableFuture。以下示例:
CompletableFuturefuture1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of Future 1"; }); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of Future 2"; }); CompletableFuture future3 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of Future 3"; }); CompletableFuture
在以上示例中,當三個中的任何一個CompletableFuture完成, anyOfFuture就會完成。因為future2的休眠時間最少,因此她最先完成,最終的結(jié)果將是future2的結(jié)果。
CompletableFuture.anyOf()傳入一個Future可變參數(shù),返回CompletableFuture
CompletableFuture 異常處理我們探尋了怎樣創(chuàng)建CompletableFuture,轉(zhuǎn)換它們,并組合多個CompletableFuture?,F(xiàn)在讓我們弄明白當發(fā)生錯誤的時候我們應(yīng)該怎么做。
首先讓我們明白在一個回調(diào)鏈中錯誤是怎么傳遞的。思考下以下回調(diào)鏈:
CompletableFuture.supplyAsync(() -> { // Code which might throw an exception return "Some result"; }).thenApply(result -> { return "processed result"; }).thenApply(result -> { return "result after further processing"; }).thenAccept(result -> { // do something with the final result });
如果在原始的supplyAsync()任務(wù)中發(fā)生一個錯誤,這時候沒有任何thenApply會被調(diào)用并且future將以一個異常結(jié)束。如果在第一個thenApply發(fā)生錯誤,這時候第二個和第三個將不會被調(diào)用,同樣的,future將以異常結(jié)束。
1. 使用 exceptionally() 回調(diào)處理異常
exceptionally()回調(diào)給你一個從原始Future中生成的錯誤恢復(fù)的機會。你可以在這里記錄這個異常并返回一個默認值。
Integer age = -1; CompletableFuturematurityFuture = CompletableFuture.supplyAsync(() -> { if(age < 0) { throw new IllegalArgumentException("Age can not be negative"); } if(age > 18) { return "Adult"; } else { return "Child"; } }).exceptionally(ex -> { System.out.println("Oops! We have an exception - " + ex.getMessage()); return "Unknown!"; }); System.out.println("Maturity : " + maturityFuture.get());
2. 使用 handle() 方法處理異常
API提供了一個更通用的方法 - handle()從異?;謴?fù),無論一個異常是否發(fā)生它都會被調(diào)用。
Integer age = -1; CompletableFuturematurityFuture = CompletableFuture.supplyAsync(() -> { if(age < 0) { throw new IllegalArgumentException("Age can not be negative"); } if(age > 18) { return "Adult"; } else { return "Child"; } }).handle((res, ex) -> { if(ex != null) { System.out.println("Oops! We have an exception - " + ex.getMessage()); return "Unknown!"; } return res; }); System.out.println("Maturity : " + maturityFuture.get());
如果異常發(fā)生,res參數(shù)將是 null,否則,ex將是 null。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/69149.html
摘要:方法接收的是的實例,但是它沒有返回值方法是函數(shù)式接口,無參數(shù),會返回一個結(jié)果這兩個方法是的升級,表示讓任務(wù)在指定的線程池中執(zhí)行,不指定的話,通常任務(wù)是在線程池中執(zhí)行的。該的接口是在線程使用舊的接口,它不允許返回值。 簡介 作為Java 8 Concurrency API改進而引入,本文是CompletableFuture類的功能和用例的介紹。同時在Java 9 也有對Completab...
摘要:中使用了提供的原生接口對自身的異步化做了改進??梢灾С趾蛢煞N調(diào)用方式。實戰(zhàn)通過下面的例子,可以看出的最大好處特性。 showImg(https://segmentfault.com/img/remote/1460000020032427?w=1240&h=655); 前段時間工作上比較忙,這篇文章一直沒來得及寫,本文是閱讀《Java8實戰(zhàn)》的時候,了解到Java 8里已經(jīng)提供了一個異步...
摘要:方法接受一個生產(chǎn)者作為參數(shù),返回一個對象,該對象完成異步執(zhí)行后會讀取調(diào)用生產(chǎn)者方法的返回值。該方法接收一個對象構(gòu)成的數(shù)組,返回由第一個執(zhí)行完畢的對象的返回值構(gòu)成的。 一、Future 接口 在Future中觸發(fā)那些潛在耗時的操作把調(diào)用線程解放出來,讓它能繼續(xù)執(zhí)行其他有價值的工作,不再需要呆呆等待耗時的操作完成。打個比方,你可以把它想象成這樣的場景:你拿了一袋子衣服到你中意的干洗店去洗。...
摘要:首先想到的是開啟一個新的線程去做某項工作。再進一步,為了讓新線程可以返回一個值,告訴主線程事情做完了,于是乎粉墨登場。然而提供的方式是主線程主動問詢新線程,要是有個回調(diào)函數(shù)就爽了。極大的提高效率。 showImg(https://segmentfault.com/img/bVbvgBJ?w=1920&h=1200); 引子 為了讓程序更加高效,讓CPU最大效率的工作,我們會采用異步編程...
摘要:這個方法返回與等待所有返回等待多個返回取多個當中最快的一個返回等待多個當中最快的一個返回二詳解終極指南并發(fā)編程中的風格 thenApply(等待并轉(zhuǎn)化future) @Test public void testThen() throws ExecutionException, InterruptedException { CompletableFutur...
閱讀 3562·2021-08-31 09:39
閱讀 1869·2019-08-30 13:14
閱讀 2932·2019-08-30 13:02
閱讀 2778·2019-08-29 13:22
閱讀 2357·2019-08-26 13:54
閱讀 778·2019-08-26 13:45
閱讀 1597·2019-08-26 11:00
閱讀 990·2019-08-26 10:58