摘要:里提供了多個(gè)用于控制多線程同步的同步原語,這些原語,包含在的標(biāo)準(zhǔn)庫當(dāng)中。例如總結(jié)多線程同步,說難也難,說不難也很容易,關(guān)鍵是要看你的業(yè)務(wù)場景和解決問題的思路,盡量降低多線程之間的依賴,理清楚業(yè)務(wù)流程,選擇合適的方法,則事盡成。
概述
多線程給我們帶來的好處是可以并發(fā)的執(zhí)行多個(gè)任務(wù),特別是對(duì)于I/O密集型的業(yè)務(wù),使用多線程,可以帶來成倍的性能增長。
可是當(dāng)我們多個(gè)線程需要修改同一個(gè)數(shù)據(jù),在不做任何同步控制的情況下,產(chǎn)生的結(jié)果往往是不可預(yù)料的,比如兩個(gè)線程,一個(gè)輸出hello,一個(gè)輸出world,實(shí)際運(yùn)行的結(jié)果,往往可能是一個(gè)是hello world,一個(gè)是world hello。
python里提供了多個(gè)用于控制多線程同步的同步原語,這些原語,包含在python的標(biāo)準(zhǔn)庫threading.py當(dāng)中。我今天簡單的介紹一下python里的這些控制多線程同步的原語,包括:Locks、RLocks、Semaphores、Events、Conditions和Barriers,你也可以繼承這些類,實(shí)現(xiàn)自己的同步控制原語。
Lock(鎖)Locks是python里最簡單的同步原語,只包括兩個(gè)狀態(tài):locked和unlocked,剛創(chuàng)建時(shí)狀態(tài)是unlocked。Locks有兩個(gè)方法,acquire和release。acquire方法加鎖,release方法釋放鎖,如果acquire枷鎖失敗,則阻塞,表明其他線程已經(jīng)加鎖。release方法只有當(dāng)狀態(tài)是locked調(diào)用方法True,如果是unlocked狀態(tài),調(diào)用release方法會(huì)拋出RunTimeError異常。例如代碼:
from threading import Lock, Thread lock = Lock() g = 0 def add_one(): """ Just used for demonstration. It’s bad to use the ‘global’ statement in general. """ global g lock.acquire() g += 1 lock.release() def add_two(): global g lock.acquire() g += 2 lock.release() threads = [] for func in [add_one, add_two]: threads.append(Thread(target=func)) threads[-1].start() for thread in threads: """ Waits for threads to complete before moving on with the main script. """ thread.join() print(g)
最終輸出的結(jié)果是3,通過Lock的使用,雖然在兩個(gè)線程中修改了同一個(gè)全局變量,但兩個(gè)線程是順序計(jì)算出結(jié)果的。
RLock(循環(huán)鎖)上面的Lock對(duì)象雖然能達(dá)到同步的效果,但是無法得知當(dāng)前是那個(gè)線程獲取到了鎖。如果鎖沒被釋放,則其他獲取這個(gè)鎖的線程都會(huì)被阻塞住。如果不想阻塞,可以使用RLock,例如:
# 使用Lock import threading num = 0 lock = Threading.Lock() lock.acquire() num += 1 lock.acquire() # 這個(gè)地方阻塞 num += 2 lock.release() # 使用RLock lock = Threading.RLock() lock.acquire() num += 3 lock.acquire() # 這不會(huì)阻塞 num += 4 lock.release() lock.release() # 這個(gè)地方注意是釋放兩次鎖Semaphores
Semaphores是個(gè)最簡單的計(jì)數(shù)器,有兩個(gè)方法acquire()和release(),如果有多個(gè)線程調(diào)用acquire()方法,acquire()方法會(huì)阻塞住,每當(dāng)調(diào)用次acquire方法,就做一次減1操作,每當(dāng)release()方法調(diào)用此次,就加1,如果最后的計(jì)數(shù)數(shù)值大于調(diào)用acquire()方法的線程數(shù)目,release()方法會(huì)拋出ValueError異常。下面是個(gè)生產(chǎn)者消費(fèi)者的示例。
import random, time from threading import BoundedSemaphore, Thread max_items = 5 container = BoundedSemaphore(max_items) def producer(nloops): for i in range(nloops): time.sleep(random.randrange(2, 5)) print(time.ctime(), end=": ") try: container.release() print("Produced an item.") except ValueError: print("Full, skipping.") def consumer(nloops): for i in range(nloops): time.sleep(random.randrange(2, 5)) print(time.ctime(), end=": ") if container.acquire(False): print("Consumed an item.") else: print("Empty, skipping.") threads = [] nloops = random.randrange(3, 6) print("Starting with %s items." % max_items) threads.append(Thread(target=producer, args=(nloops,))) threads.append(Thread(target=consumer, args=(random.randrange(nloops, nloops+max_items+2),))) for thread in threads: # Starts all the threads. thread.start() for thread in threads: # Waits for threads to complete before moving on with the main script. thread.join() print("All done.")
threading模塊還提供了一個(gè)Semaphore對(duì)象,它允許你可以任意次的調(diào)用release函數(shù),但是最好還是使用BoundedSemaphore對(duì)象,這樣在release調(diào)用次數(shù)過多時(shí)會(huì)報(bào)錯(cuò),有益于查找錯(cuò)誤。Semaphores最長用來限制資源的使用,比如最多十個(gè)進(jìn)程。
Eventsevent可以充當(dāng)多進(jìn)程之間的通信工具,基于一個(gè)內(nèi)部的標(biāo)志,線程可以調(diào)用set()和clear()方法來操作這個(gè)標(biāo)志,其他線程則阻塞在wait()函數(shù),直到標(biāo)志被設(shè)置為True。下面的代碼展示了如何利用Events來追蹤行為。
import random, time from threading import Event, Thread event = Event() def waiter(event, nloops): for i in range(nloops): print(“%s. Waiting for the flag to be set.” % (i+1)) event.wait() # Blocks until the flag becomes true. print(“Wait complete at:”, time.ctime()) event.clear() # Resets the flag. print() def setter(event, nloops): for i in range(nloops): time.sleep(random.randrange(2, 5)) # Sleeps for some time. event.set() threads = [] nloops = random.randrange(3, 6) threads.append(Thread(target=waiter, args=(event, nloops))) threads[-1].start() threads.append(Thread(target=setter, args=(event, nloops))) threads[-1].start() for thread in threads: thread.join() print(“All done.”)Conditions
conditions是比events更加高級(jí)一點(diǎn)的同步原語,可以用戶多線程間的通信和通知。比如A線程通知B線程資源已經(jīng)可以被消費(fèi)。其他的線程必須在調(diào)用wait()方法前調(diào)用acquire()方法。同樣的,每個(gè)線程在資源使用完以后,要調(diào)用release()方法,這樣其他線程就可以繼續(xù)執(zhí)行了。下面是使用conditions實(shí)現(xiàn)的一個(gè)生產(chǎn)者消費(fèi)者的例子。
import random, time from threading import Condition, Thread condition = Condition() box = [] def producer(box, nitems): for i in range(nitems): time.sleep(random.randrange(2, 5)) # Sleeps for some time. condition.acquire() num = random.randint(1, 10) box.append(num) # Puts an item into box for consumption. condition.notify() # Notifies the consumer about the availability. print("Produced:", num) condition.release() def consumer(box, nitems): for i in range(nitems): condition.acquire() condition.wait() # Blocks until an item is available for consumption. print("%s: Acquired: %s" % (time.ctime(), box.pop())) condition.release() threads = [] nloops = random.randrange(3, 6) for func in [producer, consumer]: threads.append(Thread(target=func, args=(box, nloops))) threads[-1].start() # Starts the thread. for thread in threads: thread.join() print("All done.")
conditions還有其他很多用戶,比如實(shí)現(xiàn)一個(gè)數(shù)據(jù)流API,當(dāng)數(shù)據(jù)準(zhǔn)備好了可以通知其他線程去處理數(shù)據(jù)。
Barriersbarriers是個(gè)簡單的同步原語,可以用戶多個(gè)線程之間的相互等待。每個(gè)線程都調(diào)用wait()方法,然后阻塞,直到所有線程調(diào)用了wait(),然后所有線程同時(shí)開始運(yùn)行。例如:
from random import randrange from threading import Barrier, Thread from time import ctime, sleep num = 4 b = Barrier(num) names = [“Harsh”, “Lokesh”, “George”, “Iqbal”] def player(): name = names.pop() sleep(randrange(2, 5)) print(“%s reached the barrier at: %s” % (name, ctime())) b.wait() threads = [] print(“Race starts now…”) for i in range(num): threads.append(Thread(target=player)) threads[-1].start() for thread in threads: thread.join() print() print(“Race over!”)總結(jié)
多線程同步,說難也難,說不難也很容易,關(guān)鍵是要看你的業(yè)務(wù)場景和解決問題的思路,盡量降低多線程之間的依賴,理清楚業(yè)務(wù)流程,選擇合適的方法,則事盡成。
轉(zhuǎn)載自我的博客:捕蛇者說
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/40778.html
摘要:最近看前端都展開了幾場而我大知乎最熱語言還沒有相關(guān)。有關(guān)書籍的介紹,大部分截取自是官方介紹。但從開始,標(biāo)準(zhǔn)庫為我們提供了模塊,它提供了和兩個(gè)類,實(shí)現(xiàn)了對(duì)和的進(jìn)一步抽象,對(duì)編寫線程池進(jìn)程池提供了直接的支持。 《流暢的python》閱讀筆記 《流暢的python》是一本適合python進(jìn)階的書, 里面介紹的基本都是高級(jí)的python用法. 對(duì)于初學(xué)python的人來說, 基礎(chǔ)大概也就夠用了...
摘要:具有以下基本同步原語子進(jìn)程提供了通過創(chuàng)建和管理子進(jìn)程的。雖然隊(duì)列不是線程安全的,但它們被設(shè)計(jì)為專門用于代碼。表示異步操作的最終結(jié)果。 Python的asyncio是使用 async/await 語法編寫并發(fā)代碼的標(biāo)準(zhǔn)庫。通過上一節(jié)的講解,我們了解了它不斷變化的發(fā)展歷史。到了Python最新穩(wěn)定版 3.7 這個(gè)版本,asyncio又做了比較大的調(diào)整,把這個(gè)庫的API分為了 高層級(jí)API和...
摘要:定時(shí)檢測器定時(shí)拿出一部分重新的用過濾器進(jìn)行檢測剔除不能用的代理。重載是讓類以統(tǒng)一的方式處理不同類型數(shù)據(jù)的一種手段。雖然在內(nèi)存中存儲(chǔ)表數(shù)據(jù)確實(shí)會(huì)提供很高的性能,但當(dāng)守護(hù)進(jìn)程崩潰時(shí),所有的數(shù)據(jù)都會(huì)丟失。第1題: 如何解決驗(yàn)證碼的問題,用什么模塊,聽過哪些人工打碼平臺(tái)? PIL、pytesser、tesseract模塊 平臺(tái)的話有:(打碼平臺(tái)特殊,不保證時(shí)效性) 云打碼 掙碼 斐斐打碼 若快打碼...
摘要:定時(shí)檢測器定時(shí)拿出一部分重新的用過濾器進(jìn)行檢測剔除不能用的代理。重載是讓類以統(tǒng)一的方式處理不同類型數(shù)據(jù)的一種手段。雖然在內(nèi)存中存儲(chǔ)表數(shù)據(jù)確實(shí)會(huì)提供很高的性能,但當(dāng)守護(hù)進(jìn)程崩潰時(shí),所有的數(shù)據(jù)都會(huì)丟失。第1題: 如何解決驗(yàn)證碼的問題,用什么模塊,聽過哪些人工打碼平臺(tái)? PIL、pytesser、tesseract模塊 平臺(tái)的話有:(打碼平臺(tái)特殊,不保證時(shí)效性) 云打碼 掙碼 斐斐打碼 若快打碼...
摘要:首發(fā)于我的博客線程池進(jìn)程池網(wǎng)絡(luò)編程之同步異步阻塞非阻塞后端掘金本文為作者原創(chuàng),轉(zhuǎn)載請先與作者聯(lián)系。在了解的數(shù)據(jù)結(jié)構(gòu)時(shí),容器可迭代對(duì)象迭代器使用進(jìn)行并發(fā)編程篇二掘金我們今天繼續(xù)深入學(xué)習(xí)。 Python 算法實(shí)戰(zhàn)系列之棧 - 后端 - 掘金原文出處: 安生??? 棧(stack)又稱之為堆棧是一個(gè)特殊的有序表,其插入和刪除操作都在棧頂進(jìn)行操作,并且按照先進(jìn)后出,后進(jìn)先出的規(guī)則進(jìn)行運(yùn)作。 如...
閱讀 1813·2021-11-22 09:34
閱讀 3097·2019-08-30 15:55
閱讀 676·2019-08-30 15:53
閱讀 2067·2019-08-30 15:52
閱讀 3009·2019-08-29 18:32
閱讀 1999·2019-08-29 17:15
閱讀 2405·2019-08-29 13:14
閱讀 3566·2019-08-28 18:05