摘要:對(duì)線程池的研究是之前對(duì)分析的附加工作。在之前對(duì)源碼分析的文章中,寫(xiě)到調(diào)度器將任務(wù)放入線程池的函數(shù)這里分析的線程池類是,也就是上述代碼中所使用的類。
對(duì)Python線程池的研究是之前對(duì)Apshceduler分析的附加工作。
在之前對(duì)Apshceduler源碼分析的文章中,寫(xiě)到調(diào)度器將任務(wù)放入線程池的函數(shù)
def _do_submit_job(self, job, run_times): def callback(f): exc, tb = (f.exception_info() if hasattr(f, "exception_info") else (f.exception(), getattr(f.exception(), "__traceback__", None))) if exc: self._run_job_error(job.id, exc, tb) else: self._run_job_success(job.id, f.result()) f = self._pool.submit(_run_job, job, job._jobstore_alias, run_times, self._logger.name) f.add_done_callback(callback)
這里分析的線程池類是concurrent.futures.ThreadPoolExecutor,也就是上述代碼中self._pool所使用的類。先上self._pool.submit函數(shù)的代碼,再做詳細(xì)分析
def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._shutdown: raise RuntimeError("cannot schedule new futures after shutdown") f = _base.Future() w = _WorkItem(f, fn, args, kwargs) self._work_queue.put(w) self._adjust_thread_count() return f
f和w是兩個(gè)非常重要的變量,f作為submit返回的對(duì)象,submit函數(shù)的調(diào)用者可以對(duì)其添加回調(diào),待fn執(zhí)行完成后,會(huì)在當(dāng)前線程執(zhí)行,具體是如何實(shí)現(xiàn)的,這里先不說(shuō),下面再詳細(xì)分析;w則是封裝了線程需要執(zhí)行的方法和參數(shù),通過(guò)self._work_queue.put(w)方法放入一個(gè)隊(duì)列當(dāng)中。
self._adjust_thread_count()方法則是檢查當(dāng)前線程池的線程數(shù)量,如果小于設(shè)定的最大值,就開(kāi)辟一個(gè)線程,代碼就不上了,直接看這些個(gè)線程都是干嘛的
def _worker(executor_reference, work_queue): try: while True: work_item = work_queue.get(block=True) if work_item is not None: work_item.run() # Delete references to object. See issue16284 del work_item continue executor = executor_reference() # Exit if: # - The interpreter is shutting down OR # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: # Notice other workers work_queue.put(None) return del executor except BaseException: _base.LOGGER.critical("Exception in worker", exc_info=True)
這些線程就是一個(gè)死循環(huán),不斷的從任務(wù)隊(duì)列中獲取到_WorkItem,然后通過(guò)其封裝方法,執(zhí)行我們需要的任務(wù)。如果取到的任務(wù)為None,就往隊(duì)列中再放入一個(gè)None,以通知其它線程結(jié)束,然后結(jié)束當(dāng)前循環(huán)。
def run(self): if not self.future.set_running_or_notify_cancel(): return try: result = self.fn(*self.args, **self.kwargs) except BaseException as e: self.future.set_exception(e) else: self.future.set_result(result)
如果沒(méi)有異常,執(zhí)行結(jié)束后,會(huì)執(zhí)行之前我們說(shuō)的回調(diào)。在self.future.set_result(result)方法中會(huì)執(zhí)行任務(wù)回調(diào),當(dāng)然了,是在當(dāng)前線程中。如果需要寫(xiě)入數(shù)據(jù)庫(kù)之類的操作,不建議在回調(diào)中直接寫(xiě)入。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/40997.html
摘要:說(shuō)多了都是淚,我之前排查內(nèi)存泄漏的問(wèn)題,超高并發(fā)的程序跑了個(gè)月后就崩潰。以前寫(xiě)中間件的時(shí)候,就總是把用戶當(dāng),要盡量考慮各種情況避免內(nèi)存泄漏。 從 Java 到 Python 本文為我和同事的共同研究成果 當(dāng)跨語(yǔ)言的時(shí)候,有些東西在一門語(yǔ)言中很常見(jiàn),但到了另一門語(yǔ)言中可能會(huì)很少見(jiàn)。 例如 C# 中,經(jīng)常會(huì)關(guān)注拆箱裝箱,但到了 Java 中卻發(fā)現(xiàn),根本沒(méi)人關(guān)注這個(gè)。 后來(lái)才知道,原來(lái)是因?yàn)?..
摘要:獲取正在運(yùn)行的線程數(shù),用于狀態(tài)監(jiān)控。之后初始化組件主要是初始化線程池將到中,初始化開(kāi)始時(shí)間等。如果線程池中運(yùn)行線程數(shù)量為,并且默認(rèn),那么就停止退出,結(jié)束爬蟲(chóng)。 本系列文章,針對(duì)Webmagic 0.6.1版本 一個(gè)普通爬蟲(chóng)啟動(dòng)代碼 public static void main(String[] args) { Spider.create(new GithubRepoPageP...
摘要:開(kāi)頭正式開(kāi)啟我入職的里程,現(xiàn)在已是工作了一個(gè)星期了,這個(gè)星期算是我入職的過(guò)渡期,算是知道了學(xué)校生活和工作的差距了,總之,盡快習(xí)慣這種生活吧。當(dāng)時(shí)是看的廖雪峰的博客自己也用做爬蟲(chóng)寫(xiě)過(guò)幾篇博客,不過(guò)有些是在前人的基礎(chǔ)上寫(xiě)的。 showImg(https://segmentfault.com/img/remote/1460000010867984); 開(kāi)頭 2017.08.21 正式開(kāi)啟我...
閱讀 3729·2021-11-25 09:43
閱讀 2609·2021-11-18 13:11
閱讀 2229·2019-08-30 15:55
閱讀 3279·2019-08-26 11:58
閱讀 2835·2019-08-26 10:47
閱讀 2237·2019-08-26 10:20
閱讀 1279·2019-08-23 17:59
閱讀 3014·2019-08-23 15:54