摘要:接下來(lái)就是會(huì)把任務(wù)提交到隊(duì)列中給線程池調(diào)度處理因?yàn)橹饕P(guān)心的是這個(gè)線程怎么執(zhí)行,異常的拋出和處理,所以我們暫時(shí)不解析多余的邏輯。
前言
今天遇到了一個(gè)bug,現(xiàn)象是,一個(gè)任務(wù)放入線程池中,似乎“沒(méi)有被執(zhí)行”,日志也沒(méi)有打。
經(jīng)過(guò)本地代碼調(diào)試之后,發(fā)現(xiàn)在任務(wù)邏輯的前半段,拋出了NPE,但是代碼外層沒(méi)有try-catch,導(dǎo)致這個(gè)異常被吃掉。
這個(gè)問(wèn)題解決起來(lái)是很簡(jiǎn)單的,外層加個(gè)try-catch就好了,但是這個(gè)異常如果沒(méi)有被catch,線程池內(nèi)部邏輯是怎么處理這個(gè)異常的呢?這個(gè)異常最后會(huì)跑到哪里呢?
帶著疑問(wèn)和好奇心,我研究了一下線程池那一塊的源碼,并且做了以下的總結(jié)。
源碼分析項(xiàng)目中出問(wèn)題的代碼差不多就是下面這個(gè)樣子
ExecutorService threadPool = Executors.newFixedThreadPool(3); threadPool.submit(() -> { String pennyStr = null; Double penny = Double.valueOf(pennyStr); ... })
先進(jìn)到newFixedThreadPool這個(gè)工廠方法中看生成的具體實(shí)現(xiàn)類,發(fā)現(xiàn)是ThreadPoolExecutor
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
再看這個(gè)類的繼承關(guān)系,
再進(jìn)到submit方法,這個(gè)方法在ExecutorService接口中約定,其實(shí)是在AbstractExectorService中實(shí)現(xiàn),ThreadPoolExecutor并沒(méi)有override這個(gè)方法。
public Future> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFutureftask = newTaskFor(task, null); execute(ftask); return ftask; } protected RunnableFuture newTaskFor(Runnable runnable, T value) { return new FutureTask (runnable, value); }
對(duì)應(yīng)的FutureTask對(duì)象的構(gòu)造方法
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // state由volatile 修飾 保證多線程下的可見(jiàn)性 }
對(duì)應(yīng)Callable 對(duì)象的構(gòu)造方法
public staticCallable callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter (task, result); }
對(duì)應(yīng)RunnableAdapter 對(duì)象的構(gòu)造方法
/** * A callable that runs given task and returns given result * 一個(gè)能執(zhí)行所給任務(wù)并且返回結(jié)果的Callable對(duì)象 */ static final class RunnableAdapterimplements Callable { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
總結(jié)上面的,newTaskFor就是把我們提交的Runnable 對(duì)象包裝成了一個(gè)Future。
接下來(lái)就是會(huì)把任務(wù)提交到隊(duì)列中給線程池調(diào)度處理:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
因?yàn)橹饕P(guān)心的是這個(gè)線程怎么執(zhí)行,異常的拋出和處理,所以我們暫時(shí)不解析多余的邏輯。很容易發(fā)現(xiàn),如果任務(wù)要被執(zhí)行,肯定是進(jìn)到了addWorker方法當(dāng)中,所以我們?cè)龠M(jìn)去看,鑒于addWorker方法的很長(zhǎng),不想列太多的代碼,我就摘了關(guān)鍵代碼段:
private boolean addWorker(Runnable firstTask, boolean core) { ... boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 實(shí)例化一個(gè)worker對(duì)象 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 從Worker對(duì)象的構(gòu)造方法看,當(dāng)這個(gè)thread對(duì)象start之后, // 之后實(shí)際上就是調(diào)用Worker對(duì)象的run() t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } // Worker的構(gòu)造方法 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
我們?cè)倏催@個(gè)ThreadPoolExecutor的內(nèi)部類Worker對(duì)象:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { ... /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } ... }
看來(lái)真正執(zhí)行任務(wù)的是在這個(gè)外部的runWorker當(dāng)中,讓我們?cè)倏纯催@個(gè)方法是怎么消費(fèi)Worker線程的。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; // ==== 關(guān)鍵代碼 start ==== try { // 很簡(jiǎn)潔明了,調(diào)用了任務(wù)的run方法 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } // ==== 關(guān)鍵代碼 end ==== } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
終于走到底了,可以看到關(guān)鍵代碼中的try-catch block代碼塊中,調(diào)用了本次執(zhí)行任務(wù)的run方法。
// ==== 關(guān)鍵代碼 start ==== try { // 很簡(jiǎn)潔明了,調(diào)用了任務(wù)的run方法 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } // ==== 關(guān)鍵代碼 end ====
可以看到捕捉了異常之后,會(huì)再向外拋出,只不過(guò)再finally block 中有個(gè)afterExecute()方法,似乎在這里是可以處理這個(gè)異常信息的,進(jìn)去看看
protected void afterExecute(Runnable r, Throwable t) { }
可以看到ThreadPoolExecutor#afterExecute()方法中,是什么都沒(méi)做的,看來(lái)是讓使用者通過(guò)override這個(gè)方法來(lái)定制化任務(wù)執(zhí)行之后的邏輯,其中可以包括異常處理。
那么這個(gè)異常到底是拋到哪里去了呢。我在一個(gè)大佬的文章找到了hotSpot JVM處理線程異常的邏輯,
if (!destroy_vm || JDK_Version::is_jdk12x_version()) { // JSR-166: change call from from ThreadGroup.uncaughtException to // java.lang.Thread.dispatchUncaughtException if (uncaught_exception.not_null()) { //如果有未捕獲的異常 Handle group(this, java_lang_Thread::threadGroup(threadObj())); { KlassHandle recvrKlass(THREAD, threadObj->klass()); CallInfo callinfo; KlassHandle thread_klass(THREAD, SystemDictionary::Thread_klass()); /* 這里類似一個(gè)方法表,實(shí)際就會(huì)去調(diào)用Thread#dispatchUncaughtException方法 template(dispatchUncaughtException_name, "dispatchUncaughtException") */ LinkResolver::resolve_virtual_call(callinfo, threadObj, recvrKlass, thread_klass, vmSymbols::dispatchUncaughtException_name(), vmSymbols::throwable_void_signature(), KlassHandle(), false, false, THREAD); CLEAR_PENDING_EXCEPTION; methodHandle method = callinfo.selected_method(); if (method.not_null()) { JavaValue result(T_VOID); JavaCalls::call_virtual(&result, threadObj, thread_klass, vmSymbols::dispatchUncaughtException_name(), vmSymbols::throwable_void_signature(), uncaught_exception, THREAD); } else { KlassHandle thread_group(THREAD, SystemDictionary::ThreadGroup_klass()); JavaValue result(T_VOID); JavaCalls::call_virtual(&result, group, thread_group, vmSymbols::uncaughtException_name(), vmSymbols::thread_throwable_void_signature(), threadObj, // Arg 1 uncaught_exception, // Arg 2 THREAD); } if (HAS_PENDING_EXCEPTION) { ResourceMark rm(this); jio_fprintf(defaultStream::error_stream(), " Exception: %s thrown from the UncaughtExceptionHandler" " in thread "%s" ", pending_exception()->klass()->external_name(), get_thread_name()); CLEAR_PENDING_EXCEPTION; } } }
代碼是C寫(xiě)的,有興趣可以去全文,根據(jù)英文注釋能稍微看懂一點(diǎn)
http://hg.openjdk.java.net/jd...
可以看到這里最終會(huì)去調(diào)用Thread#dispatchUncaughtException方法:
/** * Dispatch an uncaught exception to the handler. This method is * intended to be called only by the JVM. */ private void dispatchUncaughtException(Throwable e) { getUncaughtExceptionHandler().uncaughtException(this, e); }
/** * Called by the Java Virtual Machine when a thread in this * thread group stops because of an uncaught exception, and the thread * does not have a specific {@link Thread.UncaughtExceptionHandler} * installed. * */ public void uncaughtException(Thread t, Throwable e) { if (parent != null) { parent.uncaughtException(t, e); } else { Thread.UncaughtExceptionHandler ueh = Thread.getDefaultUncaughtExceptionHandler(); if (ueh != null) { ueh.uncaughtException(t, e); } else if (!(e instanceof ThreadDeath)) { //可以看到會(huì)打到System.err里面 System.err.print("Exception in thread "" + t.getName() + "" "); e.printStackTrace(System.err); } } }
jdk的注釋也說(shuō)明的很清楚了,當(dāng)一個(gè)線程拋出了一個(gè)未捕獲的異常,JVM會(huì)去調(diào)用這個(gè)方法。如果當(dāng)前線程沒(méi)有聲明UncaughtExceptionHandler成員變量并且重寫(xiě)uncaughtException方法的時(shí)候,就會(huì)看線程所屬的線程組(如果有線程組的話)有沒(méi)有這個(gè)類,沒(méi)有就會(huì)打到System.err里面。
IBM這篇文章也提倡我們使用ThreadGroup 提供的 uncaughtException 處理程序來(lái)在線程異常終止時(shí)進(jìn)行檢測(cè)。
https://www.ibm.com/developer...總結(jié) (解決方法)
從上述源碼分析中可以看到,對(duì)于本篇的異?!氨怀缘簟钡膯?wèn)題,有以下幾種方法
用try-catch 捕捉,一般都是用這種
線程或者線程組對(duì)象設(shè)置UncaughtExceptionHandler成員變量
Thread t = new Thread(r); t.setUncaughtExceptionHandler( (t1, e) -> LOGGER.error(t1 + " throws exception: " + e)); return t;
override 線程池的afterExecute方法。
本篇雖然是提出問(wèn)題的解決方法,但主旨還是分析源碼,了解了整個(gè)過(guò)程中異常的經(jīng)過(guò)的流程,希望能對(duì)您產(chǎn)生幫助。
參考https://www.jcp.org/en/jsr/de...
https://www.ibm.com/developer...
http://ifeve.com/%E6%B7%B1%E5...
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/74301.html
摘要:先看寫(xiě)的簡(jiǎn)略的代碼線程池中發(fā)現(xiàn)異常,被中斷線程池中發(fā)現(xiàn)異常,被中斷我這是一個(gè)訂單處理流程,主要用到了一個(gè)方法,就是。好了,以上就是對(duì)線程池異常捕捉的一個(gè)記錄。 開(kāi)發(fā)自己的項(xiàng)目有一段時(shí)間了,因?yàn)槭莻€(gè)長(zhǎng)時(shí)間跑的服務(wù)器端程序,所以異常處理顯得尤為重要。 對(duì)于異常的抓取和日志(狹義上的日志)的分析一點(diǎn)都不能落下。 我們使用了Java自帶的Executor模塊,我只是稍微看了下Executor...
摘要:死亡狀態(tài)有兩個(gè)原因會(huì)導(dǎo)致線程死亡方法正常退出而自然死亡。一個(gè)未捕獲的異常終止了方法而使線程猝死。注意,放入的線程不必?fù)?dān)心其結(jié)束,超過(guò)不活動(dòng),其會(huì)自動(dòng)被終止。線程間相互干擾描述了當(dāng)多個(gè)線程訪問(wèn)共享數(shù)據(jù)時(shí)可能出現(xiàn)的錯(cuò)誤。 線程 進(jìn)程與線程的區(qū)別 線程是指進(jìn)程內(nèi)的一個(gè)執(zhí)行單元,也是進(jìn)程內(nèi)的可調(diào)度實(shí)體。一個(gè)程序至少有一個(gè)進(jìn)程,一個(gè)進(jìn)程至少有一個(gè)線程。 線程的五大狀態(tài) 新建狀態(tài)(New):例如...
摘要:進(jìn)程線程與協(xié)程它們都是并行機(jī)制的解決方案。選擇是任意性的,并在對(duì)實(shí)現(xiàn)做出決定時(shí)發(fā)生。線程池的大小一旦達(dá)到最大值就會(huì)保持不變,如果某個(gè)線程因?yàn)閳?zhí)行異常而結(jié)束,那么線程池會(huì)補(bǔ)充一個(gè)新線程。此線程池支持定時(shí)以及周期性執(zhí)行任務(wù)的需求。 并發(fā)與并行的概念 并發(fā)(Concurrency): 問(wèn)題域中的概念—— 程序需要被設(shè)計(jì)成能夠處理多個(gè)同時(shí)(或者幾乎同時(shí))發(fā)生的事件 并行(Parallel...
摘要:本文對(duì)多線程基礎(chǔ)知識(shí)進(jìn)行梳理,主要包括多線程的基本使用,對(duì)象及變量的并發(fā)訪問(wèn),線程間通信,的使用,定時(shí)器,單例模式,以及線程狀態(tài)與線程組。源碼采用構(gòu)建,多線程這部分源碼位于模塊中。通知可能等待該對(duì)象的對(duì)象鎖的其他線程。 本文對(duì)多線程基礎(chǔ)知識(shí)進(jìn)行梳理,主要包括多線程的基本使用,對(duì)象及變量的并發(fā)訪問(wèn),線程間通信,lock的使用,定時(shí)器,單例模式,以及線程狀態(tài)與線程組。 寫(xiě)在前面 花了一周時(shí)...
摘要:基礎(chǔ)問(wèn)題的的性能及原理之區(qū)別詳解備忘筆記深入理解流水線抽象關(guān)鍵字修飾符知識(shí)點(diǎn)總結(jié)必看篇中的關(guān)鍵字解析回調(diào)機(jī)制解讀抽象類與三大特征時(shí)間和時(shí)間戳的相互轉(zhuǎn)換為什么要使用內(nèi)部類對(duì)象鎖和類鎖的區(qū)別,,優(yōu)缺點(diǎn)及比較提高篇八詳解內(nèi)部類單例模式和 Java基礎(chǔ)問(wèn)題 String的+的性能及原理 java之yield(),sleep(),wait()區(qū)別詳解-備忘筆記 深入理解Java Stream流水...
閱讀 2091·2023-04-25 19:03
閱讀 1244·2021-10-14 09:42
閱讀 3423·2021-09-22 15:16
閱讀 1008·2021-09-10 10:51
閱讀 1600·2021-09-06 15:00
閱讀 2414·2019-08-30 15:55
閱讀 497·2019-08-29 16:22
閱讀 905·2019-08-26 13:49