摘要:定義任務(wù)處理方法。讀取來自命令行的參數(shù),開始執(zhí)行任務(wù)。該函數(shù)有兩個(gè)參數(shù)和,是引用類型,用來存儲(chǔ)子進(jìn)程的狀態(tài),有兩個(gè)可選常量,分別表示不等待子進(jìn)程結(jié)束立即返回和等待子進(jìn)程結(jié)束。
用PHP來實(shí)現(xiàn)異步任務(wù)一直是個(gè)難題,現(xiàn)有的解決方案中:PHP知名的異步框架有 swoole 和 Workerman,但都是無法在 web 環(huán)境中直接使用的,即便強(qiáng)行搭建 web 環(huán)境,異步調(diào)用也是使用多進(jìn)程模式實(shí)現(xiàn)的。但有時(shí)真的不需要用啟動(dòng)服務(wù)的方式,讓服務(wù)端一直等待客戶端消息,何況中間還不能改動(dòng)服務(wù)端代碼。本文就介紹一下不使用任何框架和第三方庫的情況下,在 CLI 環(huán)境中如何實(shí)現(xiàn)多進(jìn)程以及在web環(huán)境中的異步調(diào)用。在 web 環(huán)境的異步調(diào)用
常用的方式有兩種
1. 使用 socket 連接這種方式就是典型的C/S架構(gòu),需要有服務(wù)端支持。
// 1. 創(chuàng)建socket套接字 $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); // 2. 進(jìn)行socket連接 socket_connect($socket, "127.0.0.1", "3939"); //socket_set_nonblock($socket); // 以非阻塞模式運(yùn)行,由于在客戶端不實(shí)用,所以這里不考慮 // 3. 向服務(wù)端發(fā)送請(qǐng)求 socket_write($socket, $request, strlen($request)); // 4. 接受服務(wù)端的回應(yīng)消息(忽略非阻塞的情況,如果服務(wù)端不是提供異步服務(wù),那這一步可以省略) $recv = socket_read($socket, 2048); // 5. 關(guān)閉socket連接 socket_close($socket);2. 使用 popen 打開進(jìn)程管道
這種方式是使用操作系統(tǒng)命令,由操作系統(tǒng)直接執(zhí)行。
本文討論的異步調(diào)用就是使用這種方式。
$sf = "/path/to/cli_async_task.php"; //要執(zhí)行的腳本文件 $op = "call"; //腳本文件接收的參數(shù)1 $data = base64_encode(serialize(["TestTask", "arg1", "arg2"])); //腳本文件接收的參數(shù)2 pclose(popen("php "$sf" --op $op --data $data &", "r")); //打開之后接著就關(guān)閉進(jìn)程管道,讓該進(jìn)程以守護(hù)模式運(yùn)行 echo PHP_EOL."異步任務(wù)已執(zhí)行。".PHP_EOL;
這種方式的優(yōu)點(diǎn)就是:一步解決,當(dāng)前進(jìn)程不需要任何開銷。
缺點(diǎn)也很明顯:無法跟蹤任務(wù)腳本的運(yùn)行狀態(tài)。
所以重頭戲會(huì)是在執(zhí)行任務(wù)的腳本文件上,下面就介紹任務(wù)處理和多進(jìn)程的實(shí)現(xiàn)方式。
注意:多進(jìn)程模式僅支持Linux,不支持Windows?。?/p>
這里會(huì)從0開始(未使用任何框架和類庫)介紹每一個(gè)步驟,最后會(huì)附帶一份完整的代碼。
任何腳本不可忽視的地方就是錯(cuò)誤處理。所以寫一個(gè)任務(wù)處理腳本首先就是寫錯(cuò)誤處理方式。
在PHP中就是調(diào)用 set_exception_handler set_error_handler register_shutdown_function 這三個(gè)函數(shù),然后寫上自定義的處理方法。
接著是定義自動(dòng)加載函數(shù) spl_autoload_register 免去每使用一個(gè)新類都要 require / include 的煩惱。
定義日志操作方法。
定義任務(wù)處理方法。
讀取來自命令行的參數(shù),開始執(zhí)行任務(wù)。
2. 多進(jìn)程處理PHP 創(chuàng)建多進(jìn)程是使用 pcntl_fork 函數(shù),該函數(shù)會(huì) fork 一份當(dāng)前進(jìn)程(影分身術(shù)),于是就有了兩個(gè)進(jìn)程,當(dāng)前進(jìn)程是主進(jìn)程(本體),fork 出的進(jìn)程是子進(jìn)程(影分身)。需要注意的是兩個(gè)進(jìn)程代碼環(huán)境是一樣的,兩個(gè)進(jìn)程都是執(zhí)行到了 pcntl_fork 函數(shù)位置。區(qū)別就是 getmypid 獲得的進(jìn)程號(hào)不一樣,最重要的區(qū)分是當(dāng)調(diào)用 pcntl_fork函數(shù)時(shí),子進(jìn)程獲得的返回值是 0,而主進(jìn)程獲得的是子進(jìn)程的進(jìn)程號(hào) pid。
好了,當(dāng)我們知道誰是子進(jìn)程后,就可以讓該子進(jìn)程執(zhí)行任務(wù)了。
那么主進(jìn)程是如何得知子進(jìn)程的狀態(tài)呢?
使用 pcntl_wait。該函數(shù)有兩個(gè)參數(shù) $status 和 $options ,$status 是引用類型,用來存儲(chǔ)子進(jìn)程的狀態(tài),$options 有兩個(gè)可選常量WNOHANG| WUNTRACED,分別表示不等待子進(jìn)程結(jié)束立即返回和等待子進(jìn)程結(jié)束。很明顯使用WUNTRACED會(huì)阻塞主進(jìn)程。(也可以使用 pcntl_waitpid 函數(shù)獲取特定 pid 子進(jìn)程狀態(tài))
在多進(jìn)程中,主進(jìn)程要做的就是管理每個(gè)子進(jìn)程的狀態(tài),否則子進(jìn)程很可能無法退出而變成僵尸進(jìn)程。
關(guān)于多進(jìn)程間的消息通信
這一塊需要涉及具體的業(yè)務(wù)邏輯,所以只能簡(jiǎn)單的提一下。不考慮使用第三方比如 redis 等服務(wù)的情況下,PHP原生可以實(shí)現(xiàn)就是管道通信和共享內(nèi)存等方式。實(shí)現(xiàn)起來都比較簡(jiǎn)單,缺點(diǎn)就是可使用的數(shù)據(jù)容量有限,只能用簡(jiǎn)單文本協(xié)議交換數(shù)據(jù)。
如何手動(dòng)結(jié)束所有進(jìn)程任務(wù)
如果多進(jìn)程處理不當(dāng),很可能導(dǎo)致進(jìn)程任務(wù)卡死,甚至占用過多系統(tǒng)資源,此時(shí)只能手動(dòng)結(jié)束進(jìn)程。
除了一個(gè)個(gè)的根據(jù)進(jìn)程號(hào)來結(jié)束,還有一個(gè)快速的方法是首先在任務(wù)腳本里自定義進(jìn)程名稱,就是調(diào)用cli_set_process_title函數(shù),然后在命令行輸入:ps aux|grep cli_async_worker |grep -v grep|awk "{print $2}"|xargs kill -9 (里面的 cli_async_worker 就是自定義的進(jìn)程名稱),這樣就可以快速結(jié)束多進(jìn)程任務(wù)了。
未完待續(xù)...
以下是完整的任務(wù)執(zhí)行腳本代碼:
可能無法直接使用,需要修改的地方有:
腳本目錄和日志目錄常量
自動(dòng)加載任務(wù)類的方法(默認(rèn)是加載腳本目錄中以Task結(jié)尾的文件)
其他的如:錯(cuò)誤和日志處理方式和文本格式就隨意吧...
如果命名管道文件設(shè)置有錯(cuò)誤,可能導(dǎo)致進(jìn)程假死,你可能需要手動(dòng)刪除進(jìn)程管道通信的代碼。
多進(jìn)程的例子:execAsyncTask("multi", [ "test" => ["a", "b", "c"], "grab" => [["url" => "https://www.baidu.com", "callback" => "http://localhost"]] ]);。執(zhí)行情況可以在日志文件中查看。execAsyncTask函數(shù)參考【__使用popen打開進(jìn)程管道__】。
[%s] %s (%s)". " ". "%s", $time, $e->getMessage(), $e->getCode(), $e->getTraceAsString() ); file_put_contents(TASK_LOGS_PATH ."/exception-".date("Ymd").".log", $msg.PHP_EOL, FILE_APPEND|LOCK_EX); }); set_error_handler(function($errno, $errmsg, $filename, $line) { if (!(error_reporting() & $errno)) return; ob_start(); debug_print_backtrace(); $backtrace = ob_get_contents(); ob_end_clean(); $datetime = date("Y-m-d H:i:s", time()); $msg = <<$header) { if (!is_numeric($_k)) $header = sprintf("%s: %s", $_k, $header); $_headers .= $header . " "; } } $headers = "Connection: close " . $_headers; $opts = array( "http" => array( "method" => strtoupper(@$job["method"] ?: "get"), "content" => @$job["data"] ?: null, "header" => $headers, "user_agent" => @$job["args"]["user_agent"] ?: "HTTPGRAB/1.0 (compatible)", "proxy" => @$job["args"]["proxy"] ?: null, "timeout" => intval(@$job["args"]["timeout"] ?: 120), "protocol_version" => @$job["args"]["protocol_version"] ?: "1.1", "max_redirects" => 3, "ignore_errors" => true ) ); $ret = @file_get_contents($url, false, stream_context_create($opts)); //debug_log($url." -->".strlen($ret)); if ($ret and isset($job["callback"])) { $postdata = http_build_query(array( "msg_id" => @$job["msg_id"] ?: 0, "url" => @$job["url"], "result" => $ret )); $opts = array( "http" => array( "method" => "POST", "header" => "Content-type:application/x-www-form-urlencoded". " ", "content" => $postdata, "timeout" => 30 ) ); file_get_contents($job["callback"], false, stream_context_create($opts)); //debug_log(json_encode(@$http_response_header)); //debug_log($job["callback"]." -->".$ret2); } return $ret; } function clean($tmpdirs, $expires=3600*24*7) { $ret = []; foreach ((array)$tmpdirs as $tmpdir) { $ret[$tmpdir] = 0; foreach (glob($tmpdir.DIRECTORY_SEPARATOR."*") as $_file) { if (fileatime($_file) < (time()-$expires)) { if (@unlink($_file)) $ret[$tmpdir]++; } } } return $ret; } function backup($file, $dest) { $zip = new ipArchive(); if (!$zip->open($file, ipArchive::CREATE)) { return false; } _backup_dir($zip, $dest); $zip->close(); return $file; } function _backup_dir($zip, $dest, $sub="") { $dest = rtrim($dest, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; $sub = rtrim($sub, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; $dir = opendir($dest); if (!$dir) return false; while (false !== ($file = readdir($dir))) { if (is_file($dest . $file)) { $zip->addFile($dest . $file, $sub . $file); } else { if ($file != "." and $file != ".." and is_dir($dest . $file)) { //$zip->addEmptyDir($sub . $file . DIRECTORY_SEPARATOR); _backup_dir($zip, $dest . $file, $file); } } } closedir($dir); return true; } function execute_task($op, $data) { debug_log("Start..."); $t1 = microtime(true); switch($op) { case "call": //執(zhí)行任務(wù)腳本類 $cmd = $data; if (is_string($cmd) and class_exists($cmd)) $cmd = new $cmd; elseif (is_array($cmd)) { if (is_string($cmd[0]) and class_exists($cmd[0])) $cmd[0] = new $cmd[0]; } $ret = call($cmd); break; case "grab": //抓取網(wǎng)頁 if (is_string($data)) $data = ["url" => $data]; if (is_array($data)) $ret = grab($data); else throw new Exception("無效的命令參數(shù)!"); break; case "clean": //清理緩存文件夾:dirs 需要清理的文件夾列表,expires 過期時(shí)間(秒,默認(rèn)7天) if (isset($data["dirs"])) { $ret = clean($data["dirs"], @$data["expires"]); } else { $ret = clean($data); } break; case "backup": //備份文件:zip 備份到哪個(gè)zip文件,dest 需要備份的文件夾 if (isset($data["zip"]) and is_dir($data["dest"])) $ret = backup($data["zip"], $data["dest"]); else throw new Exception("沒有指定需要備份的文件!"); break; case "require": //加載腳本文件 if (is_file($data)) $ret = require($data); else throw new Exception("不是可請(qǐng)求的文件!"); break; case "test": sleep(rand(1, 5)); $ret = ucfirst(strval($data)). ".PID:". getmypid(); break; case "multi": //多進(jìn)程處理模式 $results = $childs = []; $fifo = TASK_LOGS_PATH . DIRECTORY_SEPARATOR . "pipe.". posix_getpid(); if (!file_exists($fifo)) { if (!posix_mkfifo($fifo, 0666)) { //開啟進(jìn)程數(shù)據(jù)通信管道 throw new Exception("make pipe failed!"); } } //$shmid = shmop_open(ftok(__FILE__, "h"), "c", 0644, 4096); //共享內(nèi)存 //shmop_write($shmid, serialize([]), 0); //$data = unserialize(shmop_read($shmid, 0, 4096)); //shmop_delete($shmid); //shmop_close($shmid); foreach($data as $_op => $_datas) { $_datas = (array)$_datas; //data 格式為數(shù)組表示一個(gè) op 有多個(gè)執(zhí)行數(shù)據(jù) foreach($_datas as $_data) { $pid = pcntl_fork(); if ($pid == 0) { //子進(jìn)程中執(zhí)行任務(wù) $_ret = execute_task($_op, $_data); $_pid = getmypid(); $pipe = fopen($fifo, "w"); //寫 //stream_set_blocking($pipe, false); $_ret = serialize(["pid" => $_pid, "op" => $_op, "args" => $_data, "result" => $_ret]); if (strlen($_ret) > 4096) //寫入管道的數(shù)據(jù)最大4K $_ret = serialize(["pid" => $_pid, "op" => $_op, "args" => $_data, "result" => "[RESPONSE_TOO_LONG]"]); //debug_log("write pipe: ".$_ret); fwrite($pipe, $_ret.PHP_EOL); fflush($pipe); fclose($pipe); exit(0); //退出子進(jìn)程 } elseif ($pid > 0) { //主進(jìn)程中記錄任務(wù) $childs[] = $pid; $results[$pid] = 0; debug_log("fork by child: ".$pid); //pcntl_wait($status, WNOHANG); } elseif ($pid == -1) { throw new Exception("could not fork at ". getmygid()); } } } $pipe = fopen($fifo, "r+"); //讀 stream_set_blocking($pipe, true); //阻塞模式,PID與讀取的管道數(shù)據(jù)可能會(huì)不一致。 $n = 0; while(count($childs) > 0) { foreach($childs as $i => $pid) { $res = pcntl_waitpid($pid, $status, WNOHANG); if (-1 == $res || $res > 0) { $_ret = @unserialize(fgets($pipe)); //讀取管道數(shù)據(jù) $results[$pid] = $_ret; unset($childs[$i]); debug_log("read child: ".$pid . " - " . json_encode($_ret, 64|256)); } if ($n > 1000) posix_kill($pid, SIGTERM); //超時(shí)(10分鐘)結(jié)束子進(jìn)程 } usleep(200000); $n++; } debug_log("child process completed."); @fclose($pipe); @unlink($fifo); $ret = json_encode($results, 64|256); break; default: throw new Exception("沒有可執(zhí)行的任務(wù)!"); break; } $t2 = microtime(true); $times = round(($t2 - $t1) * 1000, 2); $log = sprintf("[%s] %s --> (%s) %sms", strtoupper($op), @json_encode($data, 64|256), @strlen($ret)<65?$ret:@strlen($ret), $times); debug_log($log); return $ret; } // 讀取 CLI 命令行參數(shù) $params = getopt("", array("op:", "data:")); $op = $params["op"]; $data = unserialize(base64_decode($params["data"])); // 開始執(zhí)行任務(wù) execute_task($op, $data); function __autoload($classname) { $parts = explode("", ltrim($classname, "")); if (false !== strpos(end($parts), "_")) { array_splice($parts, -1, 1, explode("_", current($parts))); } $filename = implode(DIRECTORY_SEPARATOR, $parts) . ".php"; if ($filename = stream_resolve_include_path($filename)) { include $filename; } else if (preg_match("/.*Task$/", $classname)) { //查找以Task結(jié)尾的任務(wù)腳本類 include TASK_PATH . DIRECTORY_SEPARATOR . $classname . ".php"; } else { return false; } }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/30016.html
摘要:消費(fèi)者開發(fā)本例我們使用的多進(jìn)程開發(fā)工具來完成這個(gè)需求,通常使用常駐進(jìn)程來處理隊(duì)列的消費(fèi),所以我們使用的類型,模式。中進(jìn)程負(fù)責(zé)執(zhí)行郵件發(fā)送任務(wù)。此時(shí)終端將打印成功收到測(cè)試郵件官網(wǎng) 注意:這個(gè)是 MixPHP V1 的范例 郵件發(fā)送是很常見的需求,由于發(fā)送郵件的操作一般是比較耗時(shí)的,所以我們一般采用異步處理來提升用戶體驗(yàn),而異步通常我們使用消息隊(duì)列來實(shí)現(xiàn)。 傳統(tǒng) MVC 框架由于缺少多進(jìn)程...
摘要:管理進(jìn)程會(huì)監(jiān)視所有子進(jìn)程的退出事件,當(dāng)進(jìn)程發(fā)生致命錯(cuò)誤或者運(yùn)行生命周期結(jié)束時(shí),管理進(jìn)程會(huì)回收此進(jìn)程,并創(chuàng)建新的進(jìn)程。換句話也就是說,對(duì)于進(jìn)程的創(chuàng)建回收等操作全權(quán)有保姆進(jìn)程進(jìn)行管理。跟的交互請(qǐng)求到達(dá)實(shí)際上是與進(jìn)程中的某個(gè)線程發(fā)生了連接。 showImg(https://segmentfault.com/img/bVbrhb2?w=600&h=360); 一、進(jìn)程的基本知識(shí) 什么是進(jìn)程,所...
摘要:下文如無特殊聲明將使用進(jìn)程同時(shí)表示進(jìn)程線程。收到數(shù)據(jù)后服務(wù)器程序進(jìn)行處理然后使用向客戶端發(fā)送響應(yīng)?,F(xiàn)在各種高并發(fā)異步的服務(wù)器程序都是基于實(shí)現(xiàn)的,比如。 并發(fā) IO 問題一直是服務(wù)器端編程中的技術(shù)難題,從最早的同步阻塞直接 Fork 進(jìn)程,到 Worker 進(jìn)程池/線程池,到現(xiàn)在的異步IO、協(xié)程。PHP 程序員因?yàn)橛袕?qiáng)大的 LAMP 框架,對(duì)這類底層方面的知識(shí)知之甚少,本文目的就是詳細(xì)介...
摘要:嚴(yán)格來說,并不是單線程的。其他異步和事件驅(qū)動(dòng)相關(guān)的線程通過來實(shí)現(xiàn)內(nèi)部的線程池和線程調(diào)度。線程是最小的進(jìn)程,因此也是單進(jìn)程的。子進(jìn)程中執(zhí)行的是非程序,提供一組參數(shù)后,執(zhí)行的結(jié)果以回調(diào)的形式返回。在子進(jìn)程中通過和的機(jī)制來接收和發(fā)送消息。 ??node遵循的是單線程單進(jìn)程的模式,node的單線程是指js的引擎只有一個(gè)實(shí)例,且在nodejs的主線程中執(zhí)行,同時(shí)node以事件驅(qū)動(dòng)的方式處理IO...
摘要:?jiǎn)?dòng)和如下信息則表示成功查看版本安裝擴(kuò)展從下載最新擴(kuò)展需下載最新源碼包,并解壓縮安裝安裝成功后信息然后,配置文件增加內(nèi)容重啟后,出現(xiàn)如下信息則表示安裝擴(kuò)展成功。 首發(fā)于 樊浩柏科學(xué)院 Gearman 是一個(gè)分布式任務(wù)分發(fā)系統(tǒng),通過程序調(diào)用(API,跨語言)分布式地把工作委派給更適合做某項(xiàng)工作的機(jī)器,且這些機(jī)器可以以并發(fā)的、負(fù)載均衡的形式來共同完成某項(xiàng)工作。當(dāng)計(jì)算密集型場(chǎng)景時(shí),適合在后...
閱讀 1757·2023-04-25 16:28
閱讀 694·2021-11-23 09:51
閱讀 1477·2019-08-30 15:54
閱讀 1162·2019-08-30 15:53
閱讀 2835·2019-08-30 15:53
閱讀 3425·2019-08-30 15:43
閱讀 3267·2019-08-30 11:18
閱讀 3288·2019-08-26 10:25