成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Python進(jìn)程專題5:進(jìn)程間通信

eccozhou / 892人閱讀

摘要:上一篇文章進(jìn)程專題進(jìn)程池下一篇文章進(jìn)程專題共享數(shù)據(jù)與同步模塊支持的進(jìn)程間通信主要有兩種管道和隊(duì)列。隊(duì)列底層使用管道和鎖,同時(shí)運(yùn)行支持線程講隊(duì)列中的數(shù)據(jù)傳輸?shù)降讓庸艿乐?,?lái)實(shí)習(xí)進(jìn)程間通信。

上一篇文章:Python進(jìn)程專題4:進(jìn)程池Pool
下一篇文章:Python進(jìn)程專題6:共享數(shù)據(jù)與同步

multiprocessing模塊支持的進(jìn)程間通信主要有兩種:管道和隊(duì)列。一般來(lái)說(shuō),發(fā)送較少的大對(duì)象比發(fā)送大量的小對(duì)象要好。

Queue隊(duì)列

底層使用管道和鎖,同時(shí)運(yùn)行支持線程講隊(duì)列中的數(shù)據(jù)傳輸?shù)降讓庸艿乐?,?lái)實(shí)習(xí)進(jìn)程間通信。

語(yǔ)法:

Queue([maxsize])
創(chuàng)建共享隊(duì)列。使用multiprocessing模塊的Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。Queue本身是一個(gè)消息隊(duì)列,
maxsize是隊(duì)列運(yùn)行的最大項(xiàng)數(shù),如果不指定,則不限制大小。

常用方法

q.close():關(guān)閉隊(duì)列,不再向隊(duì)列中添加數(shù)據(jù),那些已經(jīng)進(jìn)入隊(duì)列的數(shù)據(jù)會(huì)繼續(xù)處理。q被回收時(shí)將自動(dòng)調(diào)用此方法。

q.empty():如果調(diào)用此方法時(shí),隊(duì)列為null返回True,單由于此時(shí)其他進(jìn)程或者線程正在添加或刪除項(xiàng),
所以結(jié)果不可靠,而且有些平臺(tái)運(yùn)行該方法會(huì)直接報(bào)錯(cuò),我的mac系統(tǒng)運(yùn)行該方法,直接報(bào)錯(cuò)。

q.full():如果調(diào)用此方法時(shí),隊(duì)列已滿,返回True,同q.empty()方法,結(jié)果不可靠。

q.get([block,timeout]):返回q中的一個(gè)項(xiàng),block如果設(shè)置為True,如果q隊(duì)列為空,該方法會(huì)阻塞(就是不往下運(yùn)行了,處于等待狀態(tài)),
直到隊(duì)列中有項(xiàng)可用為止,如果同時(shí)頁(yè)設(shè)置了timeout,那么在改時(shí)間間隔內(nèi),都沒(méi)有等到有用的項(xiàng),就會(huì)引發(fā)Queue.Empty異常。
如果block設(shè)置為false,timeout沒(méi)有意義,如果隊(duì)列為空,將引發(fā)Queue.Empt異常。

q.get_nowait():等同于q.get(False)

q.put(item,block,timeout):將item放入隊(duì)列,如果此時(shí)隊(duì)列已滿:
如果block=True,timeout沒(méi)有設(shè)置,就會(huì)阻塞,直到有可用空間為止。
如果block=True,timeout也設(shè)置,就會(huì)阻塞到timeout,超過(guò)這個(gè)時(shí)間會(huì)報(bào)Queue.Full異常。
如果block=False,timeout設(shè)置無(wú)效,直接報(bào)Queue.Full異常。

q.put_nowait(item):等同于q.put(item,False)

q.qsize():返回當(dāng)前隊(duì)列項(xiàng)的數(shù)量,結(jié)果不可靠,而且mac會(huì)直接報(bào)錯(cuò):NotImplementedError。

實(shí)例1:驗(yàn)證:put方法會(huì)阻塞

實(shí)例:

#驗(yàn)證:put方法會(huì)阻塞
from multiprocessing import Queue
queue=Queue(3)#初始化一個(gè)Queue隊(duì)列,可以接受3個(gè)消息
queue.put("我是第1條信息")
queue.put("我是第2條信息")
queue.put("我是第3條信息")

print("插入第4條信息之前")
queue.put("我是第4條信息")
print("插入第4條信息之后")

效果:程序會(huì)一直阻塞,最后一句輸永遠(yuǎn)也不會(huì)輸出。

實(shí)例2:closse方法、get方法、put方法簡(jiǎn)單使用:多進(jìn)程訪問(wèn)同一個(gè)Queue

代碼:

#closse方法、get方法、put方法簡(jiǎn)單使用:多進(jìn)程訪問(wèn)同一個(gè)Queue
from multiprocessing import Queue,Process
import time,os
#參數(shù)q就是Queue實(shí)例
def mark(q,interval):
    time.sleep(interval)
    # 打印信息
    print("進(jìn)程%d取出數(shù)據(jù):"%os.getpid()+queue.get(True))


if __name__=="__main__":
    queue = Queue(3)  # 初始化一個(gè)Queue隊(duì)列,可以接受3個(gè)消息
    queue.put("我是第1條信息")
    queue.put("我是第2條信息")
    queue.put("我是第3條信息")

    p1=Process(target=mark,args=(queue,1))
    p2=Process(target=mark,args=(queue,2))
    p3=Process(target=mark,args=(queue,3))

    p1.start()
    p2.start()
    p3.start()
    # 關(guān)閉隊(duì)列,不再插入信息
    queue.close()
    # 下面插入會(huì)導(dǎo)致異常
    # queue.put("我是第4條信息")

    # 打印第1條信息
    print("程序語(yǔ)句執(zhí)行完成")

效果

JoinableQueue隊(duì)列

創(chuàng)建可連接的共享進(jìn)程隊(duì)列,可以看做還是一個(gè)Queue,只不過(guò)這個(gè)Queue除了Queue特有功能外,允許項(xiàng)的消費(fèi)者通知項(xiàng)的生產(chǎn)者,項(xiàng)已經(jīng)處理成功。該通知進(jìn)程時(shí)使用共享的信號(hào)和條件變量來(lái)實(shí)現(xiàn)的。

JoinableQueue實(shí)例除了與Queue對(duì)象相同的方法外,還具有下列方法:

q.task_done():消費(fèi)者使用此方法發(fā)送信號(hào),表示q.get()返回的項(xiàng)已經(jīng)被處理。
注意??:如果調(diào)用此方法的次數(shù)大于隊(duì)列中刪除的項(xiàng)的數(shù)量,將引發(fā)ValueError異常。

q.join():生產(chǎn)者使用此方法進(jìn)行阻塞,直到隊(duì)列中所有的項(xiàng)都被處理完成,即阻塞將持續(xù)到隊(duì)列中的每一項(xiàng)均調(diào)用q.task_done()方法為止。

代碼實(shí)例:

#利用JoinableQueue實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者,并且加入了哨兵,來(lái)監(jiān)聽(tīng)生產(chǎn)者的要求
from multiprocessing import JoinableQueue,Process
import time

#參數(shù)q為JoinableQueue隊(duì)列實(shí)例
def mark(q):
    #循環(huán)接受信息,一直運(yùn)行,這也下面為什么要將它設(shè)為后臺(tái)進(jìn)程的原因,必須保證當(dāng)主線程退出時(shí),它可以退出
    while True:
        value = q.get()
        print(value)  # 實(shí)際開(kāi)發(fā)過(guò)程中,此處一般用來(lái)進(jìn)行有用的處理
        # 消費(fèi)者發(fā)送信號(hào):任務(wù)完成(此處實(shí)例的任務(wù)就是打印一下下)
        q.task_done()
        #我來(lái)方便看出效果,特意停留1s
        time.sleep(1)

        #使用哨兵,監(jiān)聽(tīng)生產(chǎn)者的消息,此處通過(guò)判斷value是否為None來(lái)判斷傳遞的消息
        if value==None:
            #執(zhí)行哨兵計(jì)劃后,后面的語(yǔ)句都不會(huì)輸出
            break




if __name__=="__main__":
    #實(shí)例化JoinableQueue
    q=JoinableQueue()

    #定義消費(fèi)者進(jìn)程
    p=Process(target=mark,args=(q,))
    #將消費(fèi)者線程設(shè)置為后臺(tái)進(jìn)程,隨創(chuàng)建它的進(jìn)程(此處是主進(jìn)程)的終止而終止
    #也就是當(dāng)它的創(chuàng)建進(jìn)程(此處是主現(xiàn)場(chǎng))意外退出時(shí),它也會(huì)跟隨一起退出。
    #并且后臺(tái)進(jìn)程無(wú)法創(chuàng)建新的進(jìn)程
    p.daemon=True

    #啟動(dòng)消費(fèi)者進(jìn)程
    p.start()

    #模擬生產(chǎn)者,生產(chǎn)多個(gè)項(xiàng)
    for xx in range(5):
        print(xx)
        #當(dāng)xx==3時(shí)去執(zhí)行哨兵計(jì)劃
        if xx==3:
            print("我用哨兵計(jì)劃了")
            q.put(None)
            print("哨兵計(jì)完美執(zhí)行")
        q.put("第%d條消息"%xx)


    #等待所有項(xiàng)都處理完成再退出,由于使用了哨兵計(jì)劃,隊(duì)列沒(méi)有完全執(zhí)行,所以會(huì)一直卡在這個(gè)位置
    q.join()

    print("程序真正退出了")

效果:

管道

除了使用隊(duì)列來(lái)進(jìn)行進(jìn)程間通信,還可以使用管道來(lái)進(jìn)行消息傳遞。

語(yǔ)法:

(connection1,connection2)=Pipe([duplex])
在進(jìn)程間創(chuàng)建一條管道,并返回元祖(connection1,connection2),其中connection1、connection2表示兩端的Connection對(duì)象。
默認(rèn)情況下,duplex=True,此時(shí)管道是雙向的,如果設(shè)置duplex=false,connection1只能用于接收,connection2只能用于發(fā)送。
注意:必須在多進(jìn)程創(chuàng)建之前創(chuàng)建管道。

常用方法:

connection.close() :關(guān)閉連接,當(dāng)connection被垃圾回收時(shí),默認(rèn)會(huì)調(diào)用該方法。

connection.fileno() :返回連接使用的整數(shù)文件描述符

connection.poll([timeout]):如果連接上的數(shù)據(jù)可用,返回True,timeout為等待的最長(zhǎng)時(shí)間,如果不指定,該方法將立刻返回結(jié)果。
如果指定為None,該方法將會(huì)無(wú)限等待直到數(shù)據(jù)到達(dá)。

connection.send(obj):通過(guò)連接發(fā)送對(duì)象,obj是與序列號(hào)兼容的任意對(duì)象。

connection.send_bytes(buffer[,offset,size]):通過(guò)連接發(fā)送字節(jié)數(shù)據(jù)緩沖區(qū),buffer是支持緩沖區(qū)的任何對(duì)象。
offset是緩沖區(qū)的字節(jié)偏移量,而size是要發(fā)送的字節(jié)數(shù)。

connection.recv():接收connection.send()方法返回的對(duì)象。如果連接的另一端已經(jīng)關(guān)閉,再也不會(huì)存在任何數(shù)據(jù),
該方法將引起EOFError異常。

connection.recv_bytes([maxlength]):接收connection.send_bytes()方法發(fā)送的一條完整字節(jié)信息,maxlength為可以接受的
最大字節(jié)數(shù)。如果消息超過(guò)這個(gè)最大數(shù),將引發(fā)IOError異常,并且在連接上無(wú)法進(jìn)一步讀取。如果連接的另一端已經(jīng)關(guān)閉,
再也不會(huì)有任何數(shù)據(jù),該方法將引發(fā)EOFError異常。

connection.recv_bytes_into(buffer[,offset]):接收一條完整的字節(jié)信息,兵把它保存在buffer對(duì)象中,
該對(duì)象支持可寫入的緩沖區(qū)接口(就是bytearray對(duì)象或類似對(duì)象)。
offset指定緩沖區(qū)放置消息的字節(jié)偏移量。返回值是收到的字節(jié)數(shù)。如果消息長(zhǎng)度大于可用的緩沖區(qū)空間,將引發(fā)BufferTooShort異常。

實(shí)例1:理解管道的生產(chǎn)者與消費(fèi)者

示意圖:

代碼:

#理解管道的生產(chǎn)者與消費(fèi)者
from multiprocessing import Pipe, Process
import time

def mark(pipe):
    #接受參數(shù)
    output_p, input_p = pipe
    print("mark方法內(nèi)部調(diào)用input_p.close()")
    #消費(fèi)者(子進(jìn)程)此實(shí)例只接收,所以把輸入關(guān)閉
    input_p.close()

    while True:
        try:
            item = output_p.recv()
        except EOFError:
            print("報(bào)錯(cuò)了")
            break
        print(item)
        time.sleep(1)
    print("mark執(zhí)行完成")


if __name__ == "__main__":
    #必須在多進(jìn)程創(chuàng)建之前,創(chuàng)建管道,該管道是雙向的
    (output_p, input_p) = Pipe()#創(chuàng)建管道
    #創(chuàng)建一個(gè)進(jìn)程,并把管道兩端都作為參數(shù)傳遞過(guò)去
    p = Process(target=mark, args=((output_p, input_p),))
    #啟動(dòng)進(jìn)程
    p.start()
    #生產(chǎn)者(主進(jìn)程)此實(shí)例只輸入,所以關(guān)閉輸出(接收端)
    output_p.close()

    for item in list(range(5)):
        input_p.send(item)
    print("主方法內(nèi)部調(diào)用input_p.close()()")
    #關(guān)閉生產(chǎn)者(主進(jìn)程)的輸入端
    input_p.close()

效果圖:

實(shí)例2:利用管道實(shí)現(xiàn)多進(jìn)程協(xié)作:子線程計(jì)算結(jié)果,返回給主線程

代碼:

#利用管道實(shí)現(xiàn)多進(jìn)程協(xié)作:子線程計(jì)算結(jié)果,返回給主線程
from multiprocessing import Pipe, Process

def mark(pipe):
    #接受參數(shù)
    server_p, client_p = pipe
    #消費(fèi)者(子進(jìn)程)此實(shí)例只接收,所以把輸入關(guān)閉
    client_p.close()

    while True:
        try:
            x,y = server_p.recv()
        except EOFError:
            print("報(bào)錯(cuò)了")
            break
        result=x+y
        server_p.send(result)
    print("mark執(zhí)行完成")


if __name__ == "__main__":
    #必須在多進(jìn)程創(chuàng)建之前,創(chuàng)建管道,該管道是雙向的
    (server_p, client_p) = Pipe()#創(chuàng)建管道
    #創(chuàng)建一個(gè)進(jìn)程,并把管道兩端都作為參數(shù)傳遞過(guò)去
    p = Process(target=mark, args=((server_p, client_p),))
    #啟動(dòng)進(jìn)程
    p.start()
    #生產(chǎn)者(主進(jìn)程)此實(shí)例只輸入,所以關(guān)閉輸出(接收端)
    server_p.close()

    #發(fā)送數(shù)據(jù)
    client_p.send((4,5))
    #打印接受到的數(shù)據(jù)
    print(client_p.recv())

    client_p.send(("Mark", "大帥哥"))
    # 打印接受到的數(shù)據(jù)
    print(client_p.recv())





    #關(guān)閉生產(chǎn)者(主進(jìn)程)的輸入端
    client_p.close()


結(jié)果:

9
Mark大帥哥
報(bào)錯(cuò)了
mark執(zhí)行完成

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/42356.html

相關(guān)文章

  • Python進(jìn)程專題6:共享數(shù)據(jù)與同步

    摘要:可以使用標(biāo)準(zhǔn)的索引切片迭代操作訪問(wèn)它,其中每項(xiàng)操作均鎖進(jìn)程同步,對(duì)于字節(jié)字符串,還具有屬性,可以把整個(gè)數(shù)組當(dāng)做一個(gè)字符串進(jìn)行訪問(wèn)。當(dāng)所編寫的程序必須一次性操作大量的數(shù)組項(xiàng)時(shí),如果同時(shí)使用這種數(shù)據(jù)類型和用于同步的單獨(dú)大的鎖,性能將極大提升。 上一篇文章:Python進(jìn)程專題5:進(jìn)程間通信下一篇文章:Python進(jìn)程專題7:托管對(duì)象 我們現(xiàn)在知道,進(jìn)程之間彼此是孤立的,唯一通信的方式是隊(duì)...

    Yuanf 評(píng)論0 收藏0
  • Python進(jìn)程專題2:multiprocessing創(chuàng)建進(jìn)程

    摘要:類常用屬性布爾值,指示進(jìn)程是否是后臺(tái)進(jìn)程。當(dāng)創(chuàng)建它的進(jìn)程終止時(shí),后臺(tái)進(jìn)程會(huì)自動(dòng)終止。進(jìn)程的整數(shù)退出指令。如果進(jìn)程仍然在運(yùn)行,它的值為,如果值為負(fù)數(shù),就表示進(jìn)程由信號(hào)所終止。 上一篇文章:Python進(jìn)程專題1:fork():創(chuàng)建子進(jìn)程、getpid()、getppid()下一篇文章:Python進(jìn)程專題3:繼承Process來(lái)創(chuàng)建進(jìn)程 由于fork()無(wú)法對(duì)Windows使用,而py...

    騫諱護(hù) 評(píng)論0 收藏0
  • Python進(jìn)程專題4:進(jìn)程池Pool

    摘要:上一篇文章進(jìn)程專題繼承來(lái)創(chuàng)建進(jìn)程下一篇文章進(jìn)程專題進(jìn)程間通信當(dāng)我們需要?jiǎng)?chuàng)建大量的進(jìn)程時(shí),利用模塊提供的來(lái)創(chuàng)建進(jìn)程。關(guān)閉進(jìn)程池,不再接受進(jìn)的進(jìn)程請(qǐng)求,但已經(jīng)接受的進(jìn)程還是會(huì)繼續(xù)執(zhí)行。 上一篇文章:Python進(jìn)程專題3:繼承Process來(lái)創(chuàng)建進(jìn)程下一篇文章:Python進(jìn)程專題5:進(jìn)程間通信 當(dāng)我們需要?jiǎng)?chuàng)建大量的進(jìn)程時(shí),利用multiprocessing模塊提供的Pool來(lái)創(chuàng)建進(jìn)程。 ...

    Leo_chen 評(píng)論0 收藏0
  • Python進(jìn)程專題8:分布集群的消息傳遞

    摘要:代表網(wǎng)絡(luò)地址的元組或者代表域套接字的文件名,或者代表形式的字符串,代表遠(yuǎn)程系統(tǒng)本地計(jì)算機(jī)的為上的一條命名管道。是一個(gè)整數(shù),當(dāng)參數(shù)指定了一個(gè)網(wǎng)絡(luò)連接時(shí),對(duì)應(yīng)于傳遞給套接字的方法的值,默認(rèn)為。 上一篇文章:Python進(jìn)程專題7:托管對(duì)象下一篇文章:Python進(jìn)程專題9:關(guān)于進(jìn)程的實(shí)用工具函數(shù) 使用multiprocessing模塊的程序不僅可以于運(yùn)行在同一計(jì)算機(jī)的其它程序進(jìn)行消息傳遞...

    wh469012917 評(píng)論0 收藏0
  • Python進(jìn)程專題7:托管對(duì)象

    摘要:連接帶遠(yuǎn)程管理器對(duì)象,該對(duì)象的地址在構(gòu)造函數(shù)中支出。在當(dāng)前進(jìn)程中運(yùn)行管理器服務(wù)器。啟動(dòng)一個(gè)單的子進(jìn)程,并在該子進(jìn)程中啟動(dòng)管理器服務(wù)器。如果無(wú)法序列號(hào)對(duì)象將引發(fā)異常。 上一篇文章:Python進(jìn)程專題6:共享數(shù)據(jù)與同步下一篇文章:Python進(jìn)程專題8:分布集群的消息傳遞 進(jìn)程不支持共享對(duì)象,上面描述的創(chuàng)建共享值和數(shù)組,但都是指定的特殊類型,對(duì)高級(jí)的Python對(duì)象(如:字典、列表、用...

    DevYK 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看
<