摘要:上一篇文章進(jìn)程專題進(jìn)程池下一篇文章進(jìn)程專題共享數(shù)據(jù)與同步模塊支持的進(jìn)程間通信主要有兩種管道和隊(duì)列。隊(duì)列底層使用管道和鎖,同時(shí)運(yùn)行支持線程講隊(duì)列中的數(shù)據(jù)傳輸?shù)降讓庸艿乐?,?lái)實(shí)習(xí)進(jìn)程間通信。
上一篇文章:Python進(jìn)程專題4:進(jìn)程池Pool
下一篇文章:Python進(jìn)程專題6:共享數(shù)據(jù)與同步
multiprocessing模塊支持的進(jìn)程間通信主要有兩種:管道和隊(duì)列。一般來(lái)說(shuō),發(fā)送較少的大對(duì)象比發(fā)送大量的小對(duì)象要好。
Queue隊(duì)列底層使用管道和鎖,同時(shí)運(yùn)行支持線程講隊(duì)列中的數(shù)據(jù)傳輸?shù)降讓庸艿乐?,?lái)實(shí)習(xí)進(jìn)程間通信。
Queue([maxsize]) 創(chuàng)建共享隊(duì)列。使用multiprocessing模塊的Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。Queue本身是一個(gè)消息隊(duì)列, maxsize是隊(duì)列運(yùn)行的最大項(xiàng)數(shù),如果不指定,則不限制大小。
q.close():關(guān)閉隊(duì)列,不再向隊(duì)列中添加數(shù)據(jù),那些已經(jīng)進(jìn)入隊(duì)列的數(shù)據(jù)會(huì)繼續(xù)處理。q被回收時(shí)將自動(dòng)調(diào)用此方法。 q.empty():如果調(diào)用此方法時(shí),隊(duì)列為null返回True,單由于此時(shí)其他進(jìn)程或者線程正在添加或刪除項(xiàng), 所以結(jié)果不可靠,而且有些平臺(tái)運(yùn)行該方法會(huì)直接報(bào)錯(cuò),我的mac系統(tǒng)運(yùn)行該方法,直接報(bào)錯(cuò)。 q.full():如果調(diào)用此方法時(shí),隊(duì)列已滿,返回True,同q.empty()方法,結(jié)果不可靠。 q.get([block,timeout]):返回q中的一個(gè)項(xiàng),block如果設(shè)置為True,如果q隊(duì)列為空,該方法會(huì)阻塞(就是不往下運(yùn)行了,處于等待狀態(tài)), 直到隊(duì)列中有項(xiàng)可用為止,如果同時(shí)頁(yè)設(shè)置了timeout,那么在改時(shí)間間隔內(nèi),都沒(méi)有等到有用的項(xiàng),就會(huì)引發(fā)Queue.Empty異常。 如果block設(shè)置為false,timeout沒(méi)有意義,如果隊(duì)列為空,將引發(fā)Queue.Empt異常。 q.get_nowait():等同于q.get(False) q.put(item,block,timeout):將item放入隊(duì)列,如果此時(shí)隊(duì)列已滿: 如果block=True,timeout沒(méi)有設(shè)置,就會(huì)阻塞,直到有可用空間為止。 如果block=True,timeout也設(shè)置,就會(huì)阻塞到timeout,超過(guò)這個(gè)時(shí)間會(huì)報(bào)Queue.Full異常。 如果block=False,timeout設(shè)置無(wú)效,直接報(bào)Queue.Full異常。 q.put_nowait(item):等同于q.put(item,False) q.qsize():返回當(dāng)前隊(duì)列項(xiàng)的數(shù)量,結(jié)果不可靠,而且mac會(huì)直接報(bào)錯(cuò):NotImplementedError。
實(shí)例:
#驗(yàn)證:put方法會(huì)阻塞 from multiprocessing import Queue queue=Queue(3)#初始化一個(gè)Queue隊(duì)列,可以接受3個(gè)消息 queue.put("我是第1條信息") queue.put("我是第2條信息") queue.put("我是第3條信息") print("插入第4條信息之前") queue.put("我是第4條信息") print("插入第4條信息之后")
效果:程序會(huì)一直阻塞,最后一句輸永遠(yuǎn)也不會(huì)輸出。
代碼:
#closse方法、get方法、put方法簡(jiǎn)單使用:多進(jìn)程訪問(wèn)同一個(gè)Queue from multiprocessing import Queue,Process import time,os #參數(shù)q就是Queue實(shí)例 def mark(q,interval): time.sleep(interval) # 打印信息 print("進(jìn)程%d取出數(shù)據(jù):"%os.getpid()+queue.get(True)) if __name__=="__main__": queue = Queue(3) # 初始化一個(gè)Queue隊(duì)列,可以接受3個(gè)消息 queue.put("我是第1條信息") queue.put("我是第2條信息") queue.put("我是第3條信息") p1=Process(target=mark,args=(queue,1)) p2=Process(target=mark,args=(queue,2)) p3=Process(target=mark,args=(queue,3)) p1.start() p2.start() p3.start() # 關(guān)閉隊(duì)列,不再插入信息 queue.close() # 下面插入會(huì)導(dǎo)致異常 # queue.put("我是第4條信息") # 打印第1條信息 print("程序語(yǔ)句執(zhí)行完成")
效果
JoinableQueue隊(duì)列創(chuàng)建可連接的共享進(jìn)程隊(duì)列,可以看做還是一個(gè)Queue,只不過(guò)這個(gè)Queue除了Queue特有功能外,允許項(xiàng)的消費(fèi)者通知項(xiàng)的生產(chǎn)者,項(xiàng)已經(jīng)處理成功。該通知進(jìn)程時(shí)使用共享的信號(hào)和條件變量來(lái)實(shí)現(xiàn)的。
JoinableQueue實(shí)例除了與Queue對(duì)象相同的方法外,還具有下列方法:
q.task_done():消費(fèi)者使用此方法發(fā)送信號(hào),表示q.get()返回的項(xiàng)已經(jīng)被處理。 注意??:如果調(diào)用此方法的次數(shù)大于隊(duì)列中刪除的項(xiàng)的數(shù)量,將引發(fā)ValueError異常。 q.join():生產(chǎn)者使用此方法進(jìn)行阻塞,直到隊(duì)列中所有的項(xiàng)都被處理完成,即阻塞將持續(xù)到隊(duì)列中的每一項(xiàng)均調(diào)用q.task_done()方法為止。
代碼實(shí)例:
#利用JoinableQueue實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者,并且加入了哨兵,來(lái)監(jiān)聽(tīng)生產(chǎn)者的要求 from multiprocessing import JoinableQueue,Process import time #參數(shù)q為JoinableQueue隊(duì)列實(shí)例 def mark(q): #循環(huán)接受信息,一直運(yùn)行,這也下面為什么要將它設(shè)為后臺(tái)進(jìn)程的原因,必須保證當(dāng)主線程退出時(shí),它可以退出 while True: value = q.get() print(value) # 實(shí)際開(kāi)發(fā)過(guò)程中,此處一般用來(lái)進(jìn)行有用的處理 # 消費(fèi)者發(fā)送信號(hào):任務(wù)完成(此處實(shí)例的任務(wù)就是打印一下下) q.task_done() #我來(lái)方便看出效果,特意停留1s time.sleep(1) #使用哨兵,監(jiān)聽(tīng)生產(chǎn)者的消息,此處通過(guò)判斷value是否為None來(lái)判斷傳遞的消息 if value==None: #執(zhí)行哨兵計(jì)劃后,后面的語(yǔ)句都不會(huì)輸出 break if __name__=="__main__": #實(shí)例化JoinableQueue q=JoinableQueue() #定義消費(fèi)者進(jìn)程 p=Process(target=mark,args=(q,)) #將消費(fèi)者線程設(shè)置為后臺(tái)進(jìn)程,隨創(chuàng)建它的進(jìn)程(此處是主進(jìn)程)的終止而終止 #也就是當(dāng)它的創(chuàng)建進(jìn)程(此處是主現(xiàn)場(chǎng))意外退出時(shí),它也會(huì)跟隨一起退出。 #并且后臺(tái)進(jìn)程無(wú)法創(chuàng)建新的進(jìn)程 p.daemon=True #啟動(dòng)消費(fèi)者進(jìn)程 p.start() #模擬生產(chǎn)者,生產(chǎn)多個(gè)項(xiàng) for xx in range(5): print(xx) #當(dāng)xx==3時(shí)去執(zhí)行哨兵計(jì)劃 if xx==3: print("我用哨兵計(jì)劃了") q.put(None) print("哨兵計(jì)完美執(zhí)行") q.put("第%d條消息"%xx) #等待所有項(xiàng)都處理完成再退出,由于使用了哨兵計(jì)劃,隊(duì)列沒(méi)有完全執(zhí)行,所以會(huì)一直卡在這個(gè)位置 q.join() print("程序真正退出了")
效果:
管道除了使用隊(duì)列來(lái)進(jìn)行進(jìn)程間通信,還可以使用管道來(lái)進(jìn)行消息傳遞。
(connection1,connection2)=Pipe([duplex]) 在進(jìn)程間創(chuàng)建一條管道,并返回元祖(connection1,connection2),其中connection1、connection2表示兩端的Connection對(duì)象。 默認(rèn)情況下,duplex=True,此時(shí)管道是雙向的,如果設(shè)置duplex=false,connection1只能用于接收,connection2只能用于發(fā)送。 注意:必須在多進(jìn)程創(chuàng)建之前創(chuàng)建管道。
connection.close() :關(guān)閉連接,當(dāng)connection被垃圾回收時(shí),默認(rèn)會(huì)調(diào)用該方法。 connection.fileno() :返回連接使用的整數(shù)文件描述符 connection.poll([timeout]):如果連接上的數(shù)據(jù)可用,返回True,timeout為等待的最長(zhǎng)時(shí)間,如果不指定,該方法將立刻返回結(jié)果。 如果指定為None,該方法將會(huì)無(wú)限等待直到數(shù)據(jù)到達(dá)。 connection.send(obj):通過(guò)連接發(fā)送對(duì)象,obj是與序列號(hào)兼容的任意對(duì)象。 connection.send_bytes(buffer[,offset,size]):通過(guò)連接發(fā)送字節(jié)數(shù)據(jù)緩沖區(qū),buffer是支持緩沖區(qū)的任何對(duì)象。 offset是緩沖區(qū)的字節(jié)偏移量,而size是要發(fā)送的字節(jié)數(shù)。 connection.recv():接收connection.send()方法返回的對(duì)象。如果連接的另一端已經(jīng)關(guān)閉,再也不會(huì)存在任何數(shù)據(jù), 該方法將引起EOFError異常。 connection.recv_bytes([maxlength]):接收connection.send_bytes()方法發(fā)送的一條完整字節(jié)信息,maxlength為可以接受的 最大字節(jié)數(shù)。如果消息超過(guò)這個(gè)最大數(shù),將引發(fā)IOError異常,并且在連接上無(wú)法進(jìn)一步讀取。如果連接的另一端已經(jīng)關(guān)閉, 再也不會(huì)有任何數(shù)據(jù),該方法將引發(fā)EOFError異常。 connection.recv_bytes_into(buffer[,offset]):接收一條完整的字節(jié)信息,兵把它保存在buffer對(duì)象中, 該對(duì)象支持可寫入的緩沖區(qū)接口(就是bytearray對(duì)象或類似對(duì)象)。 offset指定緩沖區(qū)放置消息的字節(jié)偏移量。返回值是收到的字節(jié)數(shù)。如果消息長(zhǎng)度大于可用的緩沖區(qū)空間,將引發(fā)BufferTooShort異常。
示意圖:
代碼:
#理解管道的生產(chǎn)者與消費(fèi)者 from multiprocessing import Pipe, Process import time def mark(pipe): #接受參數(shù) output_p, input_p = pipe print("mark方法內(nèi)部調(diào)用input_p.close()") #消費(fèi)者(子進(jìn)程)此實(shí)例只接收,所以把輸入關(guān)閉 input_p.close() while True: try: item = output_p.recv() except EOFError: print("報(bào)錯(cuò)了") break print(item) time.sleep(1) print("mark執(zhí)行完成") if __name__ == "__main__": #必須在多進(jìn)程創(chuàng)建之前,創(chuàng)建管道,該管道是雙向的 (output_p, input_p) = Pipe()#創(chuàng)建管道 #創(chuàng)建一個(gè)進(jìn)程,并把管道兩端都作為參數(shù)傳遞過(guò)去 p = Process(target=mark, args=((output_p, input_p),)) #啟動(dòng)進(jìn)程 p.start() #生產(chǎn)者(主進(jìn)程)此實(shí)例只輸入,所以關(guān)閉輸出(接收端) output_p.close() for item in list(range(5)): input_p.send(item) print("主方法內(nèi)部調(diào)用input_p.close()()") #關(guān)閉生產(chǎn)者(主進(jìn)程)的輸入端 input_p.close()
效果圖:
代碼:
#利用管道實(shí)現(xiàn)多進(jìn)程協(xié)作:子線程計(jì)算結(jié)果,返回給主線程 from multiprocessing import Pipe, Process def mark(pipe): #接受參數(shù) server_p, client_p = pipe #消費(fèi)者(子進(jìn)程)此實(shí)例只接收,所以把輸入關(guān)閉 client_p.close() while True: try: x,y = server_p.recv() except EOFError: print("報(bào)錯(cuò)了") break result=x+y server_p.send(result) print("mark執(zhí)行完成") if __name__ == "__main__": #必須在多進(jìn)程創(chuàng)建之前,創(chuàng)建管道,該管道是雙向的 (server_p, client_p) = Pipe()#創(chuàng)建管道 #創(chuàng)建一個(gè)進(jìn)程,并把管道兩端都作為參數(shù)傳遞過(guò)去 p = Process(target=mark, args=((server_p, client_p),)) #啟動(dòng)進(jìn)程 p.start() #生產(chǎn)者(主進(jìn)程)此實(shí)例只輸入,所以關(guān)閉輸出(接收端) server_p.close() #發(fā)送數(shù)據(jù) client_p.send((4,5)) #打印接受到的數(shù)據(jù) print(client_p.recv()) client_p.send(("Mark", "大帥哥")) # 打印接受到的數(shù)據(jù) print(client_p.recv()) #關(guān)閉生產(chǎn)者(主進(jìn)程)的輸入端 client_p.close()
結(jié)果:
9 Mark大帥哥 報(bào)錯(cuò)了 mark執(zhí)行完成
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/42356.html
摘要:可以使用標(biāo)準(zhǔn)的索引切片迭代操作訪問(wèn)它,其中每項(xiàng)操作均鎖進(jìn)程同步,對(duì)于字節(jié)字符串,還具有屬性,可以把整個(gè)數(shù)組當(dāng)做一個(gè)字符串進(jìn)行訪問(wèn)。當(dāng)所編寫的程序必須一次性操作大量的數(shù)組項(xiàng)時(shí),如果同時(shí)使用這種數(shù)據(jù)類型和用于同步的單獨(dú)大的鎖,性能將極大提升。 上一篇文章:Python進(jìn)程專題5:進(jìn)程間通信下一篇文章:Python進(jìn)程專題7:托管對(duì)象 我們現(xiàn)在知道,進(jìn)程之間彼此是孤立的,唯一通信的方式是隊(duì)...
摘要:類常用屬性布爾值,指示進(jìn)程是否是后臺(tái)進(jìn)程。當(dāng)創(chuàng)建它的進(jìn)程終止時(shí),后臺(tái)進(jìn)程會(huì)自動(dòng)終止。進(jìn)程的整數(shù)退出指令。如果進(jìn)程仍然在運(yùn)行,它的值為,如果值為負(fù)數(shù),就表示進(jìn)程由信號(hào)所終止。 上一篇文章:Python進(jìn)程專題1:fork():創(chuàng)建子進(jìn)程、getpid()、getppid()下一篇文章:Python進(jìn)程專題3:繼承Process來(lái)創(chuàng)建進(jìn)程 由于fork()無(wú)法對(duì)Windows使用,而py...
摘要:上一篇文章進(jìn)程專題繼承來(lái)創(chuàng)建進(jìn)程下一篇文章進(jìn)程專題進(jìn)程間通信當(dāng)我們需要?jiǎng)?chuàng)建大量的進(jìn)程時(shí),利用模塊提供的來(lái)創(chuàng)建進(jìn)程。關(guān)閉進(jìn)程池,不再接受進(jìn)的進(jìn)程請(qǐng)求,但已經(jīng)接受的進(jìn)程還是會(huì)繼續(xù)執(zhí)行。 上一篇文章:Python進(jìn)程專題3:繼承Process來(lái)創(chuàng)建進(jìn)程下一篇文章:Python進(jìn)程專題5:進(jìn)程間通信 當(dāng)我們需要?jiǎng)?chuàng)建大量的進(jìn)程時(shí),利用multiprocessing模塊提供的Pool來(lái)創(chuàng)建進(jìn)程。 ...
摘要:代表網(wǎng)絡(luò)地址的元組或者代表域套接字的文件名,或者代表形式的字符串,代表遠(yuǎn)程系統(tǒng)本地計(jì)算機(jī)的為上的一條命名管道。是一個(gè)整數(shù),當(dāng)參數(shù)指定了一個(gè)網(wǎng)絡(luò)連接時(shí),對(duì)應(yīng)于傳遞給套接字的方法的值,默認(rèn)為。 上一篇文章:Python進(jìn)程專題7:托管對(duì)象下一篇文章:Python進(jìn)程專題9:關(guān)于進(jìn)程的實(shí)用工具函數(shù) 使用multiprocessing模塊的程序不僅可以于運(yùn)行在同一計(jì)算機(jī)的其它程序進(jìn)行消息傳遞...
摘要:連接帶遠(yuǎn)程管理器對(duì)象,該對(duì)象的地址在構(gòu)造函數(shù)中支出。在當(dāng)前進(jìn)程中運(yùn)行管理器服務(wù)器。啟動(dòng)一個(gè)單的子進(jìn)程,并在該子進(jìn)程中啟動(dòng)管理器服務(wù)器。如果無(wú)法序列號(hào)對(duì)象將引發(fā)異常。 上一篇文章:Python進(jìn)程專題6:共享數(shù)據(jù)與同步下一篇文章:Python進(jìn)程專題8:分布集群的消息傳遞 進(jìn)程不支持共享對(duì)象,上面描述的創(chuàng)建共享值和數(shù)組,但都是指定的特殊類型,對(duì)高級(jí)的Python對(duì)象(如:字典、列表、用...
閱讀 3254·2021-11-19 09:40
閱讀 3034·2021-09-09 09:32
閱讀 820·2021-09-02 09:55
閱讀 1420·2019-08-26 13:23
閱讀 2466·2019-08-26 11:46
閱讀 1256·2019-08-26 10:19
閱讀 2099·2019-08-23 16:53
閱讀 1104·2019-08-23 12:44