摘要:把因執(zhí)行超時(shí)的隊(duì)列從集合重新到當(dāng)前執(zhí)行的隊(duì)列中。從要執(zhí)行的隊(duì)列中取任務(wù)可以看到在取要執(zhí)行的隊(duì)列的時(shí)候,同時(shí)會(huì)放一份到一個(gè)有序集合中,并使用過期時(shí)間戳作為分值。
(原文鏈接:https://blog.tanteng.me/2017/...)
在 Laravel 中使用 Redis 處理隊(duì)列任務(wù),框架提供的功能非常強(qiáng)大,但是最近遇到一個(gè)問題,就是發(fā)現(xiàn)一個(gè)任務(wù)被多次執(zhí)行,這是為什么呢?
先說原因:因?yàn)樵?Laravel 中如果一個(gè)隊(duì)列(任務(wù))執(zhí)行時(shí)間大于 60 秒,就會(huì)被認(rèn)為執(zhí)行失敗并重新加入隊(duì)列中,這樣就會(huì)導(dǎo)致重復(fù)執(zhí)行同一個(gè)任務(wù)。
這個(gè)任務(wù)的邏輯就是給用戶推送內(nèi)容,需要根據(jù)隊(duì)列內(nèi)容取出用戶并遍歷,通過請(qǐng)求后端 HTTP 接口發(fā)送。比如有 10000 個(gè)用戶,在用戶數(shù)量多或接口處理速度沒那么快的情況下,執(zhí)行時(shí)間肯定會(huì)大于 60 秒,于是這個(gè)任務(wù)就被重新加入隊(duì)列。情況更糟糕一點(diǎn),前面的任務(wù)如果都沒有在 60 秒執(zhí)行完,就都會(huì)重新加入隊(duì)列,這樣同一個(gè)任務(wù)就不止重復(fù)執(zhí)行一次了,而是多次。
下面從 Laravel 源代碼找一下罪魁禍?zhǔn)住?/p>
源代碼文件:vendor/laravel/framework/src/Illuminate/Queue/RedisQueue.php
/** * The expiration time of a job. * * @var int|null */ protected $expire = 60;
這個(gè) $expire 成員變量是一個(gè)固定的值,Laravel 認(rèn)為一個(gè)隊(duì)列再怎么 60 秒也該執(zhí)行完了吧。取隊(duì)列方法:
public function pop($queue = null) { $original = $queue ?: $this->default; $queue = $this->getQueue($queue); $this->migrateExpiredJobs($queue.":delayed", $queue); if (! is_null($this->expire)) { $this->migrateExpiredJobs($queue.":reserved", $queue); } list($job, $reserved) = $this->getConnection()->eval( LuaScripts::pop(), 2, $queue, $queue.":reserved", $this->getTime() + $this->expire ); if ($reserved) { return new RedisJob($this->container, $this, $job, $reserved, $original); } }
取隊(duì)列有幾步操作,因?yàn)殛?duì)列執(zhí)行失敗,或執(zhí)行超時(shí)等都會(huì)放入另外的集合保存起來,以便重試,過程如下:
1.把因執(zhí)行失敗的隊(duì)列從 delayed 集合重新 rpush 到當(dāng)前執(zhí)行的隊(duì)列中。
2.把因執(zhí)行超時(shí)的隊(duì)列從 reserved 集合重新 rpush 到當(dāng)前執(zhí)行的隊(duì)列中。
3.然后才是從隊(duì)列中取任務(wù)開始執(zhí)行,同時(shí)把隊(duì)列放入 reserved 的有序集合。
這里使用了 eval 命令執(zhí)行這個(gè)過程,用到了幾個(gè) lua 腳本。
從要執(zhí)行的隊(duì)列中取任務(wù):
local job = redis.call("lpop", KEYS[1]) local reserved = false if(job ~= false) then reserved = cjson.decode(job) reserved["attempts"] = reserved["attempts"] + 1 reserved = cjson.encode(reserved) redis.call("zadd", KEYS[2], ARGV[1], reserved) end return {job, reserved}
可以看到 Laravel 在取 Redis 要執(zhí)行的隊(duì)列的時(shí)候,同時(shí)會(huì)放一份到一個(gè)有序集合中,并使用過期時(shí)間戳作為分值。
只有當(dāng)這個(gè)任務(wù)完成后,再把有序集合中這個(gè)任務(wù)移除。從這個(gè)有序集合移除隊(duì)列的代碼就省略,我們看一下 Laravel 如何處理執(zhí)行時(shí)間大于 60 秒的隊(duì)列。
也就是這段 lua 腳本執(zhí)行的操作:
local val = redis.call("zrangebyscore", KEYS[1], "-inf", ARGV[1]) if(next(val) ~= nil) then redis.call("zremrangebyrank", KEYS[1], 0, #val - 1) for i = 1, #val, 100 do redis.call("rpush", KEYS[2], unpack(val, i, math.min(i+99, #val))) end end return true
這里 zrangebyscore 找出分值從無限小到當(dāng)前時(shí)間戳的元素,也就是 60 秒之前加入到集合的任務(wù),然后通過 zremrangebyrank 從集合移除這些元素并 rpush 到隊(duì)列中。
看到這里應(yīng)該就恍然大悟了。
如果一個(gè)隊(duì)列 60 秒沒執(zhí)行完,那么進(jìn)程在取隊(duì)列的時(shí)候從 reserved 集合中把這些任務(wù)又重新 rpush 到隊(duì)列中。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/26260.html
摘要:已經(jīng)取消了參數(shù),都用來執(zhí)行。取數(shù)據(jù)的過程事物處理已經(jīng)打開。取得符合條件的隊(duì)列后程序會(huì)更新該條數(shù)據(jù),并且更新完后即。 connections => [ .... database => [ driver => database, table => jobs, queue => defaul...
摘要:對(duì)于定時(shí)任務(wù)的基本用法,官網(wǎng)文檔已經(jīng)描述得很詳細(xì)了,這里不再多說。這種情況下如果定時(shí)任務(wù)能夠并行執(zhí)行,就不會(huì)有這樣的問題。這個(gè)時(shí)候我們希望能夠像隊(duì)列那樣,將定時(shí)任務(wù)分散到多臺(tái)服務(wù)器上。 定時(shí)任務(wù) Scheduled Tasks 是 Laravel 提供的組件之一,稍微上點(diǎn)規(guī)模的項(xiàng)目應(yīng)該都會(huì)用到,比如開發(fā)微信應(yīng)用時(shí)通過定時(shí)任務(wù)去刷新access token,比如每天定時(shí)發(fā)推送提現(xiàn)用戶要記...
摘要:當(dāng)查詢數(shù)據(jù)時(shí),本地范圍允許我們創(chuàng)建自己的查詢構(gòu)造器鏈?zhǔn)椒椒?。這樣便會(huì)知道這是一個(gè)本地范圍并且可以在查詢構(gòu)造器中使用。某些查詢構(gòu)造器不可用或者說可用但是方法名不同,關(guān)于這些請(qǐng)查閱所有集合的方法。 showImg(https://segmentfault.com/img/remote/1460000017877956?w=800&h=267); Laravel 因可編寫出干凈,可用可調(diào)試的...
摘要:高性能高精度定時(shí)服務(wù),輕松管理千萬級(jí)定時(shí)任務(wù)。支持任務(wù)到期觸發(fā)和。支持創(chuàng)建延時(shí)任務(wù)和定時(shí)到期任務(wù),和原生保持相同接口,輕松使用。不支持任務(wù)輸出任務(wù)鉤子及維護(hù)模式。是不指定任務(wù)名時(shí)自動(dòng)生成,每個(gè)任務(wù)名必須唯一,相同任務(wù)名重復(fù)定義將會(huì)自動(dòng)覆蓋。 Forsun高性能高精度定時(shí)服務(wù),輕松管理千萬級(jí)定時(shí)任務(wù)。 定時(shí)服務(wù)項(xiàng)目地址:https://github.com/snower/forsun l...
閱讀 2480·2021-09-27 13:36
閱讀 2172·2019-08-29 18:47
閱讀 2140·2019-08-29 15:21
閱讀 1404·2019-08-29 11:14
閱讀 1989·2019-08-28 18:29
閱讀 1634·2019-08-28 18:04
閱讀 581·2019-08-26 13:58
閱讀 3217·2019-08-26 12:12