摘要:在這個示例中我們使用了一個單線程線程池的。在延遲消逝后,任務(wù)將會并發(fā)執(zhí)行。這是并發(fā)系列教程的第一部分。第一部分線程和執(zhí)行器第二部分同步和鎖第三部分原子操作和
Java 8 并發(fā)教程:線程和執(zhí)行器
原文:Java 8 Concurrency Tutorial: Threads and Executors
譯者:BlankKelly
來源:Java8并發(fā)教程:Threads和Executors
歡迎閱讀我的Java8并發(fā)教程的第一部分。這份指南將會以簡單易懂的代碼示例來教給你如何在Java8中進(jìn)行并發(fā)編程。這是一系列教程中的第一部分。在接下來的15分鐘,你將會學(xué)會如何通過線程,任務(wù)(tasks)和 exector services來并行執(zhí)行代碼。
第一部分:線程和執(zhí)行器
第二部分:同步和鎖
第三部分:原子操作和 ConcurrentMap
并發(fā)在Java5中首次被引入并在后續(xù)的版本中不斷得到增強(qiáng)。在這篇文章中介紹的大部分概念同樣適用于以前的Java版本。不過我的代碼示例聚焦于Java8,大量使用lambda表達(dá)式和其他新特性。如果你對lambda表達(dá)式不屬性,我推薦你首先閱讀我的Java 8 教程。
Thread 和 Runnable所有的現(xiàn)代操作系統(tǒng)都通過進(jìn)程和線程來支持并發(fā)。進(jìn)程是通常彼此獨(dú)立運(yùn)行的程序的實(shí)例,比如,如果你啟動了一個Java程序,操作系統(tǒng)產(chǎn)生一個新的進(jìn)程,與其他程序一起并行執(zhí)行。在這些進(jìn)程的內(nèi)部,我們使用線程并發(fā)執(zhí)行代碼,因此,我們可以最大限度的利用CPU可用的核心(core)。
Java從JDK1.0開始執(zhí)行線程。在開始一個新的線程之前,你必須指定由這個線程執(zhí)行的代碼,通常稱為task。這可以通過實(shí)現(xiàn)Runnable——一個定義了一個無返回值無參數(shù)的run()方法的函數(shù)接口,如下面的代碼所示:
Runnable task = () -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName); }; task.run(); Thread thread = new Thread(task); thread.start(); System.out.println("Done!");
因?yàn)?b>Runnable是一個函數(shù)接口,所以我們利用lambda表達(dá)式將當(dāng)前的線程名打印到控制臺。首先,在開始一個線程前我們在主線程中直接運(yùn)行runnable。
控制臺輸出的結(jié)果可能像下面這樣:
Hello main Hello Thread-0 Done!
或者這樣:
Hello main Done! Hello Thread-0
由于我們不能預(yù)測這個runnable是在打印"done"前執(zhí)行還是在之后執(zhí)行。順序是不確定的,因此在大的程序中編寫并發(fā)程序是一個復(fù)雜的任務(wù)。
我們可以將線程休眠確定的時間。在這篇文章接下來的代碼示例中我們可以通過這種方法來模擬長時間運(yùn)行的任務(wù)。
Runnable runnable = () -> { try { String name = Thread.currentThread().getName(); System.out.println("Foo " + name); TimeUnit.SECONDS.sleep(1); System.out.println("Bar " + name); } catch (InterruptedException e) { e.printStackTrace(); } }; Thread thread = new Thread(runnable); thread.start();
當(dāng)你運(yùn)行上面的代碼時,你會注意到在第一條打印語句和第二條打印語句之間存在一分鐘的延遲。TimeUnit在處理單位時間時一個有用的枚舉類。你可以通過調(diào)用Thread.sleep(1000)來達(dá)到同樣的目的。
使用Thread類是很單調(diào)的且容易出錯。由于并發(fā)API在2004年Java5發(fā)布的時候才被引入。這些API位于java.util.concurrent包下,包含很多處理并發(fā)編程的有用的類。自從這些并發(fā)API引入以來,在隨后的新的Java版本發(fā)布過程中得到不斷的增強(qiáng),甚至Java8提供了新的類和方法來處理并發(fā)。
接下來,讓我們走進(jìn)并發(fā)API中最重要的一部——executor services。
Executor并發(fā)API引入了ExecutorService作為一個在程序中直接使用Thread的高層次的替換方案。Executos支持運(yùn)行異步任務(wù),通常管理一個線程池,這樣一來我們就不需要手動去創(chuàng)建新的線程。在不斷地處理任務(wù)的過程中,線程池內(nèi)部線程將會得到復(fù)用,因此,在我們可以使用一個executor service來運(yùn)行和我們想在我們整個程序中執(zhí)行的一樣多的并發(fā)任務(wù)。
下面是使用executors的第一個代碼示例:
ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(() -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName); }); // => Hello pool-1-thread-1
Executors類提供了便利的工廠方法來創(chuàng)建不同類型的 executor services。在這個示例中我們使用了一個單線程線程池的 executor。
代碼運(yùn)行的結(jié)果類似于上一個示例,但是當(dāng)運(yùn)行代碼時,你會注意到一個很大的差別:Java進(jìn)程從沒有停止!Executors必須顯式的停止-否則它們將持續(xù)監(jiān)聽新的任務(wù)。
ExecutorService提供了兩個方法來達(dá)到這個目的——shutdwon()會等待正在執(zhí)行的任務(wù)執(zhí)行完而shutdownNow()會終止所有正在執(zhí)行的任務(wù)并立即關(guān)閉execuotr。
這是我喜歡的通常關(guān)閉executors的方式:
try { System.out.println("attempt to shutdown executor"); executor.shutdown(); executor.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { System.err.println("tasks interrupted"); } finally { if (!executor.isTerminated()) { System.err.println("cancel non-finished tasks"); } executor.shutdownNow(); System.out.println("shutdown finished"); }
executor通過等待指定的時間讓當(dāng)前執(zhí)行的任務(wù)終止來“溫柔的”關(guān)閉executor。在等待最長5分鐘的時間后,execuote最終會通過中斷所有的正在執(zhí)行的任務(wù)關(guān)閉。
Callable 和 Future除了Runnable,executor還支持另一種類型的任務(wù)——Callable。Callables也是類似于runnables的函數(shù)接口,不同之處在于,Callable返回一個值。
下面的lambda表達(dá)式定義了一個callable:在休眠一分鐘后返回一個整數(shù)。
Callabletask = () -> { try { TimeUnit.SECONDS.sleep(1); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); } };
Callbale也可以像runnbales一樣提交給 executor services。但是callables的結(jié)果怎么辦?因?yàn)?b>submit()不會等待任務(wù)完成,executor service不能直接返回callable的結(jié)果。不過,executor 可以返回一個Future類型的結(jié)果,它可以用來在稍后某個時間取出實(shí)際的結(jié)果。
ExecutorService executor = Executors.newFixedThreadPool(1); Futurefuture = executor.submit(task); System.out.println("future done? " + future.isDone()); Integer result = future.get(); System.out.println("future done? " + future.isDone()); System.out.print("result: " + result);
在將callable提交給exector之后,我們先通過調(diào)用isDone()來檢查這個future是否已經(jīng)完成執(zhí)行。我十分確定這會發(fā)生什么,因?yàn)樵诜祷啬莻€整數(shù)之前callable會休眠一分鐘、
在調(diào)用get()方法時,當(dāng)前線程會阻塞等待,直到callable在返回實(shí)際的結(jié)果123之前執(zhí)行完成?,F(xiàn)在future執(zhí)行完畢,我們可以在控制臺看到如下的結(jié)果:
future done? false future done? true result: 123
Future與底層的executor service緊密的結(jié)合在一起。記住,如果你關(guān)閉executor,所有的未中止的future都會拋出異常。
executor.shutdownNow(); future.get();
你可能注意到我們這次創(chuàng)建executor的方式與上一個例子稍有不同。我們使用newFixedThreadPool(1)來創(chuàng)建一個單線程線程池的 execuot service。
這等同于使用newSingleThreadExecutor不過使用第二種方式我們可以稍后通過簡單的傳入一個比1大的值來增加線程池的大小。
任何future.get()調(diào)用都會阻塞,然后等待直到callable中止。在最糟糕的情況下,一個callable持續(xù)運(yùn)行——因此使你的程序?qū)]有響應(yīng)。我們可以簡單的傳入一個時長來避免這種情況。
ExecutorService executor = Executors.newFixedThreadPool(1); Futurefuture = executor.submit(() -> { try { TimeUnit.SECONDS.sleep(2); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); } }); future.get(1, TimeUnit.SECONDS);
運(yùn)行上面的代碼將會產(chǎn)生一個TimeoutException:
Exception in thread "main" java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask.get(FutureTask.java:205)
你可能已經(jīng)猜到俄為什么會排除這個異常。我們指定的最長等待時間為1分鐘,而這個callable在返回結(jié)果之前實(shí)際需要兩分鐘。
invokeAllExecutors支持通過invokeAll()一次批量提交多個callable。這個方法結(jié)果一個callable的集合,然后返回一個future的列表。
ExecutorService executor = Executors.newWorkStealingPool(); List> callables = Arrays.asList( () -> "task1", () -> "task2", () -> "task3"); executor.invokeAll(callables) .stream() .map(future -> { try { return future.get(); } catch (Exception e) { throw new IllegalStateException(e); } }) .forEach(System.out::println);
在這個例子中,我們利用Java8中的函數(shù)流(stream)來處理invokeAll()調(diào)用返回的所有future。我們首先將每一個future映射到它的返回值,然后將每個值打印到控制臺。如果你還不屬性stream,可以閱讀我的Java8 Stream 教程。
invokeAny批量提交callable的另一種方式就是invokeAny(),它的工作方式與invokeAll()稍有不同。在等待future對象的過程中,這個方法將會阻塞直到第一個callable中止然后返回這一個callable的結(jié)果。
為了測試這種行為,我們利用這個幫助方法來模擬不同執(zhí)行時間的callable。這個方法返回一個callable,這個callable休眠指定 的時間直到返回給定的結(jié)果。
Callablecallable(String result, long sleepSeconds) { return () -> { TimeUnit.SECONDS.sleep(sleepSeconds); return result; }; }
我們利用這個方法創(chuàng)建一組callable,這些callable擁有不同的執(zhí)行時間,從1分鐘到3分鐘。通過invokeAny()將這些callable提交給一個executor,返回最快的callable的字符串結(jié)果-在這個例子中為任務(wù)2:
ExecutorService executor = Executors.newWorkStealingPool(); List> callables = Arrays.asList( callable("task1", 2), callable("task2", 1), callable("task3", 3)); String result = executor.invokeAny(callables); System.out.println(result); // => task2
上面這個例子又使用了另一種方式來創(chuàng)建executor——調(diào)用newWorkStealingPool()。這個工廠方法是Java8引入的,返回一個ForkJoinPool類型的 executor,它的工作方法與其他常見的execuotr稍有不同。與使用一個固定大小的線程池不同,ForkJoinPools使用一個并行因子數(shù)來創(chuàng)建,默認(rèn)值為主機(jī)CPU的可用核心數(shù)。
ForkJoinPools 在Java7時引入,將會在這個系列后面的教程中詳細(xì)講解。讓我們深入了解一下 scheduled executors 來結(jié)束本次教程。
ScheduledExecutor我們已經(jīng)學(xué)習(xí)了如何在一個 executor 中提交和運(yùn)行一次任務(wù)。為了持續(xù)的多次執(zhí)行常見的任務(wù),我們可以利用調(diào)度線程池。
ScheduledExecutorService支持任務(wù)調(diào)度,持續(xù)執(zhí)行或者延遲一段時間后執(zhí)行。
下面的實(shí)例,調(diào)度一個任務(wù)在延遲3分鐘后執(zhí)行:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime()); ScheduledFuture> future = executor.schedule(task, 3, TimeUnit.SECONDS); TimeUnit.MILLISECONDS.sleep(1337); long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS); System.out.printf("Remaining Delay: %sms", remainingDelay);
調(diào)度一個任務(wù)將會產(chǎn)生一個專門的future類型——ScheduleFuture,它除了提供了Future的所有方法之外,他還提供了getDelay()方法來獲得剩余的延遲。在延遲消逝后,任務(wù)將會并發(fā)執(zhí)行。
為了調(diào)度任務(wù)持續(xù)的執(zhí)行,executors 提供了兩個方法scheduleAtFixedRate()和scheduleWithFixedDelay()。第一個方法用來以固定頻率來執(zhí)行一個任務(wù),比如,下面這個示例中,每分鐘一次:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime()); int initialDelay = 0; int period = 1; executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);
另外,這個方法還接收一個初始化延遲,用來指定這個任務(wù)首次被執(zhí)行等待的時長。
請記?。?b>scheduleAtFixedRate()并不考慮任務(wù)的實(shí)際用時。所以,如果你指定了一個period為1分鐘而任務(wù)需要執(zhí)行2分鐘,那么線程池為了性能會更快的執(zhí)行。
在這種情況下,你應(yīng)該考慮使用scheduleWithFixedDelay()。這個方法的工作方式與上我們上面描述的類似。不同之處在于等待時間 period 的應(yīng)用是在一次任務(wù)的結(jié)束和下一個任務(wù)的開始之間。例如:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Runnable task = () -> { try { TimeUnit.SECONDS.sleep(2); System.out.println("Scheduling: " + System.nanoTime()); } catch (InterruptedException e) { System.err.println("task interrupted"); } }; executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);
這個例子調(diào)度了一個任務(wù),并在一次執(zhí)行的結(jié)束和下一次執(zhí)行的開始之間設(shè)置了一個1分鐘的固定延遲。初始化延遲為0,任務(wù)執(zhí)行時間為0。所以我們分別在0s,3s,6s,9s等間隔處結(jié)束一次執(zhí)行。如你所見,scheduleWithFixedDelay()在你不能預(yù)測調(diào)度任務(wù)的執(zhí)行時長時是很有用的。
這是并發(fā)系列教程的第一部分。我推薦你親手實(shí)踐一下上面的代碼示例。你可以從 Github 上找到這篇文章中所有的代碼示例,所以歡迎你fork這個倉庫,并收藏它。
我希望你會喜歡這篇文章。如果你有任何的問題都可以在下面評論或者通過 Twitter 向我反饋。
第一部分:線程和執(zhí)行器
第二部分:同步和鎖
第三部分:原子操作和 ConcurrentMap
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/64965.html
摘要:在接下來的分鐘,你將會學(xué)會如何通過同步關(guān)鍵字,鎖和信號量來同步訪問共享可變變量。所以在使用樂觀鎖時,你需要每次在訪問任何共享可變變量之后都要檢查鎖,來確保讀鎖仍然有效。 原文:Java 8 Concurrency Tutorial: Synchronization and Locks譯者:飛龍 協(xié)議:CC BY-NC-SA 4.0 歡迎閱讀我的Java8并發(fā)教程的第二部分。這份指南將...
摘要:并發(fā)教程原子變量和原文譯者飛龍協(xié)議歡迎閱讀我的多線程編程系列教程的第三部分。如果你能夠在多線程中同時且安全地執(zhí)行某個操作,而不需要關(guān)鍵字或上一章中的鎖,那么這個操作就是原子的。當(dāng)多線程的更新比讀取更頻繁時,這個類通常比原子數(shù)值類性能更好。 Java 8 并發(fā)教程:原子變量和 ConcurrentMap 原文:Java 8 Concurrency Tutorial: Synchroni...
以下是Java技術(shù)棧微信公眾號發(fā)布的關(guān)于 Java 的技術(shù)干貨,從以下幾個方面匯總。 Java 基礎(chǔ)篇 Java 集合篇 Java 多線程篇 Java JVM篇 Java 進(jìn)階篇 Java 新特性篇 Java 工具篇 Java 書籍篇 Java基礎(chǔ)篇 8張圖帶你輕松溫習(xí) Java 知識 Java父類強(qiáng)制轉(zhuǎn)換子類原則 一張圖搞清楚 Java 異常機(jī)制 通用唯一標(biāo)識碼UUID的介紹及使用 字符串...
并發(fā) 計(jì)算機(jī)用戶想當(dāng)然地認(rèn)為他們的系統(tǒng)一次可以做不止一件事,他們設(shè)想他們可以繼續(xù)在文字處理器中工作,而其他應(yīng)用程序則下載文件、管理打印隊(duì)列和流音頻,即使是單個應(yīng)用程序通常也希望一次完成多個任務(wù)。例如,流式音頻應(yīng)用程序必須同時從網(wǎng)絡(luò)上讀取數(shù)字音頻、解壓縮、管理回放并更新其顯示,甚至文字處理器應(yīng)始終準(zhǔn)備好響應(yīng)鍵盤和鼠標(biāo)事件,無論重新格式化文本或更新顯示有多繁忙,可以執(zhí)行此類操作的軟件稱為并發(fā)軟件。 J...
閱讀 3083·2023-04-25 18:54
閱讀 2601·2021-11-02 14:40
閱讀 3210·2021-09-23 11:58
閱讀 2441·2019-08-30 13:50
閱讀 1247·2019-08-29 12:46
閱讀 3134·2019-08-28 17:51
閱讀 690·2019-08-26 11:47
閱讀 912·2019-08-23 16:17