摘要:后面每開啟一個子進(jìn)程,會將子進(jìn)程的存儲到中,用來后面主進(jìn)程監(jiān)控子進(jìn)程,如果子進(jìn)程意外終止,主進(jìn)程可以重新佛。將當(dāng)前子進(jìn)程設(shè)置為會話組再次創(chuàng)建子進(jìn)程,為了防止在的系統(tǒng)下重新打開控制終端。
wokerman 啟動分析
@(學(xué)習(xí))[workerman, php]
前期想說的也是最近才看的代碼,遇到不懂得地方就去google,所以這篇文章里面穿插了很多參考資料,可以直接點(diǎn)擊閱覽。
需要了解一些知識pcntl、posix、libevent,然后我們從服務(wù)的啟動開始來看。
啟動runAll顧名思義,運(yùn)行所有的,注釋中也寫了,Run all worker instances,運(yùn)行所有的實(shí)例,也就是說腳本中可以同時(shí)new多個worker服務(wù),這也是后面一個重要的$workers包含了每一個$worker都是一個服務(wù)實(shí)例。然后會根據(jù)每一個實(shí)例初始化count個子進(jìn)程。
/** * Run all worker instances. * * @return void */ public static function runAll() { // 判斷是否命令行模式 self::checkSapiEnv(); // 創(chuàng)建目錄、設(shè)置權(quán)限、綁定時(shí)鐘信號腳本 self::init(); // 解析cli的命令,完成start、reload、restart、kill、stop等 self::parseCommand(); // 是否開啟守護(hù)進(jìn)程 self::daemonize(); // 對socket進(jìn)行了一系列的配置 self::initWorkers(); // 注冊信號處理器 self::installSignal(); // 保存主進(jìn)程的pid self::saveMasterPid(); // 每個$worker服務(wù)fork出count個子進(jìn)程,然后給每個子進(jìn)程綁定loop循環(huán)監(jiān)聽事件tcp self::forkWorkers(); self::displayUI(); self::resetStd(); // 主進(jìn)程來監(jiān)聽子進(jìn)程狀態(tài) self::monitorWorkers(); }生成實(shí)例的id
根據(jù)對象生成hashid,并且同一個對象生成的id是一樣的。
$this->workerId = spl_object_hash($this);
并且將當(dāng)前對象$this存儲到靜態(tài)數(shù)組中self::$_workers
創(chuàng)建資源環(huán)境$this->_context = stream_context_create($context_option);校驗(yàn)是否命令行模式
只有命令行模式可以執(zhí)行
if(php_sapi_name() != "cli") {}獲取初始化運(yùn)行的腳本文件的絕對路徑
debug_backtrace返回的數(shù)組的最后一個元素就是初始化的文件,第一個元素就是當(dāng)前執(zhí)行該命令的文件。
$backtrace = debug_backtrace(); $self::startFile = $backtrace[count($backtrace) -1 ]["file"]生成pid的存儲文件
將debug_backtrace獲取到的絕對路徑通過_進(jìn)行分割作為pid的文件名。
創(chuàng)建日志文件如果路徑?jīng)]定義直接指定../workerman.log
touch(self::$logFile); chmod(self::$logFile,0622); // 可讀寫、寫、寫初始化當(dāng)前主進(jìn)程狀態(tài)
self:$_status = self::STATUS_STARTING;
腳本使用self::$_status來存儲當(dāng)前運(yùn)行的狀態(tài)(current status),一共有四種狀態(tài)
// 開始 const STATUS_STARTING = 1; // 運(yùn)行 const STATUS_RUNNING = 2; // 關(guān)閉 const STATUS_SHUTDOWN=4; // 重新加載 const STATUS_RELOADING=8;臨時(shí)文件
slef::$_statisticsFile = sys_get_temp_dir()."/workerman.status";設(shè)置進(jìn)程標(biāo)題
// 設(shè)置當(dāng)前進(jìn)程的標(biāo)題 cli_set_process_title($title); // 如果不存在上面的方法,那么使用proc_title擴(kuò)展 if(extension_loaded("proctitle") && function_exists("setproctitle")) { setproctitle($title); }填充idMap
每個$worker_id都是當(dāng)前腳本中要初始化的實(shí)例,每個服務(wù)要開啟$worker->count個子進(jìn)程,先用0來填充數(shù)組。后面每開啟一個子進(jìn)程,會將子進(jìn)程的pid存儲到idMap中,用來后面主進(jìn)程監(jiān)控子進(jìn)程,如果子進(jìn)程意外終止,主進(jìn)程可以重新佛。
self::$_idMap[$worker_id] = array_fill(0, $worker->count, 0);進(jìn)程信號處理器
pcntl_signal安裝一個信號處理器,用來監(jiān)聽信號SIGALRM。如果不安裝SIGALRM信號,當(dāng)進(jìn)程接收到SIGALRM信號時(shí)會默認(rèn)終止進(jìn)程。
pcntl_signal(SIGALRM, ["WorkermanLibTimer","signalHandle"],false);解析cli參數(shù)
允許進(jìn)程腳本接受參數(shù),用來完成相應(yīng)的指令。
xxx.php start -d 會開啟守護(hù)進(jìn)程 xxx.php start 會進(jìn)入debug模式判斷進(jìn)程是否已經(jīng)存在
從pid文件中讀取存在的進(jìn)程號,如果進(jìn)程號存在,并且進(jìn)程也存活。如果傳入的命令是start并且pid文件中的進(jìn)程id和當(dāng)前腳本執(zhí)行的不一樣,那么說明腳本重復(fù)執(zhí)行了,報(bào)錯。
posix_kill($master_pid,0)用來判斷進(jìn)程是否存在,posix_kill本意是向該進(jìn)程發(fā)送信號。
// Get master process PID. $master_pid = @file_get_contents(self::$pidFile); $master_is_alive = $master_pid && @posix_kill($master_pid, 0); // Master is still alive? if ($master_is_alive) { // 如果已經(jīng)有進(jìn)程存在,并且當(dāng)前執(zhí)行的進(jìn)程和已存在的進(jìn)程不一樣,報(bào)錯。 if ($command === "start" && posix_getpid() != $master_pid) { self::log("Workerman[$start_file] already running"); exit; } }命令處理
kill使用pcntl_signal注冊了信號,然后使用posix_kill發(fā)送信號,使用pcntl_signal_dispatch來處理信號。
先發(fā)送kill -SIGINT ,千分之一毫秒之后發(fā)送kill -SIGKILL
status刪掉臨時(shí)文件
向進(jìn)程發(fā)送SIGUSR2信號
從臨時(shí)文件中讀取內(nèi)容
結(jié)束腳本
restart、stop向進(jìn)程發(fā)送終止信號
$master_pid && posix_kill($master_pid, SIGINT);
while(1)循環(huán)判斷是否進(jìn)程已經(jīng)被終止,如果超過時(shí)間5s,那么寫入日志終止失敗,并且結(jié)束當(dāng)前腳本。(沒有結(jié)束進(jìn)程,只是結(jié)束了當(dāng)前的命令腳本)
reload發(fā)送特殊信號SIGUSR1,但是這個信號不會被立即執(zhí)行,而是需要等待pcntl_signal_dispatch來進(jìn)行信號分發(fā)。
posix_kill($master_pid,SIGUSR1);
例如用戶現(xiàn)在輸入了php worker.php reload,那么會使所有的進(jìn)程重新進(jìn)行加載配置。
當(dāng)前腳本解析參數(shù)
判斷進(jìn)程是否存活,如果進(jìn)程pid存在,但是進(jìn)程沒有存活,那么報(bào)錯,說not run
進(jìn)程存活,注冊信號SIGUSR1
退出當(dāng)前腳本
運(yùn)行中的父進(jìn)程開始觸發(fā)所有的信號,由于之前已經(jīng)安裝了信號處理方法,所以會觸發(fā)self::reload()方法。
啟動守護(hù)進(jìn)程fork兩次并不是為了避免僵尸進(jìn)程,而是為了避免svr4系統(tǒng)重新打開終端
守護(hù)進(jìn)程詳解
守護(hù)進(jìn)程為什么要fork兩次?
方法通用,可以稍作修改,用到別的地方。用這個方法做過一個隊(duì)列監(jiān)聽,redis的list啟動一個rpop。
umask(0)將默認(rèn)權(quán)限的掩碼修改為0,即將要創(chuàng)建的所有的文佳你的權(quán)限都是777
$pid = pcntl_fork()啟動子進(jìn)程,判斷$pid是否存在,只有在父進(jìn)程中pcntl_fork()才會返回id,我們要將父進(jìn)程kill掉。
posix_setsid()將當(dāng)前子進(jìn)程設(shè)置為會話組leader
再次創(chuàng)建子進(jìn)程,為了防止在SVR4的系統(tǒng)下重新打開控制終端。
protected static function daemonize() { if (!self::$daemonize) { return; } // 將默認(rèn)權(quán)限掩碼修改為0,意味著即將要創(chuàng)建的文件的權(quán)限都是777 umask(0); // 子進(jìn)程 $pid = pcntl_fork(); // 子進(jìn)程創(chuàng)建失敗 if (-1 === $pid) { throw new Exception("fork fail"); } elseif ($pid > 0) { //說明當(dāng)前進(jìn)程是父進(jìn)程,只有在父進(jìn)程中fork才會返回pid // 關(guān)閉父進(jìn)程,讓子進(jìn)程成為孤兒進(jìn)程被init進(jìn)程收養(yǎng) exit(0); } // 將子進(jìn)程作為進(jìn)程組的leader,開啟一個新的會話,脫離之前的會話和進(jìn)程組。即使用戶logout也不會終止 if (-1 === posix_setsid()) { throw new Exception("setsid fail"); } // Fork again avoid SVR4 system regain the control of terminal. // 避免svr4系統(tǒng)重新獲取控制終端 $pid = pcntl_fork(); if (-1 === $pid) { throw new Exception("fork fail"); } elseif (0 !== $pid) { // 如果不是孫子進(jìn)程,直接干掉。讓孫子進(jìn)程成為孤兒進(jìn)程被init進(jìn)程(1號進(jìn)程收養(yǎng)) exit(0); } }初始化所有的woker實(shí)例 獲取進(jìn)程的當(dāng)前用戶信息
$user_info = posix_getpwuid(posix_getuid());開啟監(jiān)聽
UNIX域套接字相關(guān)知識
創(chuàng)建socket服務(wù),啟動一個本地的socket服務(wù)。如果是unix域socket的話,$local_socket需要是本地的文件。個人理解就是通過某個端口來監(jiān)聽某個協(xié)議。
$this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);將包含socket的流導(dǎo)入到socket的擴(kuò)展資源中
Imports a stream that encapsulates a socket into a socket extension resource.
關(guān)于為什么要使用socket_import_stream?
設(shè)置socketphp提供兩種socket:php提供了兩種類型的socket,stream_socket 和 sockets,二者api不兼容。stream_socket是php內(nèi)置的,可以直接使用,并且api和stream 的api通用(可以調(diào)用fread fwrite...)。sockets需要php安裝sockets擴(kuò)展才能使用。
設(shè)置心跳檢測,減少傳輸延遲,設(shè)置為非阻塞模式。
SO_KEEPALIVE
TCP/IP Socket心跳機(jī)制so_keepalive的三個參數(shù)詳解
TCP Keepalive HOWTO
設(shè)置心跳
SO_KEEPALIVE 保持連接檢測對方主機(jī)是否崩潰,避免(服務(wù)器)永遠(yuǎn)阻塞于TCP連接的輸入
@socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
設(shè)置最小化傳輸延遲
提高linux上socket性能
// 最小化傳輸延遲,而不是追求最小化報(bào)文數(shù)量 @socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
設(shè)置為非阻塞模式
在非阻塞模式下,調(diào)用 fgets() 總是會立即返回;而在阻塞模式下,將會一直等到從資源流里面獲取到數(shù)據(jù)才能返回。
stream_set_blocking($this->_mainSocket,0);注冊信號處理器
// stop pcntl_signal(SIGINT, array("WorkermanWorker", "signalHandler"), false); // reload 重新加載的時(shí)候發(fā)送R1信號 pcntl_signal(SIGUSR1, array("WorkermanWorker", "signalHandler"), false); // status 查看狀態(tài)的時(shí)候發(fā)送R2信號 pcntl_signal(SIGUSR2, array("WorkermanWorker", "signalHandler"), false); // ignore pcntl_signal(SIGPIPE, SIG_IGN, false);保存當(dāng)前進(jìn)程的pid
如果已經(jīng)開啟了守護(hù)進(jìn)程,那么獲取的是當(dāng)前孫子進(jìn)程(守護(hù)進(jìn)程)的pid,將其寫入到pid文件中。
將每個woker實(shí)例(服務(wù))創(chuàng)建count個子進(jìn)程遍歷整個$workers,將進(jìn)程按照里面的每一個實(shí)例的參數(shù)fork count個數(shù)的子進(jìn)程。并且將子進(jìn)程的號碼,記錄到父進(jìn)程中
// 創(chuàng)建子進(jìn)程 $pid = pcntl_fork(); // Get available worker id. $id = self::getId($worker->workerId, 0); // For master process. // 如果當(dāng)前進(jìn)程是父進(jìn)程 if ($pid > 0) { // 將子進(jìn)程的號碼,記錄到父進(jìn)程中(重要) self::$_pidMap[$worker->workerId][$pid] = $pid; // 父進(jìn)程將自己的pid寫入到idMap的第一位(非常重要),在此之后,同一個實(shí)例下每個進(jìn)程的id都不一樣。 self::$_idMap[$worker->workerId][$id] = $pid; } // For child processes.讓每個子進(jìn)程下都開始啟動對應(yīng)worker的服務(wù)
$worker->run();設(shè)置當(dāng)前狀態(tài)為運(yùn)行中
self::$_status = self::STATUS_RUNNING;設(shè)置globalEvent
self::getEventLoopName從三個事件擴(kuò)展中選擇一個libevent,event,ev
事件監(jiān)聽向socket添加事件監(jiān)聽回調(diào)函數(shù)
self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, "acceptUdpConnection"));displayUI
終端下打印UI界面
再次重置標(biāo)準(zhǔn)化輸入輸出self::resetStd();監(jiān)控所有子進(jìn)程
PHP多進(jìn)程編程初步
PHP通過PCNTL擴(kuò)展實(shí)現(xiàn)進(jìn)程控制
pcntl_signal()函數(shù)僅僅是注冊信號和它的處理方法,真正接收到信號并調(diào)用其處理方法的是pcntl_signal_dispatch()函數(shù),而posix_kill會發(fā)送信號。
只有父進(jìn)程才會監(jiān)控所有的子進(jìn)程,因?yàn)樽舆M(jìn)程運(yùn)行的是run方法
觸發(fā)所有的信號
等待子進(jìn)程退出
將退出的子進(jìn)程從實(shí)例維護(hù)的pidMap中移除
將退出的子進(jìn)程從對應(yīng)的idMap中進(jìn)行重置為0,取消占取的位置
如果腳本仍然在執(zhí)行,但是子進(jìn)程退出了,那么重啟子進(jìn)程
獲取所有的子進(jìn)程號,如果子進(jìn)程都退出了,那么結(jié)束父進(jìn)程
關(guān)于pcntl_waitpcntl_signal(SIGUSR1,"signalHandle1"); 用來注冊信號;posix_kill($pid,SIGUSR1)向指定進(jìn)程發(fā)送信號(返回結(jié)果是即時(shí)的,但是并不會觸發(fā)信號所綁定的行為);pcntl_signal_dispatch()用來觸發(fā)收到的信號的回調(diào)函數(shù)。
pcntl_wait($status,WUNTRACED)開啟阻塞模式來監(jiān)控子進(jìn)程是否退出,之后子進(jìn)程退出之后,才會執(zhí)行后面的操作。
0) { $status = 0; // 等待子進(jìn)程終止 pcntl_wait($status,WUNTRACED); echo "wait已執(zhí)行 {$pid}? "; // 執(zhí)行信號的回調(diào)函數(shù) pcntl_signal_dispatch(); }else { // 發(fā)送信號2 posix_kill($pid,SIGUSR2); // 處理信號2的回調(diào)函數(shù) pcntl_signal_dispatch(); // 父進(jìn)程處理信號2 sleep(3); // 子進(jìn)程發(fā)送信號,看是否pcntl_wait會執(zhí)行(結(jié)果,wait沒有執(zhí)行) posix_kill($pid,SIGUSR1); // 繼續(xù)等待,觀察是否pcntl_wait等子進(jìn)程結(jié)束后執(zhí)行 sleep(3); echo "子進(jìn)程終止 "; // 這個時(shí)候wait往下執(zhí)行了,看來wait等待子進(jìn)程讓父進(jìn)程進(jìn)行了掛起操作 }
輸出結(jié)果
信號2回調(diào) 子進(jìn)程終止 wait已執(zhí)行 18425? 信號2回調(diào) 信號1回調(diào)
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/22028.html
摘要:即時(shí)通訊中,最重要的是響應(yīng)速度,我們需要展示消息列表那么這時(shí)會有未讀消息,未讀數(shù)量,最后一條消息內(nèi)容,時(shí)間等等。目前設(shè)計(jì)是單表單庫。這里只是對即時(shí)通訊設(shè)計(jì)上做了一些簡要的闡述,如有疑問和建議,請?jiān)谠u論區(qū)回復(fù)。 詳解即時(shí)通訊設(shè)計(jì)實(shí)現(xiàn)(PHP+GatewayWorker+Redis) 需要實(shí)現(xiàn)的功能 一對一聊天(私聊) 一對多聊天(群聊) 類似QQ,微信等聊天列表 實(shí)時(shí)消息 顯示 工具...
摘要:即時(shí)通訊中,最重要的是響應(yīng)速度,我們需要展示消息列表那么這時(shí)會有未讀消息,未讀數(shù)量,最后一條消息內(nèi)容,時(shí)間等等。目前設(shè)計(jì)是單表單庫。這里只是對即時(shí)通訊設(shè)計(jì)上做了一些簡要的闡述,如有疑問和建議,請?jiān)谠u論區(qū)回復(fù)。 詳解即時(shí)通訊設(shè)計(jì)實(shí)現(xiàn)(PHP+GatewayWorker+Redis) 需要實(shí)現(xiàn)的功能 一對一聊天(私聊) 一對多聊天(群聊) 類似QQ,微信等聊天列表 實(shí)時(shí)消息 顯示 工具...
摘要:即時(shí)通訊中,最重要的是響應(yīng)速度,我們需要展示消息列表那么這時(shí)會有未讀消息,未讀數(shù)量,最后一條消息內(nèi)容,時(shí)間等等。目前設(shè)計(jì)是單表單庫。這里只是對即時(shí)通訊設(shè)計(jì)上做了一些簡要的闡述,如有疑問和建議,請?jiān)谠u論區(qū)回復(fù)。 詳解即時(shí)通訊設(shè)計(jì)實(shí)現(xiàn)(PHP+GatewayWorker+Redis) 需要實(shí)現(xiàn)的功能 一對一聊天(私聊) 一對多聊天(群聊) 類似QQ,微信等聊天列表 實(shí)時(shí)消息 顯示 工具...
閱讀 2234·2021-11-22 15:29
閱讀 4114·2021-11-04 16:13
閱讀 1000·2019-08-29 16:58
閱讀 346·2019-08-29 16:08
閱讀 1467·2019-08-23 17:56
閱讀 2393·2019-08-23 17:06
閱讀 3172·2019-08-23 16:55
閱讀 2068·2019-08-23 16:22