成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

MySql實(shí)現(xiàn)事務(wù)型消息隊(duì)列以及php多進(jìn)程消費(fèi)設(shè)計(jì)

Cristic / 1498人閱讀

摘要:因公司業(yè)務(wù)需要,最近在設(shè)計(jì)一個(gè)通用隊(duì)列功能模塊,主體要求兩大點(diǎn)用實(shí)現(xiàn)事務(wù)型消息隊(duì)列當(dāng)然,主流的隊(duì)列服務(wù)可使用或者等,此處討論的是實(shí)現(xiàn)多進(jìn)程消費(fèi)隊(duì)列消息用實(shí)現(xiàn)事務(wù)型消息隊(duì)列消息隊(duì)列的作用有異步化解耦和消除峰值等。

因公司業(yè)務(wù)需要,最近在設(shè)計(jì)一個(gè)通用隊(duì)列功能模塊,主體要求兩大點(diǎn):

用MySql實(shí)現(xiàn)事務(wù)型消息隊(duì)列(當(dāng)然,主流的隊(duì)列服務(wù)可使用redis或者rabbitmq等,此處討論的是mysql實(shí)現(xiàn))

php多進(jìn)程消費(fèi)隊(duì)列消息

用MySql實(shí)現(xiàn)事務(wù)型消息隊(duì)列

消息隊(duì)列的作用有:異步化、解耦和消除峰值等。目前異步化對(duì)于我來(lái)說(shuō)使用最頻繁,在很多業(yè)務(wù)場(chǎng)景下,我們可以將實(shí)時(shí)性要求較低的請(qǐng)求轉(zhuǎn)為異步處理,減小系統(tǒng)負(fù)載壓力,提高系統(tǒng)穩(wěn)定性。在離線數(shù)據(jù)異步處理過(guò)程中,消息隊(duì)列要滿足以下要求:

消息不能丟失,即使在系統(tǒng)失敗的情況下。消息一旦被插入就一定會(huì)被至少處理一次(只被處理一次是最好的,但是實(shí)現(xiàn)起來(lái)有難度,所以只要求at-least-once semantic);

FIFO順序。(mysql id自增可滿足此特性。當(dāng)然,可以設(shè)計(jì)特殊參數(shù)做特殊處理)

支持多生產(chǎn)者(mysql支持并發(fā)操作,支持此特點(diǎn))

支持多消費(fèi)者。每個(gè)消息只能被其中一個(gè)消費(fèi)者處理(業(yè)務(wù)的處理需要考慮冪等性)。

以上是隊(duì)列實(shí)現(xiàn)的說(shuō)明,具體用MySql實(shí)現(xiàn)事務(wù)型消息隊(duì)列可以參考文章
https://spockwangs.github.io/...

此次設(shè)計(jì)的表結(jié)構(gòu)如下:

CREATE TABLE `comom_queue` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT "自增id",
  `type` tinyint(4) NOT NULL DEFAULT "0" COMMENT "隊(duì)列類型,代碼業(yè)務(wù)備注",
  `conn_id` int(11) NOT NULL DEFAULT "0" COMMENT "消費(fèi)者標(biāo)識(shí)",
  `param_content` text COMMENT "隊(duì)列入?yún)?,
  `callback` varchar(255) NOT NULL DEFAULT "" COMMENT "隊(duì)列消費(fèi)回調(diào)函數(shù)",
  `status` tinyint(2) NOT NULL DEFAULT "0" COMMENT "0新建 1消費(fèi)中 2成功 3失敗 4需重試",
  `create_time` int(11) NOT NULL DEFAULT "0" COMMENT "創(chuàng)建時(shí)間",
  `update_time` int(11) NOT NULL DEFAULT "0" COMMENT "狀態(tài)變更時(shí)間",
  `preexec_time` int(11) NOT NULL DEFAULT "0" COMMENT "預(yù)消費(fèi)時(shí)間",
  `p_key` varchar(100) NOT NULL DEFAULT "" COMMENT "業(yè)務(wù)唯一標(biāo)識(shí)key,查詢用",
  `mark` varchar(255) NOT NULL DEFAULT "" COMMENT "備注",
  PRIMARY KEY (`id`),
  KEY `indx_s` (`p_key`,`type`) USING BTREE,
  KEY `indx_exec` (`conn_id`,`status`) USING BTREE,
  KEY `indx_ty` (`type`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

說(shuō)明下幾個(gè)字段的設(shè)計(jì):

callback 隊(duì)列中不同的業(yè)務(wù)消息有不同的業(yè)務(wù)處理,利用callback值回調(diào)對(duì)應(yīng)的業(yè)務(wù)方法

type 隊(duì)列業(yè)務(wù)類型,區(qū)分不同的業(yè)務(wù),可用不同的消費(fèi)者分開(kāi)消費(fèi)。在FIFO的特點(diǎn)外,可多帶帶開(kāi)消費(fèi)者對(duì)有特殊要求(消息優(yōu)先級(jí)高)的業(yè)務(wù)消息進(jìn)行消費(fèi)

preexec_time 預(yù)消費(fèi)時(shí)間,有的業(yè)務(wù)消息有消費(fèi)時(shí)間要求,可設(shè)置出隊(duì)列時(shí)間

php多進(jìn)程消費(fèi)設(shè)計(jì)

此次php多進(jìn)程的實(shí)現(xiàn)依賴pcntl,posix擴(kuò)展,讀者可自行檢查是否安裝了此拓展。queue隊(duì)列服務(wù)設(shè)計(jì)和實(shí)現(xiàn)包括以下功能點(diǎn):

主進(jìn)程和子進(jìn)程的運(yùn)行時(shí)間可配

主進(jìn)程(master進(jìn)程)創(chuàng)建和監(jiān)聽(tīng)子進(jìn)程行為

創(chuàng)建定時(shí)器信號(hào),主進(jìn)程(master進(jìn)程)定時(shí)監(jiān)聽(tīng)隊(duì)列信息,可用于消息堆積通知等

子進(jìn)程(worker進(jìn)程)消費(fèi)消息

針對(duì)不同的業(yè)務(wù)消息可配置不同數(shù)量的子進(jìn)程

各個(gè)業(yè)務(wù)子進(jìn)程數(shù)可配置正常拉起數(shù)和最大進(jìn)程數(shù),根據(jù)隊(duì)列積壓情況,子進(jìn)程動(dòng)態(tài)啟動(dòng)進(jìn)程數(shù)(暫未實(shí)現(xiàn),后續(xù)添加)

不多說(shuō)了,直接看代碼,抽離出來(lái)的queue服務(wù)類代碼如下:

 "process_num"]
    protected $child   = []; // 子進(jìn)程pid數(shù)組
    protected $result  = []; // 計(jì)算的結(jié)果
    protected $overTime = 0; //主進(jìn)程超時(shí)時(shí)間
    protected $startTime; //主進(jìn)程運(yùn)行時(shí)間
    protected $childOverTime = 3600; //子進(jìn)程超時(shí)時(shí)間
    protected $alarm_time = 2;
    public function __construct($process = [], $overTime = 0, $childOverTime = 3600)
    {
        if (!function_exists("pcntl_fork")) {
            die("pcntl_fork not existing");
        }
        $this->process  = $process;
        $this->overTime = $overTime;
        $this->childOverTime = $childOverTime;
        $this->startTime = time();
    }
    /**
     * 設(shè)置子進(jìn)程
     */
    public function setProcess($process)
    {
        $this->process = $process;
    }

    /**
     * 設(shè)置檢測(cè)時(shí)間間隔 單位s
     */
    public function setAlarmTime($time){
        $this->alarm_time = $time;
    }

    /**
     * fork 子進(jìn)程
     */
    protected function forkProcess()
    {
        //循環(huán)創(chuàng)建每個(gè)type 的消費(fèi)子進(jìn)程
        $process  = $this->process;
        foreach($process as $key => $num) {
            for ($i = 0; $i < $num; $i++){
                $this->forkOneProcess($key);
            }
        }
        return $this;
    }

    /**
     * 創(chuàng)建子進(jìn)程操作
     * @param $key
     * @return $this
     */
    private function forkOneProcess($key)
    {
        $pid = pcntl_fork();
        if ($pid == 0) {
            $id = getmypid();
            $this->processDo($id, $key);
            exit(0);
        } else if ($pid > 0) {
            //記錄子進(jìn)程信息
            $childProcess = array(
                "pid" => $pid,
                "type" => $key,
                "create_time" => time()
            );
            $this->child[$pid] = $childProcess;
        }
        return $this;
    }

    /**
     * 子進(jìn)程做的事情,消費(fèi)者
     */
    abstract protected function processDo($id, $key);

    /**
     * 隊(duì)列數(shù)量檢測(cè)
     */
    abstract protected function checkQueueNum();

    /**
     * 等待子進(jìn)程結(jié)束
     */
    protected function waiteProcess()
    {
        while(count($this->child)) {
            foreach($this->child as $pid => $item){
                $res = pcntl_waitpid($pid,$status,WNOHANG);
                pcntl_signal_dispatch();
                if ( -1 == $res || $res > 0 ) {
                    unset($this->child[$pid]);
                    echo "pid $pid 退出", PHP_EOL;
                    //判斷主進(jìn)程是否超時(shí) 未超時(shí)拉起新的子進(jìn)程
                    $leftTime = time() - $this->startTime;
                    if ($this->overTime > $leftTime){
                        $this->forkOneProcess($item["type"]);
                        echo "創(chuàng)建新進(jìn)程", PHP_EOL;
                    }
                }//判斷子進(jìn)程是否存在且超時(shí),超過(guò)時(shí)限20分鐘則強(qiáng)制退出
                elseif (posix_kill($pid, 0) && (time() - $item["create_time"] - 20*60) > $this->childOverTime){
                    posix_kill($pid, SIGUSR1);
                    echo "pid $pid 退出2", PHP_EOL;
                }
            }
        }

        return $this;
    }

    /**
     * 隊(duì)列檢測(cè)
     */
    protected function timeHandler(){
        $this->checkQueueNum();
        pcntl_alarm($this->alarm_time);
    }

    /**
     * 啟動(dòng)
     */
    public function runProcess() {
        //注冊(cè)信號(hào)
        pcntl_signal(SIGALRM, array($this, "timeHandler"));
        pcntl_alarm($this->alarm_time);
        $leftTime = time() - $this->startTime;
        while(($this->overTime ==0 || $this->overTime > $leftTime)){
            echo "新進(jìn)程processlist", PHP_EOL;
            $this->forkProcess()->waiteProcess();
            $leftTime = time() - $this->startTime;
        }
    }
}

最后一個(gè)功能點(diǎn):各個(gè)業(yè)務(wù)子進(jìn)程數(shù)可配置正常拉起數(shù)和最大進(jìn)程數(shù),根據(jù)隊(duì)列積壓情況,子進(jìn)程動(dòng)態(tài)啟動(dòng)進(jìn)程數(shù) 暫未實(shí)現(xiàn)。目前的queue服務(wù)設(shè)計(jì)如上,請(qǐng)各位看官多多指教!

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/29768.html

相關(guān)文章

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<