成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Swoole 源碼分析——Server模塊之Timer模塊與時(shí)間輪算法

qieangel2013 / 3139人閱讀

摘要:當(dāng)其就緒時(shí),會(huì)調(diào)用執(zhí)行定時(shí)函數(shù)。進(jìn)程超時(shí)停止進(jìn)程將要停止時(shí),并不會(huì)立刻停止,而是會(huì)等待事件循環(huán)結(jié)束后停止,這時(shí)為了防止進(jìn)程不退出,還設(shè)置了的延遲,超過就會(huì)停止該進(jìn)程。當(dāng)允許空閑時(shí)間小于時(shí),統(tǒng)一每隔檢測空閑連接。

前言

swooletimer 模塊功能有三個(gè):用戶定時(shí)任務(wù)、剔除空閑連接、更新 server 時(shí)間。timer 模塊的底層有兩種,一種是基于 alarm 信號,一種是基于 timefd。

timer 數(shù)據(jù)結(jié)構(gòu)

timer 數(shù)據(jù)結(jié)構(gòu)是 swTimer。其中 heap 是多個(gè) swTimer_node 類型構(gòu)成的一個(gè)數(shù)據(jù)堆,該數(shù)據(jù)堆按照下一次執(zhí)行時(shí)間來排序,下次執(zhí)行時(shí)間離當(dāng)前時(shí)間越近,元素的位置越靠前;mapswTimer_node 類型的 map,其 keyswTimer_node 類型的 id,該數(shù)據(jù)結(jié)構(gòu)可以通過 id 快速查找對應(yīng)的 swTimer_node 元素;numswTimer_node 元素個(gè)數(shù);use_pipe 標(biāo)志著 worker 進(jìn)程中是否使用管道 pipe 來獲知 alarm 信號已觸發(fā);fd 用于 timefd;_current_id 是當(dāng)前最大 swTimer_nodeid_next_id 就是下一個(gè)新建的 swTimer_nodeid 值,是 _current_id + 1;_next_msec 是下次檢查定時(shí)器的時(shí)間。

_swTimer_nodeheap_node_swTimer 中的數(shù)據(jù)堆元素;data 一般存儲(chǔ) server;callback 是定時(shí)器觸發(fā)后需要執(zhí)行的回調(diào)函數(shù);exec_msec 是該元素應(yīng)該執(zhí)行的時(shí)間;id 是元素在 swTimer 中的 id;type 有三種:SW_TIMER_TYPE_KERNELserver 內(nèi)置定時(shí)函數(shù))、SW_TIMER_TYPE_CORO(協(xié)程定時(shí)函數(shù))、SW_TIMER_TYPE_PHP(PHP 定時(shí)函數(shù))

struct _swTimer
{
    /*--------------timerfd & signal timer--------------*/
    swHeap *heap;
    swHashMap *map;
    int num;
    int use_pipe;
    int lasttime;
    int fd;
    long _next_id;
    long _current_id;
    long _next_msec;
    swPipe pipe;
    /*-----------------for EventTimer-------------------*/
    struct timeval basetime;
    /*--------------------------------------------------*/
    int (*set)(swTimer *timer, long exec_msec);
    swTimer_node* (*add)(swTimer *timer, int _msec, int persistent, void *data, swTimerCallback callback);
};

struct _swTimer_node
{
    swHeap_node *heap_node;
    void *data;
    swTimerCallback callback;
    int64_t exec_msec;
    uint32_t interval;
    long id;
    int type;                 //0 normal node 1 node for client_coro
    uint8_t remove;
};
Timer 定時(shí)器 swTimer_init 創(chuàng)建定時(shí)器

創(chuàng)建定時(shí)器需要給定一個(gè)間隔時(shí)間,每隔這個(gè)時(shí)間就要檢查 swTimer 中的 _swTimer_node 元素,如果時(shí)間已經(jīng)超過了 _swTimer_node 元素的 exec_msec 時(shí)間,就要執(zhí)行定時(shí)函數(shù)。

swTimer_now 函數(shù)初始化 basetimeswTimer_now 函數(shù)可以獲取當(dāng)前時(shí)間,使用的是 clock_gettimeCLOCK_MONOTONIC 獲取絕對時(shí)間,或者使用 gettimeofday 函數(shù)

如果是 worker 進(jìn)程,那么調(diào)用 swSystemTimer_init 函數(shù)對定時(shí)器進(jìn)行初始化;如果是 master 進(jìn)程,那么調(diào)用 swReactorTimer_init 進(jìn)行初始化

int swTimer_now(struct timeval *time)
{
#if defined(SW_USE_MONOTONIC_TIME) && defined(CLOCK_MONOTONIC)
    struct timespec _now;
    if (clock_gettime(CLOCK_MONOTONIC, &_now) < 0)
    {
        swSysError("clock_gettime(CLOCK_MONOTONIC) failed.");
        return SW_ERR;
    }
    time->tv_sec = _now.tv_sec;
    time->tv_usec = _now.tv_nsec / 1000;
#else
    if (gettimeofday(time, NULL) < 0)
    {
        swSysError("gettimeofday() failed.");
        return SW_ERR;
    }
#endif
    return SW_OK;
}

int swTimer_init(long msec)
{
    if (swTimer_now(&SwooleG.timer.basetime) < 0)
    {
        return SW_ERR;
    }


    SwooleG.timer.heap = swHeap_new(1024, SW_MIN_HEAP);
    if (!SwooleG.timer.heap)
    {
        return SW_ERR;
    }

    SwooleG.timer.map = swHashMap_new(SW_HASHMAP_INIT_BUCKET_N, NULL);
    if (!SwooleG.timer.map)
    {
        swHeap_free(SwooleG.timer.heap);
        SwooleG.timer.heap = NULL;
        return SW_ERR;
    }

    SwooleG.timer._current_id = -1;
    SwooleG.timer._next_msec = msec;
    SwooleG.timer._next_id = 1;
    SwooleG.timer.add = swTimer_add;

    if (swIsTaskWorker())
    {
        swSystemTimer_init(msec, SwooleG.use_timer_pipe);
    }
    else
    {
        swReactorTimer_init(msec);
    }

    return SW_OK;
}
swReactorTimer_init 初始化

對于 master 進(jìn)程,只需要設(shè)置 main_reactor 的超時(shí)時(shí)間即可,當(dāng)發(fā)生超時(shí)事件之后,main_reactor 會(huì)調(diào)用 onTimeout 函數(shù);或者一個(gè)事件循環(huán)最后,會(huì)調(diào)用 onFinish 函數(shù);這兩個(gè)函數(shù)都會(huì)最終調(diào)用 swTimer_select,來篩選那些已經(jīng)到了執(zhí)行時(shí)間的元素。

static int swReactorTimer_init(long exec_msec)
{
    SwooleG.main_reactor->check_timer = SW_TRUE;
    SwooleG.main_reactor->timeout_msec = exec_msec;
    SwooleG.timer.set = swReactorTimer_set;
    SwooleG.timer.fd = -1;
    return SW_OK;
}

static int swReactorEpoll_wait(swReactor *reactor, struct timeval *timeo)
{
    ...
    
    if (reactor->timeout_msec == 0)
    {
        if (timeo == NULL)
        {
            reactor->timeout_msec = -1;
        }
        else
        {
            reactor->timeout_msec = timeo->tv_sec * 1000 + timeo->tv_usec / 1000;
        }
    }
    
    while (reactor->running > 0)
    {
        msec = reactor->timeout_msec;
        n = epoll_wait(epoll_fd, events, max_event_num, msec);
        if (n < 0)
        {
           ...
        }
        else if (n == 0)
        {
            if (reactor->onTimeout != NULL)
            {
                reactor->onTimeout(reactor);
            }
            continue;
        }
        
        ...
        
        if (reactor->onFinish != NULL)
        {
            reactor->onFinish(reactor);
        }
        
        ...
    }
    
    ...
}

static void swReactor_onTimeout(swReactor *reactor)
{
    swReactor_onTimeout_and_Finish(reactor);

    if (reactor->disable_accept)
    {
        reactor->enable_accept(reactor);
        reactor->disable_accept = 0;
    }
}

static void swReactor_onFinish(swReactor *reactor)
{
    //check signal
    if (reactor->singal_no)
    {
        swSignal_callback(reactor->singal_no);
        reactor->singal_no = 0;
    }
    swReactor_onTimeout_and_Finish(reactor);
}

static void swReactor_onTimeout_and_Finish(swReactor *reactor)
{
    if (reactor->check_timer)
    {
        swTimer_select(&SwooleG.timer);
    }
    
    ...
}
swSystemTimer_init 初始化

對于 worker 進(jìn)程來說,由于定時(shí)任務(wù)比較多而且復(fù)雜,就不能簡單使用 reactor 超時(shí)來實(shí)現(xiàn)功能。

swSystemTimer_init 采用 SIGALRM 鬧鐘信號或者 timefd 來觸發(fā)中斷 reactor 的等待。

對于 timefd 來說,需要使用 timerfd_settime 系統(tǒng)調(diào)用來設(shè)置超時(shí)時(shí)間,然后將 timefd 加入 workerreactor 監(jiān)控中,將其當(dāng)做文件描述符來監(jiān)控。當(dāng)其就緒時(shí),會(huì)調(diào)用 swTimer_select 執(zhí)行定時(shí)函數(shù)。

對于普通 SIGALRM 信號來說,將 timer->pipe 放入 reactor 的監(jiān)控中,使用 setitimer 來定時(shí)觸發(fā) SIGALRM 信號,設(shè)置信號處理函數(shù)。信號處理函數(shù)中,會(huì)向 timer->pipe 寫入數(shù)據(jù),進(jìn)而觸發(fā) swTimer_select 執(zhí)行定時(shí)函數(shù)。

int swSystemTimer_init(int interval, int use_pipe)
{
    swTimer *timer = &SwooleG.timer;
    timer->lasttime = interval;

#ifndef HAVE_TIMERFD
    SwooleG.use_timerfd = 0;
#endif

    if (SwooleG.use_timerfd)
    {
        if (swSystemTimer_timerfd_set(timer, interval) < 0)
        {
            return SW_ERR;
        }
        timer->use_pipe = 0;
    }
    else
    {
        if (use_pipe)
        {
            if (swPipeNotify_auto(&timer->pipe, 0, 0) < 0)
            {
                return SW_ERR;
            }
            timer->fd = timer->pipe.getFd(&timer->pipe, 0);
            timer->use_pipe = 1;
        }
        else
        {
            timer->fd = 1;
            timer->use_pipe = 0;
        }

        if (swSystemTimer_signal_set(timer, interval) < 0)
        {
            return SW_ERR;
        }
        swSignal_add(SIGALRM, swSystemTimer_signal_handler);
    }

    if (timer->fd > 1)
    {
        SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_TIMER, swSystemTimer_event_handler);
        SwooleG.main_reactor->add(SwooleG.main_reactor, SwooleG.timer.fd, SW_FD_TIMER);
    }
    timer->set = swSystemTimer_set;
    return SW_OK;
}
swSystemTimer_timerfd_set 設(shè)置 timefd

該函數(shù)目的是使用 timerfd_settime 系統(tǒng)調(diào)用,該系統(tǒng)調(diào)用需要 timefditimerspec 類型對象

timefd 可以由 timerfd_create 系統(tǒng)函數(shù)創(chuàng)建

itimerspec 對象需要當(dāng)前時(shí)間和 interval 間隔時(shí)間共同設(shè)置。it_value 是首次超時(shí)時(shí)間,需要填寫當(dāng)前時(shí)間,并加上要超時(shí)的時(shí)間,值得注意的是 tv_nsec 加上去后一定要判斷是否超出1000000000(如果超過要秒加一),否則會(huì)設(shè)置失敗;it_interval 是后續(xù)周期性超時(shí)時(shí)間。

static int swSystemTimer_timerfd_set(swTimer *timer, long interval)
{

    struct timeval now;
    int sec = interval / 1000;
    int msec = (((float) interval / 1000) - sec) * 1000;

    if (gettimeofday(&now, NULL) < 0)
    {
        swWarn("gettimeofday() failed. Error: %s[%d]", strerror(errno), errno);
        return SW_ERR;
    }

    struct itimerspec timer_set;
    bzero(&timer_set, sizeof(timer_set));

    if (interval < 0)
    {
        if (timer->fd == 0)
        {
            return SW_OK;
        }
    }
    else
    {
        timer_set.it_interval.tv_sec = sec;
        timer_set.it_interval.tv_nsec = msec * 1000 * 1000;

        timer_set.it_value.tv_sec = now.tv_sec + sec;
        timer_set.it_value.tv_nsec = (now.tv_usec * 1000) + timer_set.it_interval.tv_nsec;

        if (timer_set.it_value.tv_nsec > 1e9)
        {
            timer_set.it_value.tv_nsec = timer_set.it_value.tv_nsec - 1e9;
            timer_set.it_value.tv_sec += 1;
        }

        if (timer->fd == 0)
        {
            timer->fd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK | TFD_CLOEXEC);
            if (timer->fd < 0)
            {
                swWarn("timerfd_create() failed. Error: %s[%d]", strerror(errno), errno);
                return SW_ERR;
            }
        }
    }

    if (timerfd_settime(timer->fd, TFD_TIMER_ABSTIME, &timer_set, NULL) == -1)
    {
        swWarn("timerfd_settime() failed. Error: %s[%d]", strerror(errno), errno);
        return SW_ERR;
    }
    return SW_OK;
#else
    swWarn("kernel not support timerfd.");
    return SW_ERR;
#endif
}
swSystemTimer_signal_set 設(shè)置信號超時(shí)時(shí)間

setitimer 是一個(gè)比較常用的函數(shù),可用來實(shí)現(xiàn)延時(shí)和定時(shí)的功能。

ITIMER_REAL:以系統(tǒng)真實(shí)的時(shí)間來計(jì)算,它送出 SIGALRM 信號。

ITIMER_VIRTUAL:以該進(jìn)程在用戶態(tài)下花費(fèi)的時(shí)間來計(jì)算,它送出 SIGVTALRM 信號。

ITIMER_PROF:以該進(jìn)程在用戶態(tài)下和內(nèi)核態(tài)下所費(fèi)的時(shí)間來計(jì)算,它送出 SIGPROF 信號。

it_interval 為計(jì)時(shí)間隔,it_value 為延時(shí)時(shí)長,也就是距離現(xiàn)有時(shí)間第一次延遲觸發(fā)的相對時(shí)間,而不是絕對時(shí)間。(所以我認(rèn)為代碼中 gettimeofday 函數(shù)是多余的,并不需要獲取當(dāng)前時(shí)間)

 */
static int swSystemTimer_signal_set(swTimer *timer, long interval)
{
    struct itimerval timer_set;
    int sec = interval / 1000;
    int msec = (((float) interval / 1000) - sec) * 1000;

    struct timeval now;
    if (gettimeofday(&now, NULL) < 0)
    {
        swWarn("gettimeofday() failed. Error: %s[%d]", strerror(errno), errno);
        return SW_ERR;
    }
    bzero(&timer_set, sizeof(timer_set));

    if (interval > 0)
    {
        timer_set.it_interval.tv_sec = sec;
        timer_set.it_interval.tv_usec = msec * 1000;

        timer_set.it_value.tv_sec = sec;
        timer_set.it_value.tv_usec = timer_set.it_interval.tv_usec;

        if (timer_set.it_value.tv_usec > 1e6)
        {
            timer_set.it_value.tv_usec = timer_set.it_value.tv_usec - 1e6;
            timer_set.it_value.tv_sec += 1;
        }
    }

    if (setitimer(ITIMER_REAL, &timer_set, NULL) < 0)
    {
        swWarn("setitimer() failed. Error: %s[%d]", strerror(errno), errno);
        return SW_ERR;
    }
    return SW_OK;
}
swSystemTimer_signal_handler 超時(shí)信號處理函數(shù)

swSystemTimer_signal_handler 函數(shù)是 SIGALARM 信號的處理函數(shù),該函數(shù)被觸發(fā)說明 epoll_wait 函數(shù)被鬧鐘信號中斷,此時(shí)本函數(shù)向 timer.pipe 寫入數(shù)據(jù),然后即返回。reactor 會(huì)檢測到 timer.pipe 的寫就緒,進(jìn)而調(diào)用對應(yīng)的回調(diào)函數(shù) swSystemTimer_event_handler

void swSystemTimer_signal_handler(int sig)
{
    SwooleG.signal_alarm = 1;
    uint64_t flag = 1;

    if (SwooleG.timer.use_pipe)
    {
        SwooleG.timer.pipe.write(&SwooleG.timer.pipe, &flag, sizeof(flag));
    }
}
swSystemTimer_event_handler 寫就緒回調(diào)函數(shù)

寫就緒回調(diào)函數(shù)可能是由 timer.pipe 的寫就緒觸發(fā),也可能是 timefd 的寫就緒觸發(fā),無論哪個(gè)都會(huì)調(diào)用 swTimer_select 函數(shù)執(zhí)行對應(yīng)的定時(shí)函數(shù)。

int swSystemTimer_event_handler(swReactor *reactor, swEvent *event)
{
    uint64_t exp;
    swTimer *timer = &SwooleG.timer;

    if (read(timer->fd, &exp, sizeof(uint64_t)) != sizeof(uint64_t))
    {
        return SW_ERR;
    }
    SwooleG.signal_alarm = 0;
    return swTimer_select(timer);
}
swTimer_add 添加元素

swTimer_add 用于添加定時(shí)函數(shù)元素。本函數(shù)邏輯比較簡單,新建一個(gè) swTimer_node 對象,初始化賦值之后加入到 timer->heap 中,程序會(huì)自動(dòng)根據(jù)其 exec_msec 進(jìn)行有小到大的排序,然后再更新 timer->map 哈希表。

值得注意的是,當(dāng)新添加的定時(shí)函數(shù)需要執(zhí)行的時(shí)間小于當(dāng)前 timer 下次執(zhí)行時(shí)間的時(shí)候,我們需要調(diào)用 timer->set 函數(shù)更新 time 的間隔時(shí)間。在 master 進(jìn)程中,這個(gè) set 函數(shù)是 swReactorTimer_set,用于設(shè)置 reactor 的超時(shí)時(shí)間;在 worker 進(jìn)程中,set 函數(shù)是 swSystemTimer_set,用于更新 timerfd_settimesetitimer 函數(shù)。

static swTimer_node* swTimer_add(swTimer *timer, int _msec, int interval, void *data, swTimerCallback callback)
{
    swTimer_node *tnode = sw_malloc(sizeof(swTimer_node));
    if (!tnode)
    {
        swSysError("malloc(%ld) failed.", sizeof(swTimer_node));
        return NULL;
    }

    int64_t now_msec = swTimer_get_relative_msec();
    if (now_msec < 0)
    {
        sw_free(tnode);
        return NULL;
    }

    tnode->data = data;
    tnode->type = SW_TIMER_TYPE_KERNEL;
    tnode->exec_msec = now_msec + _msec;
    tnode->interval = interval ? _msec : 0;
    tnode->remove = 0;
    tnode->callback = callback;

    if (timer->_next_msec < 0 || timer->_next_msec > _msec)
    {
        timer->set(timer, _msec);
        timer->_next_msec = _msec;
    }

    tnode->id = timer->_next_id++;
    if (unlikely(tnode->id < 0))
    {
        tnode->id = 1;
        timer->_next_id = 2;
    }
    timer->num++;

    tnode->heap_node = swHeap_push(timer->heap, tnode->exec_msec, tnode);
    if (tnode->heap_node == NULL)
    {
        sw_free(tnode);
        return NULL;
    }
    swHashMap_add_int(timer->map, tnode->id, tnode);
    return tnode;
}

static int swSystemTimer_set(swTimer *timer, long new_interval)
{
    if (new_interval == current_interval)
    {
        return SW_OK;
    }
    current_interval = new_interval;
    if (SwooleG.use_timerfd)
    {
        return swSystemTimer_timerfd_set(timer, new_interval);
    }
    else
    {
        return swSystemTimer_signal_set(timer, new_interval);
    }
}
swTimer_del 刪除元素
int swTimer_del(swTimer *timer, swTimer_node *tnode)
{
    if (tnode->remove)
    {
        return SW_FALSE;
    }
    if (SwooleG.timer._current_id > 0 && tnode->id == SwooleG.timer._current_id)
    {
        tnode->remove = 1;
        return SW_TRUE;
    }
    if (swHashMap_del_int(timer->map, tnode->id) < 0)
    {
        return SW_ERR;
    }
    if (tnode->heap_node)
    {
        //remove from min-heap
        swHeap_remove(timer->heap, tnode->heap_node);
        sw_free(tnode->heap_node);
    }
    sw_free(tnode);
    timer->num --;
    return SW_TRUE;
}
swTimer_select 篩選定時(shí)函數(shù)

swTimer_select 函數(shù)的篩選原理是從 timer->heap 中不斷 pop 出定時(shí)元素,比較它們的 exec_msec 是否超過了當(dāng)前時(shí)間,如果超過了時(shí)間,就執(zhí)行對應(yīng)的定時(shí)函數(shù);如果沒有超過,由于 timer->heap 是排序過后的數(shù)據(jù)堆,因此當(dāng)前定時(shí)元素之后的都不會(huì)超過當(dāng)前時(shí)間,也就是還沒有到執(zhí)行的時(shí)間。

如果當(dāng)前的定時(shí)元素超過了當(dāng)前時(shí)間,說明該元素應(yīng)該執(zhí)行定時(shí)函數(shù)。設(shè)置 timer->_current_id 為當(dāng)前的 id 后,執(zhí)行 tnode->callback 回調(diào)函數(shù);如果當(dāng)前定時(shí)元素不是一次執(zhí)行的任務(wù),而是需要每隔一段時(shí)間定時(shí)的任務(wù),就要再次將元素放入 timer->heap 中;如果當(dāng)前定時(shí)元素是一次執(zhí)行的任務(wù),就要將元素從 timer->map、timer->map 中刪除

循環(huán)結(jié)束后,tnode 就是下一個(gè)要執(zhí)行的定時(shí)元素,我們需要調(diào)用 timer->set 函數(shù)設(shè)置鬧鐘信號(worker 進(jìn)程)或者 reactor 超時(shí)時(shí)間(master 進(jìn)程)。

int swTimer_select(swTimer *timer)
{
    int64_t now_msec = swTimer_get_relative_msec();
    if (now_msec < 0)
    {
        return SW_ERR;
    }

    swTimer_node *tnode = NULL;
    swHeap_node *tmp;
    long timer_id;

    while ((tmp = swHeap_top(timer->heap)))
    {
        tnode = tmp->data;
        if (tnode->exec_msec > now_msec)
        {
            break;
        }

        timer_id = timer->_current_id = tnode->id;
        if (!tnode->remove)
        {
            tnode->callback(timer, tnode);
        }
        timer->_current_id = -1;

        //persistent timer
        if (tnode->interval > 0 && !tnode->remove)
        {
            while (tnode->exec_msec <= now_msec)
            {
                tnode->exec_msec += tnode->interval;
            }
            swHeap_change_priority(timer->heap, tnode->exec_msec, tmp);
            continue;
        }

        timer->num--;
        swHeap_pop(timer->heap);
        swHashMap_del_int(timer->map, timer_id);
        sw_free(tnode);
    }

    if (!tnode || !tmp)
    {
        timer->_next_msec = -1;
        timer->set(timer, -1);
    }
    else
    {
        timer->set(timer, tnode->exec_msec - now_msec);
    }
    return SW_OK;
}
Timer 定時(shí)器的使用 master 進(jìn)程 swServer_start_proxy

timer 模塊在 master 進(jìn)程中最重要的作用是每隔一秒更新 serv->gs->now 的值。除此之外,當(dāng) reactor 線程調(diào)度 worker 進(jìn)程時(shí),如果一段時(shí)間內(nèi)沒有任何空閑的 worker 進(jìn)程空閑,timer 模塊還負(fù)責(zé)寫入錯(cuò)誤日志。

static int swServer_start_proxy(swServer *serv)
{
    ...
    if (swTimer_init(1000) < 0)
    {
        return SW_ERR;
    }
    
    if (SwooleG.timer.add(&SwooleG.timer, 1000, 1, serv, swServer_master_onTimer) == NULL)
    {
        return SW_ERR;
    }
    ...
}

void swServer_master_onTimer(swTimer *timer, swTimer_node *tnode)
{
    swServer *serv = (swServer *) tnode->data;
    swServer_update_time(serv);
    if (serv->scheduler_warning && serv->warning_time < serv->gs->now)
    {
        serv->scheduler_warning = 0;
        serv->warning_time = serv->gs->now;
        swoole_error_log(SW_LOG_WARNING, SW_ERROR_SERVER_NO_IDLE_WORKER, "No idle worker is available.");
    }

    if (serv->hooks[SW_SERVER_HOOK_MASTER_TIMER])
    {
        swServer_call_hook(serv, SW_SERVER_HOOK_MASTER_TIMER, serv);
    }
}

void swServer_update_time(swServer *serv)
{
    time_t now = time(NULL);
    if (now < 0)
    {
        swWarn("get time failed. Error: %s[%d]", strerror(errno), errno);
    }
    else
    {
        serv->gs->now = now;
    }
}
worker 進(jìn)程超時(shí)停止

worker 進(jìn)程將要停止時(shí),并不會(huì)立刻停止,而是會(huì)等待事件循環(huán)結(jié)束后停止,這時(shí)為了防止 worker 進(jìn)程不退出,還設(shè)置了 30s 的延遲,超過 30s 就會(huì)停止該進(jìn)程。

static void swWorker_stop()
{
    swWorker *worker = SwooleWG.worker;
    swServer *serv = SwooleG.serv;
    worker->status = SW_WORKER_BUSY;
    
    ...

    try_to_exit: SwooleWG.wait_exit = 1;
    if (SwooleG.timer.fd == 0)
    {
        swTimer_init(serv->max_wait_time * 1000);
    }
    SwooleG.timer.add(&SwooleG.timer, serv->max_wait_time * 1000, 0, NULL, swWorker_onTimeout);

    swWorker_try_to_exit();
}

static void swWorker_onTimeout(swTimer *timer, swTimer_node *tnode)
{
    SwooleG.running = 0;
    SwooleG.main_reactor->running = 0;
    swoole_error_log(SW_LOG_WARNING, SW_ERROR_SERVER_WORKER_EXIT_TIMEOUT, "worker exit timeout, forced to terminate.");
}
swoole_timer_tick 添加定時(shí)任務(wù)

timer 模塊另一個(gè)非常重要的功能是添加定時(shí)任務(wù),一般是使用 swoole_timer_tick 函數(shù)、swoole_timer_after 函數(shù)、swoole_server->tick 函數(shù)、swoole_server->after 函數(shù):

PHP_FUNCTION(swoole_timer_tick)
{
    long after_ms;
    zval *callback;
    zval *param = NULL;

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "lz|z", &after_ms, &callback, ¶m) == FAILURE)
    {
        return;
    }

    long timer_id = php_swoole_add_timer(after_ms, callback, param, 1 TSRMLS_CC);
    if (timer_id < 0)
    {
        RETURN_FALSE;
    }
    else
    {
        RETURN_LONG(timer_id);
    }
}

PHP_FUNCTION(swoole_timer_after)
{
    long after_ms;
    zval *callback;
    zval *param = NULL;

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "lz|z", &after_ms, &callback, ¶m) == FAILURE)
    {
        return;
    }

    long timer_id = php_swoole_add_timer(after_ms, callback, param, 0 TSRMLS_CC);
    if (timer_id < 0)
    {
        RETURN_FALSE;
    }
    else
    {
        RETURN_LONG(timer_id);
    }
}
php_swoole_add_timer 函數(shù)

本函數(shù)主要調(diào)用 SwooleG.timer.add 函數(shù)將添加新的定時(shí)任務(wù),值得注意的是 swTimer_callback 類型的對象 cb 和兩個(gè)回調(diào)函數(shù) php_swoole_onIntervalphp_swoole_onTimeout,真正的回調(diào)函數(shù)存放在了 swTimer_callback 對象中,如果用戶有參數(shù)設(shè)置,也會(huì)放入 cb->data 中。

long php_swoole_add_timer(int ms, zval *callback, zval *param, int persistent TSRMLS_DC)
{
    char *func_name = NULL;

    if (!swIsTaskWorker())
    {
        php_swoole_check_reactor();
    }

    php_swoole_check_timer(ms);
    swTimer_callback *cb = emalloc(sizeof(swTimer_callback));

    cb->data = &cb->_data;
    cb->callback = &cb->_callback;
    memcpy(cb->callback, callback, sizeof(zval));
    if (param)
    {
        memcpy(cb->data, param, sizeof(zval));
    }
    else
    {
        cb->data = NULL;
    }

    swTimerCallback timer_func;
    if (persistent)
    {
        cb->type = SW_TIMER_TICK;
        timer_func = php_swoole_onInterval;
    }
    else
    {
        cb->type = SW_TIMER_AFTER;
        timer_func = php_swoole_onTimeout;
    }

    sw_zval_add_ref(&cb->callback);
    if (cb->data)
    {
        sw_zval_add_ref(&cb->data);
    }

    swTimer_node *tnode = SwooleG.timer.add(&SwooleG.timer, ms, persistent, cb, timer_func);
    {
        tnode->type = SW_TIMER_TYPE_PHP;
        return tnode->id;
    }
}

void php_swoole_check_timer(int msec)
{
    if (unlikely(SwooleG.timer.fd == 0))
    {
        swTimer_init(msec);
    }
}
php_swoole_onInterval 函數(shù)

本函數(shù)主要調(diào)用 cb->callback,如果有用戶參數(shù),還要將 cb->data 放入調(diào)用函數(shù)中。

void php_swoole_onInterval(swTimer *timer, swTimer_node *tnode)
{
    zval *retval = NULL;
    int argc = 1;

    zval *ztimer_id;

    swTimer_callback *cb = tnode->data;

    SW_MAKE_STD_ZVAL(ztimer_id);
    ZVAL_LONG(ztimer_id, tnode->id);

    {
        zval **args[2];
        if (cb->data)
        {
            argc = 2;
            sw_zval_add_ref(&cb->data);
            args[1] = &cb->data;
        }
        args[0] = &ztimer_id;

        if (sw_call_user_function_ex(EG(function_table), NULL, cb->callback, &retval, argc, args, 0, NULL TSRMLS_CC) == FAILURE)
        {
            swoole_php_fatal_error(E_WARNING, "swoole_timer: onTimerCallback handler error");
            return;
        }
    }

    if (tnode->remove)
    {
        php_swoole_del_timer(tnode TSRMLS_CC);
    }
}

php_swoole_onTimeout 函數(shù)

與上一個(gè)函數(shù)類似,只是這次直接從 timer 中刪除對應(yīng)的元素。

void php_swoole_onTimeout(swTimer *timer, swTimer_node *tnode)
{
    {
        swTimer_callback *cb = tnode->data;
        zval *retval = NULL;

        {
            zval **args[2];
            int argc;

            if (NULL == cb->data)
            {
                argc = 0;
                args[0] = NULL;
            }
            else
            {
                argc = 1;
                args[0] = &cb->data;
            }

            if (sw_call_user_function_ex(EG(function_table), NULL, cb->callback, &retval, argc, args, 0, NULL TSRMLS_CC) == FAILURE)
            {
                swoole_php_fatal_error(E_WARNING, "swoole_timer: onTimeout handler error");
                return;
            }
        }

        php_swoole_del_timer(tnode TSRMLS_CC);
    }
}
Timer 模塊時(shí)間輪算法

時(shí)間輪算法是各大網(wǎng)絡(luò)模塊采用的剔除空閑連接的方法,原理是構(gòu)建一個(gè)首尾相連的循環(huán)數(shù)組,每隔數(shù)組元素中有若干個(gè)連接。如果某個(gè)連接有數(shù)據(jù)發(fā)送過來,將連接從所在的數(shù)組元素中刪除,將連接放入最新的數(shù)組元素中,這樣有數(shù)據(jù)來往的連接會(huì)一直在新數(shù)組元素中,空閑的連接所在的數(shù)組元素漸漸的變成了舊數(shù)組元素。每隔一段時(shí)間就按順序清空舊數(shù)組元素的全部連接。

swTimeWheel_new 創(chuàng)建時(shí)間輪

時(shí)間輪的數(shù)據(jù)結(jié)構(gòu)比較簡單,由哈希表、size(循環(huán)數(shù)組總數(shù)量),current (循環(huán)數(shù)組當(dāng)前最舊的數(shù)組元素,current-1 是循環(huán)數(shù)組中最新的數(shù)組元素)。swTimeWheel_new 函數(shù)很簡單,就是創(chuàng)建這三個(gè)屬性。

typedef struct
{
    uint16_t current;
    uint16_t size;
    swHashMap **wheel;

} swTimeWheel;

swTimeWheel* swTimeWheel_new(uint16_t size)
{
    swTimeWheel *tw = sw_malloc(sizeof(swTimeWheel));
    if (!tw)
    {
        swWarn("malloc(%ld) failed.", sizeof(swTimeWheel));
        return NULL;
    }

    tw->size = size;
    tw->current = 0;
    tw->wheel = sw_calloc(size, sizeof(void*));
    if (tw->wheel == NULL)
    {
        swWarn("malloc(%ld) failed.", sizeof(void*) * size);
        sw_free(tw);
        return NULL;
    }

    int i;
    for (i = 0; i < size; i++)
    {
        tw->wheel[i] = swHashMap_new(16, NULL);
        if (tw->wheel[i] == NULL)
        {
            swTimeWheel_free(tw);
            return NULL;
        }
    }
    return tw;
}
swTimeWheel_add 添加連接

當(dāng) main_reactor 有新連接進(jìn)入的時(shí)候,需要將新的連接添加到時(shí)間輪中,新的連接會(huì)被放到最新的數(shù)組元素中,也就是 current-1 的元素中,然后設(shè)置 swConnection 中的 timewheel_index。

void swTimeWheel_add(swTimeWheel *tw, swConnection *conn)
{
    uint16_t index = tw->current == 0 ? tw->size - 1 : tw->current - 1;
    swHashMap *new_set = tw->wheel[index];
    swHashMap_add_int(new_set, conn->fd, conn);

    conn->timewheel_index = index;

    swTraceLog(SW_TRACE_REACTOR, "current=%d, fd=%d, index=%d.", tw->current, conn->fd, index);
}
swTimeWheel_update 函數(shù)

當(dāng)連接有數(shù)據(jù)傳輸?shù)臅r(shí)候,需要更新該連接在時(shí)間輪中的位置,將該連接從原有的數(shù)組元素中刪除,然后添加到最新的數(shù)組元素中,也就是 current-1 中,然后更新 swConnection 中的 timewheel_index。

#define swTimeWheel_new_index(tw)   (tw->current == 0 ? tw->size - 1 : tw->current - 1)

void swTimeWheel_update(swTimeWheel *tw, swConnection *conn)
{
    uint16_t new_index = swTimeWheel_new_index(tw);
    swHashMap *new_set = tw->wheel[new_index];
    swHashMap_add_int(new_set, conn->fd, conn);

    swHashMap *old_set = tw->wheel[conn->timewheel_index];
    swHashMap_del_int(old_set, conn->fd);

    swTraceLog(SW_TRACE_REACTOR, "current=%d, fd=%d, old_index=%d, new_index=%d.", tw->current, conn->fd, new_index, conn->timewheel_index);

    conn->timewheel_index = new_index;
}
swTimeWheel_remove 函數(shù)

在時(shí)間輪中刪除該連接,

void swTimeWheel_remove(swTimeWheel *tw, swConnection *conn)
{
    swHashMap *set = tw->wheel[conn->timewheel_index];
    swHashMap_del_int(set, conn->fd);
    swTraceLog(SW_TRACE_REACTOR, "current=%d, fd=%d.", tw->current, conn->fd);
}
swTimeWheel_forward 刪除空閑連接

swTimeWheel_forward 將最舊的數(shù)組元素 current 中所有連接都關(guān)閉掉,然后將 current 遞增。

void swTimeWheel_forward(swTimeWheel *tw, swReactor *reactor)
{
    swHashMap *set = tw->wheel[tw->current];
    tw->current = tw->current == tw->size - 1 ? 0 : tw->current + 1;

    swTraceLog(SW_TRACE_REACTOR, "current=%d.", tw->current);

    swConnection *conn;
    uint64_t fd;

    while (1)
    {
        conn = swHashMap_each_int(set, &fd);
        if (conn == NULL)
        {
            break;
        }

        conn->close_force = 1;
        conn->close_notify = 1;
        conn->close_wait = 1;
        conn->close_actively = 1;

        //notify to reactor thread
        if (conn->removed)
        {
            reactor->close(reactor, (int) fd);
        }
        else
        {
            reactor->set(reactor, fd, SW_FD_TCP | SW_EVENT_WRITE);
        }
    }
}
reactor 線程中時(shí)間輪的創(chuàng)建

時(shí)間輪的創(chuàng)建在 reactor 線程進(jìn)行事件循環(huán)之前,按照用戶設(shè)置的連接最大空閑時(shí)間設(shè)置不同大小的時(shí)間輪,值得注意的是,時(shí)間輪最大是 SW_TIMEWHEEL_SIZE,也就是循環(huán)數(shù)組大小最大是 60。如果超過 60s 空閑時(shí)間,也僅僅建立 60 個(gè)元素的數(shù)組,但是這樣會(huì)造成每個(gè)數(shù)組元素存放更多的連接。

值得注意的是,當(dāng)允許空閑時(shí)間超過 60s 時(shí),heartbeat_interval * 1000reactor 的超時(shí)時(shí)間,例如空閑時(shí)間是 60s,那么每隔 6s,reactor 都會(huì)超時(shí)來檢測空閑連接。當(dāng)允許空閑時(shí)間小于 60s 時(shí),reactor 統(tǒng)一每隔 1s 檢測空閑連接。

不同于 master 進(jìn)程和 worker 線程,reactoronFinishonTimeout 不再采用默認(rèn)的 swReactor_onTimeoutswReactor_onFinish 函數(shù),而是采用空閑連接檢測的 swReactorThread_onReactorCompleted 函數(shù),該函數(shù)會(huì)調(diào)用 swTimeWheel_forward 來剔除空閑連接。

#define SW_TIMEWHEEL_SIZE          60

static int swReactorThread_loop(swThreadParam *param)
{
    ...
    
    if (serv->heartbeat_idle_time > 0)
    {
        if (serv->heartbeat_idle_time < SW_TIMEWHEEL_SIZE)
        {
            reactor->timewheel = swTimeWheel_new(serv->heartbeat_idle_time);
            reactor->heartbeat_interval = 1;
        }
        else
        {
            reactor->timewheel = swTimeWheel_new(SW_TIMEWHEEL_SIZE);
            reactor->heartbeat_interval = serv->heartbeat_idle_time / SW_TIMEWHEEL_SIZE;
        }
        reactor->last_heartbeat_time = 0;
        if (reactor->timewheel == NULL)
        {
            swSysError("thread->timewheel create failed.");
            return SW_ERR;
        }
        reactor->timeout_msec = reactor->heartbeat_interval * 1000;
        reactor->onFinish = swReactorThread_onReactorCompleted;
        reactor->onTimeout = swReactorThread_onReactorCompleted;
    }
    
    reactor->wait(reactor, NULL);
}
reactor 線程中時(shí)間輪的添加

當(dāng)有新連接的時(shí)候,conn->connect_notify 會(huì)被置為 1,此時(shí)該連接文件描述符寫就緒,然后就會(huì)調(diào)用 swReactorThread_onWrite,此時(shí) reactor 線程將該連接添加到時(shí)間輪中。

static int swReactorThread_onWrite(swReactor *reactor, swEvent *ev)
{
    ...
    
    if (conn->connect_notify)
    {
        conn->connect_notify = 0;
        if (reactor->timewheel)
        {
            swTimeWheel_add(reactor->timewheel, conn);
        }
        
        ...
    }
    ...
}
reactor 線程中時(shí)間輪的更新
static int swReactorThread_onRead(swReactor *reactor, swEvent *event)
{
    ...
    if (reactor->timewheel && swTimeWheel_new_index(reactor->timewheel) != event->socket->timewheel_index)
    {
        swTimeWheel_update(reactor->timewheel, event->socket);
    }
    ...
}

reactor 線程中時(shí)間輪的剔除

當(dāng)連接在允許的空閑時(shí)間之內(nèi)沒有任何數(shù)據(jù)發(fā)送,那么時(shí)間輪算法就要關(guān)閉該連接。關(guān)閉連接并不是直接 close 套接字,而是需要通知對應(yīng)的 worker 進(jìn)程調(diào)用 onClose 函數(shù),然后才能關(guān)閉。具體的做法是設(shè)置 swConnectionclose_force、close_notify 等成員變量為 1,并且關(guān)閉該連接的讀就緒監(jiān)聽事件。

static void swReactorThread_onReactorCompleted(swReactor *reactor)
{
    swServer *serv = reactor->ptr;
    if (reactor->heartbeat_interval > 0 && reactor->last_heartbeat_time < serv->gs->now - reactor->heartbeat_interval)
    {
        swTimeWheel_forward(reactor->timewheel, reactor);
        reactor->last_heartbeat_time = serv->gs->now;
    }
}

void swTimeWheel_forward(swTimeWheel *tw, swReactor *reactor)
{
    ...
    
    conn->close_force = 1;
    conn->close_notify = 1;
    conn->close_wait = 1;
    conn->close_actively = 1;
    
    if (conn->removed)
    {
        reactor->close(reactor, (int) fd);
    }
    else
    {
        reactor->set(reactor, fd, SW_FD_TCP | SW_EVENT_WRITE);
    }
    ...
}

當(dāng)該連接寫就緒的時(shí)候,會(huì)調(diào)用 swReactorThread_onWrite 函數(shù)。這個(gè)時(shí)候就會(huì)調(diào)用 swServer_tcp_notify 函數(shù),進(jìn)而調(diào)用 swFactoryProcess_notifyswFactoryProcess_dispatch,最后調(diào)用 swReactorThread_send2worker 發(fā)送給了 worker 進(jìn)程。

由于 reactor 啟用的是水平觸發(fā),由于并未向該連接寫入數(shù)據(jù),因此很快又會(huì)觸發(fā)寫就緒事件調(diào)用 swReactorThread_onWrite 函數(shù),這時(shí)如果 disable_notify 為 1(dispatch_mode 為 1 或 3),會(huì)直接執(zhí)行 swReactorThread_close 函數(shù)關(guān)閉連接,假如此時(shí) conn->out_buffer 中還有數(shù)據(jù)未發(fā)送,也會(huì)被拋棄。如果 disable_notify 為 0,則會(huì)繼續(xù)向?qū)⒁P(guān)閉的連接發(fā)送數(shù)據(jù),直到接收到 SW_CHUNK_CLOSE 類型的消息。

static int swReactorThread_onWrite(swReactor *reactor, swEvent *ev)
{
    ...
    else if (conn->close_notify)
    {
        swServer_tcp_notify(serv, conn, SW_EVENT_CLOSE);
        conn->close_notify = 0;
        return SW_OK;
    }
    else if (serv->disable_notify && conn->close_force)
    {
        return swReactorThread_close(reactor, fd);
    }
    ...
}

int swServer_tcp_notify(swServer *serv, swConnection *conn, int event)
{
    swDataHead notify_event;
    notify_event.type = event;
    notify_event.from_id = conn->from_id;
    notify_event.fd = conn->fd;
    notify_event.from_fd = conn->from_fd;
    notify_event.len = 0;
    return serv->factory.notify(&serv->factory, ¬ify_event);
}

static int swFactoryProcess_notify(swFactory *factory, swDataHead *ev)
{
    memcpy(&sw_notify_data._send, ev, sizeof(swDataHead));
    sw_notify_data._send.len = 0;
    sw_notify_data.target_worker_id = -1;
    return factory->dispatch(factory, (swDispatchData *) &sw_notify_data);
}

static int swFactoryProcess_dispatch(swFactory *factory, swDispatchData *task)
{ 
   ...
   
   if (swEventData_is_stream(task->data.info.type))
   {
       swConnection *conn = swServer_connection_get(serv, fd);
       if (conn->closed)
        {
            //Connection has been clsoed by server
            if (!(task->data.info.type == SW_EVENT_CLOSE && conn->close_force))
            {
                return SW_OK;
            }
        }
        //converted fd to session_id
        task->data.info.fd = conn->session_id;
        task->data.info.from_fd = conn->from_fd;
   }
   
   return swReactorThread_send2worker((void *) &(task->data), send_len, target_worker_id);
}

worker 進(jìn)程收到消息后會(huì)調(diào)用 swWorker_onTask 函數(shù),進(jìn)而調(diào)用 swFactoryProcess_end 函數(shù),調(diào)用 serv->onClose 函數(shù),并設(shè)置 swConnection 對象的 closed 為 1,然后調(diào)用 swFactoryProcess_finish 函數(shù)將數(shù)據(jù)包發(fā)送給 reactor 線程。

int swWorker_onTask(swFactory *factory, swEventData *task)
{
    switch (task->info.type)
    {
        ... 
        factory->end(factory, task->info.fd);
        break;
        ...
    }
}

static int swFactoryProcess_end(swFactory *factory, int fd)
{
    bzero(&_send, sizeof(_send));
    _send.info.fd = fd;
    _send.info.len = 0;
    _send.info.type = SW_EVENT_CLOSE;
   
   swConnection *conn = swWorker_get_connection(serv, fd);
   
   if (conn->close_force)
   {
       goto do_close;
   }
   else if (conn->closing)
   {
       swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SESSION_CLOSING, "The connection[%d] is closing.", fd);
       return SW_ERR;
   }
   else if (conn->closed)
   {
       return SW_ERR;
   }
   else
   {
        do_close:
        conn->closing = 1;
        if (serv->onClose != NULL)
        {
            info.fd = fd;
            if (conn->close_actively)
            {
                info.from_id = -1;
            }
            else
            {
                info.from_id = conn->from_id;
            }
            info.from_fd = conn->from_fd;
            serv->onClose(serv, &info);
        }
        conn->closing = 0;
        conn->closed = 1;
        conn->close_errno = 0;
        return factory->finish(factory, &_send);
   }

}

reactor 通過 swReactorThread_onPipeReceive 收到 worker 進(jìn)程的連接關(guān)閉通知后,調(diào)用 swReactorThread_send 函數(shù)。如果連接已經(jīng)被關(guān)閉,或者緩沖區(qū)中沒有任何數(shù)據(jù)的時(shí)候,直接調(diào)用 reactor->close 函數(shù),也就是 swReactorThread_close 函數(shù);如果緩沖區(qū)還有數(shù)據(jù),那么需要將消息放到 conn->out_buffer 中等待著該連接寫就緒回調(diào) swReactorThread_close 函數(shù)(此時(shí) close_notify 已經(jīng)為 0)。

int swReactorThread_send(swSendData *_send)
{
    ...
    if (_send->info.type == SW_EVENT_CLOSE && (conn->close_reset || conn->removed))
    {
        goto close_fd;
    }
    
    ...
    if (swBuffer_empty(conn->out_buffer))
    {
        if (_send->info.type == SW_EVENT_CLOSE)
        {
            close_fd:
            reactor->close(reactor, fd);
            return SW_OK;
        }
    }
    
    swBuffer_chunk *chunk;
    //close connection
    if (_send->info.type == SW_EVENT_CLOSE)
    {
        chunk = swBuffer_new_chunk(conn->out_buffer, SW_CHUNK_CLOSE, 0);
        chunk->store.data.val1 = _send->info.type;
    }
    
    if (reactor->set(reactor, fd, SW_EVENT_TCP | SW_EVENT_WRITE | SW_EVENT_READ) < 0
            && (errno == EBADF || errno == ENOENT))
    {
        goto close_fd;
    }

    ...
    close_fd:
        reactor->close(reactor, fd);
        return SW_OK;
}

static int swReactorThread_onWrite(swReactor *reactor, swEvent *ev)
{
    ...
    else if (conn->close_notify)
    {
        swServer_tcp_notify(serv, conn, SW_EVENT_CLOSE);
        conn->close_notify = 0;
        return SW_OK;
    }
    else if (serv->disable_notify && conn->close_force)
    {
        return swReactorThread_close(reactor, fd);
    }

    _pop_chunk: while (!swBuffer_empty(conn->out_buffer))
    {
        chunk = swBuffer_get_chunk(conn->out_buffer);
        if (chunk->type == SW_CHUNK_CLOSE)
        {
            close_fd: reactor->close(reactor, fd);
            return SW_OK;
        }
        ...
    }
    ...
}

swReactorThread_close 函數(shù)會(huì)刪除 swConnectionserver 中的所有痕跡,包括 reactor 中的監(jiān)控,serv->stats 的成員變量,port->connection_num 遞減,從時(shí)間輪中刪除、sessionfd 置空等等工作。而且,還要清空套接字緩存中的所有數(shù)據(jù),直接向客戶端發(fā)送關(guān)閉請求。swReactor_close 函數(shù)釋放內(nèi)存,關(guān)閉套接字文件描述符。

int swReactorThread_close(swReactor *reactor, int fd)
{
    swServer *serv = SwooleG.serv;

    if (conn->removed == 0 && reactor->del(reactor, fd) < 0)
    {
        return SW_ERR;
    }

    sw_atomic_fetch_add(&serv->stats->close_count, 1);
    sw_atomic_fetch_sub(&serv->stats->connection_num, 1);

    swTrace("Close Event.fd=%d|from=%d", fd, reactor->id);

    //free the receive memory buffer
    swServer_free_buffer(serv, fd);

    swListenPort *port = swServer_get_port(serv, fd);
    sw_atomic_fetch_sub(&port->connection_num, 1);

#ifdef SW_USE_SOCKET_LINGER
    if (conn->close_force)
    {
        struct linger linger;
        linger.l_onoff = 1;
        linger.l_linger = 0;
        if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &linger, sizeof(struct linger)) == -1)
        {
            swWarn("setsockopt(SO_LINGER) failed. Error: %s[%d]", strerror(errno), errno);
        }
    }
#endif

#ifdef SW_REACTOR_USE_SESSION
    swSession *session = swServer_get_session(serv, conn->session_id);
    session->fd = 0;
#endif

#ifdef SW_USE_TIMEWHEEL
    if (reactor->timewheel)
    {
        swTimeWheel_remove(reactor->timewheel, conn);
    }
#endif

    return swReactor_close(reactor, fd);
}

int swReactor_close(swReactor *reactor, int fd)
{
    swConnection *socket = swReactor_get(reactor, fd);
    if (socket->out_buffer)
    {
        swBuffer_free(socket->out_buffer);
    }
    if (socket->in_buffer)
    {
        swBuffer_free(socket->in_buffer);
    }
    if (socket->websocket_buffer)
    {
        swString_free(socket->websocket_buffer);
    }
    bzero(socket, sizeof(swConnection));
    socket->removed = 1;
    swTraceLog(SW_TRACE_CLOSE, "fd=%d.", fd);
    return close(fd);
}

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/29353.html

相關(guān)文章

  • Swoole 源碼分析——Reactor模塊ReactorBase

    前言 作為一個(gè)網(wǎng)絡(luò)框架,最為核心的就是消息的接受與發(fā)送。高效的 reactor 模式一直是眾多網(wǎng)絡(luò)框架的首要選擇,本節(jié)主要講解 swoole 中的 reactor 模塊。 UNP 學(xué)習(xí)筆記——IO 復(fù)用 Reactor 的數(shù)據(jù)結(jié)構(gòu) Reactor 的數(shù)據(jù)結(jié)構(gòu)比較復(fù)雜,首先 object 是具體 Reactor 對象的首地址,ptr 是擁有 Reactor 對象的類的指針, event_nu...

    baukh789 評論0 收藏0
  • Swoole 源碼分析——Server模塊Start

    摘要:是緩存區(qū)高水位線,達(dá)到了說明緩沖區(qū)即將滿了創(chuàng)建線程函數(shù)用于將監(jiān)控的存放于中向中添加監(jiān)聽的文件描述符等待所有的線程開啟事件循環(huán)利用創(chuàng)建線程,線程啟動(dòng)函數(shù)是保存監(jiān)聽本函數(shù)將用于監(jiān)聽的存放到當(dāng)中,并設(shè)置相應(yīng)的屬性 Server 的啟動(dòng) 在 server 啟動(dòng)之前,swoole 首先要調(diào)用 php_swoole_register_callback 將 PHP 的回調(diào)函數(shù)注冊到 server...

    3fuyu 評論0 收藏0
  • Swoole筆記(一)

    摘要:修復(fù)添加超過萬個(gè)以上定時(shí)器時(shí)發(fā)生崩潰的問題增加模塊,下高性能序列化庫修復(fù)監(jiān)聽端口設(shè)置無效的問題等。線程來處理網(wǎng)絡(luò)事件輪詢,讀取數(shù)據(jù)。當(dāng)?shù)娜挝帐殖晒α艘院?,由這個(gè)線程將連接成功的消息告訴進(jìn)程,再由進(jìn)程轉(zhuǎn)交給進(jìn)程。此時(shí)進(jìn)程觸發(fā)事件。 本文示例代碼詳見:https://github.com/52fhy/swoo...。 簡介 Swoole是一個(gè)PHP擴(kuò)展,提供了PHP語言的異步多線程服務(wù)器...

    SHERlocked93 評論0 收藏0
  • Swoole 源碼分析——Client模塊Send

    摘要:當(dāng)此時(shí)的套接字不可寫的時(shí)候,會(huì)自動(dòng)放入緩沖區(qū)中。當(dāng)大于高水線時(shí),會(huì)自動(dòng)調(diào)用回調(diào)函數(shù)。寫就緒狀態(tài)當(dāng)監(jiān)控到套接字進(jìn)入了寫就緒狀態(tài)時(shí),就會(huì)調(diào)用函數(shù)。如果為,說明此時(shí)異步客戶端雖然建立了連接,但是還沒有調(diào)用回調(diào)函數(shù),因此這時(shí)要調(diào)用函數(shù)。 前言 上一章我們說了客戶端的連接 connect,對于同步客戶端來說,連接已經(jīng)建立成功;但是對于異步客戶端來說,此時(shí)可能還在進(jìn)行 DNS 的解析,on...

    caozhijian 評論0 收藏0
  • Swoole 源碼分析——Server模塊Signal信號處理

    摘要:在創(chuàng)建進(jìn)程和線程之間,主線程開始進(jìn)行信號處理函數(shù)的設(shè)置。事件循環(huán)結(jié)束前會(huì)調(diào)用函數(shù),該函數(shù)會(huì)檢查并執(zhí)行相應(yīng)的信號處理函數(shù)。 前言 信號處理是網(wǎng)絡(luò)庫不可或缺的一部分,不論是 ALARM、SIGTERM、SIGUSR1、SIGUSR2、SIGPIPE 等信號對程序的控制,還是 reactor、read、write 等操作被信號中斷的處理,都關(guān)系著整個(gè)框架程序的正常運(yùn)行。 Signal 數(shù)據(jù)...

    Nosee 評論0 收藏0

發(fā)表評論

0條評論

閱讀需要支付1元查看
<