摘要:多進(jìn)程執(zhí)行任務(wù)結(jié)束,創(chuàng)建進(jìn)程和銷毀進(jìn)程是時(shí)間的,如果長度不夠,會(huì)造成多線程快過多進(jìn)程多線程執(zhí)行任務(wù)結(jié)束,進(jìn)程間通信生產(chǎn)者消費(fèi)者模型與隊(duì)列演示了生產(chǎn)者和消費(fèi)者的場(chǎng)景。
進(jìn)程
Python是運(yùn)行在解釋器中的語言,查找資料知道,python中有一個(gè)全局鎖(GIL),在使用多進(jìn)程(Thread)的情況下,不能發(fā)揮多核的優(yōu)勢(shì)。而使用多進(jìn)程(Multiprocess),則可以發(fā)揮多核的優(yōu)勢(shì)真正地提高效率。
如果多線程的進(jìn)程是CPU密集型的,那多線程并不能有多少效率上的提升,相反還可能會(huì)因?yàn)榫€程的頻繁切換,導(dǎo)致效率下降,推薦使用多進(jìn)程;如果是IO密集型,多線程的進(jìn)程可以利用IO阻塞等待時(shí)的空閑時(shí)間執(zhí)行其他線程,提升效率。
1.Linux創(chuàng)建子進(jìn)程的原理:
1). 父進(jìn)程和子進(jìn)程, 如果父進(jìn)程結(jié)束, 子進(jìn)程也隨之結(jié)束;
2). 先有父進(jìn)程, 再有子進(jìn)程, 通過fork函數(shù)實(shí)現(xiàn);
2.fork函數(shù)的返回值:調(diào)用該方法一次, 返回兩次;
產(chǎn)生的子進(jìn)程返回一個(gè)0
父進(jìn)程返回子進(jìn)程的pid;
3.Window也能使用fork函數(shù)么?
Windows沒有fork函數(shù), Mac有fork函數(shù)(Unix -> Linux, Unix-> Mac), 封裝了一個(gè)模塊multiprocessing
4.常用方法:
os.fork()
os.getpid(): 獲取當(dāng)前進(jìn)程的pid;
os.getppid(): parent process id, 獲取當(dāng)前進(jìn)程的父進(jìn)程的id號(hào);
import os import time print("當(dāng)前進(jìn)程(pid=%d)正在運(yùn)行..." %(os.getpid())) print("當(dāng)前進(jìn)程的父進(jìn)程(pid=%d)正在運(yùn)行..." %(os.getppid())) print("正在創(chuàng)建子進(jìn)程......") pid = os.fork() pid2 = os.fork() print("第1個(gè):", pid) print("第2個(gè): ", pid2) if pid == 0: print("這是創(chuàng)建的子進(jìn)程, 子進(jìn)程的id為%s, 父進(jìn)程的id為%s" %(os.getpid(), os.getppid())) else: print("當(dāng)前是父進(jìn)程[%s]的返回值%s" %(os.getpid(), pid)) time.sleep(100)win系統(tǒng)
在win系統(tǒng)下,使用實(shí)例化multiprocessing.Process創(chuàng)建進(jìn)程須添加"if __name__=="__main__"",否則會(huì)出現(xiàn)以下報(bào)錯(cuò):
RuntimeError:
An attempt has been made to start a new process before the current process has finished its bootstrapping phase. This probably means that you are not using fork to start your child processes and you have forgotten to use the proper idiom in the main module: if __name__ == "__main__": freeze_support() ... The "freeze_support()" line can be omitted if the program is not going to be frozen to produce an executable.
import multiprocessing def job(): print("當(dāng)前子進(jìn)程的名稱為%s" %(multiprocessing.current_process())) if __name__=="__main__": #win操作系統(tǒng)需要加上,否則會(huì)出現(xiàn)異常報(bào)錯(cuò)RuntimeError # 創(chuàng)建一個(gè)進(jìn)程對(duì)象(group=None, target=None, name=None, args=(), kwargs={}) p1 = multiprocessing.Process(target=job) p2 = multiprocessing.Process(target=job) # 運(yùn)行多進(jìn)程, 執(zhí)行任務(wù) p1.start() p2.start() # 等待所有的子進(jìn)程執(zhí)行結(jié)束, 再執(zhí)行主進(jìn)程的內(nèi)容 p1.join() p2.join() print("任務(wù)執(zhí)行結(jié)束.......")通過重寫multiprocessing.Process類創(chuàng)建多進(jìn)程
from multiprocessing import Process import multiprocessing class JobProcess(Process): # 重寫Process的構(gòu)造方法, 獲取新的屬性 def __init__(self,queue): super(JobProcess, self).__init__() self.queue = queue # 重寫run方法, 將執(zhí)行的任務(wù)放在里面即可 def run(self): print("當(dāng)前進(jìn)程信息%s" %(multiprocessing.current_process())) if __name__=="__main__": processes = [] # 啟動(dòng)10個(gè)子進(jìn)程, 來處理需要執(zhí)行的任務(wù); for i in range(10): #示例化類,創(chuàng)建進(jìn)程 p = JobProcess(queue=3) processes.append(p) #啟動(dòng)多進(jìn)程,執(zhí)行任務(wù) p.start() #等待所有的子進(jìn)程結(jié)束,再執(zhí)行主進(jìn)程 [pro.join() for pro in processes] print("任務(wù)執(zhí)行結(jié)束")守護(hù)進(jìn)程
守護(hù)線程:
setDeamon: True: 主線程執(zhí)行結(jié)束, 子線程不再繼續(xù)執(zhí)行; Flase:
守護(hù)進(jìn)程:
setDeamon: True: 主進(jìn)程執(zhí)行結(jié)束, 子進(jìn)程不再繼續(xù)執(zhí)行; Flase:
import multiprocessing import time def deamon(): #守護(hù)進(jìn)程:當(dāng)主程序運(yùn)行結(jié)束,子進(jìn)程也結(jié)束 name = multiprocessing.current_process() print("%s開始執(zhí)行" %(name)) time.sleep(3) print("執(zhí)行結(jié)束") if __name__=="__main__": p1 = multiprocessing.Process(target=deamon,name="hello") p1.daemon = True p1.start() time.sleep(2) print("整個(gè)程序執(zhí)行結(jié)束")終止進(jìn)程
有些進(jìn)程或許再執(zhí)行死循環(huán)任務(wù),此時(shí)我們手動(dòng)結(jié)束進(jìn)程
terminate()
import multiprocessing import time def job(): name = multiprocessing.current_process() print("%s進(jìn)程開啟" %(name)) time.sleep(3) print("進(jìn)程結(jié)束") if __name__=="__main__": p = multiprocessing.Process(target=job) print("進(jìn)程開啟:",p.is_alive()) p.start() print("進(jìn)程開啟:",p.is_alive()) p.terminate() print("進(jìn)程開啟:",p.is_alive()) time.sleep(0.001) print("進(jìn)程開啟:",p.is_alive()) print("程序執(zhí)行結(jié)束")計(jì)算密集型和I/O密集型
計(jì)算密集型任務(wù)的特點(diǎn)是要進(jìn)行大量的計(jì)算, 消耗CPU資源, 比如計(jì)算圓周率、 對(duì)視頻進(jìn)行高清解碼等等, 全靠CPU的運(yùn)算能力。 這種計(jì)算密集型任務(wù)雖然也可以用多任務(wù)完成, 但是任務(wù)越多, 花在任務(wù)切換的時(shí)間就越多, CPU執(zhí)行任務(wù)的效率就越低, 所以, 要最高效地利用CPU, 計(jì)算密集型任務(wù)同時(shí)進(jìn)行的數(shù)量應(yīng)當(dāng)?shù)扔贑PU的核心數(shù)。計(jì)算密集型任務(wù)由于主要消耗CPU資源, 因此, 代碼運(yùn)行效率至關(guān)重要。 Python這樣的腳本語言運(yùn)行效率很低, 完全不適合計(jì)算密集型任務(wù)。 對(duì)于計(jì)算密集型任務(wù),最好用C語言編寫。
第二種任務(wù)的類型是IO密集型, 涉及到網(wǎng)絡(luò)、 磁盤IO的任務(wù)都是IO密集型任務(wù), 這類任務(wù)的特點(diǎn)是CPU消耗很少, 任務(wù)的大部分時(shí)間都在等待IO操作完成(因?yàn)镮O的速度遠(yuǎn)遠(yuǎn)低于CPU和內(nèi)存的速度)。對(duì)于IO密集型任務(wù), 任務(wù)越多, CPU效率越高, 但也有一個(gè)限度。 常見的大部分任務(wù)都是IO密集型任務(wù), 比如Web應(yīng)用。
多進(jìn)程和多線程對(duì)比多進(jìn)程模式最大的優(yōu)點(diǎn)就是穩(wěn)定性高, 因?yàn)橐粋€(gè)子進(jìn)程崩潰了, 不會(huì)影響主進(jìn)程和其他子進(jìn)程。(當(dāng)然主進(jìn)程掛了所有進(jìn)程就全掛了, 但是Master進(jìn)程只負(fù)責(zé)分配任務(wù), 掛掉的概率低)著名的Apache最早就是采用多進(jìn)程模式。
多進(jìn)程模式的缺點(diǎn)是創(chuàng)建進(jìn)程的代價(jià)大, 在Unix/Linux系統(tǒng)下, 用 fork 調(diào)用還行, 在Windows下創(chuàng)建進(jìn)程開銷巨大。 另外, 操作系統(tǒng)能同時(shí)運(yùn)行的進(jìn)程數(shù)也是有限的, 在內(nèi)存和。CPU的限制下, 如果有幾千個(gè)進(jìn)程同時(shí)運(yùn)行, 操作系統(tǒng)連調(diào)度都會(huì)成問題。
多線程模式通常比多進(jìn)程快一點(diǎn), 但是也快不到哪去, 而且, 多線程模式致命的缺點(diǎn)就是任何一個(gè)線程掛掉都可能直接造成整個(gè)進(jìn)程崩潰, 因?yàn)樗芯€程共享進(jìn)程的內(nèi)存。 在Windows上, 如果一個(gè)線程執(zhí)行的代碼出了問題, 你經(jīng)常可以看到這樣的提示:“該程序執(zhí)行了非法操作, 即將關(guān)閉”, 其實(shí)往往是某個(gè)線程出了問題, 但是操作系統(tǒng)會(huì)強(qiáng)制結(jié)束整個(gè)進(jìn)程。
這里通過一個(gè)計(jì)算密集型任務(wù),來測(cè)試多進(jìn)程和多線程的執(zhí)行效率。
import multiprocessing import threading from mytimeit import timeit class JobProcess(multiprocessing.Process): def __init__(self,li): super(JobProcess, self).__init__() self.li = li def run(self): for i in self.li: sum(i) class JobThread(threading.Thread): def __init__(self,li): super(JobThread, self).__init__() self.li = li def run(self): for i in self.li: sum(i) @timeit def many_processs(): li = [[24892,23892348,239293,233],[2382394,49230,2321234],[48294,28420,29489]]*10 processes = [] for i in li : p = JobProcess(li) processes.append(p) p.start() [pro.join() for pro in processes] print("多進(jìn)程執(zhí)行任務(wù)結(jié)束,?") @timeit def many_thread(): #創(chuàng)建進(jìn)程和銷毀進(jìn)程是時(shí)間的,如果li長度不夠,會(huì)造成多線程快過多進(jìn)程 li = [[24892,23892348,239293,233],[2382394,49230,2321234],[48294,28420,29489]]*1000 threads = [] for i in li : t = JobThread(li) threads.append(t) t.start() [thread.join() for thread in threads] print("多線程執(zhí)行任務(wù)結(jié)束,?") if __name__ =="__main__": many_processs() many_thread()進(jìn)程間通信-生產(chǎn)者消費(fèi)者模型與隊(duì)列
演示了生產(chǎn)者和消費(fèi)者的場(chǎng)景。生產(chǎn)者生產(chǎn)貨物,然后把貨物放到一個(gè)隊(duì)列之類的數(shù)據(jù)結(jié)構(gòu)中,生產(chǎn)貨物所要花費(fèi)的時(shí)間無法預(yù)先確定。消費(fèi)者消耗生產(chǎn)者生產(chǎn)的貨物的時(shí)間也是不確定的。
通過隊(duì)列來實(shí)現(xiàn)進(jìn)程間的通信
import multiprocessing import threading from multiprocessing import Queue class Producer(multiprocessing.Process): def __init__(self,queue): super(Producer, self).__init__() self.queue = queue def run(self): for i in range(13): #往隊(duì)列添加內(nèi)容 self.queue.put(i) print("生產(chǎn)者傳遞的消息為%s" %(i)) return self.queue class Consumer(multiprocessing.Process): def __init__(self,queue): super(Consumer, self).__init__() self.queue = queue def run(self): #獲取隊(duì)列內(nèi)容 #get會(huì)自動(dòng)判斷隊(duì)列是否為空,如果是空, 跳出循環(huán), 不會(huì)再去從隊(duì)列獲取數(shù)據(jù); while True: print("進(jìn)程獲取消息為:%s" %(self.queue.get())) if __name__=="__main__": queue = Queue(maxsize=100) p = Producer(queue) p.start() c = Consumer(queue) c.start() p.join() c.join(2) c.terminate() #終止進(jìn)程 print("進(jìn)程間通信結(jié)束,( ?? ω ?? )y")
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/42480.html
摘要:在一個(gè)進(jìn)程內(nèi)部,要同時(shí)干多件事,就需要同時(shí)運(yùn)行多個(gè)子任務(wù),我們把進(jìn)程內(nèi)的這些子任務(wù)稱為線程??偨Y(jié)一下,多任務(wù)的實(shí)現(xiàn)方式有三種多進(jìn)程模式多線程模式多進(jìn)程多線程模式線程是最小的執(zhí)行單元,而進(jìn)程由至少一個(gè)線程組成。 進(jìn)程與線程 很多同學(xué)都聽說過,現(xiàn)代操作系統(tǒng)比如Mac OS X,UNIX,Linux,Windows等,都是支持多任務(wù)的操作系統(tǒng)。 什么叫多任務(wù)呢?簡(jiǎn)單地說,就是操作系統(tǒng)可以同時(shí)...
摘要:分布式進(jìn)程在和中,應(yīng)當(dāng)優(yōu)選,因?yàn)楦€(wěn)定,而且,可以分布到多臺(tái)機(jī)器上,而最多只能分布到同一臺(tái)機(jī)器的多個(gè)上。由于模塊封裝很好,不必了解網(wǎng)絡(luò)通信的細(xì)節(jié),就可以很容易地編寫分布式多進(jìn)程程序。 分布式進(jìn)程 在Thread和Process中,應(yīng)當(dāng)優(yōu)選Process,因?yàn)镻rocess更穩(wěn)定,而且,Process可以分布到多臺(tái)機(jī)器上,而Thread最多只能分布到同一臺(tái)機(jī)器的多個(gè)CPU上。 Pytho...
摘要:協(xié)程,又稱微線程,纖程。最大的優(yōu)勢(shì)就是協(xié)程極高的執(zhí)行效率。生產(chǎn)者產(chǎn)出第條數(shù)據(jù)返回更新值更新消費(fèi)者正在調(diào)用第條數(shù)據(jù)查看當(dāng)前進(jìn)行的線程函數(shù)中有,返回值為生成器庫實(shí)現(xiàn)協(xié)程通過提供了對(duì)協(xié)程的基本支持,但是不完全。 協(xié)程,又稱微線程,纖程。英文名Coroutine協(xié)程看上去也是子程序,但執(zhí)行過程中,在子程序內(nèi)部可中斷,然后轉(zhuǎn)而執(zhí)行別的子程序,在適當(dāng)?shù)臅r(shí)候再返回來接著執(zhí)行。 最大的優(yōu)勢(shì)就是協(xié)程極高...
摘要:協(xié)程實(shí)現(xiàn)連接在網(wǎng)絡(luò)通信中,每個(gè)連接都必須創(chuàng)建新線程或進(jìn)程來處理,否則,單線程在處理連接的過程中,無法接受其他客戶端的連接。所以我們嘗試使用協(xié)程來實(shí)現(xiàn)服務(wù)器對(duì)多個(gè)客戶端的響應(yīng)。 協(xié)程實(shí)現(xiàn)TCP連接 在網(wǎng)絡(luò)通信中,每個(gè)連接都必須創(chuàng)建新線程(或進(jìn)程) 來處理,否則,單線程在處理連接的過程中, 無法接受其他客戶端的連接。所以我們嘗試使用協(xié)程來實(shí)現(xiàn)服務(wù)器對(duì)多個(gè)客戶端的響應(yīng)。與單一TCP通信的構(gòu)架...
摘要:一個(gè)包來了之后,到底是交給瀏覽器還是,就需要端口號(hào)來區(qū)分。每個(gè)網(wǎng)絡(luò)程序都向操作系統(tǒng)申請(qǐng)唯一的端口號(hào),這樣,兩個(gè)進(jìn)程在兩臺(tái)計(jì)算機(jī)之間建立網(wǎng)絡(luò)連接就需要各自的地址和各自的端口號(hào)。 網(wǎng)絡(luò)通信的三要素 IP 通信的時(shí)候, 雙方必須知道對(duì)方的標(biāo)識(shí), 好比發(fā)郵件必須知道對(duì)方的郵件地址。 互聯(lián)網(wǎng)上每個(gè)計(jì)算機(jī)的唯一標(biāo)識(shí)就是IP地址, 類似 123.123.123.123 。 IP地址實(shí)際上是一個(gè)32位...
閱讀 2738·2021-11-22 13:54
閱讀 1077·2021-10-14 09:48
閱讀 2302·2021-09-08 09:35
閱讀 1566·2019-08-30 15:53
閱讀 1177·2019-08-30 13:14
閱讀 615·2019-08-30 13:09
閱讀 2531·2019-08-30 10:57
閱讀 3343·2019-08-29 13:18