摘要:年月日參考鏈接使用不得不明白的知識隊列文檔中文文檔本文環(huán)境隊列為什么使用隊列使用隊列的目的一般是異步執(zhí)行出錯重試解釋一下異步執(zhí)行部分代碼執(zhí)行很耗時為了提高響應速度及避免占用過多連接資源可以將這部分代碼放到隊列中異步執(zhí)行網(wǎng)站新用戶注冊后需要
Last-Modified: 2019年5月10日15:04:22
參考鏈接使用 Laravel Queue 不得不明白的知識
Laravel 隊列文檔
Redis 中文文檔
本文環(huán)境Laravel 5.5
隊列 Redis
為什么使用隊列使用隊列的目的一般是:
異步執(zhí)行
出錯重試
解釋一下:
異步執(zhí)行: 部分代碼執(zhí)行很耗時, 為了提高響應速度及避免占用過多連接資源, 可以將這部分代碼放到隊列中異步執(zhí)行.
Eg. 網(wǎng)站新用戶注冊后, 需要發(fā)送歡迎的郵件, 涉及到網(wǎng)絡IO無法控制耗時的這一類就很適合放到隊列中來執(zhí)行.
出錯重試: 為了保證一些任務的正常執(zhí)行, 可以將任務放到隊列中執(zhí)行, 若執(zhí)行出錯則可以延遲一段時間后重試, 直到任務處理成功或出錯超過N次后取消執(zhí)行.
Eg. 用戶需要綁定手機號, 此時發(fā)送短信的接口是依賴第三方, 一個是不確定耗時, 一個是不確定調(diào)用的成功, 為了保證調(diào)用成功, 必然需要在出錯后重試Laravel 中的隊列
以下分析默認使用的隊列及其配置如下
默認隊列引擎: redis
通過在 redis-cli 中使用 monitor 命令查看具體執(zhí)行的命令語句
默認隊列名: default
分發(fā)任務此處以分發(fā) 異步通知(class XxxNotification implement ShouldQueue)為例.
在Laravel中發(fā)起異步通知時, Laravel 會往redis中的任務隊列添加一條新任務
redis 執(zhí)行語句
redis> RPUSH queues:default { "displayName": "AppListenersRebateEventListener", "job": "IlluminateQueueCallQueuedHandler@call", "maxTries": null, "timeout": null, "timeoutAt": null, "data": { "commandName": "IlluminateEventsCallQueuedListener", "command": "O:36:"IlluminateEventsCallQueuedListener":7:{s:5:"class";s:33:"AppListenersRebateEventListener";s:6:"method";s:15:"onRebateCreated";s:4:"data";a:1:{i:0;O:29:"AppEventsRebateCreatedEvent":4:{s:11:"u0000*u0000tbkOrder";O:45:"IlluminateContractsDatabaseModelIdentifier":3:{s:5:"class";s:19:"AppModelsTbkOrder";s:2:"id";i:416;s:10:"connection";s:5:"mysql";}s:15:"u0000*u0000notifyAdmins";b:1;s:13:"u0000*u0000manualBind";b:0;s:6:"socket";N;}}s:5:"tries";N;s:9:"timeoutAt";N;s:7:"timeout";N;s:6:"u0000*u0000job";N;}" }, "id": "iTqpbeDqqFb3VoED2WP3pgmDbLAUQcMB", "attempts": 0 }
上面的redis語句是將任務信息(json格式) rpush 到 redis 隊列 queues:default 的尾部.
任務隊列 WorkerLaravel 處理任務隊列的進程開啟方式: php artisan queue:work, 為了更好的觀察, 這里使用 --once 選項來指定隊列中的單一任務進行處理, 具體的更多參數(shù)請自行參考文檔
php artisan queue:work --once --delay=1 --tries=3
上述執(zhí)行語句參數(shù)含義:
--once 僅執(zhí)行一次任務, 默認是常駐進程一直執(zhí)行
--tries=3 任務出錯最多重試3次, 默認是無限制重試
--delay=1 任務出錯后, 每次延遲1秒后再次執(zhí)行, 默認是延遲0秒
當 Worker 啟動時, 它依次執(zhí)行如下步驟:
此處仍以默認隊列 default 為例講解, 且只講解redis的相關操作
從 queues:default:delayed 有序集合中獲取可以處理的 "延遲任務", 并 rpush 到 queue:default隊列的尾部
具體的執(zhí)行語句:
redis> eval "Lua腳本" 2 queues:default:delayed queues:default 當前時間戳
Lua 腳本內(nèi)容如下:
-- Get all of the jobs with an expired "score"... local val = redis.call("zrangebyscore", KEYS[1], "-inf", ARGV[1]) -- If we have values in the array, we will remove them from the first queue -- and add them onto thedestination queue in chunks of 100, which moves -- all of the appropriate jobs onto the destination queue very safely. if(next(val) ~= nil) then redis.call("zremrangebyrank", KEYS[1], 0, #val - 1) for i = 1, #val, 100 do redis.call("rpush", KEYS[2], unpack(val, i, math.min(i+99, #val))) end end return val
從 queue:default:reserved有序集合中獲取已過期的 "reserved 任務", 并 rpush 到 queue:default隊列的尾部
具體的執(zhí)行語句:
redis> eval "Lua腳本" 2 queues:default:reserved queues:default 當前時間戳
使用的Lua腳本同步驟 1
從 queue:default 隊列中獲取(lpop)一個任務, 增加其 attempts 次數(shù), 并將該任務保存到 queu:default:reserved 有序集合中, 該任務的 score 值為 當前時間 + 90(任務執(zhí)行超時時間)
具體的執(zhí)行語句:
redis> eval “Lua腳本” 2 queues:default queues:default:reserved 任務超時時間戳
Lua腳本
-- Pop the first job off of the queue... local job = redis.call("lpop", KEYS[1]) local reserved = false if(job ~= false) then -- Increment the attempt count and place job on the reserved queue... reserved = cjson.decode(job) reserved["attempts"] = reserved["attempts"] + 1 reserved = cjson.encode(reserved) redis.call("zadd", KEYS[2], ARGV[1], reserved) end return {job, reserved}
這里的 90 是根據(jù)配置而定: config("queue.connections.redis.retry_after")若預計任務耗時過久, 則應增加該數(shù)值, 防止任務還在執(zhí)行時就被重置
在成功執(zhí)行上面獲取的任務后, 就將該任務從 queues:default:reserved 隊列中移除掉
具體執(zhí)行語句: ZREM queues:default:reserved "具體任務"
如果執(zhí)行任務失敗, 此時分為2種情況:
任務失敗次數(shù)未達到指定的重試次數(shù)閥值
將該任務從 queues:default:reserved 中移除, 并將該任務添加到 queue:default:delayed 有序集合中, score 為該任務下一次執(zhí)行的時間戳
執(zhí)行語句:
redis> EVAL "Lua腳本" 2 queues:default:delayed queues:default:reserved "失敗的任務" 任務延遲執(zhí)行的時間戳
Lua腳本
-- Remove the job from the current queue... redis.call("zrem", KEYS[2], ARGV[1]) -- Add the job onto the "delayed" queue... redis.call("zadd", KEYS[1], ARGV[2], ARGV[1]) return true
如果任務失敗次數(shù)超過指定的重試閥值
將該任務從 queue:default:reserved 中移除
執(zhí)行語句:
redis> ZREM queue:default:reserved
注意, 上述使用 Lua 腳本的目的在于操作的原子性, Redis 是單進程單線程模式, 以Lua腳本形式執(zhí)行命令時可以確保執(zhí)行腳本的原子性, 而不會有并發(fā)問題.
關于Redis的原子操作上面 Laravel 使用redis作為隊列存儲引擎時, 在操作redis時使用到了 exec 執(zhí)行Lua腳本, 以確保原子性.
這里給不熟悉redis的同學簡單講一下.
以上面 Worker 啟動時的步驟1為例:
從 queues:default:delayed 有序集合中獲取可以處理的 "延遲任務", 并 rpush 到 queue:default隊列的尾部
具體的執(zhí)行語句:
redis> eval "Lua腳本" 2 queues:default:delayed queues:default 當前時間戳Lua 腳本內(nèi)容如下:
-- Get all of the jobs with an expired "score"... local val = redis.call("zrangebyscore", KEYS[1], "-inf", ARGV[1]) -- If we have values in the array, we will remove them from the first queue -- and add them onto thedestination queue in chunks of 100, which moves -- all of the appropriate jobs onto the destination queue very safely. if(next(val) ~= nil) then redis.call("zremrangebyrank", KEYS[1], 0, #val - 1) for i = 1, #val, 100 do redis.call("rpush", KEYS[2], unpack(val, i, math.min(i+99, #val))) end end return val
上述步驟首先從 queues:default:delayed 有序集合中獲取可以處理的 "延遲任務" 并 rpush 到 queue:default隊列的尾部.
那么如果不使用Lua腳本的話, 一般做法會是如下:
$jobs = $redis->zRangeByScore("queues:default:delayed", "-inf", time()) if (!empty($jobs)) { $redis->zRem("queues:default:delayed", ...$jobs); $redis->rPush("queues:default", ...$jobs); }
如果是單個Worker的話, 上述腳本不會有問題, 但是如果有多個Worker呢? 在php層面上執(zhí)行上述操作是會有并發(fā)問題的.
Worker_1 和 Worker_2 從 queues:default:delayed 隊列中獲取多個任務后, 執(zhí)行 rPush 語句會導致任務被執(zhí)行2次, 如果有多個 Worker 甚至會執(zhí)行更多次.
只要是有可能引起并發(fā)問題的情況, 那么就一定會發(fā)生.以 分布式鎖 為例
鎖的兩大基本操作:
Lock
Unlock
Lock 操作
// 生成唯一的鎖id $identifier = uniqid(php_uname("n") . "_", true); // 僅在該key不存在時設置, 過期時間5秒 $result = $redis->set("lock_key", $identifier, ["NX", "EX" => 5]);
Unlock 操作
$script = <<evaluate($script, ["lock_key", $identifier], 1);
至于 Unlock 操作為什么要這么麻煩, 可以看一下以下兩種有問題的方案, 再想一想.
有問題的方案一
$redis->del("lock_key");
有問題的方案二
if ($redis->get("lock_key") == $identifier) { $redis->del("lock_key"); }
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/31416.html
摘要:在使用中的隊列時,產(chǎn)生沖突干擾。文件中的配置部分至此,兩個項目的隊列沖突原因就找到了。隊列監(jiān)聽最后遇到問題,莫要病急亂投醫(yī)。從代碼入手,分析理解實現(xiàn)原理,找對點,解決方法也許很簡單,。 問題 公司項目使用Laravel的開發(fā)的兩個項目在同一個測試服務器部署,公用同一個redis。在使用laravel中的隊列時,產(chǎn)生沖突干擾。 查找問題原因 在laravel 隊列的操作類 Illumin...
摘要:一前言之前在項目中需要使用的事件廣播,而且項目打算使用作為驅(qū)動,但發(fā)現(xiàn)網(wǎng)上的資料大部分都是驅(qū)動的,只能自己摸索著搭建了一下服務。 一、前言 之前在項目中需要使用laravel的事件廣播,而且項目打算使用redis作為驅(qū)動,但發(fā)現(xiàn)網(wǎng)上的資料大部分都是Pusher驅(qū)動的,只能自己摸索著搭建了一下服務?,F(xiàn)在將這個過程記錄一下,希望能幫到其他人。 二、項目的環(huán)境 事件廣播需要用到redis,n...
摘要:如果任務沒有在規(guī)定時間內(nèi)完成,那么該有序集合的任務將會被重新放入隊列中。這兩個進程操縱了三個隊列,其中一個,負責即時任務,兩個,負責延時任務與待處理任務。如果任務執(zhí)行成功,就會刪除中的任務,否則會被重新放入隊列中。 在實際的項目開發(fā)中,我們經(jīng)常會遇到需要輕量級隊列的情形,例如發(fā)短信、發(fā)郵件等,這些任務不足以使用 kafka、RabbitMQ 等重量級的消息隊列,但是又的確需要異步、重試...
閱讀 3595·2021-11-24 10:19
閱讀 3733·2021-09-30 09:47
閱讀 1300·2019-08-30 15:56
閱讀 798·2019-08-29 15:11
閱讀 909·2019-08-29 13:43
閱讀 3573·2019-08-28 18:25
閱讀 2166·2019-08-26 13:27
閱讀 1441·2019-08-26 11:44