摘要:大家知道,一個(gè)消息隊(duì)列處理系統(tǒng)主要分為兩大部分消費(fèi)者和生產(chǎn)者。任務(wù)系統(tǒng)實(shí)時(shí)的對任務(wù)隊(duì)列進(jìn)行,出來一個(gè)任務(wù)就一個(gè)子進(jìn)程,由子進(jìn)程完成具體的任務(wù)邏輯。新的設(shè)計(jì)為了解決并發(fā)的問題,我們計(jì)劃做一個(gè)更加高效強(qiáng)壯的隊(duì)里處理系統(tǒng)。
背景
由于PHP不支持多線程,但是作為一個(gè)完善的系統(tǒng),有很多操作都是需要異步完成的。為了完成這些異步操作,我們做了一個(gè)基于Redis隊(duì)列任務(wù)系統(tǒng)。
大家知道,一個(gè)消息隊(duì)列處理系統(tǒng)主要分為兩大部分:消費(fèi)者和生產(chǎn)者。
在我們的系統(tǒng)中,主系統(tǒng)作為生產(chǎn)者,任務(wù)系統(tǒng)作為消費(fèi)者。
具體的工作流程如下: 1、主系統(tǒng)將需要需要處理的任務(wù)名稱+任務(wù)參數(shù)push到隊(duì)列中。 2、任務(wù)系統(tǒng)實(shí)時(shí)的對任務(wù)隊(duì)列進(jìn)行pop,pop出來一個(gè)任務(wù)就fork一個(gè)子進(jìn)程,由子進(jìn)程完成具體的任務(wù)邏輯。
具體代碼如下:
/** * 啟動守護(hù)進(jìn)程 */ public function runAction() { Tools::log_message("ERROR", "daemon/run" . " | action: restart", "daemon-"); while (true) { $this->fork_process(); } exit; } /** * 創(chuàng)建子進(jìn)程 */ private function fork_process() { $ppid = getmypid(); $pid = pcntl_fork(); if ($pid == 0) {//子進(jìn)程 $pid = posix_getpid(); //echo "* Process {$pid} was created "; $this->mq_process(); exit; } else {//主進(jìn)程 $pid = pcntl_wait($status, WUNTRACED); //取得子進(jìn)程結(jié)束狀態(tài) if (pcntl_wifexited($status)) { //echo " * Sub process: {$pid} exited with {$status}"; //Tools::log_message("INFO", "daemon/run succ" . "|status:" . $status . "|pid:" . $ppid . "|childpid:" . $pid ); } else { Tools::log_message("ERROR", "daemon/run fail" . "|status:" . $status . "|pid:" . $ppid . "|childpid:" . $pid, "daemon-"); } } } /** * 業(yè)務(wù)任務(wù)隊(duì)列處理 */ private function mq_process() { $data_pop = $this->masterRedis->rPop($this->redis_list_key); $data = json_decode($data_pop, 1); if (!$data) { return FALSE; } $worker = "_task_" . $data["worker"]; $class_name = isset($data["class"]) ? $data["class"] : "TaskproModel"; $params = $data["params"]; $class = new $class_name(); $class->$worker($params); return TRUE; }
這是一個(gè)簡單的任務(wù)處理系統(tǒng)。
通過這個(gè)任務(wù)系統(tǒng)幫助我們實(shí)現(xiàn)了異步,到目前為止已經(jīng)穩(wěn)定運(yùn)行了將近一年。
但很可惜,它是一個(gè)單進(jìn)程的系統(tǒng)。它是一直在不斷的fork,如果有任務(wù)就處理,沒有任務(wù)就跳過。
這樣很穩(wěn)定。
但問題有兩個(gè):一是不斷地fork、pop會浪費(fèi)服務(wù)器資源,二是不支持并發(fā)!
第一個(gè)問題還好,但第二個(gè)問題就很嚴(yán)重。
當(dāng)主系統(tǒng) 同時(shí) 拋過來大量的任務(wù)時(shí),任務(wù)的處理時(shí)間就會無限的拉長。
新的設(shè)計(jì)
為了解決并發(fā)的問題,我們計(jì)劃做一個(gè)更加高效強(qiáng)壯的隊(duì)里處理系統(tǒng)。
因?yàn)樵赑HP7之前不支持多線程,所以我們采用多進(jìn)程。
從網(wǎng)上找了不少資料,大多所謂的多進(jìn)程都是N個(gè)進(jìn)程同時(shí)在后臺運(yùn)行。
顯然這是不合適的。
我的預(yù)想是:每pop出一個(gè)任務(wù)就fork一個(gè)任務(wù),任務(wù)執(zhí)行完成后子進(jìn)程結(jié)束。
遇到的問題
1、如何控制最大進(jìn)程數(shù)
這個(gè)問題很簡單,那就是每fork一個(gè)子進(jìn)程就自增一次。而當(dāng)子進(jìn)程執(zhí)行完成就自減一次。
自增沒有問題,我們就在主進(jìn)程中操作就完了。那么該如何自減呢?
可能你會說,當(dāng)然是在子進(jìn)程中啊。但這里你需要注意:當(dāng)fork的時(shí)候是從主進(jìn)程復(fù)制了一份資源給子進(jìn)程,這就意味著你無法在子進(jìn)程中操作主進(jìn)程中的計(jì)數(shù)器!
所以,這里就需要了解一個(gè)知識點(diǎn):信號。
具體的可以自行Google,這里直接看代碼。
// install signal handler for dead kids pcntl_signal(SIGCHLD, array($this, "sig_handler"));
這就安裝了一個(gè)信號處理器。當(dāng)然還缺少一點(diǎn)。
declare(ticks = 1);
declare是一個(gè)控制結(jié)構(gòu)語句,具體的用法也請去Google。
這句代碼的意思就是每執(zhí)行一條低級語句就調(diào)用一次信號處理器。
這樣,每當(dāng)子進(jìn)程結(jié)束的時(shí)候就會調(diào)用信號處理器,我們就可以在信號處理器中進(jìn)行自減。
2、如何解決進(jìn)程殘留
在多進(jìn)程開發(fā)中,如果處理不當(dāng)就會導(dǎo)致進(jìn)程殘留。
為了解決進(jìn)程殘留,必須得將子進(jìn)程回收。
那么如何對子進(jìn)程進(jìn)行回收就是一個(gè)技術(shù)點(diǎn)了。
在pcntl的demo中,包括很多博文中都是說在主進(jìn)程中回收子進(jìn)程。
但我們是基于Redis的brpop的,而brpop是阻塞的。
這就導(dǎo)致一個(gè)問題:當(dāng)執(zhí)行N個(gè)任務(wù)之后,任務(wù)系統(tǒng)空閑的時(shí)候主進(jìn)程是阻塞的,而在發(fā)生阻塞的時(shí)候子進(jìn)程還在執(zhí)行,所以就無法完成最后幾個(gè)子進(jìn)程的進(jìn)程回收。。。
這里本來一直很糾結(jié),但當(dāng)我將信號處理器搞定之后就也很簡單了。
進(jìn)程回收也放到信號處理器中去。
新系統(tǒng)的評估
pcntl是一個(gè)進(jìn)程處理的擴(kuò)展,但很可惜它對多進(jìn)程的支持非常乏力。
所以這里采用Swoole擴(kuò)展中的Process。
具體代碼如下:
declare(ticks = 1); class JobDaemonController extends Yaf_Controller_Abstract{ use Trait_Redis; private $maxProcesses = 800; private $child; private $masterRedis; private $redis_task_wing = "task:wing"; //待處理隊(duì)列 public function init(){ // install signal handler for dead kids pcntl_signal(SIGCHLD, array($this, "sig_handler")); set_time_limit(0); ini_set("default_socket_timeout", -1); //隊(duì)列處理不超時(shí),解決redis報(bào)錯:read error on connection } private function redis_client(){ $rds = new Redis(); $rds->connect("redis.master.host",6379); return $rds; } public function process(swoole_process $worker){// 第一個(gè)處理 $GLOBALS["worker"] = $worker; swoole_event_add($worker->pipe, function($pipe) { $worker = $GLOBALS["worker"]; $recv = $worker->read(); //send data to master sleep(rand(1, 3)); echo "From Master: $recv "; $worker->exit(0); }); exit; } public function testAction(){ for ($i = 0; $i < 10000; $i++){ $data = [ "abc" => $i, "timestamp" => time().rand(100,999) ]; $this->masterRedis->lpush($this->redis_task_wing, json_encode($data)); } exit; } public function runAction(){ while (1){ // echo " now we de have $this->child child processes "; if ($this->child < $this->maxProcesses){ $rds = $this->redis_client(); $data_pop = $rds->brpop($this->redis_task_wing, 3);//無任務(wù)時(shí),阻塞等待 if (!$data_pop){ continue; } echo " Starting new child | now we de have $this->child child processes "; $this->child++; $process = new swoole_process([$this, "process"]); $process->write(json_encode($data_pop)); $pid = $process->start(); } } } private function sig_handler($signo) { // echo "Recive: $signo "; switch ($signo) { case SIGCHLD: while($ret = swoole_process::wait(false)) { // echo "PID={$ret["pid"]} "; $this->child--; } } } }
最終,經(jīng)過測試,單核1G的服務(wù)器執(zhí)行1到3秒的任務(wù)可以做到800的并發(fā)。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/30967.html
摘要:下文如無特殊聲明將使用進(jìn)程同時(shí)表示進(jìn)程線程。收到數(shù)據(jù)后服務(wù)器程序進(jìn)行處理然后使用向客戶端發(fā)送響應(yīng)?,F(xiàn)在各種高并發(fā)異步的服務(wù)器程序都是基于實(shí)現(xiàn)的,比如。 并發(fā) IO 問題一直是服務(wù)器端編程中的技術(shù)難題,從最早的同步阻塞直接 Fork 進(jìn)程,到 Worker 進(jìn)程池/線程池,到現(xiàn)在的異步IO、協(xié)程。PHP 程序員因?yàn)橛袕?qiáng)大的 LAMP 框架,對這類底層方面的知識知之甚少,本文目的就是詳細(xì)介...
摘要:在版本中我們將的進(jìn)程管理模塊封裝成了類,現(xiàn)在可以在代碼中使用的進(jìn)程管理器了。提供的進(jìn)程管理器來自于,經(jīng)過大量生產(chǎn)項(xiàng)目驗(yàn)證,穩(wěn)定性和健壯性都非常高。三任務(wù)投遞進(jìn)程管理器自帶了消息隊(duì)列和消息投遞的支持。 在Swoole-2.1.2版本中我們將Server的進(jìn)程管理模塊封裝成了PHP類,現(xiàn)在可以在PHP代碼中使用Swoole的進(jìn)程管理器了。 在實(shí)際項(xiàng)目中經(jīng)常需要寫一些長期運(yùn)行的腳本,如基于r...
摘要:易用穩(wěn)定,本次想通過對的學(xué)習(xí)和個(gè)人解析,吸收框架的思想和設(shè)計(jì)知識,加強(qiáng)自己對的認(rèn)知和理解。當(dāng)然,筆者能力水平有限,后續(xù)的文章如有錯誤,還請指出和諒解。目錄如下后續(xù)添加文章都會記錄在此服務(wù)啟動過程以及主體設(shè)計(jì)流程源碼解析 前言 swoole是什么?官網(wǎng)的原話介紹是這樣的: Swoole 使用純 C 語言編寫,提供了 PHP 語言的異步多線程服務(wù)器,異步 TCP/UDP 網(wǎng)絡(luò)客戶端,異步 ...
摘要:基于擴(kuò)展實(shí)現(xiàn)真正的數(shù)據(jù)庫連接池這種方案中,項(xiàng)目占用的連接數(shù)僅僅為。一種是連接暫時(shí)不再使用,其占用狀態(tài)解除,可以從使用者手中交回到空閑隊(duì)列中這種我們稱為連接的歸隊(duì)。源碼剖析系列目錄 作者:bromine鏈接:https://www.jianshu.com/p/1a7...來源:簡書著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對原文進(jìn)行了重新的排版。Swoft Github: https:...
摘要:為語言提供了強(qiáng)大的協(xié)程編程模式。提供的協(xié)程語法借鑒自,在此向開發(fā)組致敬協(xié)程可以與很好地互補(bǔ)。并發(fā)執(zhí)行使用創(chuàng)建協(xié)程,可以讓和兩個(gè)函數(shù)變成并發(fā)執(zhí)行。協(xié)程需要拿到請求的結(jié)果。 Swoole4為PHP語言提供了強(qiáng)大的CSP協(xié)程編程模式。底層提供了3個(gè)關(guān)鍵詞,可以方便地實(shí)現(xiàn)各類功能。 Swoole4提供的PHP協(xié)程語法借鑒自Golang,在此向GO開發(fā)組致敬 PHP+Swoole協(xié)程可以與...
閱讀 1724·2021-09-26 09:55
閱讀 3780·2021-09-22 15:31
閱讀 7749·2021-09-22 15:12
閱讀 2239·2021-09-22 10:02
閱讀 4725·2021-09-04 16:40
閱讀 1090·2019-08-30 15:55
閱讀 3069·2019-08-30 12:56
閱讀 1842·2019-08-30 12:44