摘要:前言談到并行,我們可能最先想到的是線程,多個(gè)線程一起運(yùn)行,來(lái)提高我們系統(tǒng)的整體處理速度為什么使用多個(gè)線程就能提高處理速度,因?yàn)楝F(xiàn)在計(jì)算機(jī)普遍都是多核處理器,我們需要充分利用資源如果站的更高一點(diǎn)來(lái)看,我們每臺(tái)機(jī)器都可以是一個(gè)處理節(jié)點(diǎn),多臺(tái)機(jī)器
前言
談到并行,我們可能最先想到的是線程,多個(gè)線程一起運(yùn)行,來(lái)提高我們系統(tǒng)的整體處理速度;為什么使用多個(gè)線程就能提高處理速度,因?yàn)楝F(xiàn)在計(jì)算機(jī)普遍都是多核處理器,我們需要充分利用cpu資源;如果站的更高一點(diǎn)來(lái)看,我們每臺(tái)機(jī)器都可以是一個(gè)處理節(jié)點(diǎn),多臺(tái)機(jī)器并行處理;并行的處理方式可以說(shuō)無(wú)處不在,本文主要來(lái)談?wù)凧ava在并行處理方面的努力。
無(wú)處不在的并行Java的垃圾回收器,我們可以看到每一代版本的更新,伴隨著GC更短的延遲,從serial到cms再到現(xiàn)在的G1,一直在摘掉Java慢的帽子;消息隊(duì)列從早期的ActiveMQ到現(xiàn)在的kafka和RocketMQ,引入的分區(qū)的概念,提高了消息的并行性;數(shù)據(jù)庫(kù)單表數(shù)據(jù)到一定量級(jí)之后,訪問(wèn)速度會(huì)很慢,我們會(huì)對(duì)表進(jìn)行分表處理,引入數(shù)據(jù)庫(kù)中間件;Redis你可能覺(jué)得本身處理是單線程的,但是Redis的集群方案中引入了slot(槽)的概念;更普遍的就是我們很多的業(yè)務(wù)系統(tǒng),通常會(huì)部署多臺(tái),通過(guò)負(fù)載均衡器來(lái)進(jìn)行分發(fā);好了還有其他的一些例子,此處不在一一例舉。
如何并行我覺(jué)得并行的核心在于"拆分",把大任務(wù)變成小任務(wù),然后利用多核CPU也好,還是多節(jié)點(diǎn)也好,同時(shí)并行的處理,Java歷代版本的更新,都在為我們開發(fā)者提供更方便的并行處理,從開始的Thread,到線程池,再到fork/join框架,最后到流處理,下面使用簡(jiǎn)單的求和例子來(lái)看看各種方式是如何并行處理的;
單線程處理首先看一下最簡(jiǎn)單的單線程處理方式,直接使用主線程進(jìn)行求和操作;
public class SingleThread { public static long[] numbers; public static void main(String[] args) { numbers = LongStream.rangeClosed(1, 10_000_000).toArray(); long sum = 0; for (int i = 0; i < numbers.length; i++) { sum += numbers[i]; } System.out.println("sum = " + sum); } }
求和本身是一個(gè)計(jì)算密集型任務(wù),但是現(xiàn)在已經(jīng)是多核時(shí)代,只用單線程,相當(dāng)于只使用了其中一個(gè)cpu,其他cpu被閑置,資源的浪費(fèi);
Thread方式我們把任務(wù)拆分成多個(gè)小任務(wù),然后每個(gè)小任務(wù)分別啟動(dòng)一個(gè)線程,如下所示:
public class ThreadTest { public static final int THRESHOLD = 10_000; public static long[] numbers; private static long allSum; public static void main(String[] args) throws Exception { numbers = LongStream.rangeClosed(1, 10_000_000).toArray(); int taskSize = (int) (numbers.length / THRESHOLD); for (int i = 1; i <= taskSize; i++) { final int key = i; new Thread(new Runnable() { public void run() { sumAll(sum((key - 1) * THRESHOLD, key * THRESHOLD)); } }).start(); } Thread.sleep(100); System.out.println("allSum = " + getAllSum()); } private static synchronized long sumAll(long threadSum) { return allSum += threadSum; } public static synchronized long getAllSum() { return allSum; } private static long sum(int start, int end) { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } }
以上指定了一個(gè)拆分閥值,計(jì)算拆分多少個(gè)認(rèn)為,同時(shí)啟動(dòng)多少線程;這種處理就是啟動(dòng)的線程數(shù)過(guò)多,而CPU數(shù)有限,更重要的是求和是一個(gè)計(jì)算密集型任務(wù),啟動(dòng)過(guò)多的線程只會(huì)帶來(lái)更多的線程上下文切換;同時(shí)線程處理完一個(gè)任務(wù)就終止了,也是對(duì)資源的浪費(fèi);另外可以看到主線程不知道何時(shí)子任務(wù)已經(jīng)處理完了,需要做額外的處理;所有Java后續(xù)引入了線程池。
線程池方式jdk1.5引入了并發(fā)包,其中包括了ThreadPoolExecutor,相關(guān)代碼如下:
public class ExecutorServiceTest { public static final int THRESHOLD = 10_000; public static long[] numbers; public static void main(String[] args) throws Exception { numbers = LongStream.rangeClosed(1, 10_000_000).toArray(); ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1); CompletionServicecompletionService = new ExecutorCompletionService (executor); int taskSize = (int) (numbers.length / THRESHOLD); for (int i = 1; i <= taskSize; i++) { final int key = i; completionService.submit(new Callable () { @Override public Long call() throws Exception { return sum((key - 1) * THRESHOLD, key * THRESHOLD); } }); } long sumValue = 0; for (int i = 0; i < taskSize; i++) { sumValue += completionService.take().get(); } // 所有任務(wù)已經(jīng)完成,關(guān)閉線程池 System.out.println("sumValue = " + sumValue); executor.shutdown(); } private static long sum(int start, int end) { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } }
上面已經(jīng)分析了計(jì)算密集型并不是線程越多越好,這里創(chuàng)建了JDK默認(rèn)的線程數(shù):CPU數(shù)+1,這是一個(gè)經(jīng)過(guò)大量測(cè)試以后給出的一個(gè)結(jié)果;線程池顧名思義,可以重復(fù)利用現(xiàn)有的線程;同時(shí)利用CompletionService來(lái)對(duì)子任務(wù)進(jìn)行匯總;合理的使用線程池已經(jīng)可以充分的并行處理任務(wù),只是在寫法上有點(diǎn)繁瑣,此時(shí)JDK1.7中引入了fork/join框架;
fork/join框架分支/合并框架的目的是以遞歸的方式將可以并行的認(rèn)為拆分成更小的任務(wù),然后將每個(gè)子任務(wù)的結(jié)果合并起來(lái)生成整體結(jié)果;相關(guān)代碼如下:
public class ForkJoinTest extends java.util.concurrent.RecursiveTask{ private static final long serialVersionUID = 1L; private final long[] numbers; private final int start; private final int end; public static final long THRESHOLD = 10_000; public ForkJoinTest(long[] numbers) { this(numbers, 0, numbers.length); } private ForkJoinTest(long[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } @Override protected Long compute() { int length = end - start; if (length <= THRESHOLD) { return computeSequentially(); } ForkJoinTest leftTask = new ForkJoinTest(numbers, start, start + length / 2); leftTask.fork(); ForkJoinTest rightTask = new ForkJoinTest(numbers, start + length / 2, end); Long rightResult = rightTask.compute(); // 注:join方法會(huì)阻塞,因此有必要在兩個(gè)子任務(wù)的計(jì)算都開始之后才執(zhí)行join方法 Long leftResult = leftTask.join(); return leftResult + rightResult; } private long computeSequentially() { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } public static void main(String[] args) { System.out.println(forkJoinSum(10_000_000)); } public static long forkJoinSum(long n) { long[] numbers = LongStream.rangeClosed(1, n).toArray(); ForkJoinTask task = new ForkJoinTest(numbers); return new ForkJoinPool().invoke(task); } }
ForkJoinPool是ExecutorService接口的一個(gè)實(shí)現(xiàn),子認(rèn)為分配給線程池中的工作線程;同時(shí)需要把任務(wù)提交到此線程池中,需要?jiǎng)?chuàng)建RecursiveTask
Java8引入了stream的概念,可以讓我們更好的利用并行,使用流代碼如下:
public class StreamTest { public static void main(String[] args) { System.out.println("sum = " + parallelRangedSum(10_000_000)); } public static long parallelRangedSum(long n) { return LongStream.rangeClosed(1, n).parallel().reduce(0L, Long::sum); } }
以上代碼是不是非常簡(jiǎn)單,對(duì)于開發(fā)者來(lái)說(shuō)完全不需要手動(dòng)拆分,使用同步機(jī)制等方式,就可以讓任務(wù)并行處理,只需要對(duì)流使用parallel()方法,系統(tǒng)自動(dòng)會(huì)對(duì)任務(wù)進(jìn)行拆分,當(dāng)然前提是沒(méi)有共享可變狀態(tài);其實(shí)并行流內(nèi)部使用的也是fork/join框架;
總結(jié)本文使用一個(gè)求和的實(shí)例,來(lái)介紹了jdk為開發(fā)者提供并行處理的各種方式,可以看到Java一直在為提供更方便的并行處理而努力。
參考<
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/74238.html
摘要:是一個(gè)倡議,它提倡提供一種帶有非阻塞背壓的異步流處理的標(biāo)準(zhǔn)。是標(biāo)準(zhǔn)的實(shí)現(xiàn)之一。的實(shí)現(xiàn)細(xì)節(jié)請(qǐng)求響應(yīng)的與請(qǐng)求響應(yīng)的暴露為是請(qǐng)求的的消費(fèi)者是響應(yīng)的的生產(chǎn)者內(nèi)部的內(nèi)部 北京時(shí)間 9 月 26 日,Oracle 官方宣布 Java 11 正式發(fā)布 一、JDK HTTP Client介紹 JDK11中的17個(gè)新特性 showImg(https://segmentfault.com/img/remo...
摘要:但有一個(gè)限制它們不能修改定義的方法的局部變量的內(nèi)容。如前所述,這種限制存在的原因在于局部變量保存在棧上,并且隱式表示它們僅限于其所在線程。 2014年,Oracle發(fā)布了Java8新版本。對(duì)于Java來(lái)說(shuō),這顯然是一個(gè)具有里程碑意義的版本。尤其是那函數(shù)式編程的功能,避開了Java那煩瑣的語(yǔ)法所帶來(lái)的麻煩。 這可以算是一篇Java8的學(xué)習(xí)筆記。將Java8一些常見的一些特性作了一個(gè)概要的...
摘要:如果僅依靠程序自動(dòng)交出控制的話,那么一些惡意程序?qū)?huì)很容易占用全部時(shí)間而不與其他任務(wù)共享。多個(gè)操作可以在重疊的時(shí)間段內(nèi)進(jìn)行。 PHP下的異步嘗試系列 如果你還不太了解PHP下的生成器,你可以根據(jù)下面目錄翻閱 PHP下的異步嘗試一:初識(shí)生成器 PHP下的異步嘗試二:初識(shí)協(xié)程 PHP下的異步嘗試三:協(xié)程的PHP版thunkify自動(dòng)執(zhí)行器 PHP下的異步嘗試四:PHP版的Promise ...
摘要:內(nèi)部迭代與使用迭代器顯式迭代的集合不同,流的迭代操作是在背后進(jìn)行的。流只能遍歷一次請(qǐng)注意,和迭代器類似,流只能遍歷一次。 流(Stream) 流是什么 流是Java API的新成員,它允許你以聲明性方式處理數(shù)據(jù)集合(通過(guò)查詢語(yǔ)句來(lái)表達(dá),而不是臨時(shí)編寫一個(gè)實(shí)現(xiàn))。就現(xiàn)在來(lái)說(shuō),你可以把它們看成遍歷數(shù)據(jù)集的高級(jí)迭代器。此外,流還可以透明地并行處理,你無(wú)需寫任何多線程代碼了!我會(huì)在后面的筆記中...
閱讀 2634·2021-11-15 11:38
閱讀 2645·2021-11-04 16:13
閱讀 18198·2021-09-22 15:07
閱讀 1062·2019-08-30 15:55
閱讀 3288·2019-08-30 14:15
閱讀 1698·2019-08-29 13:59
閱讀 3252·2019-08-28 18:28
閱讀 1612·2019-08-23 18:29