摘要:申明守護(hù)線程需要在開啟線程之前。線程池線程都是后臺線程。每個線程都使用默認(rèn)的堆棧大小,以默認(rèn)的優(yōu)先級運(yùn)行,并處于多線程單元中。
創(chuàng)建線程
一個進(jìn)程必有一個線程,進(jìn)程也可由多個線程組成,但有一個線程為主線程。
若一個任務(wù)需要花10Mins,當(dāng)只有一個線程時,花費(fèi)10Mins,當(dāng)有十個線程時,可能就花費(fèi)1Mins,所以多線程可以提升任務(wù)執(zhí)行時間,提高工作效率。
python里與線程有關(guān)的模塊:
_thread 底層
threading
查看當(dāng)前運(yùn)行的線程個數(shù):threading.current_thread()
查看當(dāng)前線程信息:threading.active_count()
import _thread import threading def job(): print("當(dāng)前線程個數(shù):",threading.active_count()) print("當(dāng)前線程信息",threading.current_thread()) if __name__=="__main__": job()_thread創(chuàng)建多線程
調(diào)用thread模塊中的start_new_thread()函數(shù)來產(chǎn)生新線程。
thread.start_new_thread(function,args = ())
#_thread創(chuàng)建多線程 import _thread import time def job(name): print("name:%s,time:%s" %(name,time.ctime())) if __name__=="__main__": # 創(chuàng)建多個線程, 但是沒有開始執(zhí)行任務(wù) _thread.start_new_thread(job,("thread1",)) _thread.start_new_thread(job,("thread2",)) while True: #盲等待 passthreading通過實(shí)例化Thread類創(chuàng)建多線程
_thread模塊提供了低級別的、原始的線程以及一個簡單的鎖。
threading模塊是對_thread再封裝,對使用者更友好
通過實(shí)例化Thread對象創(chuàng)建線程,Thread的方法有:
run() #Method representing the thread"s activity.
start() #Start the thread"s activity.
join() #Wait until the thread terminates.
is_alive() #Return whether the thread is alive.
import threading def job(name): print("當(dāng)前執(zhí)行的任務(wù)名:",name) print("當(dāng)前線程個數(shù):",threading.active_count()) print("當(dāng)前線程信息:",threading.current_thread()) if __name__=="__main__": t1 = threading.Thread(target=job,name="thread1",args=("job1",)) t2 = threading.Thread(target=job,name="thread2",args=("job2",)) t1.start() #Start the thread"s activity. t2.start()使用多線程與不使用多線程的對比
不使用多線程執(zhí)行任務(wù),程序會一直等待sleep時間過去,在執(zhí)行下一條命令。
#不使用多線程 import time def music(name): for i in range(2): print("i am listening :",name) time.sleep(2) def read(book): for i in range(2): print("i am reading :",book) time.sleep(1) if __name__ == "__main__": start_time = time.time() music("空空如也") read("面紗") print("花費(fèi)時間: %s" %(time.time()-start_time))
使用多線程執(zhí)行任務(wù),在遇到某一線程需要等待時,會執(zhí)行其他線程
Thread.join()會等待當(dāng)前線程執(zhí)行結(jié)束,再執(zhí)行主線程。
import threading import time def music(name): for i in range(2): print("i am listening :",name) time.sleep(2) def read(book): for i in range(2): print("i am reading :",book) time.sleep(1) if __name__=="__main__": start_time = time.time() t1 = threading.Thread(target=music,args=("空空如也",)) t2 = threading.Thread(target=read,args=("面紗",)) t1.start() t2.start() t1.join() #等待線程執(zhí)行結(jié)束,才執(zhí)行主程序,防止主線程阻塞子線程 t2.join() end_time = time.time() print("任務(wù)執(zhí)行時間:",end_time-start_time)守護(hù)線程setDeamon
當(dāng)申明一個子線程為守護(hù)線程時,主線程結(jié)束時,子線程也結(jié)束。
申明守護(hù)線程需要在開啟線程之前。
import threading import time def music(name): for i in range(2): print("listening music :",name) time.sleep(4) def code(pro): for i in range(2): print("i am coding :",pro) time.sleep(5) if __name__=="__main__": st_time = time.time() t1 = threading.Thread(target=music,args=("hello",)) t2 = threading.Thread(target=code,args=("mydiff",)) #將線程申明為守護(hù)線程,如果設(shè)置為True,當(dāng)主線程結(jié)束,子線程也結(jié)束 #必須在啟動線程之前進(jìn)行設(shè)置 t1.setDaemon(True) t2.setDaemon(True) #主線程執(zhí)行結(jié)束之后,子線程還沒來得及執(zhí)行結(jié)束,整個程序就退出了 t1.start() t2.start() end_time = time.time() print("運(yùn)行時間:",end_time-st_time)線程同步
如果多個線程共同對某個數(shù)據(jù)修改,則可能出現(xiàn)不可預(yù)料的結(jié)果,為了保證數(shù)據(jù)的正確性,需要對多個線程進(jìn)行同步。
使用Thread對象的Lock和Rlock可以實(shí)現(xiàn)簡單的線程同步,這兩個對象都有acquire方法和release方法,對于那些需要每次只允許一個線程操作的數(shù)據(jù),可以將其操作放到acquire和release方法之間。
import threading def add(lock): #操作變量之前加鎖 lock.acquire() global money for i in range(1389993): money+=1 #變量操作完成之后,解鎖 lock.release() def reduce(lock): #操作變量之前加鎖 lock.acquire() global money for i in range(4728937): money-=1 #變量操作完成之后,解鎖 lock.release() if __name__=="__main__": money = 0 lock = threading.Lock() #示例化一個鎖對象 t1 = threading.Thread(target=add,args=(lock,)) t2 = threading.Thread(target=reduce,args=(lock,)) t1.start() t2.start() t1.join() t2.join() print("最終金額為:",money)GIL全局解釋器鎖
Python 代碼的執(zhí)行由 Python 虛擬機(jī)(也叫解釋器主循環(huán))來控制。Python 在設(shè)計之初就考慮到要在主循環(huán)中,同時只有一個線程在執(zhí)行,就像單 CPU 的系統(tǒng)中運(yùn)行多個進(jìn)程那樣,內(nèi)存中可以存放多個程序,但任意時刻,只有一個程序在 CPU 中運(yùn)行。同樣地,雖然 Python 解釋器中可以“運(yùn)行”,多個線程,但在任意時刻,只有一個線程在解釋器中運(yùn)行。
對 Python 虛擬機(jī)的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運(yùn)行。
執(zhí)行過程:
1). 設(shè)置GIL 2). 切換到線程去運(yùn)行對應(yīng)的任務(wù); 3). 運(yùn)行 - 執(zhí)行完了 - time.sleep() - 獲取其他信息才能繼續(xù)執(zhí)行, eg: 從網(wǎng)絡(luò)上獲取網(wǎng)頁信息等; 3. 把線程設(shè)置為睡眠狀態(tài) 4. 解鎖GIL 5.再次重復(fù)執(zhí)行上述內(nèi)容;生產(chǎn)者消費(fèi)者模型
在工作中,某些模塊生成一些數(shù)據(jù),由另一些模塊負(fù)責(zé)處理。產(chǎn)生數(shù)據(jù)的模塊,就形象地稱為生產(chǎn)者;而處理數(shù)據(jù)的模塊,就稱為消費(fèi)者。在生產(chǎn)者與消費(fèi)者之間在加個緩沖區(qū),我們形象的稱之為倉庫,生產(chǎn)者負(fù)責(zé)往倉庫了進(jìn)商品,而消費(fèi)者負(fù)責(zé)從倉庫里拿商品,這就構(gòu)成了生產(chǎn)者消費(fèi)者模式。
這里,我們用生產(chǎn)者消費(fèi)者模型來實(shí)現(xiàn)多線程的網(wǎng)址訪問,節(jié)省時間。
#多線程實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型 #實(shí)現(xiàn)不同的網(wǎng)址或ip訪問 import threading from urllib.request import urlopen def create_data(): with open("ips.txt","w") as f: f.write("www.baidu.com ") f.write("www.163.com ") for i in range(100): f.write("172.25.254.%s " %(i+1)) def creat_url(filename="ips.txt"): ports=[80,443] with open(filename) as f: ips = [url_info.strip() for url_info in f.readlines()] urls = ["http://%s:%s" %(ip,port) for ip in ips for port in ports] return urls def job(url): try: urlObj = urlopen(url) except Exception as e : print("Warnning!!! %s不可訪問" %(url)) else: print("%s可以訪問" %(url)) if __name__=="__main__": urls = creat_url() threads = [] for url in urls: t = threading.Thread(target=job,args=(url,)) threads.append(t) t.start() [thread.join() for thread in threads] print("任務(wù)執(zhí)行結(jié)束")再封裝threading.Thread類 無參版
對threading.Thread類的再封裝,執(zhí)行時無需傳遞參數(shù)
from threading import Thread class IpThread(Thread): def __init__(self): super(IpThread, self).__init__() # 將多線程需要執(zhí)行的任務(wù)重寫到run方法中; def run(self): print("this is a JOB") print(type(self)) t = IpThread() t.start()含參版
實(shí)現(xiàn)訪問Ip地址
import json from threading import Thread from urllib.request import urlopen class IpThread(Thread): #重寫構(gòu)造方法,如果執(zhí)行的任務(wù)需要傳遞參數(shù),那將參數(shù)與self綁定 def __init__(self,jobname,ip): super(IpThread, self).__init__() self.jobname = jobname self.ip = ip #將多線程需要執(zhí)行的任務(wù)重寫到run方法中 def run(self): print("this is a %s job" %(self.jobname)) #需要有一個參數(shù),傳遞ip url = "http://ip.taobao.com/service/getIpInfo.php?ip=%s" % (self.ip) try : # 根據(jù)url獲取網(wǎng)頁的內(nèi)容, 并且解碼為utf-8格式, 識別中文; result = urlopen(url).read().decode("utf-8") except Exception as e: print("訪問%s失敗" %(self.ip)) else: # 將獲取的字符串類型轉(zhuǎn)換為字典, 方便處理 d = json.loads(result)["data"] country = d["country"] city = d["city"] print("%s位于%s,城市為%s" %(self.ip,country,city)) if __name__=="__main__": ips = ["172.25.254.22","8.8.8.8","89.31.136.0"] threads = [] for ip in ips : t = IpThread(jobname="Clawer",ip=ip) threads.append(t) t.start() [thread.join() for thread in threads] print("程序執(zhí)行結(jié)束")線程池
線程池是一種多線程處理形式,處理過程中將任務(wù)添加到隊列,然后在創(chuàng)建線程后自動啟動這些任務(wù)。線程池線程都是后臺線程。每個線程都使用默認(rèn)的堆棧大小,以默認(rèn)的優(yōu)先級運(yùn)行,并處于多線程單元中。
from concurrent.futures import ThreadPoolExecutor import time #需要執(zhí)行的任務(wù) def job(): print("morning sheen") return "new day" if __name__=="__main__": #示例化對象,線程池里最多有10個線程 pool = ThreadPoolExecutor(max_workers=10) #往線程池里扔需要執(zhí)行的任務(wù),返回一個對象 _base.Future()示例化出來的 f1 = pool.submit(job) f2 = pool.submit(job) #判斷任務(wù)是否執(zhí)行結(jié)束 print(f1.done()) time.sleep(1) print(f2.done()) #判斷是否釋放了線程 #獲取執(zhí)行任務(wù)的結(jié)果 print(f1.result()) print(f2.result())線程池循環(huán)執(zhí)行任務(wù)
略
線程池執(zhí)行任務(wù)方式concurrent.futures.ThreadPoolExecutor,在提交任務(wù)的時候,有兩種方式,一種是submit()函數(shù),另一種是map()函數(shù),兩者的主要區(qū)別在于:
map可以保證輸出的順序, submit輸出的順序是亂的
如果你要提交的任務(wù)的函數(shù)是一樣的,就可以簡化成map。但是假如提交的任務(wù)函數(shù)是不一樣的,或者執(zhí)行的過程之可能出現(xiàn)異常(使用map執(zhí)行過程中發(fā)現(xiàn)問題會直接拋出錯誤)就要用到submit()
submit和map的參數(shù)是不同的,submit每次都需要提交一個目標(biāo)函數(shù)和對應(yīng)的參數(shù),map只需要提交一次目標(biāo)函數(shù),目標(biāo)函數(shù)的參數(shù)放在一個迭代器(列表,字典)里就可以。
from urllib.error import HTTPError from urllib.request import urlopen from concurrent.futures import ThreadPoolExecutor,as_completed import time URLS = ["http://httpbin.org", "http://example.com/", "https://api.github.com/"]*3 def get_page(url,timeout = 0.3): #爬取網(wǎng)頁信息 try: content = urlopen(url).read() return {"url":url, "len":len(content)} except HTTPError as e: return {"url":url, "len":0} # 方法1: submit提交任務(wù) start_time = time.time() pool = ThreadPoolExecutor(max_workers=20) #submit返回的是Future對象,對于Future對象可以簡單地理解為一個在未來完成的操作 futuresObj = [pool.submit(get_page, url) for url in URLS] # # 注意: 傳遞的是包含futures對象的序列, as_complete返回已經(jīng)執(zhí)行完任務(wù)的future對象, # # 直到所有的future對應(yīng)的任務(wù)執(zhí)行完成, 循環(huán)結(jié)束; for finish_fs in as_completed(futuresObj): print(finish_fs.result() ) #submit返回值Future的方法result(self, timeout=None) """Return the result of the call that the future represents. Args: timeout: The number of seconds to wait for the result if the future isn"t done. If None, then there is no limit on the wait time. Returns: The result of the call that the future represents.""" print("執(zhí)行時間:%s" %(time.time()-start_time)) # 方法2:通過map方式執(zhí)行 start2_time = time.time() pool2 = ThreadPoolExecutor(max_workers=20) for res in pool2.map(get_page, URLS): print(res) print("執(zhí)行時間:%s" %(time.time()-start2_time))
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/42484.html
摘要:在一個進(jìn)程內(nèi)部,要同時干多件事,就需要同時運(yùn)行多個子任務(wù),我們把進(jìn)程內(nèi)的這些子任務(wù)稱為線程??偨Y(jié)一下,多任務(wù)的實(shí)現(xiàn)方式有三種多進(jìn)程模式多線程模式多進(jìn)程多線程模式線程是最小的執(zhí)行單元,而進(jìn)程由至少一個線程組成。 進(jìn)程與線程 很多同學(xué)都聽說過,現(xiàn)代操作系統(tǒng)比如Mac OS X,UNIX,Linux,Windows等,都是支持多任務(wù)的操作系統(tǒng)。 什么叫多任務(wù)呢?簡單地說,就是操作系統(tǒng)可以同時...
摘要:協(xié)程,又稱微線程,纖程。最大的優(yōu)勢就是協(xié)程極高的執(zhí)行效率。生產(chǎn)者產(chǎn)出第條數(shù)據(jù)返回更新值更新消費(fèi)者正在調(diào)用第條數(shù)據(jù)查看當(dāng)前進(jìn)行的線程函數(shù)中有,返回值為生成器庫實(shí)現(xiàn)協(xié)程通過提供了對協(xié)程的基本支持,但是不完全。 協(xié)程,又稱微線程,纖程。英文名Coroutine協(xié)程看上去也是子程序,但執(zhí)行過程中,在子程序內(nèi)部可中斷,然后轉(zhuǎn)而執(zhí)行別的子程序,在適當(dāng)?shù)臅r候再返回來接著執(zhí)行。 最大的優(yōu)勢就是協(xié)程極高...
摘要:協(xié)程實(shí)現(xiàn)連接在網(wǎng)絡(luò)通信中,每個連接都必須創(chuàng)建新線程或進(jìn)程來處理,否則,單線程在處理連接的過程中,無法接受其他客戶端的連接。所以我們嘗試使用協(xié)程來實(shí)現(xiàn)服務(wù)器對多個客戶端的響應(yīng)。 協(xié)程實(shí)現(xiàn)TCP連接 在網(wǎng)絡(luò)通信中,每個連接都必須創(chuàng)建新線程(或進(jìn)程) 來處理,否則,單線程在處理連接的過程中, 無法接受其他客戶端的連接。所以我們嘗試使用協(xié)程來實(shí)現(xiàn)服務(wù)器對多個客戶端的響應(yīng)。與單一TCP通信的構(gòu)架...
摘要:多進(jìn)程執(zhí)行任務(wù)結(jié)束,創(chuàng)建進(jìn)程和銷毀進(jìn)程是時間的,如果長度不夠,會造成多線程快過多進(jìn)程多線程執(zhí)行任務(wù)結(jié)束,進(jìn)程間通信生產(chǎn)者消費(fèi)者模型與隊列演示了生產(chǎn)者和消費(fèi)者的場景。 進(jìn)程 Python是運(yùn)行在解釋器中的語言,查找資料知道,python中有一個全局鎖(GIL),在使用多進(jìn)程(Thread)的情況下,不能發(fā)揮多核的優(yōu)勢。而使用多進(jìn)程(Multiprocess),則可以發(fā)揮多核的優(yōu)勢真正地提...
摘要:我們來編寫一個簡單的服務(wù)器程序,它接收客戶端連接,回復(fù)客戶端發(fā)來的請求。如果一切順利,新浪的服務(wù)器接受了我們的連接,一個連接就建立起來的,后面的通信就是發(fā)送網(wǎng)頁內(nèi)容了。 TCP TCP(Transmission Control Protocol 傳輸控制協(xié)議)是一種面向連接的、可靠的、基于字節(jié)流的傳輸層通信協(xié)議,由IETF的RFC 793定義。在簡化的計算機(jī)網(wǎng)絡(luò)OSI模型中,它完成第四...
閱讀 2760·2021-11-16 11:45
閱讀 1668·2021-09-26 10:19
閱讀 2062·2021-09-13 10:28
閱讀 2822·2021-09-08 10:46
閱讀 1547·2021-09-07 10:13
閱讀 1543·2019-08-30 13:50
閱讀 1383·2019-08-30 11:17
閱讀 1463·2019-08-29 13:18