摘要:另外,返回的兩個如果一個是數(shù)據(jù)那么另外一個就只能接收數(shù)據(jù)了已經(jīng)執(zhí)行到子進(jìn)程等待所有子進(jìn)程跑完下面打印向傳輸?shù)男畔⑤敵鰹槿谶M(jìn)行并發(fā)編程時,應(yīng)盡量避免使用共享狀態(tài),因為多進(jìn)程同時修改數(shù)據(jù)會導(dǎo)致數(shù)據(jù)破壞。
前言:
python多進(jìn)程,經(jīng)常在使用,卻沒有怎么系統(tǒng)的學(xué)習(xí)過,官網(wǎng)上面講得比較細(xì),結(jié)合自己的學(xué)習(xí),整理記錄下 官網(wǎng):https://docs.python.org/3/library/multiprocessing.htmlmultiprocessing簡介
multiprocessing是python自帶的多進(jìn)程模塊,可以大批量的生成進(jìn)程,在服務(wù)器為多核CPU時效果更好,類似于threading模塊。相對于多線程,多進(jìn)程由于獨享內(nèi)存空間,更穩(wěn)定安全,在運維里面做些批量操作時,多進(jìn)程有更多適用的場景
multiprocessing包提供了本地和遠(yuǎn)程兩種并發(fā)操作,有效的避開了使用子進(jìn)程而不是全局解釋鎖的線程,因此,multiprocessing可以有效利用到多核處理
Process類在multiporcessing中,通過Process類對象來批量產(chǎn)生進(jìn)程,使用start()方法來啟動這個進(jìn)程
1.語法multiprocessing.Process(group=None,target=None,name=None,args=(),kwargs={},*) group: 這個參數(shù)一般為空,它只是為了兼容threading.Tread target: 這個參數(shù)就是通過run()可調(diào)用對象的方法,默認(rèn)為空,表示沒有方法被調(diào)用 name: 表示進(jìn)程名 args: 傳給target調(diào)用方法的tuple(元組)參數(shù) kwargs: 傳給target調(diào)用方法的dict(字典)參數(shù)2.Process類的方法及對象
run()
該方法是進(jìn)程的運行過程,可以在子類中重寫此方法,一般也很少去重構(gòu)
start()
啟動進(jìn)程,每個進(jìn)程對象都必須被該方法調(diào)用
join([timeout])
等待進(jìn)程終止,再往下執(zhí)行,可以設(shè)置超時時間
name
可以獲取進(jìn)程名字,多個進(jìn)程也可以是相同的名字
is_alive()
返回進(jìn)程是否還存活,True or False,進(jìn)程存活是指start()開始到子進(jìn)程終止
daemon
守護(hù)進(jìn)程的標(biāo)記,一個布爾值,在start()之后設(shè)置該值,表示是否后臺運行
注意:如果設(shè)置了后臺運行,那么后臺程序不運行再創(chuàng)建子進(jìn)程
pid
可以獲取進(jìn)程ID
exitcode
子進(jìn)程退出時的值,如果進(jìn)程還沒有終止,值將是None,如果是負(fù)值,表示子進(jìn)程被終止
terminate()
終止進(jìn)程,如果是Windows,則使用terminateprocess(),該方法對已經(jīng)退出和結(jié)束的進(jìn)程,將不會執(zhí)行
以下為一個簡單的例子:
#-*- coding:utf8 -*- import multiprocessing import time def work(x): time.sleep(1) print time.ctime(),"這是子進(jìn)程[{0}]...".format(x) if __name__ == "__main__": for i in range(5): p = multiprocessing.Process(target=work,args=(i,)) print "啟動進(jìn)程數(shù):{0}".format(i) p.start() p.deamon = True
當(dāng)然也可以顯示每個進(jìn)程的ID
#-*- coding:utf8 -*- import multiprocessing import time import os def work(x): time.sleep(1) ppid = os.getppid() pid = os.getpid() print time.ctime(),"這是子進(jìn)程[{0},父進(jìn)程:{1},子進(jìn)程:{2}]...".format(x,ppid,pid) if __name__ == "__main__": for i in range(5): p = multiprocessing.Process(target=work,args=(i,)) print "啟動進(jìn)程數(shù):{0}".format(i) p.start() p.deamon = True
但在實際使用的過程中,并不只是并發(fā)完就可以了,比如,有30個任務(wù),由于服務(wù)器資源有限,每次并發(fā)5個任務(wù),這里還涉及到30個任務(wù)怎么獲取的問題,另外并發(fā)的進(jìn)程任務(wù)執(zhí)行時間很難保證一致,尤其是需要時間的任務(wù),可能并發(fā)5個任務(wù),有3個已經(jīng)執(zhí)行完了,2個還需要很長時間執(zhí)行,總不能等到這兩個進(jìn)程執(zhí)行完了,再繼續(xù)執(zhí)行后面的任務(wù),因此進(jìn)程控制就在此有了使用場景,可以利用Process的方法和一些multiprocessing的包,類等結(jié)合使用
進(jìn)程控制及通信常用類 一、Queue類類似于python自帶的Queue.Queue,主要用在比較小的隊列上面
語法:
multiprocessing.Queue([maxsize])
類方法:
qsize()
返回隊列的大致大小,因為多進(jìn)程或者多線程一直在消耗隊列,因此該數(shù)據(jù)不一定正確
empty()
判斷隊列是否為空,如果是,則返回True,否則False
full()
判斷隊列是否已滿,如果是,則返回True,否則False
put(obj[, block[, timeout]])
將對象放入隊列,可選參數(shù)block為True,timeout為None
get()
從隊列取出對象
#-*- coding:utf8 -*- from multiprocessing import Process, Queue def f(q): q.put([42,None,"hi"]) if __name__ == "__main__": q = Queue() p = Process(target=f, args=(q,)) p.start() print q.get() #打印內(nèi)容: [42,None,"hi"] p.join()二、Pipe類
pipe()函數(shù)返回一對對象的連接,可以為進(jìn)程間傳輸消息,在打印一些日志、進(jìn)程控制上面有一些用處,Pip()對象返回兩個對象connection,代表兩個通道,每個connection對象都有send()和recv()方法,需要注意的是兩個或以上的進(jìn)程同時讀取或者寫入同一管道,可能會導(dǎo)致數(shù)據(jù)混亂,測試了下,是直接覆蓋了。另外,返回的兩個connection,如果一個是send()數(shù)據(jù),那么另外一個就只能recv()接收數(shù)據(jù)了
#-*- coding:utf8 -*- from multiprocessing import Process, Pipe import time def f(conn,i): print "[{0}]已經(jīng)執(zhí)行到子進(jìn)程:{1}".format(time.ctime(),i) time.sleep(1) w = "[{0}]hi,this is :{1}".format(time.ctime(),i) conn.send(w) conn.close() if __name__ == "__main__": reader = [] parent_conn, child_conn = Pipe() for i in range(4): p = Process(target=f, args=(child_conn,i)) p.start() reader.append(parent_conn) p.deamon=True # 等待所有子進(jìn)程跑完 time.sleep(3) print " [{0}]下面打印child_conn向parent_conn傳輸?shù)男畔?".format(time.ctime()) for i in reader: print i.recv()
輸出為:
在進(jìn)行并發(fā)編程時,應(yīng)盡量避免使用共享狀態(tài),因為多進(jìn)程同時修改數(shù)據(jù)會導(dǎo)致數(shù)據(jù)破壞。但如果確實需要在多進(jìn)程間共享數(shù)據(jù),multiprocessing也提供了方法Value、Array
from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == "__main__": num = Value("d",0.0) arr = Array("i", range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print num.value print arr[:]
*print
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]*
Manager類管理進(jìn)程使用得較多,它返回對象可以操控子進(jìn)程,并且支持很多類型的操作,如: list, dict, Namespace、lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array,因此使用Manager基本上就夠了
from multiprocessing import Process, Manager def f(d, l): d[1] = "1" d["2"] = 2 d[0.25] = None l.reverse() if __name__ == "__main__": with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() #等待進(jìn)程結(jié)束后往下執(zhí)行 print d," ",l
輸出:
{0.25: None, 1: "1", "2": 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
可以看到,跟共享數(shù)據(jù)一樣的效果,大部分管理進(jìn)程的方法都集成到了Manager()模塊了
#-*- coding:utf8 -*- from multiprocessing import Process, Queue import time def work(pname,q): time.sleep(1) print_some = "{0}|this is process: {1}".format(time.ctime(),pname) print print_some q.put(pname) if __name__ == "__main__": p_manag_num = 2 # 進(jìn)程并發(fā)控制數(shù)量2 # 并發(fā)的進(jìn)程名 q_process = ["process_1","process_2","process_3","process_4","process_5"] q_a = Queue() # 將進(jìn)程名放入隊列 q_b = Queue() # 將q_a的進(jìn)程名放往q_b進(jìn)程,由子進(jìn)程完成 for i in q_process: q_a.put(i) p_list = [] # 完成的進(jìn)程隊列 while not q_a.empty(): if len(p_list) <= 2: pname=q_a.get() p = Process(target=work, args=(pname,q_b)) p.start() p_list.append(p) print pname for p in p_list: if not p.is_alive(): p_list.remove(p) # 等待5秒,預(yù)估執(zhí)行完后看隊列通信信息 # 當(dāng)然也可以循環(huán)判斷隊列里面的進(jìn)程是否執(zhí)行完成 time.sleep(5) print "打印p_b隊列:" while not q_b.empty(): print q_b.get()
執(zhí)行結(jié)果:
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/44890.html
摘要:效率高當(dāng)然,對于爬蟲這種密集型任務(wù)來說,多線程和多進(jìn)程影響差別并不大。對于計算密集型任務(wù)來說,的多進(jìn)程相比多線程,其多核運行效率會有成倍的提升。 一、進(jìn)程介紹 進(jìn)程...
摘要:協(xié)程的基本行為協(xié)程包含四種狀態(tài)等待開始執(zhí)行。協(xié)程中重要的兩個方法調(diào)用方把數(shù)據(jù)提供給協(xié)程。注意使用調(diào)用協(xié)程時會自動預(yù)激,因此與裝飾器不兼容標(biāo)準(zhǔn)庫中的裝飾器不會預(yù)激協(xié)程,因此能兼容句法。因此,終止協(xié)程的本質(zhì)在于向協(xié)程發(fā)送其無法處理的異常。 導(dǎo)語:本文章記錄了本人在學(xué)習(xí)Python基礎(chǔ)之控制流程篇的重點知識及個人心得,打算入門Python的朋友們可以來一起學(xué)習(xí)并交流。 本文重點: 1、掌握協(xié)...
摘要:協(xié)程定義協(xié)程是指一個過程,這個過程與調(diào)用方協(xié)作,產(chǎn)出由調(diào)用方提供的值。當(dāng)?shù)玫娇刂茩?quán)時,會阻塞,同時等待終止。終止協(xié)程的方法該方法致使生成器在暫停的表達(dá)式處拋出異常。 協(xié)程 定義:協(xié)程是指一個過程,這個過程與調(diào)用方協(xié)作,產(chǎn)出由調(diào)用方提供的值。(協(xié)程中必定含有一條yield語句) 協(xié)程與生成器類似,都是定義體內(nèi)包含yield關(guān)鍵字的函數(shù)。不過,在協(xié)程中,yield通常出現(xiàn)在表達(dá)式的右邊(例...
摘要:并發(fā)用于制定方案,用來解決可能但未必并行的問題。在協(xié)程中使用需要注意兩點使用鏈接的多個協(xié)程最終必須由不是協(xié)程的調(diào)用方驅(qū)動,調(diào)用方顯式或隱式在最外層委派生成器上調(diào)用函數(shù)或方法。對象可以取消取消后會在協(xié)程當(dāng)前暫停的處拋出異常。 導(dǎo)語:本文章記錄了本人在學(xué)習(xí)Python基礎(chǔ)之控制流程篇的重點知識及個人心得,打算入門Python的朋友們可以來一起學(xué)習(xí)并交流。 本文重點: 1、了解asyncio...
摘要:所以與多線程相比,線程的數(shù)量越多,協(xié)程性能的優(yōu)勢越明顯。值得一提的是,在此過程中,只有一個線程在執(zhí)行,因此這與多線程的概念是不一樣的。 真正有知識的人的成長過程,就像麥穗的成長過程:麥穗空的時候,麥子長得很快,麥穗驕傲地高高昂起,但是,麥穗成熟飽滿時,它們開始謙虛,垂下麥芒。 ——蒙田《蒙田隨筆全集》 上篇論述了關(guān)于python多線程是否是雞肋的問題,得到了一些網(wǎng)友的認(rèn)可,當(dāng)然也有...
閱讀 2918·2021-09-22 15:20
閱讀 2991·2021-09-22 15:19
閱讀 3506·2021-09-22 15:15
閱讀 2432·2021-09-08 09:35
閱讀 2408·2019-08-30 15:44
閱讀 3041·2019-08-30 10:50
閱讀 3791·2019-08-29 16:25
閱讀 1616·2019-08-26 13:55