摘要:序本文主要來(lái)展示一下簡(jiǎn)版的線程池的實(shí)現(xiàn)。默認(rèn)提供了幾個(gè)工廠方法思路主要用到的是雙端隊(duì)列,不過(guò)這里我們粗糙的實(shí)現(xiàn)的話,也可以不用到。測(cè)試實(shí)例輸出從數(shù)據(jù)來(lái)看,還是相對(duì)均勻的。
序
本文主要來(lái)展示一下簡(jiǎn)版的work stealing線程池的實(shí)現(xiàn)。
ExecutorsExecutors默認(rèn)提供了幾個(gè)工廠方法
/** * Creates a thread pool that maintains enough threads to support * the given parallelism level, and may use multiple queues to * reduce contention. The parallelism level corresponds to the * maximum number of threads actively engaged in, or available to * engage in, task processing. The actual number of threads may * grow and shrink dynamically. A work-stealing pool makes no * guarantees about the order in which submitted tasks are * executed. * * @param parallelism the targeted parallelism level * @return the newly created thread pool * @throws IllegalArgumentException if {@code parallelism <= 0} * @since 1.8 */ public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } /** * Creates a work-stealing thread pool using all * {@link Runtime#availableProcessors available processors} * as its target parallelism level. * @return the newly created thread pool * @see #newWorkStealingPool(int) * @since 1.8 */ public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }思路
ForkJoinPool主要用到的是雙端隊(duì)列,不過(guò)這里我們粗糙的實(shí)現(xiàn)的話,也可以不用到deque。
public class WorkStealingChannel{ private static final Logger LOGGER = LoggerFactory.getLogger(WorkStealingChannel.class); BlockingDeque [] managedQueues; AtomicLongMap stat = AtomicLongMap.create(); public WorkStealingChannel() { int nCPU = Runtime.getRuntime().availableProcessors(); int queueCount = nCPU / 2 + 1; managedQueues = new LinkedBlockingDeque[queueCount]; for(int i=0;i (); } } public void put(T item) throws InterruptedException { int targetIndex = Math.abs(item.hashCode() % managedQueues.length); BlockingQueue targetQueue = managedQueues[targetIndex]; targetQueue.put(item); } public T take() throws InterruptedException { int rdnIdx = ThreadLocalRandom.current().nextInt(managedQueues.length); int idx = rdnIdx; while (true){ idx = idx % managedQueues.length; T item = null; if(idx == rdnIdx){ item = managedQueues[idx].poll(); }else{ item = managedQueues[idx].pollLast(); } if(item != null){ LOGGER.info("take ele from queue {}",idx); stat.addAndGet(idx,1); return item; } idx++; if(idx == rdnIdx){ break; } } //走完一輪沒(méi)有,則隨機(jī)取一個(gè)等待 LOGGER.info("wait for queue:{}",rdnIdx); stat.addAndGet(rdnIdx,1); return managedQueues[rdnIdx].take(); } public AtomicLongMap getStat() { return stat; } }
這里根據(jù)cpu的數(shù)量建立了幾個(gè)deque,然后每次put的時(shí)候,根據(jù)hashcode取模放到對(duì)應(yīng)的隊(duì)列。然后獲取的時(shí)候,先從隨機(jī)一個(gè)隊(duì)列取,沒(méi)有的話,再robbin round取其他隊(duì)列的,還沒(méi)有的話,則阻塞等待指定隊(duì)列的元素。
測(cè)試實(shí)例
public class WorkStealingDemo { static final WorkStealingChannelchannel = new WorkStealingChannel<>(); static volatile boolean running = true; static class Producer extends Thread{ @Override public void run() { while(running){ try { channel.put(UUID.randomUUID().toString()); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class Consumer extends Thread{ @Override public void run() { while(running){ try { String value = channel.take(); System.out.println(value); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void stop(){ running = false; System.out.println(channel.getStat()); } public static void main(String[] args) throws InterruptedException { int nCPU = Runtime.getRuntime().availableProcessors(); int consumerCount = nCPU / 2 + 1; for (int i = 0; i < nCPU; i++) { new Producer().start(); } for (int i = 0; i < consumerCount; i++) { new Consumer().start(); } Thread.sleep(30*1000); stop(); } }
輸出
{0=660972, 1=660613, 2=661537, 3=659846, 4=659918}
從數(shù)據(jù)來(lái)看,還是相對(duì)均勻的。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/70405.html
執(zhí)行器 在前面的所有示例中,由新的線程(由其Runnable對(duì)象定義)和線程本身(由Thread對(duì)象定義)完成的任務(wù)之間存在緊密的聯(lián)系,這適用于小型應(yīng)用程序,但在大型應(yīng)用程序中,將線程管理和創(chuàng)建與應(yīng)用程序的其余部分分開(kāi)是有意義的,封裝這些函數(shù)的對(duì)象稱為執(zhí)行器,以下小節(jié)詳細(xì)描述了執(zhí)行器。 執(zhí)行器接口定義三個(gè)執(zhí)行器對(duì)象類型。 線程池是最常見(jiàn)的執(zhí)行器實(shí)現(xiàn)類型。 Fork/Join是一個(gè)利用多個(gè)處理器的...
摘要:大多數(shù)待遇豐厚的開(kāi)發(fā)職位都要求開(kāi)發(fā)者精通多線程技術(shù)并且有豐富的程序開(kāi)發(fā)調(diào)試優(yōu)化經(jīng)驗(yàn),所以線程相關(guān)的問(wèn)題在面試中經(jīng)常會(huì)被提到。掌握了這些技巧,你就可以輕松應(yīng)對(duì)多線程和并發(fā)面試了。進(jìn)入等待通行準(zhǔn)許時(shí),所提供的對(duì)象。 最近看到網(wǎng)上流傳著,各種面試經(jīng)驗(yàn)及面試題,往往都是一大堆技術(shù)題目貼上去,而沒(méi)有答案。 不管你是新程序員還是老手,你一定在面試中遇到過(guò)有關(guān)線程的問(wèn)題。Java語(yǔ)言一個(gè)重要的特點(diǎn)就...
摘要:同時(shí),它會(huì)通過(guò)的方法將自己注冊(cè)到線程池中。線程池中的每個(gè)工作線程都有一個(gè)自己的任務(wù)隊(duì)列,工作線程優(yōu)先處理自身隊(duì)列中的任務(wù)或順序,由線程池構(gòu)造時(shí)的參數(shù)決定,自身隊(duì)列為空時(shí),以的順序隨機(jī)竊取其它隊(duì)列中的任務(wù)。 showImg(https://segmentfault.com/img/bVbizJb?w=1802&h=762); 本文首發(fā)于一世流云的專欄:https://segmentfau...
摘要:方法接受對(duì)象數(shù)組作為參數(shù),目標(biāo)是對(duì)數(shù)組進(jìn)行升序排序。創(chuàng)建一個(gè)對(duì)象,并調(diào)用方法將它提交給線程池。此排序算法不直接返回結(jié)果給調(diào)用方,因此基于類。 分支/合并框架 說(shuō)明 重點(diǎn)是那個(gè)浮點(diǎn)數(shù)數(shù)組排序的例子,從主函數(shù)展開(kāi),根據(jù)序號(hào)看 1、GitHub代碼歡迎star。你們輕輕的一點(diǎn),對(duì)我鼓勵(lì)特大,我有一個(gè)習(xí)慣,看完別人的文章是會(huì)點(diǎn)贊的。 2、個(gè)人認(rèn)為學(xué)習(xí)語(yǔ)言最好的方式就是模仿、思考別人為什么這么寫...
摘要:并不會(huì)為每個(gè)任務(wù)都創(chuàng)建工作線程,而是根據(jù)實(shí)際情況構(gòu)造線程池時(shí)的參數(shù)確定是喚醒已有空閑工作線程,還是新建工作線程。 showImg(https://segmentfault.com/img/bVbiYSP?w=1071&h=707); 本文首發(fā)于一世流云的專欄:https://segmentfault.com/blog... 一、引言 前一章——Fork/Join框架(1) 原理,我們...
閱讀 1587·2021-10-18 13:35
閱讀 2370·2021-10-09 09:44
閱讀 825·2021-10-08 10:05
閱讀 2723·2021-09-26 09:47
閱讀 3578·2021-09-22 15:22
閱讀 441·2019-08-29 12:24
閱讀 2004·2019-08-29 11:06
閱讀 2862·2019-08-26 12:23