摘要:基礎(chǔ)系列的與方法類(lèi)初始化順序線(xiàn)程池如何彈性伸縮的幾個(gè)要點(diǎn)的緩存什么場(chǎng)景下使用阻塞隊(duì)列的使用及模式中的序本文主要分析線(xiàn)程池是如何進(jìn)行線(xiàn)程的彈性伸縮。線(xiàn)程池最小是,最大是,除非設(shè)置了和超時(shí)時(shí)間,這種情況線(xiàn)程數(shù)可能減少到,最大可能是。
Java基礎(chǔ)系列
Java的hashcode與equals方法
Java類(lèi)初始化順序
ThreadPoolExecutor線(xiàn)程池如何彈性伸縮
HashMap的幾個(gè)要點(diǎn)
Integer的緩存
什么場(chǎng)景下使用阻塞隊(duì)列
volatile的使用及DCL模式
try-catch-finally中的return
序本文主要分析Java7線(xiàn)程池是如何進(jìn)行線(xiàn)程的彈性伸縮。
一、worker線(xiàn)程while循環(huán)利用空閑線(xiàn)程final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }二、獲取任務(wù)時(shí)從等待隊(duì)列中取任務(wù)
/** * Performs blocking or timed wait for a task, depending on * current configuration settings, or returns null if this worker * must exit because of any of: * 1. There are more than maximumPoolSize workers (due to * a call to setMaximumPoolSize). * 2. The pool is stopped. * 3. The pool is shutdown and the queue is empty. * 4. This worker timed out waiting for a task, and timed-out * workers are subject to termination (that is, * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) * both before and after the timed wait. * * @return task, or null if the worker must exit, in which case * workerCount is decremented */ private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } boolean timed; // Are workers subject to culling? for (;;) { int wc = workerCountOf(c); timed = allowCoreThreadTimeOut || wc > corePoolSize; if (wc <= maximumPoolSize && ! (timedOut && timed)) break; if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }三、等待隊(duì)列沒(méi)有任務(wù)時(shí)銷(xiāo)毀并維持必要的線(xiàn)程池大小
/** * Performs cleanup and bookkeeping for a dying worker. Called * only from worker threads. Unless completedAbruptly is set, * assumes that workerCount has already been adjusted to account * for exit. This method removes thread from worker set, and * possibly terminates the pool or replaces the worker if either * it exited due to user task exception or if fewer than * corePoolSize workers are running or queue is non-empty but * there are no workers. * * @param w the worker * @param completedAbruptly if the worker died due to user exception */ private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn"t adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
如果設(shè)置了keepAliveTime參數(shù),那么當(dāng)timeout的時(shí)候,就return null,就會(huì)跳出循環(huán),回收線(xiàn)程
if (wc <= maximumPoolSize && ! (timedOut && timed)) break; if (compareAndDecrementWorkerCount(c)) return null;
每銷(xiāo)毀一個(gè),判斷如果銷(xiāo)毀后,Worker個(gè)數(shù)小于corePoolSize,就新增一個(gè)新Worker。
線(xiàn)程池最小是corePoolSize,最大是maximumPoolSize,除非設(shè)置了allowCoreThreadTimeOut和超時(shí)時(shí)間,這種情況線(xiàn)程數(shù)可能減少到0,最大可能是Integer.MAX_VALUE。
Core pool size is the minimum number of workers to keep alive
(and not allow to time out etc) unless allowCoreThreadTimeOut
is set, in which case the minimum is zero.
四、執(zhí)行任務(wù)時(shí)不夠時(shí)添加/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn"t, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }五、運(yùn)行時(shí)更改corePoolSize
/** * Sets the core number of threads. This overrides any value set * in the constructor. If the new value is smaller than the * current value, excess existing threads will be terminated when * they next become idle. If larger, new threads will, if needed, * be started to execute any queued tasks. * * @param corePoolSize the new core size * @throws IllegalArgumentException if {@code corePoolSize < 0} * @see #getCorePoolSize */ public void setCorePoolSize(int corePoolSize) { if (corePoolSize < 0) throw new IllegalArgumentException(); int delta = corePoolSize - this.corePoolSize; this.corePoolSize = corePoolSize; if (workerCountOf(ctl.get()) > corePoolSize) interruptIdleWorkers(); else if (delta > 0) { // We don"t really know how many new threads are "needed". // As a heuristic, prestart enough new workers (up to new // core size) to handle the current number of tasks in // queue, but stop if queue becomes empty while doing so. int k = Math.min(delta, workQueue.size()); while (k-- > 0 && addWorker(null, true)) { if (workQueue.isEmpty()) break; } } }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/65558.html
摘要:最后,我們會(huì)通過(guò)對(duì)源代碼的剖析深入了解線(xiàn)程池的運(yùn)行過(guò)程和具體設(shè)計(jì),真正達(dá)到知其然而知其所以然的水平。創(chuàng)建線(xiàn)程池既然線(xiàn)程池是一個(gè)類(lèi),那么最直接的使用方法一定是一個(gè)類(lèi)的對(duì)象,例如。單線(xiàn)程線(xiàn)程池單線(xiàn)程線(xiàn)程 我們一般不會(huì)選擇直接使用線(xiàn)程類(lèi)Thread進(jìn)行多線(xiàn)程編程,而是使用更方便的線(xiàn)程池來(lái)進(jìn)行任務(wù)的調(diào)度和管理。線(xiàn)程池就像共享單車(chē),我們只要在我們有需要的時(shí)候去獲取就可以了。甚至可以說(shuō)線(xiàn)程池更棒,...
摘要:提交任務(wù)當(dāng)創(chuàng)建了一個(gè)線(xiàn)程池之后我們就可以將任務(wù)提交到線(xiàn)程池中執(zhí)行了。提交任務(wù)到線(xiàn)程池中相當(dāng)簡(jiǎn)單,我們只要把原來(lái)傳入類(lèi)構(gòu)造器的對(duì)象傳入線(xiàn)程池的方法或者方法就可以了。 我們一般不會(huì)選擇直接使用線(xiàn)程類(lèi)Thread進(jìn)行多線(xiàn)程編程,而是使用更方便的線(xiàn)程池來(lái)進(jìn)行任務(wù)的調(diào)度和管理。線(xiàn)程池就像共享單車(chē),我們只要在我們有需要的時(shí)候去獲取就可以了。甚至可以說(shuō)線(xiàn)程池更棒,我們只需要把任務(wù)提交給它,它就會(huì)在合...
摘要:也是自帶的一個(gè)基于線(xiàn)程池設(shè)計(jì)的定時(shí)任務(wù)類(lèi)。其每個(gè)調(diào)度任務(wù)都會(huì)分配到線(xiàn)程池中的一個(gè)線(xiàn)程執(zhí)行,所以其任務(wù)是并發(fā)執(zhí)行的,互不影響。 原創(chuàng)不易,如需轉(zhuǎn)載,請(qǐng)注明出處https://www.cnblogs.com/baixianlong/p/10659045.html,否則將追究法律責(zé)任!??! 一、在JAVA開(kāi)發(fā)領(lǐng)域,目前可以通過(guò)以下幾種方式進(jìn)行定時(shí)任務(wù) 1、單機(jī)部署模式 Timer:jdk中...
摘要:在中一般來(lái)說(shuō)通過(guò)來(lái)創(chuàng)建所需要的線(xiàn)程池,如高并發(fā)原理初探后端掘金閱前熱身為了更加形象的說(shuō)明同步異步阻塞非阻塞,我們以小明去買(mǎi)奶茶為例。 AbstractQueuedSynchronizer 超詳細(xì)原理解析 - 后端 - 掘金今天我們來(lái)研究學(xué)習(xí)一下AbstractQueuedSynchronizer類(lèi)的相關(guān)原理,java.util.concurrent包中很多類(lèi)都依賴(lài)于這個(gè)類(lèi)所提供的隊(duì)列式...
摘要:在中一般來(lái)說(shuō)通過(guò)來(lái)創(chuàng)建所需要的線(xiàn)程池,如高并發(fā)原理初探后端掘金閱前熱身為了更加形象的說(shuō)明同步異步阻塞非阻塞,我們以小明去買(mǎi)奶茶為例。 AbstractQueuedSynchronizer 超詳細(xì)原理解析 - 后端 - 掘金今天我們來(lái)研究學(xué)習(xí)一下AbstractQueuedSynchronizer類(lèi)的相關(guān)原理,java.util.concurrent包中很多類(lèi)都依賴(lài)于這個(gè)類(lèi)所提供的隊(duì)列式...
閱讀 3527·2021-10-08 10:04
閱讀 872·2019-08-30 15:54
閱讀 2189·2019-08-29 16:09
閱讀 1354·2019-08-29 15:41
閱讀 2285·2019-08-29 11:01
閱讀 1743·2019-08-26 13:51
閱讀 1035·2019-08-26 13:25
閱讀 1834·2019-08-26 13:24