摘要:當(dāng)其就緒時(shí),會(huì)調(diào)用執(zhí)行定時(shí)函數(shù)。進(jìn)程超時(shí)停止進(jìn)程將要停止時(shí),并不會(huì)立刻停止,而是會(huì)等待事件循環(huán)結(jié)束后停止,這時(shí)為了防止進(jìn)程不退出,還設(shè)置了的延遲,超過就會(huì)停止該進(jìn)程。當(dāng)允許空閑時(shí)間小于時(shí),統(tǒng)一每隔檢測空閑連接。
前言
swoole 的 timer 模塊功能有三個(gè):用戶定時(shí)任務(wù)、剔除空閑連接、更新 server 時(shí)間。timer 模塊的底層有兩種,一種是基于 alarm 信號,一種是基于 timefd。
timer 數(shù)據(jù)結(jié)構(gòu)timer 數(shù)據(jù)結(jié)構(gòu)是 swTimer。其中 heap 是多個(gè) swTimer_node 類型構(gòu)成的一個(gè)數(shù)據(jù)堆,該數(shù)據(jù)堆按照下一次執(zhí)行時(shí)間來排序,下次執(zhí)行時(shí)間離當(dāng)前時(shí)間越近,元素的位置越靠前;map 是 swTimer_node 類型的 map,其 key 是 swTimer_node 類型的 id,該數(shù)據(jù)結(jié)構(gòu)可以通過 id 快速查找對應(yīng)的 swTimer_node 元素;num 是 swTimer_node 元素個(gè)數(shù);use_pipe 標(biāo)志著 worker 進(jìn)程中是否使用管道 pipe 來獲知 alarm 信號已觸發(fā);fd 用于 timefd;_current_id 是當(dāng)前最大 swTimer_node 的 id;_next_id 就是下一個(gè)新建的 swTimer_node 的 id 值,是 _current_id + 1;_next_msec 是下次檢查定時(shí)器的時(shí)間。
_swTimer_node 中 heap_node 是 _swTimer 中的數(shù)據(jù)堆元素;data 一般存儲(chǔ) server;callback 是定時(shí)器觸發(fā)后需要執(zhí)行的回調(diào)函數(shù);exec_msec 是該元素應(yīng)該執(zhí)行的時(shí)間;id 是元素在 swTimer 中的 id;type 有三種:SW_TIMER_TYPE_KERNEL(server 內(nèi)置定時(shí)函數(shù))、SW_TIMER_TYPE_CORO(協(xié)程定時(shí)函數(shù))、SW_TIMER_TYPE_PHP(PHP 定時(shí)函數(shù))
struct _swTimer { /*--------------timerfd & signal timer--------------*/ swHeap *heap; swHashMap *map; int num; int use_pipe; int lasttime; int fd; long _next_id; long _current_id; long _next_msec; swPipe pipe; /*-----------------for EventTimer-------------------*/ struct timeval basetime; /*--------------------------------------------------*/ int (*set)(swTimer *timer, long exec_msec); swTimer_node* (*add)(swTimer *timer, int _msec, int persistent, void *data, swTimerCallback callback); }; struct _swTimer_node { swHeap_node *heap_node; void *data; swTimerCallback callback; int64_t exec_msec; uint32_t interval; long id; int type; //0 normal node 1 node for client_coro uint8_t remove; };Timer 定時(shí)器 swTimer_init 創(chuàng)建定時(shí)器
創(chuàng)建定時(shí)器需要給定一個(gè)間隔時(shí)間,每隔這個(gè)時(shí)間就要檢查 swTimer 中的 _swTimer_node 元素,如果時(shí)間已經(jīng)超過了 _swTimer_node 元素的 exec_msec 時(shí)間,就要執(zhí)行定時(shí)函數(shù)。
swTimer_now 函數(shù)初始化 basetime:swTimer_now 函數(shù)可以獲取當(dāng)前時(shí)間,使用的是 clock_gettime 與 CLOCK_MONOTONIC 獲取絕對時(shí)間,或者使用 gettimeofday 函數(shù)
如果是 worker 進(jìn)程,那么調(diào)用 swSystemTimer_init 函數(shù)對定時(shí)器進(jìn)行初始化;如果是 master 進(jìn)程,那么調(diào)用 swReactorTimer_init 進(jìn)行初始化
int swTimer_now(struct timeval *time) { #if defined(SW_USE_MONOTONIC_TIME) && defined(CLOCK_MONOTONIC) struct timespec _now; if (clock_gettime(CLOCK_MONOTONIC, &_now) < 0) { swSysError("clock_gettime(CLOCK_MONOTONIC) failed."); return SW_ERR; } time->tv_sec = _now.tv_sec; time->tv_usec = _now.tv_nsec / 1000; #else if (gettimeofday(time, NULL) < 0) { swSysError("gettimeofday() failed."); return SW_ERR; } #endif return SW_OK; } int swTimer_init(long msec) { if (swTimer_now(&SwooleG.timer.basetime) < 0) { return SW_ERR; } SwooleG.timer.heap = swHeap_new(1024, SW_MIN_HEAP); if (!SwooleG.timer.heap) { return SW_ERR; } SwooleG.timer.map = swHashMap_new(SW_HASHMAP_INIT_BUCKET_N, NULL); if (!SwooleG.timer.map) { swHeap_free(SwooleG.timer.heap); SwooleG.timer.heap = NULL; return SW_ERR; } SwooleG.timer._current_id = -1; SwooleG.timer._next_msec = msec; SwooleG.timer._next_id = 1; SwooleG.timer.add = swTimer_add; if (swIsTaskWorker()) { swSystemTimer_init(msec, SwooleG.use_timer_pipe); } else { swReactorTimer_init(msec); } return SW_OK; }swReactorTimer_init 初始化
對于 master 進(jìn)程,只需要設(shè)置 main_reactor 的超時(shí)時(shí)間即可,當(dāng)發(fā)生超時(shí)事件之后,main_reactor 會(huì)調(diào)用 onTimeout 函數(shù);或者一個(gè)事件循環(huán)最后,會(huì)調(diào)用 onFinish 函數(shù);這兩個(gè)函數(shù)都會(huì)最終調(diào)用 swTimer_select,來篩選那些已經(jīng)到了執(zhí)行時(shí)間的元素。
static int swReactorTimer_init(long exec_msec) { SwooleG.main_reactor->check_timer = SW_TRUE; SwooleG.main_reactor->timeout_msec = exec_msec; SwooleG.timer.set = swReactorTimer_set; SwooleG.timer.fd = -1; return SW_OK; } static int swReactorEpoll_wait(swReactor *reactor, struct timeval *timeo) { ... if (reactor->timeout_msec == 0) { if (timeo == NULL) { reactor->timeout_msec = -1; } else { reactor->timeout_msec = timeo->tv_sec * 1000 + timeo->tv_usec / 1000; } } while (reactor->running > 0) { msec = reactor->timeout_msec; n = epoll_wait(epoll_fd, events, max_event_num, msec); if (n < 0) { ... } else if (n == 0) { if (reactor->onTimeout != NULL) { reactor->onTimeout(reactor); } continue; } ... if (reactor->onFinish != NULL) { reactor->onFinish(reactor); } ... } ... } static void swReactor_onTimeout(swReactor *reactor) { swReactor_onTimeout_and_Finish(reactor); if (reactor->disable_accept) { reactor->enable_accept(reactor); reactor->disable_accept = 0; } } static void swReactor_onFinish(swReactor *reactor) { //check signal if (reactor->singal_no) { swSignal_callback(reactor->singal_no); reactor->singal_no = 0; } swReactor_onTimeout_and_Finish(reactor); } static void swReactor_onTimeout_and_Finish(swReactor *reactor) { if (reactor->check_timer) { swTimer_select(&SwooleG.timer); } ... }swSystemTimer_init 初始化
對于 worker 進(jìn)程來說,由于定時(shí)任務(wù)比較多而且復(fù)雜,就不能簡單使用 reactor 超時(shí)來實(shí)現(xiàn)功能。
swSystemTimer_init 采用 SIGALRM 鬧鐘信號或者 timefd 來觸發(fā)中斷 reactor 的等待。
對于 timefd 來說,需要使用 timerfd_settime 系統(tǒng)調(diào)用來設(shè)置超時(shí)時(shí)間,然后將 timefd 加入 worker 的 reactor 監(jiān)控中,將其當(dāng)做文件描述符來監(jiān)控。當(dāng)其就緒時(shí),會(huì)調(diào)用 swTimer_select 執(zhí)行定時(shí)函數(shù)。
對于普通 SIGALRM 信號來說,將 timer->pipe 放入 reactor 的監(jiān)控中,使用 setitimer 來定時(shí)觸發(fā) SIGALRM 信號,設(shè)置信號處理函數(shù)。信號處理函數(shù)中,會(huì)向 timer->pipe 寫入數(shù)據(jù),進(jìn)而觸發(fā) swTimer_select 執(zhí)行定時(shí)函數(shù)。
int swSystemTimer_init(int interval, int use_pipe) { swTimer *timer = &SwooleG.timer; timer->lasttime = interval; #ifndef HAVE_TIMERFD SwooleG.use_timerfd = 0; #endif if (SwooleG.use_timerfd) { if (swSystemTimer_timerfd_set(timer, interval) < 0) { return SW_ERR; } timer->use_pipe = 0; } else { if (use_pipe) { if (swPipeNotify_auto(&timer->pipe, 0, 0) < 0) { return SW_ERR; } timer->fd = timer->pipe.getFd(&timer->pipe, 0); timer->use_pipe = 1; } else { timer->fd = 1; timer->use_pipe = 0; } if (swSystemTimer_signal_set(timer, interval) < 0) { return SW_ERR; } swSignal_add(SIGALRM, swSystemTimer_signal_handler); } if (timer->fd > 1) { SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_TIMER, swSystemTimer_event_handler); SwooleG.main_reactor->add(SwooleG.main_reactor, SwooleG.timer.fd, SW_FD_TIMER); } timer->set = swSystemTimer_set; return SW_OK; }swSystemTimer_timerfd_set 設(shè)置 timefd
該函數(shù)目的是使用 timerfd_settime 系統(tǒng)調(diào)用,該系統(tǒng)調(diào)用需要 timefd 和 itimerspec 類型對象
timefd 可以由 timerfd_create 系統(tǒng)函數(shù)創(chuàng)建
itimerspec 對象需要當(dāng)前時(shí)間和 interval 間隔時(shí)間共同設(shè)置。it_value 是首次超時(shí)時(shí)間,需要填寫當(dāng)前時(shí)間,并加上要超時(shí)的時(shí)間,值得注意的是 tv_nsec 加上去后一定要判斷是否超出1000000000(如果超過要秒加一),否則會(huì)設(shè)置失敗;it_interval 是后續(xù)周期性超時(shí)時(shí)間。
static int swSystemTimer_timerfd_set(swTimer *timer, long interval) { struct timeval now; int sec = interval / 1000; int msec = (((float) interval / 1000) - sec) * 1000; if (gettimeofday(&now, NULL) < 0) { swWarn("gettimeofday() failed. Error: %s[%d]", strerror(errno), errno); return SW_ERR; } struct itimerspec timer_set; bzero(&timer_set, sizeof(timer_set)); if (interval < 0) { if (timer->fd == 0) { return SW_OK; } } else { timer_set.it_interval.tv_sec = sec; timer_set.it_interval.tv_nsec = msec * 1000 * 1000; timer_set.it_value.tv_sec = now.tv_sec + sec; timer_set.it_value.tv_nsec = (now.tv_usec * 1000) + timer_set.it_interval.tv_nsec; if (timer_set.it_value.tv_nsec > 1e9) { timer_set.it_value.tv_nsec = timer_set.it_value.tv_nsec - 1e9; timer_set.it_value.tv_sec += 1; } if (timer->fd == 0) { timer->fd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK | TFD_CLOEXEC); if (timer->fd < 0) { swWarn("timerfd_create() failed. Error: %s[%d]", strerror(errno), errno); return SW_ERR; } } } if (timerfd_settime(timer->fd, TFD_TIMER_ABSTIME, &timer_set, NULL) == -1) { swWarn("timerfd_settime() failed. Error: %s[%d]", strerror(errno), errno); return SW_ERR; } return SW_OK; #else swWarn("kernel not support timerfd."); return SW_ERR; #endif }swSystemTimer_signal_set 設(shè)置信號超時(shí)時(shí)間
setitimer 是一個(gè)比較常用的函數(shù),可用來實(shí)現(xiàn)延時(shí)和定時(shí)的功能。
ITIMER_REAL:以系統(tǒng)真實(shí)的時(shí)間來計(jì)算,它送出 SIGALRM 信號。
ITIMER_VIRTUAL:以該進(jìn)程在用戶態(tài)下花費(fèi)的時(shí)間來計(jì)算,它送出 SIGVTALRM 信號。
ITIMER_PROF:以該進(jìn)程在用戶態(tài)下和內(nèi)核態(tài)下所費(fèi)的時(shí)間來計(jì)算,它送出 SIGPROF 信號。
it_interval 為計(jì)時(shí)間隔,it_value 為延時(shí)時(shí)長,也就是距離現(xiàn)有時(shí)間第一次延遲觸發(fā)的相對時(shí)間,而不是絕對時(shí)間。(所以我認(rèn)為代碼中 gettimeofday 函數(shù)是多余的,并不需要獲取當(dāng)前時(shí)間)
*/ static int swSystemTimer_signal_set(swTimer *timer, long interval) { struct itimerval timer_set; int sec = interval / 1000; int msec = (((float) interval / 1000) - sec) * 1000; struct timeval now; if (gettimeofday(&now, NULL) < 0) { swWarn("gettimeofday() failed. Error: %s[%d]", strerror(errno), errno); return SW_ERR; } bzero(&timer_set, sizeof(timer_set)); if (interval > 0) { timer_set.it_interval.tv_sec = sec; timer_set.it_interval.tv_usec = msec * 1000; timer_set.it_value.tv_sec = sec; timer_set.it_value.tv_usec = timer_set.it_interval.tv_usec; if (timer_set.it_value.tv_usec > 1e6) { timer_set.it_value.tv_usec = timer_set.it_value.tv_usec - 1e6; timer_set.it_value.tv_sec += 1; } } if (setitimer(ITIMER_REAL, &timer_set, NULL) < 0) { swWarn("setitimer() failed. Error: %s[%d]", strerror(errno), errno); return SW_ERR; } return SW_OK; }swSystemTimer_signal_handler 超時(shí)信號處理函數(shù)
swSystemTimer_signal_handler 函數(shù)是 SIGALARM 信號的處理函數(shù),該函數(shù)被觸發(fā)說明 epoll_wait 函數(shù)被鬧鐘信號中斷,此時(shí)本函數(shù)向 timer.pipe 寫入數(shù)據(jù),然后即返回。reactor 會(huì)檢測到 timer.pipe 的寫就緒,進(jìn)而調(diào)用對應(yīng)的回調(diào)函數(shù) swSystemTimer_event_handler
void swSystemTimer_signal_handler(int sig) { SwooleG.signal_alarm = 1; uint64_t flag = 1; if (SwooleG.timer.use_pipe) { SwooleG.timer.pipe.write(&SwooleG.timer.pipe, &flag, sizeof(flag)); } }swSystemTimer_event_handler 寫就緒回調(diào)函數(shù)
寫就緒回調(diào)函數(shù)可能是由 timer.pipe 的寫就緒觸發(fā),也可能是 timefd 的寫就緒觸發(fā),無論哪個(gè)都會(huì)調(diào)用 swTimer_select 函數(shù)執(zhí)行對應(yīng)的定時(shí)函數(shù)。
int swSystemTimer_event_handler(swReactor *reactor, swEvent *event) { uint64_t exp; swTimer *timer = &SwooleG.timer; if (read(timer->fd, &exp, sizeof(uint64_t)) != sizeof(uint64_t)) { return SW_ERR; } SwooleG.signal_alarm = 0; return swTimer_select(timer); }swTimer_add 添加元素
swTimer_add 用于添加定時(shí)函數(shù)元素。本函數(shù)邏輯比較簡單,新建一個(gè) swTimer_node 對象,初始化賦值之后加入到 timer->heap 中,程序會(huì)自動(dòng)根據(jù)其 exec_msec 進(jìn)行有小到大的排序,然后再更新 timer->map 哈希表。
值得注意的是,當(dāng)新添加的定時(shí)函數(shù)需要執(zhí)行的時(shí)間小于當(dāng)前 timer 下次執(zhí)行時(shí)間的時(shí)候,我們需要調(diào)用 timer->set 函數(shù)更新 time 的間隔時(shí)間。在 master 進(jìn)程中,這個(gè) set 函數(shù)是 swReactorTimer_set,用于設(shè)置 reactor 的超時(shí)時(shí)間;在 worker 進(jìn)程中,set 函數(shù)是 swSystemTimer_set,用于更新 timerfd_settime 或 setitimer 函數(shù)。
static swTimer_node* swTimer_add(swTimer *timer, int _msec, int interval, void *data, swTimerCallback callback) { swTimer_node *tnode = sw_malloc(sizeof(swTimer_node)); if (!tnode) { swSysError("malloc(%ld) failed.", sizeof(swTimer_node)); return NULL; } int64_t now_msec = swTimer_get_relative_msec(); if (now_msec < 0) { sw_free(tnode); return NULL; } tnode->data = data; tnode->type = SW_TIMER_TYPE_KERNEL; tnode->exec_msec = now_msec + _msec; tnode->interval = interval ? _msec : 0; tnode->remove = 0; tnode->callback = callback; if (timer->_next_msec < 0 || timer->_next_msec > _msec) { timer->set(timer, _msec); timer->_next_msec = _msec; } tnode->id = timer->_next_id++; if (unlikely(tnode->id < 0)) { tnode->id = 1; timer->_next_id = 2; } timer->num++; tnode->heap_node = swHeap_push(timer->heap, tnode->exec_msec, tnode); if (tnode->heap_node == NULL) { sw_free(tnode); return NULL; } swHashMap_add_int(timer->map, tnode->id, tnode); return tnode; } static int swSystemTimer_set(swTimer *timer, long new_interval) { if (new_interval == current_interval) { return SW_OK; } current_interval = new_interval; if (SwooleG.use_timerfd) { return swSystemTimer_timerfd_set(timer, new_interval); } else { return swSystemTimer_signal_set(timer, new_interval); } }swTimer_del 刪除元素
int swTimer_del(swTimer *timer, swTimer_node *tnode) { if (tnode->remove) { return SW_FALSE; } if (SwooleG.timer._current_id > 0 && tnode->id == SwooleG.timer._current_id) { tnode->remove = 1; return SW_TRUE; } if (swHashMap_del_int(timer->map, tnode->id) < 0) { return SW_ERR; } if (tnode->heap_node) { //remove from min-heap swHeap_remove(timer->heap, tnode->heap_node); sw_free(tnode->heap_node); } sw_free(tnode); timer->num --; return SW_TRUE; }swTimer_select 篩選定時(shí)函數(shù)
swTimer_select 函數(shù)的篩選原理是從 timer->heap 中不斷 pop 出定時(shí)元素,比較它們的 exec_msec 是否超過了當(dāng)前時(shí)間,如果超過了時(shí)間,就執(zhí)行對應(yīng)的定時(shí)函數(shù);如果沒有超過,由于 timer->heap 是排序過后的數(shù)據(jù)堆,因此當(dāng)前定時(shí)元素之后的都不會(huì)超過當(dāng)前時(shí)間,也就是還沒有到執(zhí)行的時(shí)間。
如果當(dāng)前的定時(shí)元素超過了當(dāng)前時(shí)間,說明該元素應(yīng)該執(zhí)行定時(shí)函數(shù)。設(shè)置 timer->_current_id 為當(dāng)前的 id 后,執(zhí)行 tnode->callback 回調(diào)函數(shù);如果當(dāng)前定時(shí)元素不是一次執(zhí)行的任務(wù),而是需要每隔一段時(shí)間定時(shí)的任務(wù),就要再次將元素放入 timer->heap 中;如果當(dāng)前定時(shí)元素是一次執(zhí)行的任務(wù),就要將元素從 timer->map、timer->map 中刪除
循環(huán)結(jié)束后,tnode 就是下一個(gè)要執(zhí)行的定時(shí)元素,我們需要調(diào)用 timer->set 函數(shù)設(shè)置鬧鐘信號(worker 進(jìn)程)或者 reactor 超時(shí)時(shí)間(master 進(jìn)程)。
int swTimer_select(swTimer *timer) { int64_t now_msec = swTimer_get_relative_msec(); if (now_msec < 0) { return SW_ERR; } swTimer_node *tnode = NULL; swHeap_node *tmp; long timer_id; while ((tmp = swHeap_top(timer->heap))) { tnode = tmp->data; if (tnode->exec_msec > now_msec) { break; } timer_id = timer->_current_id = tnode->id; if (!tnode->remove) { tnode->callback(timer, tnode); } timer->_current_id = -1; //persistent timer if (tnode->interval > 0 && !tnode->remove) { while (tnode->exec_msec <= now_msec) { tnode->exec_msec += tnode->interval; } swHeap_change_priority(timer->heap, tnode->exec_msec, tmp); continue; } timer->num--; swHeap_pop(timer->heap); swHashMap_del_int(timer->map, timer_id); sw_free(tnode); } if (!tnode || !tmp) { timer->_next_msec = -1; timer->set(timer, -1); } else { timer->set(timer, tnode->exec_msec - now_msec); } return SW_OK; }Timer 定時(shí)器的使用 master 進(jìn)程 swServer_start_proxy
timer 模塊在 master 進(jìn)程中最重要的作用是每隔一秒更新 serv->gs->now 的值。除此之外,當(dāng) reactor 線程調(diào)度 worker 進(jìn)程時(shí),如果一段時(shí)間內(nèi)沒有任何空閑的 worker 進(jìn)程空閑,timer 模塊還負(fù)責(zé)寫入錯(cuò)誤日志。
static int swServer_start_proxy(swServer *serv) { ... if (swTimer_init(1000) < 0) { return SW_ERR; } if (SwooleG.timer.add(&SwooleG.timer, 1000, 1, serv, swServer_master_onTimer) == NULL) { return SW_ERR; } ... } void swServer_master_onTimer(swTimer *timer, swTimer_node *tnode) { swServer *serv = (swServer *) tnode->data; swServer_update_time(serv); if (serv->scheduler_warning && serv->warning_time < serv->gs->now) { serv->scheduler_warning = 0; serv->warning_time = serv->gs->now; swoole_error_log(SW_LOG_WARNING, SW_ERROR_SERVER_NO_IDLE_WORKER, "No idle worker is available."); } if (serv->hooks[SW_SERVER_HOOK_MASTER_TIMER]) { swServer_call_hook(serv, SW_SERVER_HOOK_MASTER_TIMER, serv); } } void swServer_update_time(swServer *serv) { time_t now = time(NULL); if (now < 0) { swWarn("get time failed. Error: %s[%d]", strerror(errno), errno); } else { serv->gs->now = now; } }worker 進(jìn)程超時(shí)停止
worker 進(jìn)程將要停止時(shí),并不會(huì)立刻停止,而是會(huì)等待事件循環(huán)結(jié)束后停止,這時(shí)為了防止 worker 進(jìn)程不退出,還設(shè)置了 30s 的延遲,超過 30s 就會(huì)停止該進(jìn)程。
static void swWorker_stop() { swWorker *worker = SwooleWG.worker; swServer *serv = SwooleG.serv; worker->status = SW_WORKER_BUSY; ... try_to_exit: SwooleWG.wait_exit = 1; if (SwooleG.timer.fd == 0) { swTimer_init(serv->max_wait_time * 1000); } SwooleG.timer.add(&SwooleG.timer, serv->max_wait_time * 1000, 0, NULL, swWorker_onTimeout); swWorker_try_to_exit(); } static void swWorker_onTimeout(swTimer *timer, swTimer_node *tnode) { SwooleG.running = 0; SwooleG.main_reactor->running = 0; swoole_error_log(SW_LOG_WARNING, SW_ERROR_SERVER_WORKER_EXIT_TIMEOUT, "worker exit timeout, forced to terminate."); }swoole_timer_tick 添加定時(shí)任務(wù)
timer 模塊另一個(gè)非常重要的功能是添加定時(shí)任務(wù),一般是使用 swoole_timer_tick 函數(shù)、swoole_timer_after 函數(shù)、swoole_server->tick 函數(shù)、swoole_server->after 函數(shù):
PHP_FUNCTION(swoole_timer_tick) { long after_ms; zval *callback; zval *param = NULL; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "lz|z", &after_ms, &callback, ¶m) == FAILURE) { return; } long timer_id = php_swoole_add_timer(after_ms, callback, param, 1 TSRMLS_CC); if (timer_id < 0) { RETURN_FALSE; } else { RETURN_LONG(timer_id); } } PHP_FUNCTION(swoole_timer_after) { long after_ms; zval *callback; zval *param = NULL; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "lz|z", &after_ms, &callback, ¶m) == FAILURE) { return; } long timer_id = php_swoole_add_timer(after_ms, callback, param, 0 TSRMLS_CC); if (timer_id < 0) { RETURN_FALSE; } else { RETURN_LONG(timer_id); } }php_swoole_add_timer 函數(shù)
本函數(shù)主要調(diào)用 SwooleG.timer.add 函數(shù)將添加新的定時(shí)任務(wù),值得注意的是 swTimer_callback 類型的對象 cb 和兩個(gè)回調(diào)函數(shù) php_swoole_onInterval、php_swoole_onTimeout,真正的回調(diào)函數(shù)存放在了 swTimer_callback 對象中,如果用戶有參數(shù)設(shè)置,也會(huì)放入 cb->data 中。
long php_swoole_add_timer(int ms, zval *callback, zval *param, int persistent TSRMLS_DC) { char *func_name = NULL; if (!swIsTaskWorker()) { php_swoole_check_reactor(); } php_swoole_check_timer(ms); swTimer_callback *cb = emalloc(sizeof(swTimer_callback)); cb->data = &cb->_data; cb->callback = &cb->_callback; memcpy(cb->callback, callback, sizeof(zval)); if (param) { memcpy(cb->data, param, sizeof(zval)); } else { cb->data = NULL; } swTimerCallback timer_func; if (persistent) { cb->type = SW_TIMER_TICK; timer_func = php_swoole_onInterval; } else { cb->type = SW_TIMER_AFTER; timer_func = php_swoole_onTimeout; } sw_zval_add_ref(&cb->callback); if (cb->data) { sw_zval_add_ref(&cb->data); } swTimer_node *tnode = SwooleG.timer.add(&SwooleG.timer, ms, persistent, cb, timer_func); { tnode->type = SW_TIMER_TYPE_PHP; return tnode->id; } } void php_swoole_check_timer(int msec) { if (unlikely(SwooleG.timer.fd == 0)) { swTimer_init(msec); } }php_swoole_onInterval 函數(shù)
本函數(shù)主要調(diào)用 cb->callback,如果有用戶參數(shù),還要將 cb->data 放入調(diào)用函數(shù)中。
void php_swoole_onInterval(swTimer *timer, swTimer_node *tnode) { zval *retval = NULL; int argc = 1; zval *ztimer_id; swTimer_callback *cb = tnode->data; SW_MAKE_STD_ZVAL(ztimer_id); ZVAL_LONG(ztimer_id, tnode->id); { zval **args[2]; if (cb->data) { argc = 2; sw_zval_add_ref(&cb->data); args[1] = &cb->data; } args[0] = &ztimer_id; if (sw_call_user_function_ex(EG(function_table), NULL, cb->callback, &retval, argc, args, 0, NULL TSRMLS_CC) == FAILURE) { swoole_php_fatal_error(E_WARNING, "swoole_timer: onTimerCallback handler error"); return; } } if (tnode->remove) { php_swoole_del_timer(tnode TSRMLS_CC); } }php_swoole_onTimeout 函數(shù)
與上一個(gè)函數(shù)類似,只是這次直接從 timer 中刪除對應(yīng)的元素。
void php_swoole_onTimeout(swTimer *timer, swTimer_node *tnode) { { swTimer_callback *cb = tnode->data; zval *retval = NULL; { zval **args[2]; int argc; if (NULL == cb->data) { argc = 0; args[0] = NULL; } else { argc = 1; args[0] = &cb->data; } if (sw_call_user_function_ex(EG(function_table), NULL, cb->callback, &retval, argc, args, 0, NULL TSRMLS_CC) == FAILURE) { swoole_php_fatal_error(E_WARNING, "swoole_timer: onTimeout handler error"); return; } } php_swoole_del_timer(tnode TSRMLS_CC); } }Timer 模塊時(shí)間輪算法
時(shí)間輪算法是各大網(wǎng)絡(luò)模塊采用的剔除空閑連接的方法,原理是構(gòu)建一個(gè)首尾相連的循環(huán)數(shù)組,每隔數(shù)組元素中有若干個(gè)連接。如果某個(gè)連接有數(shù)據(jù)發(fā)送過來,將連接從所在的數(shù)組元素中刪除,將連接放入最新的數(shù)組元素中,這樣有數(shù)據(jù)來往的連接會(huì)一直在新數(shù)組元素中,空閑的連接所在的數(shù)組元素漸漸的變成了舊數(shù)組元素。每隔一段時(shí)間就按順序清空舊數(shù)組元素的全部連接。
swTimeWheel_new 創(chuàng)建時(shí)間輪時(shí)間輪的數(shù)據(jù)結(jié)構(gòu)比較簡單,由哈希表、size(循環(huán)數(shù)組總數(shù)量),current (循環(huán)數(shù)組當(dāng)前最舊的數(shù)組元素,current-1 是循環(huán)數(shù)組中最新的數(shù)組元素)。swTimeWheel_new 函數(shù)很簡單,就是創(chuàng)建這三個(gè)屬性。
typedef struct { uint16_t current; uint16_t size; swHashMap **wheel; } swTimeWheel; swTimeWheel* swTimeWheel_new(uint16_t size) { swTimeWheel *tw = sw_malloc(sizeof(swTimeWheel)); if (!tw) { swWarn("malloc(%ld) failed.", sizeof(swTimeWheel)); return NULL; } tw->size = size; tw->current = 0; tw->wheel = sw_calloc(size, sizeof(void*)); if (tw->wheel == NULL) { swWarn("malloc(%ld) failed.", sizeof(void*) * size); sw_free(tw); return NULL; } int i; for (i = 0; i < size; i++) { tw->wheel[i] = swHashMap_new(16, NULL); if (tw->wheel[i] == NULL) { swTimeWheel_free(tw); return NULL; } } return tw; }swTimeWheel_add 添加連接
當(dāng) main_reactor 有新連接進(jìn)入的時(shí)候,需要將新的連接添加到時(shí)間輪中,新的連接會(huì)被放到最新的數(shù)組元素中,也就是 current-1 的元素中,然后設(shè)置 swConnection 中的 timewheel_index。
void swTimeWheel_add(swTimeWheel *tw, swConnection *conn) { uint16_t index = tw->current == 0 ? tw->size - 1 : tw->current - 1; swHashMap *new_set = tw->wheel[index]; swHashMap_add_int(new_set, conn->fd, conn); conn->timewheel_index = index; swTraceLog(SW_TRACE_REACTOR, "current=%d, fd=%d, index=%d.", tw->current, conn->fd, index); }swTimeWheel_update 函數(shù)
當(dāng)連接有數(shù)據(jù)傳輸?shù)臅r(shí)候,需要更新該連接在時(shí)間輪中的位置,將該連接從原有的數(shù)組元素中刪除,然后添加到最新的數(shù)組元素中,也就是 current-1 中,然后更新 swConnection 中的 timewheel_index。
#define swTimeWheel_new_index(tw) (tw->current == 0 ? tw->size - 1 : tw->current - 1) void swTimeWheel_update(swTimeWheel *tw, swConnection *conn) { uint16_t new_index = swTimeWheel_new_index(tw); swHashMap *new_set = tw->wheel[new_index]; swHashMap_add_int(new_set, conn->fd, conn); swHashMap *old_set = tw->wheel[conn->timewheel_index]; swHashMap_del_int(old_set, conn->fd); swTraceLog(SW_TRACE_REACTOR, "current=%d, fd=%d, old_index=%d, new_index=%d.", tw->current, conn->fd, new_index, conn->timewheel_index); conn->timewheel_index = new_index; }swTimeWheel_remove 函數(shù)
在時(shí)間輪中刪除該連接,
void swTimeWheel_remove(swTimeWheel *tw, swConnection *conn) { swHashMap *set = tw->wheel[conn->timewheel_index]; swHashMap_del_int(set, conn->fd); swTraceLog(SW_TRACE_REACTOR, "current=%d, fd=%d.", tw->current, conn->fd); }swTimeWheel_forward 刪除空閑連接
swTimeWheel_forward 將最舊的數(shù)組元素 current 中所有連接都關(guān)閉掉,然后將 current 遞增。
void swTimeWheel_forward(swTimeWheel *tw, swReactor *reactor) { swHashMap *set = tw->wheel[tw->current]; tw->current = tw->current == tw->size - 1 ? 0 : tw->current + 1; swTraceLog(SW_TRACE_REACTOR, "current=%d.", tw->current); swConnection *conn; uint64_t fd; while (1) { conn = swHashMap_each_int(set, &fd); if (conn == NULL) { break; } conn->close_force = 1; conn->close_notify = 1; conn->close_wait = 1; conn->close_actively = 1; //notify to reactor thread if (conn->removed) { reactor->close(reactor, (int) fd); } else { reactor->set(reactor, fd, SW_FD_TCP | SW_EVENT_WRITE); } } }reactor 線程中時(shí)間輪的創(chuàng)建
時(shí)間輪的創(chuàng)建在 reactor 線程進(jìn)行事件循環(huán)之前,按照用戶設(shè)置的連接最大空閑時(shí)間設(shè)置不同大小的時(shí)間輪,值得注意的是,時(shí)間輪最大是 SW_TIMEWHEEL_SIZE,也就是循環(huán)數(shù)組大小最大是 60。如果超過 60s 空閑時(shí)間,也僅僅建立 60 個(gè)元素的數(shù)組,但是這樣會(huì)造成每個(gè)數(shù)組元素存放更多的連接。
值得注意的是,當(dāng)允許空閑時(shí)間超過 60s 時(shí),heartbeat_interval * 1000 是 reactor 的超時(shí)時(shí)間,例如空閑時(shí)間是 60s,那么每隔 6s,reactor 都會(huì)超時(shí)來檢測空閑連接。當(dāng)允許空閑時(shí)間小于 60s 時(shí),reactor 統(tǒng)一每隔 1s 檢測空閑連接。
不同于 master 進(jìn)程和 worker 線程,reactor 的 onFinish 和 onTimeout 不再采用默認(rèn)的 swReactor_onTimeout 與 swReactor_onFinish 函數(shù),而是采用空閑連接檢測的 swReactorThread_onReactorCompleted 函數(shù),該函數(shù)會(huì)調(diào)用 swTimeWheel_forward 來剔除空閑連接。
#define SW_TIMEWHEEL_SIZE 60 static int swReactorThread_loop(swThreadParam *param) { ... if (serv->heartbeat_idle_time > 0) { if (serv->heartbeat_idle_time < SW_TIMEWHEEL_SIZE) { reactor->timewheel = swTimeWheel_new(serv->heartbeat_idle_time); reactor->heartbeat_interval = 1; } else { reactor->timewheel = swTimeWheel_new(SW_TIMEWHEEL_SIZE); reactor->heartbeat_interval = serv->heartbeat_idle_time / SW_TIMEWHEEL_SIZE; } reactor->last_heartbeat_time = 0; if (reactor->timewheel == NULL) { swSysError("thread->timewheel create failed."); return SW_ERR; } reactor->timeout_msec = reactor->heartbeat_interval * 1000; reactor->onFinish = swReactorThread_onReactorCompleted; reactor->onTimeout = swReactorThread_onReactorCompleted; } reactor->wait(reactor, NULL); }reactor 線程中時(shí)間輪的添加
當(dāng)有新連接的時(shí)候,conn->connect_notify 會(huì)被置為 1,此時(shí)該連接文件描述符寫就緒,然后就會(huì)調(diào)用 swReactorThread_onWrite,此時(shí) reactor 線程將該連接添加到時(shí)間輪中。
static int swReactorThread_onWrite(swReactor *reactor, swEvent *ev) { ... if (conn->connect_notify) { conn->connect_notify = 0; if (reactor->timewheel) { swTimeWheel_add(reactor->timewheel, conn); } ... } ... }reactor 線程中時(shí)間輪的更新
static int swReactorThread_onRead(swReactor *reactor, swEvent *event) { ... if (reactor->timewheel && swTimeWheel_new_index(reactor->timewheel) != event->socket->timewheel_index) { swTimeWheel_update(reactor->timewheel, event->socket); } ... }reactor 線程中時(shí)間輪的剔除
當(dāng)連接在允許的空閑時(shí)間之內(nèi)沒有任何數(shù)據(jù)發(fā)送,那么時(shí)間輪算法就要關(guān)閉該連接。關(guān)閉連接并不是直接 close 套接字,而是需要通知對應(yīng)的 worker 進(jìn)程調(diào)用 onClose 函數(shù),然后才能關(guān)閉。具體的做法是設(shè)置 swConnection 的 close_force、close_notify 等成員變量為 1,并且關(guān)閉該連接的讀就緒監(jiān)聽事件。
static void swReactorThread_onReactorCompleted(swReactor *reactor) { swServer *serv = reactor->ptr; if (reactor->heartbeat_interval > 0 && reactor->last_heartbeat_time < serv->gs->now - reactor->heartbeat_interval) { swTimeWheel_forward(reactor->timewheel, reactor); reactor->last_heartbeat_time = serv->gs->now; } } void swTimeWheel_forward(swTimeWheel *tw, swReactor *reactor) { ... conn->close_force = 1; conn->close_notify = 1; conn->close_wait = 1; conn->close_actively = 1; if (conn->removed) { reactor->close(reactor, (int) fd); } else { reactor->set(reactor, fd, SW_FD_TCP | SW_EVENT_WRITE); } ... }
當(dāng)該連接寫就緒的時(shí)候,會(huì)調(diào)用 swReactorThread_onWrite 函數(shù)。這個(gè)時(shí)候就會(huì)調(diào)用 swServer_tcp_notify 函數(shù),進(jìn)而調(diào)用 swFactoryProcess_notify、swFactoryProcess_dispatch,最后調(diào)用 swReactorThread_send2worker 發(fā)送給了 worker 進(jìn)程。
由于 reactor 啟用的是水平觸發(fā),由于并未向該連接寫入數(shù)據(jù),因此很快又會(huì)觸發(fā)寫就緒事件調(diào)用 swReactorThread_onWrite 函數(shù),這時(shí)如果 disable_notify 為 1(dispatch_mode 為 1 或 3),會(huì)直接執(zhí)行 swReactorThread_close 函數(shù)關(guān)閉連接,假如此時(shí) conn->out_buffer 中還有數(shù)據(jù)未發(fā)送,也會(huì)被拋棄。如果 disable_notify 為 0,則會(huì)繼續(xù)向?qū)⒁P(guān)閉的連接發(fā)送數(shù)據(jù),直到接收到 SW_CHUNK_CLOSE 類型的消息。
static int swReactorThread_onWrite(swReactor *reactor, swEvent *ev) { ... else if (conn->close_notify) { swServer_tcp_notify(serv, conn, SW_EVENT_CLOSE); conn->close_notify = 0; return SW_OK; } else if (serv->disable_notify && conn->close_force) { return swReactorThread_close(reactor, fd); } ... } int swServer_tcp_notify(swServer *serv, swConnection *conn, int event) { swDataHead notify_event; notify_event.type = event; notify_event.from_id = conn->from_id; notify_event.fd = conn->fd; notify_event.from_fd = conn->from_fd; notify_event.len = 0; return serv->factory.notify(&serv->factory, ¬ify_event); } static int swFactoryProcess_notify(swFactory *factory, swDataHead *ev) { memcpy(&sw_notify_data._send, ev, sizeof(swDataHead)); sw_notify_data._send.len = 0; sw_notify_data.target_worker_id = -1; return factory->dispatch(factory, (swDispatchData *) &sw_notify_data); } static int swFactoryProcess_dispatch(swFactory *factory, swDispatchData *task) { ... if (swEventData_is_stream(task->data.info.type)) { swConnection *conn = swServer_connection_get(serv, fd); if (conn->closed) { //Connection has been clsoed by server if (!(task->data.info.type == SW_EVENT_CLOSE && conn->close_force)) { return SW_OK; } } //converted fd to session_id task->data.info.fd = conn->session_id; task->data.info.from_fd = conn->from_fd; } return swReactorThread_send2worker((void *) &(task->data), send_len, target_worker_id); }
worker 進(jìn)程收到消息后會(huì)調(diào)用 swWorker_onTask 函數(shù),進(jìn)而調(diào)用 swFactoryProcess_end 函數(shù),調(diào)用 serv->onClose 函數(shù),并設(shè)置 swConnection 對象的 closed 為 1,然后調(diào)用 swFactoryProcess_finish 函數(shù)將數(shù)據(jù)包發(fā)送給 reactor 線程。
int swWorker_onTask(swFactory *factory, swEventData *task) { switch (task->info.type) { ... factory->end(factory, task->info.fd); break; ... } } static int swFactoryProcess_end(swFactory *factory, int fd) { bzero(&_send, sizeof(_send)); _send.info.fd = fd; _send.info.len = 0; _send.info.type = SW_EVENT_CLOSE; swConnection *conn = swWorker_get_connection(serv, fd); if (conn->close_force) { goto do_close; } else if (conn->closing) { swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SESSION_CLOSING, "The connection[%d] is closing.", fd); return SW_ERR; } else if (conn->closed) { return SW_ERR; } else { do_close: conn->closing = 1; if (serv->onClose != NULL) { info.fd = fd; if (conn->close_actively) { info.from_id = -1; } else { info.from_id = conn->from_id; } info.from_fd = conn->from_fd; serv->onClose(serv, &info); } conn->closing = 0; conn->closed = 1; conn->close_errno = 0; return factory->finish(factory, &_send); } }
reactor 通過 swReactorThread_onPipeReceive 收到 worker 進(jìn)程的連接關(guān)閉通知后,調(diào)用 swReactorThread_send 函數(shù)。如果連接已經(jīng)被關(guān)閉,或者緩沖區(qū)中沒有任何數(shù)據(jù)的時(shí)候,直接調(diào)用 reactor->close 函數(shù),也就是 swReactorThread_close 函數(shù);如果緩沖區(qū)還有數(shù)據(jù),那么需要將消息放到 conn->out_buffer 中等待著該連接寫就緒回調(diào) swReactorThread_close 函數(shù)(此時(shí) close_notify 已經(jīng)為 0)。
int swReactorThread_send(swSendData *_send) { ... if (_send->info.type == SW_EVENT_CLOSE && (conn->close_reset || conn->removed)) { goto close_fd; } ... if (swBuffer_empty(conn->out_buffer)) { if (_send->info.type == SW_EVENT_CLOSE) { close_fd: reactor->close(reactor, fd); return SW_OK; } } swBuffer_chunk *chunk; //close connection if (_send->info.type == SW_EVENT_CLOSE) { chunk = swBuffer_new_chunk(conn->out_buffer, SW_CHUNK_CLOSE, 0); chunk->store.data.val1 = _send->info.type; } if (reactor->set(reactor, fd, SW_EVENT_TCP | SW_EVENT_WRITE | SW_EVENT_READ) < 0 && (errno == EBADF || errno == ENOENT)) { goto close_fd; } ... close_fd: reactor->close(reactor, fd); return SW_OK; } static int swReactorThread_onWrite(swReactor *reactor, swEvent *ev) { ... else if (conn->close_notify) { swServer_tcp_notify(serv, conn, SW_EVENT_CLOSE); conn->close_notify = 0; return SW_OK; } else if (serv->disable_notify && conn->close_force) { return swReactorThread_close(reactor, fd); } _pop_chunk: while (!swBuffer_empty(conn->out_buffer)) { chunk = swBuffer_get_chunk(conn->out_buffer); if (chunk->type == SW_CHUNK_CLOSE) { close_fd: reactor->close(reactor, fd); return SW_OK; } ... } ... }
swReactorThread_close 函數(shù)會(huì)刪除 swConnection 在 server 中的所有痕跡,包括 reactor 中的監(jiān)控,serv->stats 的成員變量,port->connection_num 遞減,從時(shí)間輪中刪除、session 中 fd 置空等等工作。而且,還要清空套接字緩存中的所有數(shù)據(jù),直接向客戶端發(fā)送關(guān)閉請求。swReactor_close 函數(shù)釋放內(nèi)存,關(guān)閉套接字文件描述符。
int swReactorThread_close(swReactor *reactor, int fd) { swServer *serv = SwooleG.serv; if (conn->removed == 0 && reactor->del(reactor, fd) < 0) { return SW_ERR; } sw_atomic_fetch_add(&serv->stats->close_count, 1); sw_atomic_fetch_sub(&serv->stats->connection_num, 1); swTrace("Close Event.fd=%d|from=%d", fd, reactor->id); //free the receive memory buffer swServer_free_buffer(serv, fd); swListenPort *port = swServer_get_port(serv, fd); sw_atomic_fetch_sub(&port->connection_num, 1); #ifdef SW_USE_SOCKET_LINGER if (conn->close_force) { struct linger linger; linger.l_onoff = 1; linger.l_linger = 0; if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &linger, sizeof(struct linger)) == -1) { swWarn("setsockopt(SO_LINGER) failed. Error: %s[%d]", strerror(errno), errno); } } #endif #ifdef SW_REACTOR_USE_SESSION swSession *session = swServer_get_session(serv, conn->session_id); session->fd = 0; #endif #ifdef SW_USE_TIMEWHEEL if (reactor->timewheel) { swTimeWheel_remove(reactor->timewheel, conn); } #endif return swReactor_close(reactor, fd); } int swReactor_close(swReactor *reactor, int fd) { swConnection *socket = swReactor_get(reactor, fd); if (socket->out_buffer) { swBuffer_free(socket->out_buffer); } if (socket->in_buffer) { swBuffer_free(socket->in_buffer); } if (socket->websocket_buffer) { swString_free(socket->websocket_buffer); } bzero(socket, sizeof(swConnection)); socket->removed = 1; swTraceLog(SW_TRACE_CLOSE, "fd=%d.", fd); return close(fd); }
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/29353.html
前言 作為一個(gè)網(wǎng)絡(luò)框架,最為核心的就是消息的接受與發(fā)送。高效的 reactor 模式一直是眾多網(wǎng)絡(luò)框架的首要選擇,本節(jié)主要講解 swoole 中的 reactor 模塊。 UNP 學(xué)習(xí)筆記——IO 復(fù)用 Reactor 的數(shù)據(jù)結(jié)構(gòu) Reactor 的數(shù)據(jù)結(jié)構(gòu)比較復(fù)雜,首先 object 是具體 Reactor 對象的首地址,ptr 是擁有 Reactor 對象的類的指針, event_nu...
摘要:是緩存區(qū)高水位線,達(dá)到了說明緩沖區(qū)即將滿了創(chuàng)建線程函數(shù)用于將監(jiān)控的存放于中向中添加監(jiān)聽的文件描述符等待所有的線程開啟事件循環(huán)利用創(chuàng)建線程,線程啟動(dòng)函數(shù)是保存監(jiān)聽本函數(shù)將用于監(jiān)聽的存放到當(dāng)中,并設(shè)置相應(yīng)的屬性 Server 的啟動(dòng) 在 server 啟動(dòng)之前,swoole 首先要調(diào)用 php_swoole_register_callback 將 PHP 的回調(diào)函數(shù)注冊到 server...
摘要:修復(fù)添加超過萬個(gè)以上定時(shí)器時(shí)發(fā)生崩潰的問題增加模塊,下高性能序列化庫修復(fù)監(jiān)聽端口設(shè)置無效的問題等。線程來處理網(wǎng)絡(luò)事件輪詢,讀取數(shù)據(jù)。當(dāng)?shù)娜挝帐殖晒α艘院?,由這個(gè)線程將連接成功的消息告訴進(jìn)程,再由進(jìn)程轉(zhuǎn)交給進(jìn)程。此時(shí)進(jìn)程觸發(fā)事件。 本文示例代碼詳見:https://github.com/52fhy/swoo...。 簡介 Swoole是一個(gè)PHP擴(kuò)展,提供了PHP語言的異步多線程服務(wù)器...
摘要:當(dāng)此時(shí)的套接字不可寫的時(shí)候,會(huì)自動(dòng)放入緩沖區(qū)中。當(dāng)大于高水線時(shí),會(huì)自動(dòng)調(diào)用回調(diào)函數(shù)。寫就緒狀態(tài)當(dāng)監(jiān)控到套接字進(jìn)入了寫就緒狀態(tài)時(shí),就會(huì)調(diào)用函數(shù)。如果為,說明此時(shí)異步客戶端雖然建立了連接,但是還沒有調(diào)用回調(diào)函數(shù),因此這時(shí)要調(diào)用函數(shù)。 前言 上一章我們說了客戶端的連接 connect,對于同步客戶端來說,連接已經(jīng)建立成功;但是對于異步客戶端來說,此時(shí)可能還在進(jìn)行 DNS 的解析,on...
摘要:在創(chuàng)建進(jìn)程和線程之間,主線程開始進(jìn)行信號處理函數(shù)的設(shè)置。事件循環(huán)結(jié)束前會(huì)調(diào)用函數(shù),該函數(shù)會(huì)檢查并執(zhí)行相應(yīng)的信號處理函數(shù)。 前言 信號處理是網(wǎng)絡(luò)庫不可或缺的一部分,不論是 ALARM、SIGTERM、SIGUSR1、SIGUSR2、SIGPIPE 等信號對程序的控制,還是 reactor、read、write 等操作被信號中斷的處理,都關(guān)系著整個(gè)框架程序的正常運(yùn)行。 Signal 數(shù)據(jù)...
閱讀 3764·2021-08-11 11:16
閱讀 1633·2019-08-30 15:44
閱讀 2003·2019-08-29 18:45
閱讀 2283·2019-08-26 18:18
閱讀 1013·2019-08-26 13:37
閱讀 1578·2019-08-26 11:43
閱讀 2128·2019-08-26 11:34
閱讀 386·2019-08-26 10:59