成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

Swoole 源碼分析——Server模塊之Signal信號(hào)處理

Nosee / 768人閱讀

摘要:在創(chuàng)建進(jìn)程和線(xiàn)程之間,主線(xiàn)程開(kāi)始進(jìn)行信號(hào)處理函數(shù)的設(shè)置。事件循環(huán)結(jié)束前會(huì)調(diào)用函數(shù),該函數(shù)會(huì)檢查并執(zhí)行相應(yīng)的信號(hào)處理函數(shù)。

前言

信號(hào)處理是網(wǎng)絡(luò)庫(kù)不可或缺的一部分,不論是 ALARMSIGTERM、SIGUSR1SIGUSR2、SIGPIPE 等信號(hào)對(duì)程序的控制,還是 reactor、readwrite 等操作被信號(hào)中斷的處理,都關(guān)系著整個(gè)框架程序的正常運(yùn)行。

Signal 數(shù)據(jù)結(jié)構(gòu)

Signal 模塊的數(shù)據(jù)結(jié)構(gòu)很簡(jiǎn)單,就是一個(gè) swSignal 類(lèi)型的數(shù)組,數(shù)組大小是 128。swSignal 中存放著信號(hào)的回調(diào)函數(shù) callback,信號(hào) signo,是否啟用 active。

typedef void (*swSignalHander)(int);
#define SW_SIGNO_MAX      128

typedef struct
{
    swSignalHander callback;
    uint16_t signo;
    uint16_t active;
} swSignal;

static swSignal signals[SW_SIGNO_MAX];
Signal 函數(shù) swSignal_none 屏蔽所有信號(hào)

如果當(dāng)前的線(xiàn)程不想要被信號(hào)中斷,那么就可以使用 swSignal_none 函數(shù)屏蔽所有的信號(hào),這樣該進(jìn)程所有的函數(shù)都不會(huì)被信號(hào)中斷,編寫(xiě)函數(shù)的時(shí)候就不用考慮被信號(hào)打斷的情況。

值得注意的是處理的信號(hào) SIGKILLSIGSTOP 無(wú)法被阻塞。

void swSignal_none(void)
{
    sigset_t mask;
    sigfillset(&mask);
    int ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
    if (ret < 0)
    {
        swWarn("pthread_sigmask() failed. Error: %s[%d]", strerror(ret), ret);
    }
}
swSignal_add 添加信號(hào)

添加信號(hào)就是向 signals 數(shù)組添加一個(gè)新的信號(hào)元素,然后調(diào)用 swSignal_set 函數(shù)進(jìn)行信號(hào)處理函數(shù)的注冊(cè)。如果使用的是 signalfd,那么使用的是 swSignalfd_set 函數(shù)。

void swSignal_add(int signo, swSignalHander func)
{
#ifdef HAVE_SIGNALFD
    if (SwooleG.use_signalfd)
    {
        swSignalfd_set(signo, func);
    }
    else
#endif
    {
        {
            signals[signo].callback = func;
            signals[signo].active = 1;
            signals[signo].signo = signo;
            swSignal_set(signo, swSignal_async_handler, 1, 0);
        }
    }
}
swSignal_set 設(shè)置信號(hào)處理函數(shù)

swSignal_set 函數(shù)主要是調(diào)用 sigaction 為整個(gè)進(jìn)程設(shè)置信號(hào)處理函數(shù)。如果設(shè)置 func-1,信號(hào)處理函數(shù)是系統(tǒng)默認(rèn),如果 funcnull,就會(huì)忽略該信號(hào)。如果 mask 為 1,那么在處理該信號(hào)的時(shí)候會(huì)阻塞所有信號(hào),如果 mask 為 0,那么在處理該信號(hào)的時(shí)候就不會(huì)阻塞任何信號(hào)。

swSignalHander swSignal_set(int sig, swSignalHander func, int restart, int mask)
{
    //ignore
    if (func == NULL)
    {
        func = SIG_IGN;
    }
    //clear
    else if ((long) func == -1)
    {
        func = SIG_DFL;
    }

    struct sigaction act, oact;
    act.sa_handler = func;
    if (mask)
    {
        sigfillset(&act.sa_mask);
    }
    else
    {
        sigemptyset(&act.sa_mask);
    }
    act.sa_flags = 0;
    if (sigaction(sig, &act, &oact) < 0)
    {
        return NULL;
    }
    return oact.sa_handler;
}
swSignal_async_handler 信號(hào)處理函數(shù)

Signal 模塊所有的信號(hào)處理函數(shù)都是 swSignal_async_handler,該函數(shù)會(huì)調(diào)用 signals 數(shù)組中信號(hào)元素的回調(diào)函數(shù)。對(duì)于進(jìn)程中存在 reactor(例如主線(xiàn)程或者 worker 進(jìn)程),只需設(shè)置 main_reactor->singal_no,等待 reactor 回調(diào)即可(一般是 swReactor_error 函數(shù)和 swReactor_onFinish 函數(shù))。對(duì)于沒(méi)有 reactor 的進(jìn)程,例如 manager 進(jìn)程,會(huì)直接調(diào)用回調(diào)函數(shù)。

值得注意的是,這種異步信號(hào)處理函數(shù)代碼一定要簡(jiǎn)單,一定要是信號(hào)安全函數(shù),例如本例中只設(shè)置 SwooleG.main_reactor->singal_no,等待著返回主流程后再具體執(zhí)行回調(diào)函數(shù);而沒(méi)有 main_reactor 的進(jìn)程,就要著重注意回調(diào)函數(shù)是否是信號(hào)安全函數(shù)。因此從這方面來(lái)說(shuō),signalfd 有著天然的優(yōu)勢(shì),它是文件描述符,由 epoll 統(tǒng)一管理,回調(diào)函數(shù)并不需要異步信號(hào)安全。

static void swSignal_async_handler(int signo)
{
    if (SwooleG.main_reactor)
    {
        SwooleG.main_reactor->singal_no = signo;
    }
    else
    {
        //discard signal
        if (_lock)
        {
            return;
        }
        _lock = 1;
        swSignal_callback(signo);
        _lock = 0;
    }
}

void swSignal_callback(int signo)
{
    if (signo >= SW_SIGNO_MAX)
    {
        swWarn("signal[%d] numberis invalid.", signo);
        return;
    }
    swSignalHander callback = signals[signo].callback;
    if (!callback)
    {
        swWarn("signal[%d] callback is null.", signo);
        return;
    }
    callback(signo);
}
swSignal_clear 清除所有信號(hào)

清除信號(hào)就是遍歷 signals 數(shù)組,將所有的有效信號(hào)元素的信號(hào)處理函數(shù)設(shè)置為系統(tǒng)默認(rèn)。如果使用的是 signalfd,那么調(diào)用 swSignalfd_clear 函數(shù)。

void swSignal_clear(void)
{
#ifdef HAVE_SIGNALFD
    if (SwooleG.use_signalfd)
    {
        swSignalfd_clear();
    }
    else
#endif
    {
        int i;
        for (i = 0; i < SW_SIGNO_MAX; i++)
        {
            if (signals[i].active)
            {
                {
                    swSignal_set(signals[i].signo, (swSignalHander) -1, 1, 0);
                }
            }
        }
    }
    bzero(&signals, sizeof(signals));
}
swSignalfd_initsignalfd 信號(hào)初始化

使用 signalfd 之前需要將 signalfd_mask、signals 重置。

static sigset_t signalfd_mask;
static int signal_fd = 0;

void swSignalfd_init()
{
    sigemptyset(&signalfd_mask);
    bzero(&signals, sizeof(signals));
}
swSignalfd_setup——signalfd 信號(hào)啟用

signalfd 信號(hào)啟用需要兩個(gè)步驟,調(diào)用 signalfd 函數(shù)創(chuàng)建信號(hào)描述符,reactor->add 添加到 reactor 事件循環(huán)中。

int swSignalfd_setup(swReactor *reactor)
{
    if (signal_fd == 0)
    {
        signal_fd = signalfd(-1, &signalfd_mask, SFD_NONBLOCK | SFD_CLOEXEC);
        if (signal_fd < 0)
        {
            swWarn("signalfd() failed. Error: %s[%d]", strerror(errno), errno);
            return SW_ERR;
        }
        SwooleG.signal_fd = signal_fd;
        if (sigprocmask(SIG_BLOCK, &signalfd_mask, NULL) == -1)
        {
            swWarn("sigprocmask() failed. Error: %s[%d]", strerror(errno), errno);
            return SW_ERR;
        }
        reactor->setHandle(reactor, SW_FD_SIGNAL, swSignalfd_onSignal);
        reactor->add(reactor, signal_fd, SW_FD_SIGNAL);
        return SW_OK;
    }
    else
    {
        swWarn("signalfd has been created");
        return SW_ERR;
    }
}
swSignalfd_set——signalfd 信號(hào)處理函數(shù)的設(shè)置

使用 signalfd 函數(shù)對(duì) signal_fd 設(shè)置信號(hào)處理函數(shù)的時(shí)候,要先將對(duì)應(yīng)的信號(hào)進(jìn)行屏蔽 sigprocmask,否則很可能會(huì)額外執(zhí)行系統(tǒng)的默認(rèn)信號(hào)處理函數(shù)。

static void swSignalfd_set(int signo, swSignalHander callback)
{
    if (callback == NULL && signals[signo].active)
    {
        sigdelset(&signalfd_mask, signo);
        bzero(&signals[signo], sizeof(swSignal));
    }
    else
    {
        sigaddset(&signalfd_mask, signo);
        signals[signo].callback = callback;
        signals[signo].signo = signo;
        signals[signo].active = 1;
    }
    if (signal_fd > 0)
    {
        sigprocmask(SIG_BLOCK, &signalfd_mask, NULL);
        signalfd(signal_fd, &signalfd_mask, SFD_NONBLOCK | SFD_CLOEXEC);
    }
}
swSignalfd_onSignal——signalfd 信號(hào)處理函數(shù)

swSignalfd_onSignal 函數(shù)由 reactor 事件循環(huán)直接調(diào)用。

static int swSignalfd_onSignal(swReactor *reactor, swEvent *event)
{
    int n;
    struct signalfd_siginfo siginfo;
    n = read(event->fd, &siginfo, sizeof(siginfo));
    if (n < 0)
    {
        swWarn("read from signalfd failed. Error: %s[%d]", strerror(errno), errno);
        return SW_OK;
    }
    if (siginfo.ssi_signo >=  SW_SIGNO_MAX)
    {
        swWarn("unknown signal[%d].", siginfo.ssi_signo);
        return SW_OK;
    }
    if (signals[siginfo.ssi_signo].active)
    {
        if (signals[siginfo.ssi_signo].callback)
        {
            signals[siginfo.ssi_signo].callback(siginfo.ssi_signo);
        }
        else
        {
            swWarn("signal[%d] callback is null.", siginfo.ssi_signo);
        }
    }

    return SW_OK;
}
swSignalfd_clear——signalfd 信號(hào)處理函數(shù)的清除
static void swSignalfd_clear()
{
    if (signal_fd)
    {
        if (sigprocmask(SIG_UNBLOCK, &signalfd_mask, NULL) < 0)
        {
            swSysError("sigprocmask(SIG_UNBLOCK) failed.");
        }
        close(signal_fd);
        bzero(&signalfd_mask, sizeof(signalfd_mask));
    }
    signal_fd = 0;
}
Signal 信號(hào)的應(yīng)用 master 線(xiàn)程信號(hào)

在調(diào)用 swoole_server->start 函數(shù)之后,master 主進(jìn)程開(kāi)始創(chuàng)建 manager 進(jìn)程與 reactor 線(xiàn)程。在創(chuàng)建 manager 進(jìn)程和 reactor 線(xiàn)程之間,master 主線(xiàn)程開(kāi)始進(jìn)行信號(hào)處理函數(shù)的設(shè)置。

可以看到,master 進(jìn)程的信號(hào)處理函數(shù)是 swServer_signal_hanlder,并設(shè)置忽略了 SIGPIPE、SIGHUP 函數(shù)。

SIGPIPE 一般發(fā)生于對(duì)端連接已關(guān)閉,服務(wù)端仍然在發(fā)送數(shù)據(jù)的情況,如果沒(méi)有忽略該信號(hào),很可能主進(jìn)程會(huì)異常終止。

SIGHUP 信號(hào)一般發(fā)生于終端關(guān)閉時(shí),該信號(hào)被發(fā)送到 session 首進(jìn)程,也就是 master 主進(jìn)程。如果不忽略該信號(hào),關(guān)閉終端的時(shí)候,主進(jìn)程會(huì)默認(rèn)異常退出。

SIGHUP會(huì)在以下3種情況下被發(fā)送給相應(yīng)的進(jìn)程:

1、終端關(guān)閉時(shí),該信號(hào)被發(fā)送到session首進(jìn)程以及作為job提交的進(jìn)程(即用 & 符號(hào)提交的進(jìn)程)

2、session首進(jìn)程退出時(shí),該信號(hào)被發(fā)送到該session中的前臺(tái)進(jìn)程組中的每一個(gè)進(jìn)程

3、若父進(jìn)程退出導(dǎo)致進(jìn)程組成為孤兒進(jìn)程組,且該進(jìn)程組中有進(jìn)程處于停止?fàn)顟B(tài)(收到SIGSTOP或SIGTSTP信號(hào)),該信號(hào)會(huì)被發(fā)送到該進(jìn)程組中的每一個(gè)進(jìn)程。

int swServer_start(swServer *serv)
{
    ...
    if (factory->start(factory) < 0)//創(chuàng)建 manager 進(jìn)程
    {
        return SW_ERR;
    }
    //signal Init
    swServer_signal_init(serv);
    
    ret = swServer_start_proxy(serv);
    ...
}

void swServer_signal_init(swServer *serv)
{
    swSignal_add(SIGPIPE, NULL);
    swSignal_add(SIGHUP, NULL);
    if (serv->factory_mode != SW_MODE_BASE)
    {
        swSignal_add(SIGCHLD, swServer_signal_hanlder);
    }
    swSignal_add(SIGUSR1, swServer_signal_hanlder);
    swSignal_add(SIGUSR2, swServer_signal_hanlder);
    swSignal_add(SIGTERM, swServer_signal_hanlder);
#ifdef SIGRTMIN
    swSignal_add(SIGRTMIN, swServer_signal_hanlder);
#endif
    swSignal_add(SIGALRM, swSystemTimer_signal_handler);
    //for test
    swSignal_add(SIGVTALRM, swServer_signal_hanlder);
    swServer_set_minfd(SwooleG.serv, SwooleG.signal_fd);
}

swServer_signal_hanlder 函數(shù)中是對(duì)其他信號(hào)函數(shù)的處理,

SIGTERM 是終止信號(hào),用于終止 master 線(xiàn)程的 reactor 線(xiàn)程。

SIGALRM 是鬧鐘信號(hào),我們?cè)谏弦黄幸呀?jīng)詳細(xì)介紹過(guò)。

SIGCHLD 是子進(jìn)程退出信號(hào),如果調(diào)用 waitpid 之后,得到的是 manager 進(jìn)程的進(jìn)程 id,說(shuō)明 manager 進(jìn)程無(wú)故退出。

SIGVTALRM 信號(hào)也是鬧鐘信號(hào),是 setitimer 函數(shù)以 ITIMER_VIRTUAL 進(jìn)程在用戶(hù)態(tài)下花費(fèi)的時(shí)間進(jìn)行鬧鐘設(shè)置的時(shí)候觸發(fā),swoole 里均以 ITIMER_REAL 系統(tǒng)真實(shí)的時(shí)間來(lái)計(jì)算,因此理論上并不會(huì)有此信號(hào),

SIGUSR1、SIGUSR2manager 進(jìn)程默認(rèn)重啟 worker 進(jìn)程的信號(hào),只重啟 task 進(jìn)程使用 SIGUSR2 信號(hào),重啟所有 worker 進(jìn)程使用 SIGUSR1,該信號(hào)也是 swoole_server->reload 函數(shù)發(fā)送給 manager 的信號(hào)。

SIGRTMIN 信號(hào)被用于實(shí)現(xiàn)重新打開(kāi)日志文件。在服務(wù)器程序運(yùn)行期間日志文件被 mv 移動(dòng)或 unlink 刪除后,日志信息將無(wú)法正常寫(xiě)入,這時(shí)可以向 Server 發(fā)送 SIGRTMIN 信號(hào)

static void swServer_signal_hanlder(int sig)
{
    swTraceLog(SW_TRACE_SERVER, "signal[%d] triggered.", sig);

    swServer *serv = SwooleG.serv;
    int status;
    pid_t pid;
    switch (sig)
    {
    case SIGTERM:
        if (SwooleG.main_reactor)
        {
            SwooleG.main_reactor->running = 0;
        }
        else
        {
            SwooleG.running = 0;
        }
        swNotice("Server is shutdown now.");
        break;
    case SIGALRM:
        swSystemTimer_signal_handler(SIGALRM);
        break;
    case SIGCHLD:
        if (!SwooleG.running)
        {
            break;
        }
        if (SwooleG.serv->factory_mode == SW_MODE_SINGLE)
        {
            break;
        }
        pid = waitpid(-1, &status, WNOHANG);
        if (pid > 0 && pid == serv->gs->manager_pid)
        {
            swWarn("Fatal Error: manager process exit. status=%d, signal=%d.", WEXITSTATUS(status), WTERMSIG(status));
        }
        break;
        /**
         * for test
         */
    case SIGVTALRM:
        swWarn("SIGVTALRM coming");
        break;
        /**
         * proxy the restart signal
         */
    case SIGUSR1:
    case SIGUSR2:
        if (SwooleG.serv->factory_mode == SW_MODE_SINGLE)
        {
            if (serv->gs->event_workers.reloading)
            {
                break;
            }
            serv->gs->event_workers.reloading = 1;
            serv->gs->event_workers.reload_init = 0;
        }
        else
        {
            kill(serv->gs->manager_pid, sig);
        }
        break;
    default:
#ifdef SIGRTMIN
        if (sig == SIGRTMIN)
        {
            int i;
            swWorker *worker;
            for (i = 0; i < SwooleG.serv->worker_num + serv->task_worker_num + SwooleG.serv->user_worker_num; i++)
            {
                worker = swServer_get_worker(SwooleG.serv, i);
                kill(worker->pid, SIGRTMIN);
            }
            if (SwooleG.serv->factory_mode == SW_MODE_PROCESS)
            {
                kill(serv->gs->manager_pid, SIGRTMIN);
            }
            swServer_reopen_log_file(SwooleG.serv);
        }
#endif
        break;
    }
}

master 線(xiàn)程在 reactor 事件循環(huán)中負(fù)責(zé)接受客戶(hù)端的請(qǐng)求,在 reactor 事件循環(huán)中 epoll_wait 函數(shù)可能會(huì)被信號(hào)中斷,這時(shí)程序會(huì)首先調(diào)用 swSignal_async_handler 設(shè)置 reactor->singal_no,然后返回 n < 0,執(zhí)行 swSignal_callback 對(duì)應(yīng)的信號(hào)處理函數(shù)。

static int swServer_start_proxy(swServer *serv)
{

    main_reactor->setHandle(main_reactor, SW_FD_LISTEN, swServer_master_onAccept);
    ...
    
    return main_reactor->wait(main_reactor, NULL);
}

static int swReactorEpoll_wait(swReactor *reactor, struct timeval *timeo)
{
    ...
    while (reactor->running > 0)
    {
        n = epoll_wait(epoll_fd, events, max_event_num, msec);
        if (n < 0)
        {
            if (swReactor_error(reactor) < 0)
            {
                swWarn("[Reactor#%d] epoll_wait failed. Error: %s[%d]", reactor_id, strerror(errno), errno);
                return SW_ERR;
            }
            else
            {
                continue;
            }
        }
        
        ...
        
        handle = swReactor_getHandle(reactor, SW_EVENT_READ, event.type);
        ret = handle(reactor, &event);
                
        ...
        
        
        if (reactor->onFinish != NULL)
        {
            reactor->onFinish(reactor);
        }
    }
    ...
}

static sw_inline int swReactor_error(swReactor *reactor)
{
    switch (errno)
    {
    case EINTR:
        if (reactor->singal_no)
        {
            swSignal_callback(reactor->singal_no);
            reactor->singal_no = 0;
        }
        return SW_OK;
    }
    return SW_ERR;
}

而在 reactor 事件循環(huán)中,reactor 中讀寫(xiě)就緒的回調(diào)函數(shù)中仍然可能被信號(hào)中斷,例如 accept 函數(shù),即使 采用非阻塞也有可能被信號(hào)中斷,這個(gè)時(shí)候需要忽略這種錯(cuò)誤,繼續(xù)進(jìn)行 accept 直到 EAGAIN 錯(cuò)誤。事件循環(huán)結(jié)束前會(huì)調(diào)用 onFinish 函數(shù),該函數(shù)會(huì)檢查 reactor->singal_no 并執(zhí)行相應(yīng)的信號(hào)處理函數(shù)。

int swServer_master_onAccept(swReactor *reactor, swEvent *event)
{
    ...
    for (i = 0; i < SW_ACCEPT_MAX_COUNT; i++) 
    {
        new_fd = accept(event->fd, (struct sockaddr *) &client_addr, &client_addrlen);
    
        if (new_fd < 0)
        {
            switch (errno)
            {
            case EAGAIN:
                return SW_OK;
            case EINTR:
                continue;
            ...
        }
    }
    ...
}

static void swReactor_onFinish(swReactor *reactor)
{
    //check signal
    if (reactor-singal_no)
    {
        swSignal_callback(reactor->singal_no);
        reactor->singal_no = 0;
    }
    swReactor_onTimeout_and_Finish(reactor);
}
reactor 線(xiàn)程信號(hào)

對(duì)于 reactor 線(xiàn)程來(lái)說(shuō),承擔(dān)了大量 socket 流量消息的收發(fā),因此 reactor 不應(yīng)該頻繁的被信號(hào)中斷影響 reactor 事件循環(huán)的效率。因此,在初始化截?cái)啵绦蚓驼{(diào)用 swSignal_none 阻塞了所有的信號(hào),所有的信號(hào)處理都由 master 主線(xiàn)程來(lái)處理。當(dāng)然 SIGTERM、SIGSTOP 等信號(hào)無(wú)法屏蔽。

static int swReactorThread_loop(swThreadParam *param)
{
   ...
   
   swSignal_none();
   
   ...

}
manager 進(jìn)程中信號(hào)的應(yīng)用

manager 進(jìn)程大部分信號(hào)的處理與 master 線(xiàn)程類(lèi)似,唯一不同的是多了 SIGID 信號(hào)的處理,該信號(hào)是由 worker 進(jìn)程發(fā)送給 manager 進(jìn)程通知重啟服務(wù)時(shí)使用的。

當(dāng)發(fā)生信號(hào)時(shí),wait 函數(shù)將會(huì)被中斷,返回的 pid 小于 0,此時(shí)檢查被中斷的信號(hào)并相應(yīng)進(jìn)行操作,

static int swManager_loop(swFactory *factory)
{
   ...
   
    swSignal_add(SIGHUP, NULL);
    swSignal_add(SIGTERM, swManager_signal_handle);
    swSignal_add(SIGUSR1, swManager_signal_handle);
    swSignal_add(SIGUSR2, swManager_signal_handle);
    swSignal_add(SIGIO, swManager_signal_handle);
#ifdef SIGRTMIN
    swSignal_add(SIGRTMIN, swManager_signal_handle);
    
    if (serv->manager_alarm > 0)
    {
        alarm(serv->manager_alarm);
        swSignal_add(SIGALRM, swManager_signal_handle);
    }
    
    SwooleG.main_reactor = NULL;
    
    ...

    while (SwooleG.running > 0)
    {
        _wait: pid = wait(&status);
        
        if (ManagerProcess.read_message) {...}
     
        if (pid < 0) 
        {
            if (ManagerProcess.alarm == 1) {}
        
            if (ManagerProcess.reloading == 0) 
            {
                error: if (errno != EINTR)
                {
                    swSysError("wait() failed.");
                }
                continue;
            }
            else if (ManagerProcess.reload_all_worker == 1) {...}
            else if (ManagerProcess.reload_task_worker == 1) {...}
            else
            {
                goto error;
            }
        }
    }
    
    swSignal_none();
}

static void swManager_signal_handle(int sig)
{
    switch (sig)
    {
    case SIGTERM:
        SwooleG.running = 0;
        break;
        /**
         * reload all workers
         */
    case SIGUSR1:
        if (ManagerProcess.reloading == 0)
        {
            ManagerProcess.reloading = 1;
            ManagerProcess.reload_all_worker = 1;
        }
        break;
        /**
         * only reload task workers
         */
    case SIGUSR2:
        if (ManagerProcess.reloading == 0)
        {
            ManagerProcess.reloading = 1;
            ManagerProcess.reload_task_worker = 1;
        }
        break;
    case SIGIO:
        ManagerProcess.read_message = 1;
        break;
    case SIGALRM:
        ManagerProcess.alarm = 1;
        break;
    default:
#ifdef SIGRTMIN
        if (sig == SIGRTMIN)
        {
            swServer_reopen_log_file(SwooleG.serv);
        }
#endif
        break;
    }
}
worker 進(jìn)程信號(hào)

master 進(jìn)程類(lèi)似,也要忽略 SIGHUP、SIGPIPE 信號(hào),不同的是忽略了 SIGUSR1、SIGUSR2 信號(hào)。對(duì)于 SIGTERM 信號(hào),worker 進(jìn)程采取了異步關(guān)閉的措施,并不會(huì)強(qiáng)硬終止進(jìn)程,而是要等到 reactor 事件循環(huán)完畢。

void swWorker_signal_init(void)
{
    swSignal_clear();
    /**
     * use user settings
     */
    SwooleG.use_signalfd = SwooleG.enable_signalfd;

    swSignal_add(SIGHUP, NULL);
    swSignal_add(SIGPIPE, NULL);
    swSignal_add(SIGUSR1, NULL);
    swSignal_add(SIGUSR2, NULL);
    //swSignal_add(SIGINT, swWorker_signal_handler);
    swSignal_add(SIGTERM, swWorker_signal_handler);
    swSignal_add(SIGALRM, swSystemTimer_signal_handler);
    //for test
    swSignal_add(SIGVTALRM, swWorker_signal_handler);
#ifdef SIGRTMIN
    swSignal_add(SIGRTMIN, swWorker_signal_handler);
#endif
}

void swWorker_signal_handler(int signo)
{
    switch (signo)
    {
    case SIGTERM:
        /**
         * Event worker
         */
        if (SwooleG.main_reactor)
        {
            swWorker_stop();
        }
        /**
         * Task worker
         */
        else
        {
            SwooleG.running = 0;
        }
        break;
    case SIGALRM:
        swSystemTimer_signal_handler(SIGALRM);
        break;
    /**
     * for test
     */
    case SIGVTALRM:
        swWarn("SIGVTALRM coming");
        break;
    case SIGUSR1:
        break;
    case SIGUSR2:
        break;
    default:
#ifdef SIGRTMIN
        if (signo == SIGRTMIN)
        {
            swServer_reopen_log_file(SwooleG.serv);
        }
#endif
        break;
    }
}
task 進(jìn)程信號(hào)

task 進(jìn)程不同于 worker 進(jìn)程,使用的并不是 reactor + 非阻塞文件描述符,而是阻塞式描述符,并沒(méi)有 reactor 來(lái)告知消息的到來(lái)。因此 task 進(jìn)程的循環(huán)是阻塞在各個(gè)函數(shù)當(dāng)中的。只有文件描述符可讀,或者信號(hào)到來(lái),才會(huì)從阻塞中返回。

當(dāng) swMsgQueue_pop、accept、read 等函數(shù)被信號(hào)中斷后,信號(hào)處理函數(shù)會(huì)被執(zhí)行,之后會(huì)返回 n < 0,這個(gè)時(shí)候由于信號(hào)處理函數(shù)已經(jīng)被執(zhí)行,因此只需要 continue 即可。對(duì)于鬧鐘信號(hào),信號(hào)到來(lái),還需要調(diào)用 swTimer_select 來(lái)篩選已經(jīng)到時(shí)間的任務(wù)。

static void swTaskWorker_signal_init(void)
{
    swSignal_set(SIGHUP, NULL, 1, 0);
    swSignal_set(SIGPIPE, NULL, 1, 0);
    swSignal_set(SIGUSR1, swWorker_signal_handler, 1, 0);
    swSignal_set(SIGUSR2, NULL, 1, 0);
    swSignal_set(SIGTERM, swWorker_signal_handler, 1, 0);
    swSignal_set(SIGALRM, swSystemTimer_signal_handler, 1, 0);
#ifdef SIGRTMIN
    swSignal_set(SIGRTMIN, swWorker_signal_handler, 1, 0);
#endif
}

static int swProcessPool_worker_loop(swProcessPool *pool, swWorker *worker)
{
   ...
   
   while (SwooleG.running > 0 && task_n > 0)
   {
       if (pool->use_msgqueue)
        {
            n = swMsgQueue_pop(pool->queue, (swQueue_data *) &out, sizeof(out.buf));
            if (n < 0 && errno != EINTR)
            {
                swSysError("[Worker#%d] msgrcv() failed.", worker->id);
                break;
            }
        }
        else if (pool->use_socket)
        {
            int fd = accept(pool->stream->socket, NULL, NULL);
            if (fd < 0)
            {
                if (errno == EAGAIN || errno == EINTR)
                {
                    continue;
                }
                else
                {
                    swSysError("accept(%d) failed.", pool->stream->socket);
                    break;
                }
            }

            n = swStream_recv_blocking(fd, (void*) &out.buf, sizeof(out.buf));
            if (n == SW_CLOSE)
            {
                close(fd);
                continue;
            }
            pool->stream->last_connection = fd;
        }
        else
        {
            n = read(worker->pipe_worker, &out.buf, sizeof(out.buf));
            if (n < 0 && errno != EINTR)
            {
                swSysError("[Worker#%d] read(%d) failed.", worker->id, worker->pipe_worker);
            }
        }

        /**
         * timer
         */
        if (n < 0)
        {
            if (errno == EINTR && SwooleG.signal_alarm)
            {
                alarm_handler: SwooleG.signal_alarm = 0;
                swTimer_select(&SwooleG.timer);
            }
            continue;
        }

        /**
         * do task
         */
        worker->status = SW_WORKER_BUSY;
        worker->request_time = time(NULL);
        ret = pool->onTask(pool, &out.buf);
        worker->status = SW_WORKER_IDLE;
        worker->request_time = 0;
        worker->traced = 0;

        if (pool->use_socket && pool->stream->last_connection > 0)
        {
            int _end = 0;
            swSocket_write_blocking(pool->stream->last_connection, (void *) &_end, sizeof(_end));
            close(pool->stream->last_connection);
            pool->stream->last_connection = 0;
        }

        /**
         * timer
         */
        if (SwooleG.signal_alarm)
        {
            goto alarm_handler;
        }

        if (ret >= 0 && !worker_task_always)
        {
            task_n--;
        }
   
   }

}

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/29380.html

相關(guān)文章

  • Swoole 源碼分析——Server模塊Timer模塊與時(shí)間輪算法

    摘要:當(dāng)其就緒時(shí),會(huì)調(diào)用執(zhí)行定時(shí)函數(shù)。進(jìn)程超時(shí)停止進(jìn)程將要停止時(shí),并不會(huì)立刻停止,而是會(huì)等待事件循環(huán)結(jié)束后停止,這時(shí)為了防止進(jìn)程不退出,還設(shè)置了的延遲,超過(guò)就會(huì)停止該進(jìn)程。當(dāng)允許空閑時(shí)間小于時(shí),統(tǒng)一每隔檢測(cè)空閑連接。 前言 swoole 的 timer 模塊功能有三個(gè):用戶(hù)定時(shí)任務(wù)、剔除空閑連接、更新 server 時(shí)間。timer 模塊的底層有兩種,一種是基于 alarm 信號(hào),一種是基于...

    qieangel2013 評(píng)論0 收藏0
  • Swoole 源碼分析——Server模塊Worker事件循環(huán)

    摘要:如果為,就不斷循環(huán),殺死或者啟動(dòng)相應(yīng)的進(jìn)程,如果為,那么就關(guān)閉所有的進(jìn)程,調(diào)用函數(shù)退出程序。調(diào)用函數(shù),監(jiān)控已結(jié)束的進(jìn)程如果函數(shù)返回異常,很有可能是被信號(hào)打斷。函數(shù)主要用于調(diào)用函數(shù),進(jìn)而調(diào)用函數(shù) swManager_loop 函數(shù) manager 進(jìn)程管理 manager 進(jìn)程開(kāi)啟的時(shí)候,首先要調(diào)用 onManagerStart 回調(diào) 添加信號(hào)處理函數(shù) swSignal_add,S...

    BDEEFE 評(píng)論0 收藏0
  • Swoole 源碼分析——進(jìn)程管理 Swoole_Process

    摘要:清空主進(jìn)程殘留的定時(shí)器與信號(hào)。設(shè)定為執(zhí)行回調(diào)函數(shù)如果在回調(diào)函數(shù)中調(diào)用了異步系統(tǒng),啟動(dòng)函數(shù)進(jìn)行事件循環(huán)。因此為了區(qū)分兩者,規(guī)定并不允許兩者同時(shí)存在。 前言 swoole-1.7.2 增加了一個(gè)進(jìn)程管理模塊,用來(lái)替代 PHP 的 pcntl 擴(kuò)展。 PHP自帶的pcntl,存在很多不足,如 pcntl 沒(méi)有提供進(jìn)程間通信的功能 pcntl 不支持重定向標(biāo)準(zhǔn)輸入和輸出 pcntl 只...

    pepperwang 評(píng)論0 收藏0
  • Swoole 源碼分析——Reactor模塊ReactorBase

    前言 作為一個(gè)網(wǎng)絡(luò)框架,最為核心的就是消息的接受與發(fā)送。高效的 reactor 模式一直是眾多網(wǎng)絡(luò)框架的首要選擇,本節(jié)主要講解 swoole 中的 reactor 模塊。 UNP 學(xué)習(xí)筆記——IO 復(fù)用 Reactor 的數(shù)據(jù)結(jié)構(gòu) Reactor 的數(shù)據(jù)結(jié)構(gòu)比較復(fù)雜,首先 object 是具體 Reactor 對(duì)象的首地址,ptr 是擁有 Reactor 對(duì)象的類(lèi)的指針, event_nu...

    baukh789 評(píng)論0 收藏0
  • Swoole 源碼分析——Server模塊TaskWorker事件循環(huán)

    摘要:函數(shù)事件循環(huán)在事件循環(huán)時(shí),如果使用的是消息隊(duì)列,那么就不斷的調(diào)用從消息隊(duì)列中取出數(shù)據(jù)。獲取后的數(shù)據(jù)調(diào)用回調(diào)函數(shù)消費(fèi)消息之后,向中發(fā)送空數(shù)據(jù),告知進(jìn)程已消費(fèi),并且關(guān)閉新連接。 swManager_start 創(chuàng)建進(jìn)程流程 task_worker 進(jìn)程的創(chuàng)建可以分為三個(gè)步驟:swServer_create_task_worker 申請(qǐng)所需的內(nèi)存、swTaskWorker_init 初始化...

    用戶(hù)83 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<