摘要:因公司業(yè)務(wù)需要,最近在設(shè)計(jì)一個(gè)通用隊(duì)列功能模塊,主體要求兩大點(diǎn)用實(shí)現(xiàn)事務(wù)型消息隊(duì)列當(dāng)然,主流的隊(duì)列服務(wù)可使用或者等,此處討論的是實(shí)現(xiàn)多進(jìn)程消費(fèi)隊(duì)列消息用實(shí)現(xiàn)事務(wù)型消息隊(duì)列消息隊(duì)列的作用有異步化解耦和消除峰值等。
因公司業(yè)務(wù)需要,最近在設(shè)計(jì)一個(gè)通用隊(duì)列功能模塊,主體要求兩大點(diǎn):
用MySql實(shí)現(xiàn)事務(wù)型消息隊(duì)列(當(dāng)然,主流的隊(duì)列服務(wù)可使用redis或者rabbitmq等,此處討論的是mysql實(shí)現(xiàn))
php多進(jìn)程消費(fèi)隊(duì)列消息
用MySql實(shí)現(xiàn)事務(wù)型消息隊(duì)列消息隊(duì)列的作用有:異步化、解耦和消除峰值等。目前異步化對(duì)于我來(lái)說(shuō)使用最頻繁,在很多業(yè)務(wù)場(chǎng)景下,我們可以將實(shí)時(shí)性要求較低的請(qǐng)求轉(zhuǎn)為異步處理,減小系統(tǒng)負(fù)載壓力,提高系統(tǒng)穩(wěn)定性。在離線數(shù)據(jù)異步處理過(guò)程中,消息隊(duì)列要滿足以下要求:
消息不能丟失,即使在系統(tǒng)失敗的情況下。消息一旦被插入就一定會(huì)被至少處理一次(只被處理一次是最好的,但是實(shí)現(xiàn)起來(lái)有難度,所以只要求at-least-once semantic);
FIFO順序。(mysql id自增可滿足此特性。當(dāng)然,可以設(shè)計(jì)特殊參數(shù)做特殊處理)
支持多生產(chǎn)者(mysql支持并發(fā)操作,支持此特點(diǎn))
支持多消費(fèi)者。每個(gè)消息只能被其中一個(gè)消費(fèi)者處理(業(yè)務(wù)的處理需要考慮冪等性)。
以上是隊(duì)列實(shí)現(xiàn)的說(shuō)明,具體用MySql實(shí)現(xiàn)事務(wù)型消息隊(duì)列可以參考文章
https://spockwangs.github.io/...
此次設(shè)計(jì)的表結(jié)構(gòu)如下:
CREATE TABLE `comom_queue` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT "自增id", `type` tinyint(4) NOT NULL DEFAULT "0" COMMENT "隊(duì)列類型,代碼業(yè)務(wù)備注", `conn_id` int(11) NOT NULL DEFAULT "0" COMMENT "消費(fèi)者標(biāo)識(shí)", `param_content` text COMMENT "隊(duì)列入?yún)?, `callback` varchar(255) NOT NULL DEFAULT "" COMMENT "隊(duì)列消費(fèi)回調(diào)函數(shù)", `status` tinyint(2) NOT NULL DEFAULT "0" COMMENT "0新建 1消費(fèi)中 2成功 3失敗 4需重試", `create_time` int(11) NOT NULL DEFAULT "0" COMMENT "創(chuàng)建時(shí)間", `update_time` int(11) NOT NULL DEFAULT "0" COMMENT "狀態(tài)變更時(shí)間", `preexec_time` int(11) NOT NULL DEFAULT "0" COMMENT "預(yù)消費(fèi)時(shí)間", `p_key` varchar(100) NOT NULL DEFAULT "" COMMENT "業(yè)務(wù)唯一標(biāo)識(shí)key,查詢用", `mark` varchar(255) NOT NULL DEFAULT "" COMMENT "備注", PRIMARY KEY (`id`), KEY `indx_s` (`p_key`,`type`) USING BTREE, KEY `indx_exec` (`conn_id`,`status`) USING BTREE, KEY `indx_ty` (`type`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
說(shuō)明下幾個(gè)字段的設(shè)計(jì):
callback 隊(duì)列中不同的業(yè)務(wù)消息有不同的業(yè)務(wù)處理,利用callback值回調(diào)對(duì)應(yīng)的業(yè)務(wù)方法
type 隊(duì)列業(yè)務(wù)類型,區(qū)分不同的業(yè)務(wù),可用不同的消費(fèi)者分開(kāi)消費(fèi)。在FIFO的特點(diǎn)外,可多帶帶開(kāi)消費(fèi)者對(duì)有特殊要求(消息優(yōu)先級(jí)高)的業(yè)務(wù)消息進(jìn)行消費(fèi)
preexec_time 預(yù)消費(fèi)時(shí)間,有的業(yè)務(wù)消息有消費(fèi)時(shí)間要求,可設(shè)置出隊(duì)列時(shí)間
php多進(jìn)程消費(fèi)設(shè)計(jì)此次php多進(jìn)程的實(shí)現(xiàn)依賴pcntl,posix擴(kuò)展,讀者可自行檢查是否安裝了此拓展。queue隊(duì)列服務(wù)設(shè)計(jì)和實(shí)現(xiàn)包括以下功能點(diǎn):
主進(jìn)程和子進(jìn)程的運(yùn)行時(shí)間可配
主進(jìn)程(master進(jìn)程)創(chuàng)建和監(jiān)聽(tīng)子進(jìn)程行為
創(chuàng)建定時(shí)器信號(hào),主進(jìn)程(master進(jìn)程)定時(shí)監(jiān)聽(tīng)隊(duì)列信息,可用于消息堆積通知等
子進(jìn)程(worker進(jìn)程)消費(fèi)消息
針對(duì)不同的業(yè)務(wù)消息可配置不同數(shù)量的子進(jìn)程
各個(gè)業(yè)務(wù)子進(jìn)程數(shù)可配置正常拉起數(shù)和最大進(jìn)程數(shù),根據(jù)隊(duì)列積壓情況,子進(jìn)程動(dòng)態(tài)啟動(dòng)進(jìn)程數(shù)(暫未實(shí)現(xiàn),后續(xù)添加)
不多說(shuō)了,直接看代碼,抽離出來(lái)的queue服務(wù)類代碼如下:
"process_num"] protected $child = []; // 子進(jìn)程pid數(shù)組 protected $result = []; // 計(jì)算的結(jié)果 protected $overTime = 0; //主進(jìn)程超時(shí)時(shí)間 protected $startTime; //主進(jìn)程運(yùn)行時(shí)間 protected $childOverTime = 3600; //子進(jìn)程超時(shí)時(shí)間 protected $alarm_time = 2; public function __construct($process = [], $overTime = 0, $childOverTime = 3600) { if (!function_exists("pcntl_fork")) { die("pcntl_fork not existing"); } $this->process = $process; $this->overTime = $overTime; $this->childOverTime = $childOverTime; $this->startTime = time(); } /** * 設(shè)置子進(jìn)程 */ public function setProcess($process) { $this->process = $process; } /** * 設(shè)置檢測(cè)時(shí)間間隔 單位s */ public function setAlarmTime($time){ $this->alarm_time = $time; } /** * fork 子進(jìn)程 */ protected function forkProcess() { //循環(huán)創(chuàng)建每個(gè)type 的消費(fèi)子進(jìn)程 $process = $this->process; foreach($process as $key => $num) { for ($i = 0; $i < $num; $i++){ $this->forkOneProcess($key); } } return $this; } /** * 創(chuàng)建子進(jìn)程操作 * @param $key * @return $this */ private function forkOneProcess($key) { $pid = pcntl_fork(); if ($pid == 0) { $id = getmypid(); $this->processDo($id, $key); exit(0); } else if ($pid > 0) { //記錄子進(jìn)程信息 $childProcess = array( "pid" => $pid, "type" => $key, "create_time" => time() ); $this->child[$pid] = $childProcess; } return $this; } /** * 子進(jìn)程做的事情,消費(fèi)者 */ abstract protected function processDo($id, $key); /** * 隊(duì)列數(shù)量檢測(cè) */ abstract protected function checkQueueNum(); /** * 等待子進(jìn)程結(jié)束 */ protected function waiteProcess() { while(count($this->child)) { foreach($this->child as $pid => $item){ $res = pcntl_waitpid($pid,$status,WNOHANG); pcntl_signal_dispatch(); if ( -1 == $res || $res > 0 ) { unset($this->child[$pid]); echo "pid $pid 退出", PHP_EOL; //判斷主進(jìn)程是否超時(shí) 未超時(shí)拉起新的子進(jìn)程 $leftTime = time() - $this->startTime; if ($this->overTime > $leftTime){ $this->forkOneProcess($item["type"]); echo "創(chuàng)建新進(jìn)程", PHP_EOL; } }//判斷子進(jìn)程是否存在且超時(shí),超過(guò)時(shí)限20分鐘則強(qiáng)制退出 elseif (posix_kill($pid, 0) && (time() - $item["create_time"] - 20*60) > $this->childOverTime){ posix_kill($pid, SIGUSR1); echo "pid $pid 退出2", PHP_EOL; } } } return $this; } /** * 隊(duì)列檢測(cè) */ protected function timeHandler(){ $this->checkQueueNum(); pcntl_alarm($this->alarm_time); } /** * 啟動(dòng) */ public function runProcess() { //注冊(cè)信號(hào) pcntl_signal(SIGALRM, array($this, "timeHandler")); pcntl_alarm($this->alarm_time); $leftTime = time() - $this->startTime; while(($this->overTime ==0 || $this->overTime > $leftTime)){ echo "新進(jìn)程processlist", PHP_EOL; $this->forkProcess()->waiteProcess(); $leftTime = time() - $this->startTime; } } }
最后一個(gè)功能點(diǎn):各個(gè)業(yè)務(wù)子進(jìn)程數(shù)可配置正常拉起數(shù)和最大進(jìn)程數(shù),根據(jù)隊(duì)列積壓情況,子進(jìn)程動(dòng)態(tài)啟動(dòng)進(jìn)程數(shù) 暫未實(shí)現(xiàn)。目前的queue服務(wù)設(shè)計(jì)如上,請(qǐng)各位看官多多指教!
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/29768.html
閱讀 2378·2021-11-23 09:51
閱讀 2025·2021-10-14 09:43
閱讀 2812·2021-09-27 13:35
閱讀 1182·2021-09-22 15:54
閱讀 2549·2021-09-13 10:36
閱讀 3876·2019-08-30 15:56
閱讀 3439·2019-08-30 14:09
閱讀 1747·2019-08-30 12:57