小編寫這篇文章的主要目的,主要是給大家介紹關(guān)于python3 queue多線程通信,這里面有很多的技術(shù)性的難點(diǎn),那么,該怎么去進(jìn)行處理呢,下面小編給大家進(jìn)行詳細(xì)的解答一下。
queue分類
python3 queue分三類:
先進(jìn)先出隊(duì)列
后進(jìn)先出的棧
優(yōu)先級(jí)隊(duì)列
他們的導(dǎo)入方式分別是:
from queue import Queue from queue import LifoQueue from queue import
具體方法見下面引用說明。
Queue對(duì)象已經(jīng)包含了必要的鎖,所以你可以通過它在多個(gè)線程間多安全地共享數(shù)據(jù)。當(dāng)使用隊(duì)列時(shí),協(xié)調(diào)生產(chǎn)者和消費(fèi)者的關(guān)閉問題可能會(huì)有一些麻煩。一個(gè)通用的解決方法是在隊(duì)列中放置一個(gè)特殊的值,當(dāng)消費(fèi)者讀到這個(gè)值的時(shí)候,終止執(zhí)行。
例如:
from queue import Queue from threading import Thread #用來表示終止的特殊對(duì)象 _sentinel=object() #A thread that produces data def producer(out_q): for i in range(10): print("生產(chǎn)") out_q.put(i) out_q.put(_sentinel) #A thread that consumes data def consumer(in_q): while True: data=in_q.get() if data is _sentinel: in_q.put(_sentinel) break else: print("消費(fèi)",data) #Create the shared queue and launch both threads q=Queue() t1=Thread(target=consumer,args=(q,)) t2=Thread(target=producer,args=(q,)) t1.start() t2.start()
結(jié)果:
本例里面有一個(gè)不尋常的位置:購買者在學(xué)到這些特殊值過后馬上又將它放返回序列中,將它傳下去。那樣,任何竊聽這一個(gè)序列的用戶進(jìn)程就能夠關(guān)閉所有了。雖然序列是一種常見的線程間通信制度,但仍然能自己根據(jù)構(gòu)建自已的程序設(shè)計(jì)并添加所需的鎖和同步機(jī)制來實(shí)現(xiàn)線程間通信。最常見的方法是使用Condition變量來包裝你的程序設(shè)計(jì)。下邊這個(gè)例子演示了如何創(chuàng)建一個(gè)線程安全的優(yōu)先級(jí)隊(duì)列。
import heapq import threading class PriorityQueue: def __init__(self): self._queue=[] self._count=0 self._cv=threading.Condition() def put(self,item,priority): with self._cv: heapq.heappush(self._queue,(-priority,self._count,item)) self._count+=1 self._cv.notify() def get(self): with self._cv: while len(self._queue)==0: self._cv.wait() return heapq.heappop(self._queue)[-1]
例子二、task_done和join
使用隊(duì)列來進(jìn)行線程間通信是一個(gè)單向、不確定的過程。通常情況下,你沒有辦法知道接收數(shù)據(jù)的線程是什么時(shí)候接收到的數(shù)據(jù)并開始工作的。不過隊(duì)列對(duì)象提供一些基本完成的特性,比如下邊這個(gè)例子中的task_done()和join():
from queue import Queue from threading import Thread class Producer(Thread): def __init__(self,q): super().__init__() self.count=5 self.q=q def run(self): while self.count>0: print("生產(chǎn)") if self.count==1: self.count-=1 self.q.put(2) else: self.count-=1 self.q.put(1) class Consumer(Thread): def __init__(self,q): super().__init__() self.q=q def run(self): while True: print("消費(fèi)") data=self.q.get() if data==2: print("stop because data=",data) #任務(wù)完成,從隊(duì)列中清除一個(gè)元素 self.q.task_done() break else: print("data is good,data=",data) #任務(wù)完成,從隊(duì)列中清除一個(gè)元素 self.q.task_done() def main(): q=Queue() p=Producer(q) c=Consumer(q) p.setDaemon(True) c.setDaemon(True) p.start() c.start() #等待隊(duì)列清空 q.join() print("queue is complete") if __name__=='__main__': main()
結(jié)果:
例子三、多線程里用queue
設(shè)置倆隊(duì)列,一個(gè)是要做的任務(wù)隊(duì)列todo_queue,一個(gè)是已經(jīng)完成的隊(duì)列done_queue。
每次執(zhí)行線程,先從todo_queue隊(duì)列里取出一個(gè)值,然后執(zhí)行完,放入done_queue隊(duì)列。
如果todo_queue為空,就退出。
import logging import logging.handlers import threading import queue log_mgr=None todo_queue=queue.Queue() done_queue=queue.Queue() class LogMgr: def __init__(self,logpath): self.LOG=logging.getLogger('log') loghd=logging.handlers.RotatingFileHandler(logpath,"a",0,1) fmt=logging.Formatter("%(asctime)s%(threadName)-10s%(message)s","%Y-%m-%d%H:%M:%S") loghd.setFormatter(fmt) self.LOG.addHandler(loghd) self.LOG.setLevel(logging.INFO) def info(self,msg): if self.LOG is not None: self.LOG.info(msg) class Worker(threading.Thread): global log_mgr def __init__(self,name): threading.Thread.__init__(self) self.name=name def run(self): while True: try: task=todo_queue.get(False) if task: log_mgr.info("HANDLE_TASK:%s"%task) done_queue.put(1) except queue.Empty: break return def main(): global log_mgr log_mgr=LogMgr("mylog") for i in range(30): todo_queue.put("data"+str(i)) workers=[] for i in range(3): w=Worker("worker"+str(i)) workers.append(w) for i in range(3): workers<i>.start() for i in range(3): workers<i>.join() total_num=done_queue.qsize() log_mgr.info("TOTAL_HANDLE_TASK:%d"%total_num) exit(0) if __name__=='__main__': main()
輸出日志文件結(jié)果:
到此為止,小編就給大家介紹到這里了,希望可以給各位讀者帶來幫助。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/127777.html
摘要:第二節(jié)將任務(wù)添加到隊(duì)列上一個(gè)栗子只是簡單實(shí)現(xiàn)了下網(wǎng)頁與后臺(tái)的通信你可以在這里處理任何你想要的操作你已經(jīng)點(diǎn)到我了但由于是同一個(gè)進(jìn)程,如果你做了很耗時(shí)的操作,比如下載一張圖片之類的操作你會(huì)發(fā)現(xiàn),窗口卡住了,一般表現(xiàn)為窗口泛白,出現(xiàn)未響應(yīng)的提示但 第二節(jié) 將任務(wù)添加到隊(duì)列! 上一個(gè)栗子只是簡單實(shí)現(xiàn)了下網(wǎng)頁與后臺(tái)的通信 def clickMe(self): #你可以在這里處理任何你想要...
摘要:,,等實(shí)用方法可以獲取一個(gè)隊(duì)列的當(dāng)前大小和狀態(tài)。但要注意,這些方法都不是線程安全的??赡苣銓?duì)一個(gè)隊(duì)列使用判斷出這個(gè)隊(duì)列為空,但同時(shí)另外一個(gè)線程可能已經(jīng)向這個(gè)隊(duì)列中插入一個(gè)數(shù)據(jù)項(xiàng)。 python 多線程編程 使用回調(diào)方式 import time def countdown(n): while n > 0: print(T-minus, n) n -...
摘要:默認(rèn)值為,指定為時(shí)代表可以阻塞,若同時(shí)指定,在超時(shí)時(shí)返回。當(dāng)消費(fèi)者線程調(diào)用意味著有消費(fèi)者取得任務(wù)并完成任務(wù),未完成的任務(wù)數(shù)就會(huì)減少。當(dāng)未完成的任務(wù)數(shù)降到,解除阻塞。 學(xué)習(xí)契機(jī) 最近的一個(gè)項(xiàng)目中在使用grpc時(shí)遇到一個(gè)問題,由于client端可多達(dá)200,每個(gè)端口每10s向grpc server發(fā)送一次請(qǐng)求,server端接受client的請(qǐng)求后根據(jù)request信息更新數(shù)據(jù)庫,再將數(shù)據(jù)...
摘要:進(jìn)程線程切換都需要使用一定的時(shí)間。子進(jìn)程在中,如果要運(yùn)行系統(tǒng)命令,會(huì)使用來運(yùn)行,官方建議使用方法來運(yùn)行系統(tǒng)命令,更高級(jí)的用法是直接使用其接口。 多線程 簡單示例 對(duì)于CPU計(jì)算密集型的任務(wù),python的多線程跟單線程沒什么區(qū)別,甚至有可能會(huì)更慢,但是對(duì)于IO密集型的任務(wù),比如http請(qǐng)求這類任務(wù),python的多線程還是有用處。在日常的使用中,經(jīng)常會(huì)結(jié)合多線程和隊(duì)列一起使用,比如,以...
摘要:在中由于歷史原因使得中多線程的效果非常不理想使得任何時(shí)刻只能利用一個(gè)核并且它的調(diào)度算法簡單粗暴多線程中讓每個(gè)線程運(yùn)行一段時(shí)間然后強(qiáng)行掛起該線程繼而去運(yùn)行其他線程如此周而復(fù)始直到所有線程結(jié)束這使得無法有效利用計(jì)算機(jī)系統(tǒng)中的局部性頻繁的線程切換 GIL 在Python中,由于歷史原因(GIL),使得Python中多線程的效果非常不理想.GIL使得任何時(shí)刻Python只能利用一個(gè)CPU核,...
閱讀 956·2023-01-14 11:38
閱讀 936·2023-01-14 11:04
閱讀 787·2023-01-14 10:48
閱讀 2157·2023-01-14 10:34
閱讀 1005·2023-01-14 10:24
閱讀 895·2023-01-14 10:18
閱讀 545·2023-01-14 10:09
閱讀 622·2023-01-14 10:02