摘要:延遲任務(wù)使用列表結(jié)構(gòu)可以實(shí)現(xiàn)只能執(zhí)行一種任務(wù)的隊(duì)列,也可以實(shí)現(xiàn)通過(guò)調(diào)用不同回調(diào)函數(shù)來(lái)執(zhí)行不同任務(wù)的隊(duì)列,甚至還可以實(shí)現(xiàn)簡(jiǎn)單的優(yōu)先級(jí)隊(duì)列。
在處理Web客戶端發(fā)送的命令請(qǐng)求時(shí),某些操作的執(zhí)行時(shí)間可能會(huì)比我們預(yù)期的更長(zhǎng)一些。通過(guò)將待執(zhí)行任務(wù)的相關(guān)信息放入隊(duì)列里面,并在之后對(duì)隊(duì)列進(jìn)行處理,用戶可以推遲那些需要一段時(shí)間才能完成的操作,這種工作交給任務(wù)處理器來(lái)執(zhí)行的做法被稱為任務(wù)隊(duì)列(task queue)?,F(xiàn)在有很多專門的任務(wù)隊(duì)列軟件(如ActiveMQ、RabbitMQ、Gearman、Amazon SQS),接下來(lái)實(shí)現(xiàn)兩種不同類型的任務(wù)隊(duì)列,第一種隊(duì)列會(huì)根據(jù)任務(wù)被插入隊(duì)列的順序來(lái)盡快地執(zhí)行任務(wù),第二種隊(duì)列具有安排任務(wù)在未來(lái)某個(gè)特定時(shí)間執(zhí)行的能力。
先進(jìn)先出隊(duì)列除了任務(wù)隊(duì)列之外,還有先進(jìn)先出(FIFO)隊(duì)列、后進(jìn)后出(LIFO)隊(duì)列和優(yōu)先級(jí)(priority)隊(duì)列。
使用任務(wù)隊(duì)列來(lái)記錄郵件的收信人以及發(fā)送郵件的原因,并構(gòu)建一個(gè)可以在郵件發(fā)送服務(wù)器運(yùn)行變得緩慢的時(shí)候,以并行方式一次發(fā)送多封郵件的工作進(jìn)程(worker process)。
要編寫的隊(duì)列將以“先到先服務(wù)”(first-come,first-served)的方式發(fā)送郵件,并且無(wú)論發(fā)送是否成功,程序都會(huì)把發(fā)送結(jié)果記錄到日志里面。Redis的列表結(jié)構(gòu)允許用戶通過(guò)RPUSH和LPUSH以及RPOP和LPOP,從列表的兩端推入和彈出元素。郵件隊(duì)列使用RPUSH命令來(lái)將待發(fā)送的郵件推入列表的右端,并且因?yàn)楣ぷ鬟M(jìn)程除了發(fā)送郵件之外不需要執(zhí)行其他工作,所以它將使用阻塞版本的彈出命令BLPOP從隊(duì)列中彈出待發(fā)送的郵件,而命令的最大阻塞時(shí)限為30秒。
郵件隊(duì)列由一個(gè)Redis列表構(gòu)成,包含多個(gè)JSON編碼對(duì)象。為了將待發(fā)送的郵件推入隊(duì)列里面,程序會(huì)獲取發(fā)送郵件所需的全部信息,并將這些信息序列化為JSON對(duì)象,最后使用RPUSH命令將JSON對(duì)象推入郵件隊(duì)列里面。
def send_sold_email_via_queue(conn, seller, item, price, buyer): data = { "seller_id": seller, "item_id": item, "price": price, "buyer_id": buyer, "time": time.time() } conn.rpush("queue:email", json.dumps(data))
從隊(duì)列里獲取待發(fā)送郵件,程序首先使用BLPOP命令從郵件隊(duì)列里面彈出一個(gè)JSON對(duì)象,接著通過(guò)解碼JSON對(duì)象來(lái)取得待發(fā)送郵件的相關(guān)信息,最后根據(jù)這些信息來(lái)發(fā)送郵件。
def process_sold_email_queue(conn): while not QUIT: packed = conn.blpop(["queue:email"], 30) //獲取一封待發(fā)送郵件 if not packed: //隊(duì)列里面暫時(shí)還沒(méi)有待發(fā)送郵件,重試 continue to_send = json.loads(packed[1]) //從JSON對(duì)象中解碼出郵件信息 try: fetch_data_and_send_sold_email(to_send) except EmailSendError as err: log_error("Failed to send sold email", err, to_send) else: log_success("Send sold email", to_send)多個(gè)可執(zhí)行任務(wù)
因?yàn)锽LPOP命令每次只會(huì)從隊(duì)列里面彈出一封待發(fā)送郵件,所以待發(fā)送郵件不會(huì)出現(xiàn)重復(fù),也不會(huì)被重復(fù)發(fā)送。并且因?yàn)殛?duì)列只會(huì)存放待發(fā)送郵件,所以工作進(jìn)程要處理的任務(wù)是非常單一的。下面代碼的工作進(jìn)程會(huì)監(jiān)視用戶提供的多個(gè)隊(duì)列,并從多個(gè)已知的已注冊(cè)回調(diào)函數(shù)里面,選出一個(gè)函數(shù)來(lái)處理JSON編碼的函數(shù)調(diào)用。
def worker_watch_queue(conn, queue, callback): while not QUIT: packed = conn.blpop([queue], 30) if not packed: continue name, args = json.loads(packed[1]) if name not in callbacks: //沒(méi)有找到任務(wù)指定的回調(diào)函數(shù),用日志記錄錯(cuò)誤并重試 log_error("Unknown callback %s"%name) continue callbacks[name](*args) //執(zhí)行任務(wù)任務(wù)優(yōu)先級(jí)
在使用隊(duì)列的時(shí)候,程序可能會(huì)需要讓特定的操作優(yōu)先于其他操作執(zhí)行。
假設(shè)現(xiàn)在我們需要為任務(wù)設(shè)置高、中、低3種優(yōu)先級(jí)別,其中:高優(yōu)先級(jí)任務(wù)在出現(xiàn)之后會(huì)第一時(shí)間被執(zhí)行,而中等優(yōu)先級(jí)任務(wù)則會(huì)在沒(méi)有任何高優(yōu)先級(jí)任務(wù)存在的情況下被執(zhí)行,而低優(yōu)先級(jí)任務(wù)則會(huì)在既沒(méi)有任何高優(yōu)先級(jí)任務(wù),又沒(méi)有任何中等優(yōu)先級(jí)任務(wù)的情況下被執(zhí)行。
def worker_watch_queues(conn, queues, callbacks): while not QUIT: packed = conn.blpop(queues, 30) if not packed: continue name, args = json.loads(packed[1]) if name not in callbacks: log_error("Unknown callback %s"%name) continue callbacks[name](*args)
同時(shí)使用多個(gè)隊(duì)列可以降低實(shí)現(xiàn)優(yōu)先級(jí)特性的難度。除此之外,多隊(duì)列有時(shí)候也會(huì)被用于分隔不同的任務(wù)(如同一個(gè)隊(duì)列存放公告郵件,而另一個(gè)隊(duì)列則存放提醒郵件),在這種情況下,處理不同隊(duì)列時(shí)可能出現(xiàn)不公平現(xiàn)象。為此,我們可以偶爾重新排列各個(gè)隊(duì)列的順序,使得針對(duì)隊(duì)列的處理操作變得更公平一些,當(dāng)某個(gè)隊(duì)列的增長(zhǎng)速度比其他隊(duì)列的增長(zhǎng)速度快的時(shí)候,這種重拍操作尤為重要。
延遲任務(wù)使用列表結(jié)構(gòu)可以實(shí)現(xiàn)只能執(zhí)行一種任務(wù)的隊(duì)列,也可以實(shí)現(xiàn)通過(guò)調(diào)用不同回調(diào)函數(shù)來(lái)執(zhí)行不同任務(wù)的隊(duì)列,甚至還可以實(shí)現(xiàn)簡(jiǎn)單的優(yōu)先級(jí)隊(duì)列。
以下3種方法可以為隊(duì)列中的任務(wù)添加延遲性質(zhì):
在任務(wù)信息中包含任務(wù)的執(zhí)行時(shí)間,如果工作進(jìn)程發(fā)現(xiàn)任務(wù)的執(zhí)行時(shí)間尚未來(lái)臨,那么它將在短暫等待之后,把任務(wù)重新推入隊(duì)列里面。
工作進(jìn)程使用一個(gè)本地的等待列表來(lái)記錄所有需要在未來(lái)執(zhí)行的任務(wù),并在每次進(jìn)行while循環(huán)的時(shí)候,檢查等待列表并執(zhí)行那些已經(jīng)到期的任務(wù)。
把所有需要在未來(lái)執(zhí)行的任務(wù)都添加到有序集合里面,并將任務(wù)的執(zhí)行時(shí)間設(shè)置為分值,另外再使用一個(gè)進(jìn)程來(lái)查找有序集合里面是否存在可以立即被執(zhí)行的任務(wù),如果有的話,就從有序集合里面移除那個(gè)任務(wù),并將它添加到適當(dāng)?shù)萌蝿?wù)隊(duì)列里面。
因?yàn)闊o(wú)論是進(jìn)行短暫的等待,還是將任務(wù)重新推入隊(duì)列里面,都會(huì)浪費(fèi)工作進(jìn)程的時(shí)間,所以我們不會(huì)采用第一種方法。此外,因?yàn)楣ぷ鬟M(jìn)程可能會(huì)因?yàn)楸罎⒍鴣G失本地記錄的所有待執(zhí)行任務(wù),所以我們也不會(huì)采用第二種方法。最后,因?yàn)槭褂糜行蚣系牡谌N方法最簡(jiǎn)單直接,所以我們將采取這一方法,并使用鎖來(lái)保證任務(wù)從有序集合移動(dòng)到任務(wù)隊(duì)列時(shí)的安全性。
有序集合隊(duì)列(ZSET queue)存儲(chǔ)的每個(gè)被延遲的任務(wù)都是一個(gè)包含4個(gè)值的JSON列表,這4個(gè)分值分別是:唯一標(biāo)識(shí)符、處理任務(wù)隊(duì)列的名字、處理任務(wù)的回調(diào)函數(shù)的名字、傳給回調(diào)函數(shù)的參數(shù)。在有序集合里面,任務(wù)的分值會(huì)被設(shè)置為任務(wù)的執(zhí)行時(shí)間,而立即可執(zhí)行的任務(wù)將被直接插入任務(wù)隊(duì)列里面。下面代碼展示了創(chuàng)建延遲任務(wù)(任務(wù)是否延遲是可選的,只要把任務(wù)的延遲時(shí)間設(shè)置為0就可以創(chuàng)建一個(gè)立即執(zhí)行的任務(wù))。
def execute_later(conn, queue, name, args, delay=0): identifier = str(uuid.uuid4()) item = json.dumps([identifier, queue, name, args]) if delay > 0: conn.zadd("delayed:", item, time.time() + delay) else: conn.rpush("queue:" + queue, item) return identifier
因?yàn)镽edis沒(méi)有提供直接的方法可以阻塞有序集合直到元素的分值低于當(dāng)前UNIX時(shí)間戳為止,所以我們需要自己來(lái)查找有序集合里面分值低于當(dāng)前UNIX時(shí)間戳的任務(wù)。因?yàn)樗斜谎舆t的任務(wù)都存儲(chǔ)在同一個(gè)有序集合隊(duì)列里面,所以程序只需要獲取有序集合里面排名第一的元素以及該元素的分值就可以了:如果隊(duì)列里面沒(méi)有任何任務(wù),或者任務(wù)的執(zhí)行時(shí)間尚未來(lái)臨,那么程序?qū)⒃诙虝旱却笾卦?;如果任?wù)的執(zhí)行時(shí)間已到,那么程序?qū)⒏鶕?jù)任務(wù)包含的標(biāo)識(shí)符來(lái)獲取一個(gè)細(xì)粒度鎖,接著從有序集合里面移除要被執(zhí)行的任務(wù),并將它添加到適當(dāng)?shù)娜蝿?wù)隊(duì)列里面。通過(guò)將可執(zhí)行的任務(wù)添加到任務(wù)隊(duì)列里面而不是直接執(zhí)行它們,我們可以把獲取可執(zhí)行任務(wù)的進(jìn)程數(shù)量限制在一兩個(gè)之內(nèi),而不必根據(jù)工作進(jìn)程的數(shù)量來(lái)決定運(yùn)行多少個(gè)獲取進(jìn)程,這減少了獲取可執(zhí)行任務(wù)所需的花銷。
def poll_queue(conn): while not QUIT: item = conn.zrange("delayed:", 0, 0, withscores=True) if not item or item[0][1] > time.time(): time.sleep(.01) continue item = item[0][0] identifier, queue, function, args = json.loads(item) locked = acquire_lock(conn, identifier) if not locked: continue if conn.zrem("delayed:", item): conn.rpush("queue:" + queue, item) release_lock(conn, identifier, locked)
因?yàn)橛行蚣喜⒉痪邆湎窳斜砟菢拥淖枞麖棾鰴C(jī)制,所以程序需要不斷地進(jìn)行循環(huán),并嘗試從隊(duì)列里面獲取要被執(zhí)行的任務(wù),雖然這一操作會(huì)增大網(wǎng)絡(luò)和處理器的負(fù)載,但因?yàn)槲覀冎粫?huì)運(yùn)行一兩個(gè)這樣的程序,所以不會(huì)消耗太多資源。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/44607.html
摘要:如果任務(wù)沒(méi)有在規(guī)定時(shí)間內(nèi)完成,那么該有序集合的任務(wù)將會(huì)被重新放入隊(duì)列中。這兩個(gè)進(jìn)程操縱了三個(gè)隊(duì)列,其中一個(gè),負(fù)責(zé)即時(shí)任務(wù),兩個(gè),負(fù)責(zé)延時(shí)任務(wù)與待處理任務(wù)。如果任務(wù)執(zhí)行成功,就會(huì)刪除中的任務(wù),否則會(huì)被重新放入隊(duì)列中。 在實(shí)際的項(xiàng)目開(kāi)發(fā)中,我們經(jīng)常會(huì)遇到需要輕量級(jí)隊(duì)列的情形,例如發(fā)短信、發(fā)郵件等,這些任務(wù)不足以使用 kafka、RabbitMQ 等重量級(jí)的消息隊(duì)列,但是又的確需要異步、重試...
摘要:配置項(xiàng)用于配置失敗隊(duì)列任務(wù)存放的數(shù)據(jù)庫(kù)及數(shù)據(jù)表。要使用隊(duì)列驅(qū)動(dòng),需要在配置文件中配置數(shù)據(jù)庫(kù)連接。如果應(yīng)用使用了,那么可以使用時(shí)間或并發(fā)來(lái)控制隊(duì)列任務(wù)。你可以使用命令運(yùn)行這個(gè)隊(duì)列進(jìn)程。如果隊(duì)列進(jìn)程意外關(guān)閉,它會(huì)自動(dòng)重啟啟動(dòng)隊(duì)列進(jìn)程。 一、概述 在Web開(kāi)發(fā)中,我們經(jīng)常會(huì)遇到需要批量處理任務(wù)的場(chǎng)景,比如群發(fā)郵件、秒殺資格獲取等,我們將這些耗時(shí)或者高并發(fā)的操作放到隊(duì)列中異步執(zhí)行可以有效緩解系...
摘要:場(chǎng)景說(shuō)明用于處理比較耗時(shí)的請(qǐng)求,例如批量發(fā)送郵件,如果直接在網(wǎng)頁(yè)觸發(fā)執(zhí)行發(fā)送,程序會(huì)出現(xiàn)超時(shí)高并發(fā)場(chǎng)景,當(dāng)某個(gè)時(shí)刻請(qǐng)求瞬間增加時(shí),可以把請(qǐng)求寫入到隊(duì)列,后臺(tái)在去處理這些請(qǐng)求搶購(gòu)場(chǎng)景,先入先出的模式命令或往列表右側(cè)推入數(shù)據(jù)客戶端阻塞直到隊(duì)列有 場(chǎng)景說(shuō)明: 用于處理比較耗時(shí)的請(qǐng)求,例如批量發(fā)送郵件,如果直接在網(wǎng)頁(yè)觸發(fā)執(zhí)行發(fā)送,程序會(huì)出現(xiàn)超時(shí) 高并發(fā)場(chǎng)景,當(dāng)某個(gè)時(shí)刻請(qǐng)求瞬間增加時(shí),可以把請(qǐng)...
閱讀 1758·2021-09-22 15:25
閱讀 1318·2019-08-29 12:34
閱讀 1926·2019-08-26 13:57
閱讀 3201·2019-08-26 10:48
閱讀 1456·2019-08-26 10:45
閱讀 802·2019-08-23 18:23
閱讀 745·2019-08-23 18:01
閱讀 1957·2019-08-23 16:07