摘要:類提供了一個可擴(kuò)展的線程池實(shí)現(xiàn)。使用舉例這里有一個網(wǎng)絡(luò)服務(wù),其中一個線程池中的線程為請求提供服務(wù)。因此返回的中的每個都是完成狀態(tài)。執(zhí)行給定的一組任務(wù),如果其中一個任務(wù)成功完成沒有拋出異常則返回。
Executor
Executor是java.util.concurrent包中的一個接口,是一個執(zhí)行提交的Runnable任務(wù)的對象。這個接口提供了一種方式把任務(wù)提交從每個任務(wù)會如何執(zhí)行的方法中解耦,包括線城市用,調(diào)度等的細(xì)節(jié)。使用Executor代替了顯式創(chuàng)建線程。例如,比起對一組task中的每一個調(diào)用new Thread(new(RunnableTask())).start(),你可以用:
Executor executor = anExecutor; executor.execute(new RunnableTask1()); executor.execute(new RunnableTask2()); ...
但是,Executor接口不是嚴(yán)格需要執(zhí)行是異步的。在最簡單的情況中,一個executor能夠在調(diào)用者的線程上立即運(yùn)行提交的任務(wù):
class DirectExecutor implements Executor { public void execute(Runnable r) { r.run(); } }
更典型的是,任務(wù)執(zhí)行在非調(diào)用者線程。下面的executor為每個task產(chǎn)出一個新的線程:
class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } }
很多Executor的實(shí)現(xiàn)強(qiáng)制加入了一些關(guān)于如何以及何時任務(wù)被調(diào)度的限制。下面的executor串行提交的任務(wù)到第二個executor,表明它是一個混合的executor:
class SerialExecutor implements Executor { final Queuetasks = new ArrayDeque (); final Executor executor; Runnable active; SerialExecutor(Executor executor) { this.executor = executor; } public synchronized void execute(final Runnable r) { tasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (active == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((active = tasks.poll()) != null) { executor.execute(active); } } }
在java.util.concurrent包中提供的Executor接口的實(shí)現(xiàn)(如ThreadPoolExecutor,ScheduledThreadPoolExecutor,ForkJoinPool,AbstractExecutorService)也同時實(shí)現(xiàn)了ExecutorService,這是一個更廣泛的接口。ThreadPoolExecutor類提供了一個可擴(kuò)展的線程池實(shí)現(xiàn)。Executors類為這些Executors提供了方便的工廠方法。
內(nèi)存一致性效應(yīng):線程中在提交一個Runnable對象給一個Executor之前發(fā)生的操作happen-before執(zhí)行這個Runnable(可能在另一個線程中執(zhí)行)。
實(shí)現(xiàn)Executor接口需要實(shí)現(xiàn)execute方法,定義如下:
void execute(Runnbale command)
這個方法在未來某個時間點(diǎn)執(zhí)行給定的command。這個command可能執(zhí)行在一個新的線程中,在一個池化的線程中,或在調(diào)用者線程中,這取決于Executor的實(shí)現(xiàn)。
ExecutorServiceExecutorService接口繼承Executor接口,是提供管理終止的方法以及produce出Future去跟蹤一個或多個異步任務(wù)進(jìn)度的方法的Executor。
一個ExecutorService可以被shutdown,會導(dǎo)致它拒絕新的tasks。提供了兩個不同的方法去關(guān)閉一個ExecutorService。shutdown方法允許之前提交的任務(wù)在終止之前執(zhí)行,shutdownNow方法禁止等待已經(jīng)開始任務(wù)并試圖結(jié)束正在執(zhí)行的任務(wù)。如果一個ExecutorService終止了,一個executor沒有正在執(zhí)行的活躍任務(wù),沒有等待執(zhí)行的任務(wù),也沒有新任務(wù)能被提交。一個沒有使用的ExecutorService應(yīng)該被關(guān)閉以回收資源。
submit方法根據(jù)Executor的execute(Runnable)方法擴(kuò)展,創(chuàng)建并返回一個Future,能夠用來cancel執(zhí)行以及等待執(zhí)行完成。invokeAny方法以及invokeAll方法執(zhí)行最普通的批量執(zhí)行,執(zhí)行一組任務(wù)然后等待至少一個,或者所有任務(wù)完成。
Executors類為ExecutorService提供工廠方法。
使用舉例這里有一個網(wǎng)絡(luò)服務(wù),其中一個線程池中的線程為請求提供服務(wù)。它使用預(yù)配置的Executors的newFixedThreadPool工廠方法:
class NetworkService implements Runnable { private final ServerSocket serverSocket; private final ExecutorService pool; public NetworkService(int port, int poolSize) throws IOException { serverSocket = new ServerSocket(port); pool = Executors.newFixedTreadPool(poolSize); } public void run() { // run the service try { for (;;) { pool.execute(new Handler(serverSocket.accept())); } } catch (IOException ex) { pool.shutdown(); } } } class Handler implements Runnable { private final Socket socket; Handler(Socket socket) { this.socket = socket; } public void run() { // read and service request on socket } }
下面的方法哦通過兩步關(guān)閉一個ExecutorService,首先調(diào)用shutdown以拒絕新來的tasks,然后調(diào)用shutdownNow(如果有必要的話),去cancel任何執(zhí)行的任務(wù):
void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread.interrupt(); } }ExecutorService中定義的方法 shutdown
void shutdown()
啟動有序關(guān)閉,執(zhí)行先前提交的任務(wù),但是不接受新的任務(wù)。如果已經(jīng)關(guān)閉再次執(zhí)行沒有影響。
這個方法不等待先前提交的任務(wù)完成執(zhí)行,使用awaitTermination去等待任務(wù)執(zhí)行完畢。
shutdownNowListshutdownNow()
試圖停止所有正在執(zhí)行的任務(wù),停止等待任務(wù)的處理,返回一個等待執(zhí)行的任務(wù)列表。
這個方法不等待正在執(zhí)行的任務(wù)終止,使用awaitTermination去等待任務(wù)終止。
盡力(best-effort)去停止正在執(zhí)行的任務(wù)。例如,標(biāo)準(zhǔn)實(shí)現(xiàn)會通過Thread的interrupt去cancel任務(wù),因此任何回應(yīng)終止失敗的任務(wù)永遠(yuǎn)不會終止。
isShutdownboolean isShutdown()
如果executor已經(jīng)被shutdown則返回true。
isTerminatedboolean isTerminated()
如果所有任務(wù)在shutdown之后都執(zhí)行完成則返回true。注意isTerminated永遠(yuǎn)不會為true除非shutdown或shutdownNow先執(zhí)行。
awaitTerminationboolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
在一個shutdown請求后阻塞直到所有任務(wù)完成執(zhí)行,或者timeout發(fā)生,或者當(dāng)前線程被interrupt,看哪個先發(fā)生。
submitFuture submit(Callable task)
提交一個返回值的任務(wù)去執(zhí)行并返回一個代表任務(wù)掛起結(jié)果的Future對象。Future的get方法會返回成功完成的任務(wù)結(jié)果。
如果你想要立即阻塞等待一個任務(wù)完成,你可以使用result = exec.submit(aCallable).get()。
submitFuture submit(Runnable task, T result)
提交一個Runnable任務(wù)去執(zhí)行并返回一個代表這個任務(wù)的Future。Future的get方法會返回成功完成的任務(wù)結(jié)果。
submitFuture> submit(Runnable task)
提交一個Runnable任務(wù)去執(zhí)行并返回一個代表這個任務(wù)的Future。Future的get方法會返回成功完成的任務(wù)結(jié)果null。
invokeAllList > invokeAll(Collection extends Callable > tasks) throws InterruptedException
執(zhí)行給定的一組任務(wù),返回一個包含這些任務(wù)狀態(tài)和結(jié)果的Future列表。Future的isDone方法對返回列表中的每個元素調(diào)用都返回true。
這個方法會阻塞,等待所有task完成。因此返回的list中的每個Future都是完成狀態(tài)。
invokeAllList > invokeAll(Collection extends Callable > tasks, long timeout, TimeUnit unit) throws InterruptedException
執(zhí)行給定的一組任務(wù),返回一個包含這些任務(wù)狀態(tài)和結(jié)果的列表。所有任務(wù)完成或timeout超時時方法返回。Future的isDone方法對返回列表中的每個元素調(diào)用都返回true。
返回后,沒有完成的任務(wù)被cancel。
invokeAnyT invokeAny(Collection extends Callable > tasks) throws InterruptedException, ExcutionException
執(zhí)行給定的一組任務(wù),如果其中一個任務(wù)成功完成(沒有拋出異常)則返回。返回后,所有沒完成的任務(wù)都被cancel。
invokeAnyT invokeAny(Collection extends Callable > tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
執(zhí)行給定的一組任務(wù),如果其中一個任務(wù)成功完成(沒有拋出異常)則返回。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/75979.html
摘要:線程的啟動與銷毀都與本地線程同步。操作系統(tǒng)會調(diào)度所有線程并將它們分配給可用的??蚣艿某蓡T主要成員線程池接口接口接口以及工具類。創(chuàng)建單個線程的接口與其實(shí)現(xiàn)類用于表示異步計算的結(jié)果。參考書籍并發(fā)編程的藝術(shù)方騰飛魏鵬程曉明著 在java中,直接使用線程來異步的執(zhí)行任務(wù),線程的每次創(chuàng)建與銷毀需要一定的計算機(jī)資源開銷。每個任務(wù)創(chuàng)建一個線程的話,當(dāng)任務(wù)數(shù)量多的時候,則對應(yīng)的創(chuàng)建銷毀開銷會消耗大量...
摘要:能夠異步的執(zhí)行任務(wù),并且通常管理一個線程池。這樣我們就不用手動的去創(chuàng)建線程了,線程池中的所有線程都將被重用。在之后不能再提交任務(wù)到線程池。它不使用固定大小的線程池,默認(rèn)情況下是主機(jī)的可用內(nèi)核數(shù)。 原文地址: Java 8 Concurrency Tutorial: Threads and Executors Java 5 初次引入了Concurrency API,并在隨后的發(fā)布版本中...
摘要:注意線程與本地操作系統(tǒng)的線程是一一映射的。固定線程數(shù)的線程池提供了兩種創(chuàng)建具有固定線程數(shù)的的方法,固定線程池在初始化時確定其中的線程總數(shù),運(yùn)行過程中會始終維持線程數(shù)量不變。 showImg(https://segmentfault.com/img/bVbhK58?w=1920&h=1080); 本文首發(fā)于一世流云專欄:https://segmentfault.com/blog... ...
摘要:在這個示例中我們使用了一個單線程線程池的。在延遲消逝后,任務(wù)將會并發(fā)執(zhí)行。這是并發(fā)系列教程的第一部分。第一部分線程和執(zhí)行器第二部分同步和鎖第三部分原子操作和 Java 8 并發(fā)教程:線程和執(zhí)行器 原文:Java 8 Concurrency Tutorial: Threads and Executors 譯者:BlankKelly 來源:Java8并發(fā)教程:Threads和Execut...
摘要:有三種狀態(tài)運(yùn)行關(guān)閉終止。類類,提供了一系列工廠方法用于創(chuàng)建線程池,返回的線程池都實(shí)現(xiàn)了接口。線程池的大小一旦達(dá)到最大值就會保持不變,在提交新任務(wù),任務(wù)將會進(jìn)入等待隊(duì)列中等待。此線程池支持定時以及周期性執(zhí)行任務(wù)的需求。 這是java高并發(fā)系列第19篇文章。 本文主要內(nèi)容 介紹Executor框架相關(guān)內(nèi)容 介紹Executor 介紹ExecutorService 介紹線程池ThreadP...
閱讀 3937·2021-11-24 10:46
閱讀 1822·2021-11-16 11:44
閱讀 2302·2021-09-22 16:02
閱讀 1424·2019-08-30 15:55
閱讀 1139·2019-08-30 12:46
閱讀 573·2019-08-28 18:31
閱讀 2771·2019-08-26 18:38
閱讀 1106·2019-08-23 16:51