摘要:限制同時運行線程數(shù)使用類就行,在內(nèi)部管理著一個計數(shù)器。當(dāng)計數(shù)器到時,再調(diào)用就會阻塞,直到其他線程來調(diào)用,這樣就限制了同時運行線程的數(shù)量。
事前最好了解一下什么是進(jìn)程,什么是線程,什么是GIL,本文不再贅述,直接介紹模塊的使用:
推薦1,推薦2,推薦3,更多自尋
普通的python爬蟲是單進(jìn)程單線程的,這樣在遇到大量重復(fù)的操作時就只能逐個進(jìn)行,我們就很難過了。舉個栗子:你有1000個美圖的鏈接,逐個喂給下載器(函數(shù)),看著圖片只能一個個蹦出來,你不心急嗎?于是我們想,能不能同時跑多個下載器,實現(xiàn)多圖同時下載?——答案是可以的,使用多進(jìn)程/多線程,把每個帶著不同參數(shù)下載器分給每個進(jìn)程/線程就,然后同時跑多個進(jìn)程/線程就行了。
本文就介紹如何用多線程和多進(jìn)程給爬蟲加速
補(bǔ)充主線程與子線程(進(jìn)程同理):
一個py程序就有一個主線程,主線程負(fù)責(zé)整個py程序的代碼,當(dāng)主線程處理到啟用多線程的代碼時,就會創(chuàng)建若干個子線程,這里就有選擇了,主線程是等待子線程的結(jié)束再繼續(xù)處理還是直接繼續(xù)處理讓子線程在外頭跑
多進(jìn)程Python標(biāo)準(zhǔn)庫原本有threading和multiprocessing模塊編寫相應(yīng)的多線程/多進(jìn)程代碼。但從Python3.2開始,標(biāo)準(zhǔn)庫為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現(xiàn)了對threading和multiprocessing的更高級的抽象,對編寫線程池/進(jìn)程池提供了直接的支持。多進(jìn)程我們介紹futures的ProcessPoolExecutor
注:python 2.7 請安裝future模塊,pip install future
ProcessPoolExecutor類是Executor類的子類,實例化ProcessPoolExecutor類以創(chuàng)建進(jìn)程池,在實例化的過程中應(yīng)指定同時運行的最大進(jìn)程數(shù)
from concurrent.futures import ProcessPoolExecutor pool = ProcessPoolExecutor(max_workers=4) # 運行最大進(jìn)程數(shù)4 #進(jìn)程池的操作... pool.shutdown(wait=True) # 關(guān)閉進(jìn)程池,默認(rèn)等待所有進(jìn)程的完成。 print("Deep") # 有shutdown的情況下所有進(jìn)程完成后才會運行下面的print,沒有的話會馬上運行 "創(chuàng)建進(jìn)程也可用with,這時會自帶shutdown功能 with ProcessPoolExecutor(4) as pool: #進(jìn)程池的操作... "
該類有兩種方法對進(jìn)程池提交任務(wù)建立進(jìn)程(函數(shù)及一組參數(shù)構(gòu)成一個任務(wù)),分別是submit()和map(),如果單純想多開進(jìn)程別無他想,用哪個都行,但submit()會有更靈活的用法
map(fn,*iterables)fn:函數(shù)
*iterables:函數(shù)每個參數(shù)的集合,N個參數(shù)就接N個集合
可以理解這是python自帶map()的多進(jìn)程版,他返回的是一個迭代器,包含每個任務(wù)對應(yīng)的返回值(有序的),下面用例子來分析
from concurrent.futures import ProcessPoolExecutor import time def test(x): time.sleep(x) # 時間阻塞 print(str(x)+"s") return x if __name__ == "__main__": with ProcessPoolExecutor(4) as pool: p = pool.map(test,[2,3,10,5,6]) for i in p: print(i)
輸出
2s 2 3s 3 5s 6s 10s 10 5 6
分析(下面以參數(shù)代替某個進(jìn)程):
帶s的是函數(shù)輸出的,進(jìn)程池最大允許4個進(jìn)程同時運行,所以參數(shù) 2,3,10,5 首先一起進(jìn)去。2最快完成,馬上讓給6進(jìn)去,2+6<10 ,所以后進(jìn)6完成得比10快,最后輸出順序就是 2s,3s,5s,6s,10s
不帶s的是for循環(huán)打印迭代器中的結(jié)果,由輸出可見,i的值分配是會等待進(jìn)程完成返回值的,等2的完成返回2,等3的完成返回3,等10的完成返回10,由于10完成前5和6早就完成了,所以返回10后緊接著返回5和6,最后輸出順序為2,3,10,5,6,是有序的,對應(yīng)各任務(wù)的返回值
在爬蟲中,上面代碼中的時間阻塞會對應(yīng)著網(wǎng)絡(luò)I/O阻塞,任務(wù)中往往包含著網(wǎng)絡(luò)請求。比如你有很多個圖片鏈接,就寫一個下載圖片的函數(shù)(接收一個圖片鏈接的參數(shù)),把函數(shù)和圖片鏈接的集合喂給map()就實現(xiàn)多進(jìn)程了加速了。
submit(fn, *arg)fn:函數(shù)
*arg:函數(shù)的參數(shù)
該方法是往進(jìn)程池中提交可回調(diào)的任務(wù),并返回一個future實例。提交多個任務(wù)可用循環(huán)實現(xiàn),返回的future實例用列表存起來,每個future代表一個進(jìn)程。關(guān)于future對象有許多方法:
future.running():判斷某個future(進(jìn)程)是否運行中
future.done():判斷某個future(進(jìn)程)是否正常結(jié)束
future.cancel():終止某個future(進(jìn)程),終止失敗返回False,成功返回True
future.result():獲取future對應(yīng)任務(wù)返回的結(jié)果。如果future還沒完成就會去等待
future.add_done_callback(fn):接收函數(shù)fn,將fn綁定到future對象上。當(dāng)future對象被終止或完成時,fn將會被調(diào)用并接受該future對象
as_completed(fs):接收futures列表,futures列表中一旦有某個future(進(jìn)程)完成就將該future對象yield回來,是個迭代器
from concurrent.futures import ProcessPoolExecutor,as_completed import time def test(x): time.sleep(x) print(str(x)+"s") return x if __name__ == "__main__": with ProcessPoolExecutor(4) as pool: futures = [pool.submit(test,i) for i in [2,3,10,5,6]] """for j in futures: print(j.result()) # 對應(yīng)接收參數(shù)有序輸出,輸出2,3,10,5,6 """ for j in as_completed(futures): print(j.result()) # 對應(yīng)進(jìn)程完成順序輸出,輸出2,3,5,6,10多線程
建議小心使用,雖然多線程能實現(xiàn)高并發(fā),但由于線程資源共享的特性,某個線程操作這些共享的資源時可能操到一半就停止讓給另一個線程操作,導(dǎo)致錯亂的發(fā)生。為避免此情況發(fā)生對某些操作需要加鎖,所以這里介紹對鎖有支持的threading模塊,python自帶直接導(dǎo)入。
如果你確信這些操作不會發(fā)生錯亂,可以直接使用concurrent.future 的 ThreadPoolExecutor,方法什么的和ProcessPoolExecutor的一樣
創(chuàng)建線程有兩種方法:
實例化 threading.Thread 類,target接收函數(shù),arg以可迭代形式接收參數(shù)。這種方法最簡單
import threading import time def test(x): time.sleep(x) print(str(x)+"s") return x t1 = threading.Thread(target=test, args=(1,)) # 創(chuàng)建線程 t2 = threading.Thread(target=test, args=(3,)) t1.start() # 啟動線程 t2.start()
繼承threading.Thread 類,重寫run方法,把函數(shù)及參數(shù)接收寫進(jìn)自己寫的多線程類中。這種方法更靈活,threading.Thread 類并沒有供獲取線程調(diào)用函數(shù)返回值的方法,如果需要函數(shù)返回值就需要繼承該類自己實現(xiàn)
import threading import time class TestThread(threading.Thread): def __init__(self,x): threading.Thread.__init__(self) self.x = x # 參數(shù)接收 def run(self): time.sleep(self.x) # 原來的函數(shù)寫到run中 print(str(self.x)+"s") def result(self): # 實現(xiàn)獲取調(diào)用函數(shù)的返回值的方法 return self.x t1 = TestThread(1) #創(chuàng)建線程 t2 = TestThread(3) t1.start() # 啟動線程 t2.start() t1.join() # 等待線程結(jié)束 t2.join() print(t1.result(),t2.result())
線程相關(guān)方法和屬性:
Thread.start():啟動線程
Thread.join():等待線程的結(jié)束,沒有join的話會接著運行join下面的代碼
Thread.is_alive():判斷線程是否在運行,線程未開啟/結(jié)束時返回 False
Thread.name:返回線程的名字,默認(rèn)線程名是Thread-N,N指第N個開啟的線程
Thread.setName(str):給線程命名
Thread.setDaemon(True/False):設(shè)置子線程是否會隨主線程結(jié)束而結(jié)束,原本所有子線程默認(rèn)是不會隨主線程結(jié)束而結(jié)束的
鎖線程間資源共享,如果多個線程共同對某個數(shù)據(jù)修改,可能會出現(xiàn)錯誤,為了保證數(shù)據(jù)的正確性,需要對多個線程進(jìn)行同步。這時就需要引入鎖了(利用GIL),鎖只有一個,一個線程在持有鎖的狀態(tài)下對某些數(shù)據(jù)進(jìn)行操作,其他線程就無法對該數(shù)據(jù)進(jìn)行操作,直至該線程釋放鎖讓其他線程搶,誰搶到誰就有權(quán)修改。
threading提供Lock和RLock兩類鎖,前者一個線程只能獲取獲取一次鎖,后者允許一個線程能重復(fù)獲取鎖。如果某個線程對全局?jǐn)?shù)據(jù)的操作是割裂的(分塊的),那就使用RLock。
acquire():獲取鎖
release():釋放鎖
有數(shù)據(jù)操作放在acquire 和 release 之間,就不會出現(xiàn)多個線程修改同一個數(shù)據(jù)的風(fēng)險了
acquire 和 release 必須成對存在,如果一個線程只拿不放,其他線程沒有鎖能搶就只能永遠(yuǎn)阻塞(停止)
一個錯亂的例子及鎖的使用:
import time, threading lock = threading.Lock() # rlock = threading.RLock() balance = [0] def test(n): for i in range(100000): # 理想的情況是執(zhí)行了+n,-n操作后才讓另一個線程處理,結(jié)果永0 #lock.acquire() balance[0] = balance[0] + n # 某個線程可能處理到這里就終止讓給另一個線程處理了,循環(huán)一大,結(jié)果可能錯亂不為0 balance[0] = balance[0] - n #lock.release() t1 = threading.Thread(target=test, args=(5,)) t2 = threading.Thread(target=test, args=(8.0,)) t1.start() t2.start() t1.join() t2.join() print(balance[0])
在不加鎖的情況下多跑幾次,你會的到不同的結(jié)果。但是加了鎖之后,+n,-n兩個操作完整執(zhí)行,不會中途中斷,結(jié)果永0。
限制同時運行線程數(shù)使用 threading.Semaphore 類就行,Semaphore 在內(nèi)部管理著一個計數(shù)器。調(diào)用 acquire() 會使這個計數(shù)器減1,release() 則是加1。計數(shù)器的值永遠(yuǎn)不會小于 0。當(dāng)計數(shù)器到 0 時,再調(diào)用 acquire() 就會阻塞,直到其他線程來調(diào)用release(),這樣就限制了同時運行線程的數(shù)量。
使用上非常簡單,實例化Semaphore并指定線程數(shù)后,給函數(shù)的頭加個acquire(),尾加個release()就行。
import threading, time def test(x): semaphore.acquire() time.sleep(x) print(x) semaphore.release() semaphore = threading.Semaphore(4) # 最大4個線程同時進(jìn)行 ts = [threading.Thread(target=test,args=(i,)) for i in [2,3,5,10,6]] [t.start() for t in ts] "輸出:2,3,5,6,10 (原理和上面多進(jìn)程的那個差不多)"
關(guān)于threading的其他高級用法本文并未提及,以上都是些常用的用法,如果有更高級的需要,可以參考這文章
應(yīng)用在爬蟲上講了這么多,都是模塊的用法,沒怎么提到爬蟲。那么最后大概的講下如何把多進(jìn)程/多線程運用到爬蟲中,并給個代碼實例用作參考。
如果爬蟲需要重復(fù)進(jìn)行某個操作(如下載一張圖片,爬取一張網(wǎng)頁的源碼,破解一次加密【加密耗cpu最好多進(jìn)程】),那把這個操作抽象成一個接收相應(yīng)參數(shù)的函數(shù),把函數(shù)喂給進(jìn)程/線程即可。
沒了,大概就這么用?? ?
下面給個多進(jìn)程/多線程結(jié)合的網(wǎng)易云音樂評論下載器(下載某首音樂的多頁評論),包含加密算法,如不清楚可看之前的文章,我們用多進(jìn)程加速加密過程,用多線程加速爬取過程。
本代碼較長,長到高亮效果都沒有了,因此該長代碼分為兩部分,前半部分是之前文章提到的加密方法,后半部分是本文的多進(jìn)程多線程重點代碼:
import json, re, base64, random, requests, binascii, threading from Crypto.Cipher import AES#新的加密模塊只接受bytes數(shù)據(jù),否者報錯,密匙明文什么的要先轉(zhuǎn)碼 from concurrent.futures import ProcessPoolExecutor from math import ceil secret_key = b"0CoJUm6Qyw8W8jud"#第四參數(shù),aes密匙 pub_key ="010001"#第二參數(shù),rsa公匙組成 modulus = "00e0b509f6259df8642dbc35662901477df22677ec152b5ff68ace615bb7b725152b3ab17a876aea8a5aa76d2e417629ec4ee341f56135fccf695280104e0312ecbda92557c93870114af6c9d05c4f7f0c3685b7a46bee255932575cce10b424d813cfe4875d3e82047b97ddef52741d546b8e289dc6935b3ece0462db0a22b8e7"#第三參數(shù),rsa公匙組成 headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.146 Safari/537.36"} def random_16(): return bytes("".join(random.sample("1234567890DeepDarkFantasy",16)),"utf-8") #aes加密 def aes_encrypt(text,key): pad = 16 - len(text)%16#對長度不是16倍數(shù)的字符串進(jìn)行補(bǔ)全,然后在轉(zhuǎn)為bytes數(shù)據(jù) try: #如果接到bytes數(shù)據(jù)(如第一次aes加密得到的密文)要解碼再進(jìn)行補(bǔ)全 text = text.decode() except: pass text = text + pad * chr(pad) try: text = text.encode() except: pass encryptor = AES.new(key,AES.MODE_CBC,b"0102030405060708") ciphertext = encryptor.encrypt(text) ciphertext = base64.b64encode(ciphertext)#得到的密文還要進(jìn)行base64編碼 return ciphertext #rsa加密 def rsa_encrypt(ran_16,pub_key,modulus): text = ran_16[::-1]#明文處理,反序并hex編碼 rsa = int(binascii.hexlify(text), 16) ** int(pub_key, 16) % int(modulus, 16) return format(rsa, "x").zfill(256) #返回加密后內(nèi)容 def encrypt_data(data): ran_16 = random_16() text = json.dumps(data) params = aes_encrypt(text,secret_key) params = aes_encrypt(params,ran_16) encSecKey = rsa_encrypt(ran_16,pub_key,modulus) return {"params":params.decode(), "encSecKey":encSecKey }
class OnePageComment(threading.Thread): # 下載一頁評論的線程類 def __init__(self,post_url, enc_data): threading.Thread.__init__(self) self.post_url = post_url self.enc_data = enc_data self.comment = "" # 創(chuàng)建一個comment變量儲存爬到的數(shù)據(jù) def run(self): semaphore.acquire() content = requests.post(self.post_url, headers = headers, data = self.enc_data ).json() if "hotComments" in content: if content["hotComments"]: self.comment += "*************精彩評論 " self.common(content, "hotComments") self.comment += " *************最新評論 " self.common(content, "comments") else: self.common(content, "comments") semaphore.release() def common(self, content,c_type): for each in content[c_type]: if each ["beReplied"]: if each["beReplied"][0]["content"]: self.comment += each["content"] + " 回復(fù): " + each["beReplied"][0]["content"] + " " + "-" * 60 + " " else: self.comment += each["content"] + " " + "-" * 60 + " " def get_comment(self): # 選擇返回評論而不是直接寫入文件,因為多個線程同時操作一個文件有風(fēng)險,應(yīng)先返回,后統(tǒng)一寫入 return self.comment def get_enc_datas(pages, max_workers=4): # 多進(jìn)程加密 raw_datas = [] for i in range(pages): if i == 0: raw_datas.append({"rid":"", "offset":"0", "total":"true", "limit":"20", "csrf_token":""}) else: raw_datas.append({"rid":"", "offset":str(i*20), "total":"false", "limit":"20", "csrf_token":""}) with ProcessPoolExecutor(max_workers) as pool: # 多進(jìn)程適合計算密集型任務(wù),如加密 result = pool.map(encrypt_data,raw_datas) return list(result) def one_song_comment(id_): # 爬取一首歌的評論并寫入txt,網(wǎng)絡(luò)I/O密集使用多線程 post_url = "http://music.163.com/weapi/v1/resource/comments/R_SO_4_" + str(id_) + "?csrf_token=" ts = [OnePageComment(post_url,i) for i in enc_datas] [i.start() for i in ts] [i.join() for i in ts] comments = [i.get_comment() for i in ts] with open(id_ + ".txt", "w", encoding="utf-8") as f: f.writelines(comments) if __name__ == "__main__": semaphore = threading.Semaphore(4) # 最大線程4 enc_datas = get_enc_datas(10) # 獲取加密后的數(shù)據(jù),對所有歌曲都是通用的,這里有十頁的加密數(shù)據(jù),對應(yīng)爬十頁評論 one_song_comment("29498682")
效果提升驚人!!不信你跑一下上面的程序,然后和自己寫的單線程/單進(jìn)程比較
cpu和網(wǎng)絡(luò)都跑到了峰值,網(wǎng)絡(luò)峰值在cpu峰值之后,因為是先多進(jìn)程加密數(shù)據(jù),后多線程爬取
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/42342.html
摘要:所以只要得到登錄后的并必要時進(jìn)行更新,服務(wù)器就會認(rèn)定其為登錄狀態(tài)??纯慈思抑?,加密到連名字都沒有了,還混淆,如何下手綜上,適用于沒有加密的登錄或者加密算法比較簡單并且不常更新的網(wǎng)站。遇上無解的加密算法要么手操拷貝,要么請大佬出場。 某些網(wǎng)站,登錄和沒登錄,用戶的權(quán)限是不一樣的,帳號登錄之后才能獲取更多的信息。更有甚者一上來就是登錄界面,不登錄就不給你進(jìn)去(如p站)。爬取目標(biāo)不用登錄固...
摘要:以下這些項目,你拿來學(xué)習(xí)學(xué)習(xí)練練手。當(dāng)你每個步驟都能做到很優(yōu)秀的時候,你應(yīng)該考慮如何組合這四個步驟,使你的爬蟲達(dá)到效率最高,也就是所謂的爬蟲策略問題,爬蟲策略學(xué)習(xí)不是一朝一夕的事情,建議多看看一些比較優(yōu)秀的爬蟲的設(shè)計方案,比如說。 (一)如何學(xué)習(xí)Python 學(xué)習(xí)Python大致可以分為以下幾個階段: 1.剛上手的時候肯定是先過一遍Python最基本的知識,比如說:變量、數(shù)據(jù)結(jié)構(gòu)、語法...
摘要:一般用進(jìn)程池維護(hù),的設(shè)為數(shù)量。多線程爬蟲多線程版本可以在單進(jìn)程下進(jìn)行異步采集,但線程間的切換開銷也會隨著線程數(shù)的增大而增大。異步協(xié)程爬蟲引入了異步協(xié)程語法。 Welcome to the D-age 對于網(wǎng)絡(luò)上的公開數(shù)據(jù),理論上只要由服務(wù)端發(fā)送到前端都可以由爬蟲獲取到。但是Data-age時代的到來,數(shù)據(jù)是新的黃金,毫不夸張的說,數(shù)據(jù)是未來的一切?;诮y(tǒng)計學(xué)數(shù)學(xué)模型的各種人工智能的出現(xiàn)...
摘要:批評的人通常都會說的多線程編程太困難了,眾所周知的全局解釋器鎖,或稱使得多個線程的代碼無法同時運行。多線程起步首先讓我們來創(chuàng)建一個名為的模塊。多進(jìn)程可能比多線程更易使用,但需要消耗更大的內(nèi)存。 批評 Python 的人通常都會說 Python 的多線程編程太困難了,眾所周知的全局解釋器鎖(Global Interpreter Lock,或稱 GIL)使得多個線程的 Python 代碼無...
閱讀 3237·2021-11-23 09:51
閱讀 1042·2021-08-05 09:58
閱讀 676·2019-08-29 16:05
閱讀 985·2019-08-28 18:17
閱讀 3039·2019-08-26 14:06
閱讀 2734·2019-08-26 12:20
閱讀 2171·2019-08-26 12:18
閱讀 3074·2019-08-26 11:56