摘要:并行流與目前,我們對集合進行計算有兩種方式并行流而更加的靈活,我們可以配置線程池的大小確保整體的計算不會因為等待而發(fā)生阻塞。
【回顧Future接口
Future接口時java5引入的,設(shè)計初衷是對將來某個時刻會發(fā)生的結(jié)果建模。它建模了一種異步計算,返回了一個執(zhí)行預(yù)算結(jié)果的引用。比如,你去干洗店洗衣服,店員會告訴你什么時候可以來取衣服,而不是讓你一直在干洗店等待。要使用Future只需要將耗時操作封裝在一個Callable對象中,再將其提交給ExecutorService就可以了。
ExecutorService executor = Executors.newFixedThreadPool(10); Futurefuture = executor.submit(new Callable () { @Override public Double call() throws Exception { return doSomeLongComputation(); } }); doSomethingElse(); try { //最多等待1秒 Double result = future.get(1,TimeUnit.SECONDS); } catch (InterruptedException e) { //當(dāng)前線程等待過程中被打斷 e.printStackTrace(); } catch (ExecutionException e) { //計算時出現(xiàn)異常 e.printStackTrace(); } catch (TimeoutException e) { //完成計算前就超時 e.printStackTrace(); }
但是Future依然有一些局限性:
無法將兩個異步計算的結(jié)果合并為一個。
等待Future集合中所有任務(wù)完成。
等待Future集合中最快任務(wù)完成(選擇最優(yōu)的執(zhí)行方案)。
通過編程的方式完成一個Future任務(wù)的執(zhí)行(手工設(shè)定異步結(jié)果處理)。
應(yīng)對Future的完成事件,當(dāng)Future的完成事件發(fā)生時會收到通知,并可以使用Future的結(jié)果進行下一步操作,不只是簡單的阻塞等待。
而CompletableFuture類實現(xiàn)了Future接口,可以將上述的問題全部解決。CompletableFuture與Stream的設(shè)計都遵循了類似的設(shè)計模式:使用Lambda表達式以及流水線的思想,從這個角度可以說CompletableFuture與Future的關(guān)系類似于Stream與Collection的關(guān)系。
【構(gòu)建一個異步應(yīng)用最佳價格查詢器:查詢多個線上商店對同一商品的價格。
首先構(gòu)建商店對象:
package BestPriceFinder; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; public class Shop { private String name; public Shop(String name){ this.name = name; } public String getName(){ return name; } /** * 異步api:使用創(chuàng)建CompletableFuture類提供的工廠方法與getPriceAsync()效果完全一致 * 可以更輕易的完成這個流程,并且不用擔(dān)心實現(xiàn)細節(jié) * @param product * @return */ public FuturegetPriceAsyncByFactory(String product){ return CompletableFuture.supplyAsync(() -> calculatePrice(product)); } /** * 異步api: * @param product * @return */ public Future getPriceAsync(String product){ //創(chuàng)建CompletableFuture對象,它將包含計算結(jié)果 CompletableFuture futurePrice = new CompletableFuture<>(); //在新線程中異步計算結(jié)果 new Thread(() -> { try { double price = calculatePrice(product); //需要長時間計算的任務(wù)結(jié)束時,設(shè)置future的返回值 futurePrice.complete(price); }catch (Exception e){ //如這里沒有使用completeExceptionally,線程不會結(jié)束,調(diào)用方會永遠的執(zhí)行下去 futurePrice.completeExceptionally(e); } }).start(); //無需等待計算結(jié)果,直接返回future對象 return futurePrice; } /** * 同步api: * 每個商店都需要提供的查詢api:根據(jù)名稱返回價格; * 模擬查詢數(shù)據(jù)庫等一些耗時操作:使用delay()模擬這些耗時操作。 * @param product * @return */ public double getPrice(String product){ return calculatePrice(product); } private double calculatePrice(String product){ delay(); return random.nextDouble() * product.charAt(0) + product.charAt(1); } private Random random = new Random(); /** * 模擬耗時操作:延遲一秒 */ private static void delay(){ try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
下面我們針對Shop.java提供的同步方法與異步方法來進行測試:
package BestPriceFinder; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.stream.Collectors; /** * 最佳價格查詢器 */ public class BestFinder { Listshops = Arrays.asList( new Shop("A"), new Shop("B"), new Shop("C"), new Shop("D"), new Shop("E"), new Shop("F"), new Shop("G"), new Shop("H"), new Shop("I"), new Shop("J") ); /** * 順序查詢 */ public List findPrices(String product){ return shops.stream() .map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product))) .collect(Collectors.toList()); } /** * 并行流查詢 */ public List findPricesParallel(String product){ return shops.parallelStream() .map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product))) .collect(Collectors.toList()); } /** * 異步查詢 * 相比并行流的話CompletableFuture更有優(yōu)勢:可以對執(zhí)行器配置,設(shè)置線程池大小 */ @SuppressWarnings("all") private final Executor myExecutor = Executors.newFixedThreadPool(Math.min(shops.size(), 800), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); //使用守護線程保證不會阻止程序的關(guān)停 t.setDaemon(true); return t; } }); @SuppressWarnings("all") public List findPricesAsync(String product){ List > priceFuctures = shops.stream() .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product)),myExecutor)) .collect(Collectors.toList()); /** 這里需要使用新的stream來等待所有的子線程執(zhí)行完, * 因為:如果在一個stream中使用兩個map: * List > priceFuctures = shops.parallelStream() * .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product)))) * .map(c -> c.join()).collect(Collectors.toList()) * .collect(Collectors.toList()); * 考慮到流操作之間的延遲特性。如果你在單一的流水線中處理流,發(fā)向不同商家的請求只能以同步順序的方式執(zhí)行才會成功。因此每個創(chuàng)建CompletableFuture * 對象只能在前一個操作結(jié)束之后執(zhí)行查詢商家動作。 */ return priceFuctures.stream().map(c -> c.join()).collect(Collectors.toList()); } }
@Test public void findPrices(){ BestFinder bestFinder = new BestFinder(); long st = System.currentTimeMillis(); System.out.println(bestFinder.findPrices("iPhonX")); System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs"); } @Test public void findPricesParallel(){ BestFinder bestFinder = new BestFinder(); long st = System.currentTimeMillis(); System.out.println(bestFinder.findPrices("iPhonX")); System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs"); } @Test public void findPricesAsync(){ BestFinder bestFinder = new BestFinder(); long st = System.currentTimeMillis(); System.out.println(bestFinder.findPricesAsync("iPhonX")); System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs"); }
同步api測試結(jié)果:毫無疑問是10秒之上
并行流獲取同步api測試結(jié)果:也是10秒之上,但是并行流不是很高效嗎?怎么會如此凄慘呢?因為這與并行流可以調(diào)用的系統(tǒng)核數(shù)相關(guān),我的計算機是8核,最多8個線程同時運行。而商店有10個,也就是說,我們的兩個線程會一直等待前面的某一個線程釋放出空閑才能繼續(xù)運行。
異步獲取api測試結(jié)果:一秒左右
為何差距如此大呢?
明智的選擇是創(chuàng)建了一個配有線程池的執(zhí)行器,線程池中線程的數(shù)目取決于你的應(yīng)用需要處理的負擔(dān),但是你該如何選擇合適的線程數(shù)目呢?
《Java并發(fā)編程實戰(zhàn)》中給出如下公式:
Number = NCpu * Ucpu * ( 1 + W/C) Number : 線程數(shù)量 NCpu : 處理器核數(shù) UCpu : 期望cpu利用率 W/C : 等待時間與計算時間比
我們這里:99%d的時間是等待商店響應(yīng) W/C = 99 ,cpu利用率期望 100% ,NCpu = 9,推斷出 number = 800。但是為了避免過多的線程搞死計算機,我們選擇商店數(shù)與計算值中較小的一個。
【并行流與CompletableFuture目前,我們對集合進行計算有兩種方式:1.并行流 2.CompletableFuture;而CompletableFuture更加的靈活,我們可以配置線程池的大小確保整體的計算不會因為等待IO而發(fā)生阻塞。
書上給出的建議如下:
如果是計算密集型的操作并且沒有IO推薦stream接口,因為實現(xiàn)簡單效率也高,如果所有的線程都是計算密集型的也就沒有必要創(chuàng)建比核數(shù)更多的線程。
反之,如果任務(wù)涉及到IO,網(wǎng)絡(luò)等操作:CompletableFuture靈活性更好,因為大部分線程處于等待狀態(tài),需要讓他們更加忙碌,并且再邏輯中加入異常處理可以更有效的監(jiān)控是什么原因觸發(fā)了等待。
現(xiàn)在我們知道了如何用CompletableFuture提供異步的api,后面的文章會學(xué)習(xí)如何利用CompletableFuture高效的操作同步api。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/68219.html
摘要:相比與其他操作系統(tǒng)包括其他類系統(tǒng)有很多的優(yōu)點,其中有一項就是,其上下文切換和模式切換的時間消耗非常少。因為多線程競爭鎖時會引起上下文切換。減少線程的使用。很多編程語言中都有協(xié)程。所以如何避免死鎖的產(chǎn)生,在我們使用并發(fā)編程時至關(guān)重要。 系列文章傳送門: Java多線程學(xué)習(xí)(一)Java多線程入門 Java多線程學(xué)習(xí)(二)synchronized關(guān)鍵字(1) java多線程學(xué)習(xí)(二)syn...
摘要:因為多線程競爭鎖時會引起上下文切換。減少線程的使用。舉個例子如果說服務(wù)器的帶寬只有,某個資源的下載速度是,系統(tǒng)啟動個線程下載該資源并不會導(dǎo)致下載速度編程,所以在并發(fā)編程時,需要考慮這些資源的限制。 最近私下做一項目,一bug幾日未解決,總惶恐。一日頓悟,bug不可怕,怕的是項目不存在bug,與其懼怕,何不與其剛正面。 系列文章傳送門: Java多線程學(xué)習(xí)(一)Java多線程入門 Jav...
摘要:學(xué)習(xí)編程的本最佳書籍這些書涵蓋了各個領(lǐng)域,包括核心基礎(chǔ)知識,集合框架,多線程和并發(fā),內(nèi)部和性能調(diào)優(yōu),設(shè)計模式等。擅長解釋錯誤及錯誤的原因以及如何解決簡而言之,這是學(xué)習(xí)中并發(fā)和多線程的最佳書籍之一。 showImg(https://segmentfault.com/img/remote/1460000018913016); 來源 | 愿碼(ChainDesk.CN)內(nèi)容編輯 愿碼Slo...
摘要:表示的是兩個,當(dāng)其中任意一個計算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢娝氖褂?,在開始分析它的高并發(fā)實現(xiàn)機制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個比較典型的互聯(lián)網(wǎng)高并發(fā)場景。 干貨:深度剖析分布式搜索引擎設(shè)計 分布式,高可用,和機器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個名詞,今天我們首先來說說分布式。 探究...
閱讀 2814·2019-08-30 15:55
閱讀 2861·2019-08-30 15:53
閱讀 2299·2019-08-26 13:47
閱讀 2562·2019-08-26 13:43
閱讀 3161·2019-08-26 13:33
閱讀 2809·2019-08-26 11:53
閱讀 1801·2019-08-23 18:35
閱讀 804·2019-08-23 17:16