摘要:方法接受一個生產(chǎn)者作為參數(shù),返回一個對象,該對象完成異步執(zhí)行后會讀取調(diào)用生產(chǎn)者方法的返回值。該方法接收一個對象構(gòu)成的數(shù)組,返回由第一個執(zhí)行完畢的對象的返回值構(gòu)成的。
一、Future 接口
在Future中觸發(fā)那些潛在耗時的操作把調(diào)用線程解放出來,讓它能繼續(xù)執(zhí)行其他有價值的工作,不再需要呆呆等待耗時的操作完成。打個比方,你可以把它想象成這樣的場景:你拿了一袋子衣服到你中意的干洗店去洗。干洗店的員工會給你張發(fā)票,告訴你什么時候你的衣服會洗好(這就是一個Future事件)。衣服干洗的同時,你可以去做其他的事情。Future的另一個優(yōu)點是它比更底層的Thread更易用。要使用Future,通常你只需要將耗時的操作封裝在一個Callable對象中,再將它提交給ExecutorService,就萬事大吉了。
ExecutorService executor = Executors.newCachedThreadPool(); Futurefuture = executor.submit(new Callable () { public Double call() { return doSomeLongComputation(); }}); doSomethingElse(); //異步操作進(jìn)行的同時,你可以做其他的事情 try { Double result = future.get(1, TimeUnit.SECONDS); //獲取異步操作的結(jié)果,如果最終被阻塞,無法得到結(jié) //果,那么在最多等待1秒鐘之后退出 } catch (ExecutionException ee) { // 計算拋出一個異常 } catch (InterruptedException ie) { // 當(dāng)前線程在等待過程中被中斷 } catch (TimeoutException te) { // 在Future對象完成之前超過已過期 }
同步API與異步API二、實現(xiàn)異步 API同步API其實只是對傳統(tǒng)方法調(diào)用的另一種稱呼:你調(diào)用了某個方法,調(diào)用方在被調(diào)用方運行的過程中會等待,被調(diào)用方運行結(jié)束返回,調(diào)用方取得被調(diào)用方的返回值并繼續(xù)運行。即使調(diào)用方和被調(diào)用方在不同的線程中運行,調(diào)用方還是需要等待被調(diào)用方結(jié)束運行,這就是阻塞式調(diào)用這個名詞的由來。
與此相反,異步API會直接返回,或者至少在被調(diào)用方計算完成之前,將它剩余的計算任務(wù)交給另一個線程去做,該線程和調(diào)用方是異步的——這就是非阻塞式調(diào)用的由來。執(zhí)行剩余計算任務(wù)的線程會將它的計算結(jié)果返回給調(diào)用方。返回的方式要么是通過回調(diào)函數(shù),要么是由調(diào)用方再次執(zhí)行一個“等待,直到計算完成”的方法調(diào)用。這種方式的計算在I/O系統(tǒng)程序設(shè)計中非常常見:你發(fā)起了一次磁盤訪問,這次訪問和你的其他計算操作是異步的,你完成其他的任務(wù)時,磁盤塊的數(shù)據(jù)可能還沒載入到內(nèi)存,你只需要等待數(shù)據(jù)的載入完成。
使用CompletableFuture后,getPriceAsync方法的實現(xiàn)
public FuturegetPriceAsync(String product) { CompletableFuture futurePrice = new CompletableFuture<>(); new Thread( () -> { double price = calculatePrice(product); //calculatePrice需長時間計算,任務(wù)結(jié)束并得出結(jié)果時設(shè)置 //Future的返回值 futurePrice.complete(price); }).start(); return futurePrice; //無需等待還沒結(jié)束的計算,直接返回Future對象 }
使用異步API:
Shop shop = new Shop("BestShop"); long start = System.nanoTime(); FuturefuturePrice = shop.getPriceAsync("my favorite product"); long invocationTime = ((System.nanoTime() - start) / 1_000_000); System.out.println("Invocation returned after " + invocationTime + " msecs"); // 執(zhí)行更多任務(wù),比如查詢其他商店 doSomethingElse(); // 在計算商品價格的同時 try { double price = futurePrice.get(); //從Future對象中讀取價格,如果價格未知,會發(fā)生阻塞 System.out.printf("Price is %.2f%n", price); } catch (Exception e) { throw new RuntimeException(e); } long retrievalTime = ((System.nanoTime() - start) / 1_000_000); System.out.println("Price returned after " + retrievalTime + " msecs");
Stream和CompletableFuture的設(shè)計都遵循了類似的模式:它們都使用了Lambda表達(dá)式以及流水線的思想。CompletableFuture和Future的關(guān)系就跟Stream和Collection的關(guān)系一樣。
錯誤處理
如果計算商品價格的方法出現(xiàn)異常,用于提示錯誤的異常會被限制在試圖計算商品價格的當(dāng)前線程的范圍內(nèi),最終會殺死該線程,而這會導(dǎo)致等待get方法返回結(jié)果的客戶端永久地被阻塞。為了避免這種情況,你需要使用CompletableFuture的completeExceptionally方法將導(dǎo)致CompletableFuture內(nèi)發(fā)生問題的異常拋出。
拋出CompletableFuture內(nèi)的異常:
public FuturegetPriceAsync( String product ) { CompletableFuture futurePrice = new CompletableFuture<>(); new Thread( () - > { try { double price = calculatePrice( product ); futurePrice.complete( price ); } catch ( Exception ex ) { futurePrice.completeExceptionally( ex ); } } ).start(); return(futurePrice); }
使用工廠方法supplyAsync創(chuàng)建CompletableFuture對象:
public FuturegetPriceAsync(String product) { return CompletableFuture.supplyAsync(() -> calculatePrice(product)); }
此處getPriceAsync方法返回的CompletableFuture對象和上面你手工創(chuàng)建和完成的CompletableFuture對象是完全等價的,這意味著它提供了同樣的錯誤管理機制。supplyAsync方法接受一個生產(chǎn)者(Supplier)作為參數(shù),返回一個CompletableFuture對象,該對象完成異步執(zhí)行后會讀取調(diào)用生產(chǎn)者方法的返回值。生產(chǎn)者方法會交由ForkJoinPool池中的某個執(zhí)行線程(Executor)運行,但是你也可以使用supplyAsync方法的重載版本,傳遞第二個參數(shù)指定不同的執(zhí)行線程執(zhí)行生產(chǎn)者方法。
三、讓你的代碼免受阻塞之苦在所有店鋪中找出同一商品的價格,使用CompletableFuture實現(xiàn)findPrices方法
public ListfindPrices(String product) { List > priceFutures = shops.stream() .map(shop -> CompletableFuture.supplyAsync( () -> shop.getName() + " price is " + shop.getPrice(product))) .collect(Collectors.toList()); return priceFutures.stream() .map(CompletableFuture::join) .collect(toList()); }
這里使用了兩個不同的Stream流水線,而不是在同一個處理流的流水線上一個接一個地放置兩個map操作——這其實是有緣由的??紤]流操作之間的延遲特性,如果你在單一流水線中處理流,發(fā)向不同商家的請求只能以同步、順序執(zhí)行的方式才會成功。因此,每個創(chuàng)建CompletableFuture對象只能在前一個操作結(jié)束之后執(zhí)行查詢指定商家的動作、通知join方法返回計算結(jié)果。
CompletableFuture類中的join方法和Future接口中的get有相同的含義,并且也聲明在Future接口中,它們唯一的不同是join不會拋出任何檢測到的異常。使用它你不再需要使用try/catch語句塊讓你傳遞給第二個map方法的Lambda表達(dá)式變得過于臃腫。
使用定制的執(zhí)行器:
調(diào)整線程池的大小
Brian Goetz建議,線程池大小與處理器的利用率之比可以使用下面的公式進(jìn)行估算:
Nthreads = NCPU * UCPU * (1 + W/C)
其中:
?NCPU是處理器的核的數(shù)目,可以通過Runtime.getRuntime().availableProcessors()得到
?UCPU是期望的CPU利用率(該值應(yīng)該介于0和1之間)
?W/C是等待時間與計算時間的比率
實際操作中,如果你創(chuàng)建
的線程數(shù)比商店的數(shù)目更多,反而是一種浪費,因為這樣做之后,你線程池中的有些線程根本沒有機會被使用。出于這種考慮,我們建議你將執(zhí)行器使用的線程數(shù),與你需要查詢的商店數(shù)目設(shè)定為同一個值,這樣每個商店都應(yīng)該對應(yīng)一個服務(wù)線程。不過,為了避免發(fā)生由于商店的數(shù)目過多導(dǎo)致服務(wù)器超負(fù)荷而崩潰,你還是需要設(shè)置一個上限,比如100個線程。代碼清單如下所示。為“最優(yōu)價格查詢器”應(yīng)用定制的執(zhí)行器:
private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } });
并行——使用流還是CompletableFutures?四、對多個異步任務(wù)進(jìn)行流水線操作 1.thenCompose
目前為止,你已經(jīng)知道對集合進(jìn)行并行計算有兩種方式:要么將其轉(zhuǎn)化為并行流,利用map這樣的操作開展工作,要么枚舉出集合中的每一個元素,創(chuàng)建新的線程,在CompletableFuture內(nèi)對其進(jìn)行操作。后者提供了更多的靈活性,你可以調(diào)整線程池的大小,而這能幫助你確保整體的計算不會因為線程都在等待I/O而發(fā)生阻塞。
我們對使用這些API的建議如下。
?如果你進(jìn)行的是計算密集型的操作,并且沒有I/O,那么推薦使用Stream接口,因為實現(xiàn)簡單,同時效率也可能是最高的(如果所有的線程都是計算密集型的,那就沒有必要創(chuàng)建比處理器核數(shù)更多的線程)。
?反之,如果你并行的工作單元還涉及等待I/O的操作(包括網(wǎng)絡(luò)連接等待),那么使用CompletableFuture靈活性更好,你可以像前文討論的那樣,依據(jù)等待/計算,或者W/C的比率設(shè)定需要使用的線程數(shù)。這種情況不使用并行流的另一個原因是,處理流的流水線中如果發(fā)生I/O等待,流的延遲特性會讓我們很難判斷到底什么時候觸發(fā)了等待。
使用CompletableFuture實現(xiàn)findPrices方法(獲取商品折扣后價格):
public ListfindPrices(String product) { List > priceFutures = shops.stream() .map(shop -> CompletableFuture.supplyAsync( () -> shop.getPrice(product), executor))//getPrice耗時操作,獲取商品的價格字符串,使用異步方式 .map(future -> future.thenApply(Quote::parse)) //將價格字符串解析成Quote對象(包裝了價格,折扣率等) .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync( () -> Discount.applyDiscount(quote), executor))) //異步計算商品最終價格 .collect(toList()); return priceFutures.stream() .map(CompletableFuture::join) //等待流中的所有Future執(zhí)行完畢,并提取各自的返回值 .collect(toList()); }
thenapply()是返回的是非CompletableFuture類型:它的功能相當(dāng)于將CompletableFuture轉(zhuǎn)換成CompletableFuture。
thenCompose()用來連接兩個CompletableFuture,返回值是新的CompletableFuture:
thenCompose方法允許你對兩個異步操作進(jìn)行流水線,第一個操作完成時,將其結(jié)果作為參數(shù)傳遞給第二個操作。
CompletableFuture類中的其他方法一樣,也提供了一個以Async后綴結(jié)尾的版本thenComposeAsync。通常而言,名稱中不帶Async2.用thenCombine將兩個 CompletableFuture 對象整合起來,無論它們是否存在依賴
的方法和它的前一個任務(wù)一樣,在同一個線程中運行;而名稱以Async結(jié)尾的方法會將后續(xù)的任務(wù)提交到一個線程池,所以每個任務(wù)是由不同的線程處理的。
thenCombine方法,它接收名為BiFunction的第二參數(shù),這個參數(shù)定義了當(dāng)兩個CompletableFuture對象完成計算后,結(jié)果如何合并。同thenCompose方法一樣,thenCombine方法也提供有一個Async的版本。這里,如果使用thenCombineAsync會導(dǎo)致BiFunction中定義的合并操作被提交到線程池中,由另一個任務(wù)以異步的方式執(zhí)行。
eg:有一家商店提供的價格是以歐元(EUR)計價的,但是你希望以美元的方式提供給你的客戶:
Future五、響應(yīng) CompletableFuture 的 completion 事件futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product)) .thenCombine( CompletableFuture.supplyAsync( () -> exchangeService.getRate(Money.EUR, Money.USD)), (price, rate) -> price * rate );
只要有商店返回商品價格就在第一時間顯示返回值,不再等待那些還未返回的商店(有些甚至?xí)l(fā)生超時)。Java 8的CompletableFuture通 過thenAccept方法提供了這一功能,它接收CompletableFuture執(zhí)行完畢后的返回值做參數(shù)。
重構(gòu)findPrices方法返回一個由Future構(gòu)成的流
public Stream> findPricesStream(String product) { return shops.stream() .map(shop -> CompletableFuture.supplyAsync( () -> shop.getPrice(product), executor)) .map(future -> future.thenApply(Quote::parse)) .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync( () -> Discount.applyDiscount(quote), executor))); } findPricesStream("myPhone").map(f -> f.thenAccept(System.out::println));
由 于thenAccept方法已經(jīng)定義了如何處理CompletableFuture返回的結(jié)果,一旦CompletableFuture計算得到結(jié)果,它就返回一個CompletableFuture
你還希望能給最慢的商店一些機會,讓它有機會打印輸出返回的價格。為了實現(xiàn)這一目的,你可以把構(gòu)成Stream的所有CompletableFuture
CompletableFuture[] futures = findPricesStream("myPhone") .map(f -> f.thenAccept(System.out::println)) .toArray(size -> new CompletableFuture[size]); CompletableFuture.allOf(futures).join();
allOf工廠方法接收一個由CompletableFuture構(gòu)成的數(shù)組,數(shù)組中的所有CompletableFuture對象執(zhí)行完成之后,它返回一個CompletableFuture
CompletableFuture執(zhí)行join操作是個不錯的主意。
你可能希望只要CompletableFuture對象數(shù)組中有任何一個執(zhí)行完畢就不再等待,比如,你正在查詢兩個匯率服務(wù)器,任何一個返回了結(jié)果都能滿足你的需求。在這種情況下,你可以使用一個類似的工廠方法anyOf。該方法接收一個CompletableFuture對象構(gòu)成的數(shù)組,返回由第一個執(zhí)行完畢的CompletableFuture對象的返回值構(gòu)成的CompletableFuture
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/74370.html
摘要:中使用了提供的原生接口對自身的異步化做了改進(jìn)。可以支持和兩種調(diào)用方式。實戰(zhàn)通過下面的例子,可以看出的最大好處特性。 showImg(https://segmentfault.com/img/remote/1460000020032427?w=1240&h=655); 前段時間工作上比較忙,這篇文章一直沒來得及寫,本文是閱讀《Java8實戰(zhàn)》的時候,了解到Java 8里已經(jīng)提供了一個異步...
摘要:方法接收的是的實例,但是它沒有返回值方法是函數(shù)式接口,無參數(shù),會返回一個結(jié)果這兩個方法是的升級,表示讓任務(wù)在指定的線程池中執(zhí)行,不指定的話,通常任務(wù)是在線程池中執(zhí)行的。該的接口是在線程使用舊的接口,它不允許返回值。 簡介 作為Java 8 Concurrency API改進(jìn)而引入,本文是CompletableFuture類的功能和用例的介紹。同時在Java 9 也有對Completab...
摘要:在這種方式中,主線程不會被阻塞,不需要一直等到子線程完成。主線程可以并行的執(zhí)行其他任務(wù)。如果我們不想等待結(jié)果返回,我們可以把需要等待完成執(zhí)行的邏輯寫入到回調(diào)函數(shù)中。任何立即執(zhí)行完成那就是執(zhí)行在主線程中嘗試刪除測試下??梢允褂眠_(dá)成目的。 Java 8 有大量的新特性和增強如 Lambda 表達(dá)式,Streams,CompletableFuture等。在本篇文章中我將詳細(xì)解釋清楚Compl...
摘要:并行流與目前,我們對集合進(jìn)行計算有兩種方式并行流而更加的靈活,我們可以配置線程池的大小確保整體的計算不會因為等待而發(fā)生阻塞。 【回顧Future接口 Future接口時java5引入的,設(shè)計初衷是對將來某個時刻會發(fā)生的結(jié)果建模。它建模了一種異步計算,返回了一個執(zhí)行預(yù)算結(jié)果的引用。比如,你去干洗店洗衣服,店員會告訴你什么時候可以來取衣服,而不是讓你一直在干洗店等待。要使用Future只需...
摘要:表示的是兩個,當(dāng)其中任意一個計算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢娝氖褂茫陂_始分析它的高并發(fā)實現(xiàn)機制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個比較典型的互聯(lián)網(wǎng)高并發(fā)場景。 干貨:深度剖析分布式搜索引擎設(shè)計 分布式,高可用,和機器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個名詞,今天我們首先來說說分布式。 探究...
閱讀 3107·2021-10-15 09:41
閱讀 3179·2021-09-22 16:05
閱讀 2419·2021-09-22 15:19
閱讀 2883·2021-09-02 15:11
閱讀 2458·2019-08-30 15:52
閱讀 846·2019-08-30 11:06
閱讀 1010·2019-08-29 16:44
閱讀 1265·2019-08-23 18:18