成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Swoole 源碼分析——Server模塊之TaskWorker事件循環(huán)

用戶83 / 4041人閱讀

摘要:函數(shù)事件循環(huán)在事件循環(huán)時,如果使用的是消息隊列,那么就不斷的調(diào)用從消息隊列中取出數(shù)據(jù)。獲取后的數(shù)據(jù)調(diào)用回調(diào)函數(shù)消費消息之后,向中發(fā)送空數(shù)據(jù),告知進程已消費,并且關(guān)閉新連接。

swManager_start 創(chuàng)建進程流程

task_worker 進程的創(chuàng)建可以分為三個步驟:swServer_create_task_worker 申請所需的內(nèi)存、swTaskWorker_init 初始化各個屬性、swProcessPool_start 創(chuàng)建進程

int swManager_start(swFactory *factory)
{
    swFactoryProcess *object = factory->object;
    int i;
    pid_t pid;
    swServer *serv = factory->ptr;

    if (serv->task_worker_num > 0)
    {
        if (swServer_create_task_worker(serv) < 0)
        {
            return SW_ERR;
        }

        swProcessPool *pool = &serv->gs->task_workers;
        swTaskWorker_init(pool);

        swWorker *worker;
        for (i = 0; i < serv->task_worker_num; i++)
        {
            worker = &pool->workers[i];
            if (swWorker_create(worker) < 0)
            {
                return SW_ERR;
            }
            if (serv->task_ipc_mode == SW_TASK_IPC_UNIXSOCK)
            {
                swServer_store_pipe_fd(SwooleG.serv, worker->pipe_object);
            }
        }
    }

    pid = fork();
    switch (pid)
    {
    //fork manager process
    case 0:
        if (serv->task_worker_num > 0)
        {
            swProcessPool_start(&serv->gs->task_workers);
        }
        break;

        //master process
    default:
        serv->gs->manager_pid = pid;
        break;
    case -1:
        swError("fork() failed.");
        return SW_ERR;
    }
    return SW_OK;
}

swServer_create_task_worker 創(chuàng)建 task 進程

task 進程的調(diào)度有四種: 使用unix socket通信,默認模式;使用消息隊列通信; 使用消息隊列通信,并設置為爭搶模式;stream 模式

不同于 worker 進程,tasker 進程由 swProcessPool_create 創(chuàng)建

如果是 stream 模式,程序還要調(diào)用 swProcessPool_create_unix_socket 創(chuàng)建一個監(jiān)聽的 socket

int swServer_create_task_worker(swServer *serv)
{
    key_t key = 0;
    int ipc_mode;

    if (serv->task_ipc_mode == SW_TASK_IPC_MSGQUEUE || serv->task_ipc_mode == SW_TASK_IPC_PREEMPTIVE)
    {
        key = serv->message_queue_key;
        ipc_mode = SW_IPC_MSGQUEUE;
    }
    else if (serv->task_ipc_mode == SW_TASK_IPC_STREAM)
    {
        ipc_mode = SW_IPC_SOCKET;
    }
    else
    {
        ipc_mode = SW_IPC_UNIXSOCK;
    }

    if (swProcessPool_create(&serv->gs->task_workers, serv->task_worker_num, serv->task_max_request, key, ipc_mode) < 0)
    {
        swWarn("[Master] create task_workers failed.");
        return SW_ERR;
    }
    if (ipc_mode == SW_IPC_SOCKET)
    {
        char sockfile[sizeof(struct sockaddr_un)];
        snprintf(sockfile, sizeof(sockfile), "/tmp/swoole.task.%d.sock", serv->gs->master_pid);
        if (swProcessPool_create_unix_socket(&serv->gs->task_workers, sockfile, 2048) < 0)
        {
            return SW_ERR;
        }
    }
    return SW_OK;
}
swProcessPool_create 函數(shù)

swProcessPool_create 函數(shù)主要為 task 進程申請內(nèi)存初始化變量。首先要申請 worker_numworker 的內(nèi)存。

如果調(diào)度采用的是消息隊列通信,那么首先就要創(chuàng)建消息隊列,初始化 pool->queue,相關(guān)函數(shù)是 swMsgQueue_create

如果調(diào)度采用 stream 模式,那么就要初始化 pool->stream

如果調(diào)度采用模式的 unixsock,那么就要創(chuàng)建各個 workerpipe

創(chuàng)建 pool->mapmain_loop

int swProcessPool_create(swProcessPool *pool, int worker_num, int max_request, key_t msgqueue_key, int ipc_mode)
{
    bzero(pool, sizeof(swProcessPool));

    pool->worker_num = worker_num;
    pool->max_request = max_request;

    pool->workers = SwooleG.memory_pool->alloc(SwooleG.memory_pool, worker_num * sizeof(swWorker));
    if (pool->workers == NULL)
    {
        swSysError("malloc[1] failed.");
        return SW_ERR;
    }

    if (ipc_mode == SW_IPC_MSGQUEUE)
    {
        pool->use_msgqueue = 1;
        pool->msgqueue_key = msgqueue_key;

        pool->queue = sw_malloc(sizeof(swMsgQueue));
        if (pool->queue == NULL)
        {
            swSysError("malloc[2] failed.");
            return SW_ERR;
        }

        if (swMsgQueue_create(pool->queue, 1, pool->msgqueue_key, 0) < 0)
        {
            return SW_ERR;
        }
    }
    else if (ipc_mode == SW_IPC_SOCKET)
    {
        pool->use_socket = 1;
        pool->stream = sw_malloc(sizeof(swStreamInfo));
        if (pool->stream == NULL)
        {
            swWarn("malloc[2] failed.");
            return SW_ERR;
        }
        bzero(pool->stream, sizeof(swStreamInfo));
    }
    else if (ipc_mode == SW_IPC_UNIXSOCK)
    {
        pool->pipes = sw_calloc(worker_num, sizeof(swPipe));
        if (pool->pipes == NULL)
        {
            swWarn("malloc[2] failed.");
            return SW_ERR;
        }

        swPipe *pipe;
        int i;
        for (i = 0; i < worker_num; i++)
        {
            pipe = &pool->pipes[i];
            if (swPipeUnsock_create(pipe, 1, SOCK_DGRAM) < 0)
            {
                return SW_ERR;
            }
            pool->workers[i].pipe_master = pipe->getFd(pipe, SW_PIPE_MASTER);
            pool->workers[i].pipe_worker = pipe->getFd(pipe, SW_PIPE_WORKER);
            pool->workers[i].pipe_object = pipe;
        }
    }
    else
    {
        ipc_mode = SW_IPC_NONE;
    }

    pool->map = swHashMap_new(SW_HASHMAP_INIT_BUCKET_N, NULL);
    if (pool->map == NULL)
    {
        swProcessPool_free(pool);
        return SW_ERR;
    }

    pool->ipc_mode = ipc_mode;
    if (ipc_mode > SW_IPC_NONE)
    {
        pool->main_loop = swProcessPool_worker_loop;
    }

    return SW_OK;
}
swProcessPool_create_unix_socket 函數(shù)

當調(diào)度模式是 stream 的時候,還有創(chuàng)建相應的本地 UNIX 域套接字 socket,綁定到 /tmp/swoole.task.%d.sock 本地 sock 文件上。

int swProcessPool_create_unix_socket(swProcessPool *pool, char *socket_file, int blacklog)
{
    if (pool->ipc_mode != SW_IPC_SOCKET)
    {
        swWarn("ipc_mode is not SW_IPC_SOCKET.");
        return SW_ERR;
    }
    pool->stream->socket_file = sw_strdup(socket_file);
    if (pool->stream->socket_file == NULL)
    {
        return SW_ERR;
    }
    pool->stream->socket = swSocket_create_server(SW_SOCK_UNIX_STREAM, pool->stream->socket_file, 0, blacklog);
    if (pool->stream->socket < 0)
    {
        return SW_ERR;
    }
    return SW_OK;
}

int swSocket_create_server(int type, char *address, int port, int backlog)
{
    int fd = swSocket_create(type);
    if (fd < 0)
    {
        swoole_error_log(SW_LOG_ERROR, SW_ERROR_SYSTEM_CALL_FAIL, "socket() failed. Error: %s[%d]", strerror(errno), errno);
        return SW_ERR;
    }

    if (swSocket_bind(fd, type, address, &port) < 0)
    {
        return SW_ERR;
    }

    if (listen(fd, backlog) < 0)
    {
        swoole_error_log(SW_LOG_ERROR, SW_ERROR_SYSTEM_CALL_FAIL, "listen(%s:%d, %d) failed. Error: %s[%d]", address, port, backlog, strerror(errno), errno);
        return SW_ERR;
    }

    return fd;
}
swTaskWorker_init 函數(shù)
void swTaskWorker_init(swProcessPool *pool)
{
    swServer *serv = SwooleG.serv;
    pool->ptr = serv;
    pool->onTask = swTaskWorker_onTask;
    pool->onWorkerStart = swTaskWorker_onStart;
    pool->onWorkerStop = swTaskWorker_onStop;
    pool->type = SW_PROCESS_TASKWORKER;
    pool->start_id = serv->worker_num;
    pool->run_worker_num = serv->task_worker_num;

    if (serv->task_ipc_mode == SW_TASK_IPC_PREEMPTIVE)
    {
        pool->dispatch_mode = SW_DISPATCH_QUEUE;
    }
}
swProcessPool_start 進程啟動

本函數(shù)利用 swProcessPool_spawn 啟動所有的 task_worker 進程

fork 子進程后,將 task 進程的進程 id 存放到 pool->map

task 進程中,調(diào)用 onWorkerStart 回調(diào)函數(shù)、onWorkerStop 回調(diào)函數(shù),進行事件循環(huán)

int swProcessPool_start(swProcessPool *pool)
{
    if (pool->ipc_mode == SW_IPC_SOCKET && (pool->stream == NULL || pool->stream->socket == 0))
    {
        swWarn("must first listen to an tcp port.");
        return SW_ERR;
    }

    int i;
    pool->started = 1;
    pool->run_worker_num = pool->worker_num;

    for (i = 0; i < pool->worker_num; i++)
    {
        pool->workers[i].pool = pool;
        pool->workers[i].id = pool->start_id + i;
        pool->workers[i].type = pool->type;

        if (swProcessPool_spawn(pool, &(pool->workers[i])) < 0)
        {
            return SW_ERR;
        }
    }
    return SW_OK;
}

pid_t swProcessPool_spawn(swProcessPool *pool, swWorker *worker)
{
    pid_t pid = fork();
    int ret_code = 0;

    switch (pid)
    {
    //child
    case 0:
        /**
         * Process start
         */
        if (pool->onWorkerStart != NULL)
        {
            pool->onWorkerStart(pool, worker->id);
        }
        /**
         * Process main loop
         */
        if (pool->main_loop)
        {
            ret_code = pool->main_loop(pool, worker);
        }
        /**
         * Process stop
         */
        if (pool->onWorkerStop != NULL)
        {
            pool->onWorkerStop(pool, worker->id);
        }
        exit(ret_code);
        break;
    case -1:
        swWarn("fork() failed. Error: %s [%d]", strerror(errno), errno);
        break;
        //parent
    default:
        //remove old process
        if (worker->pid)
        {
            swHashMap_del_int(pool->map, worker->pid);
        }
        worker->pid = pid;
        //insert new process
        swHashMap_add_int(pool->map, pid, worker);
        break;
    }
    return pid;
}
onWorkerStart 函數(shù)

onWorkerStart 函數(shù)是進程啟動的回調(diào)函數(shù),作用是設置信號處理函數(shù),調(diào)用設置的 serv->onWorkerStart 函數(shù)。

void swTaskWorker_onStart(swProcessPool *pool, int worker_id)
{
    swServer *serv = pool->ptr;
    SwooleWG.id = worker_id;
    SwooleG.pid = getpid();

    SwooleG.use_timer_pipe = 0;
    SwooleG.use_timerfd = 0;

    swServer_close_port(serv, SW_TRUE);

    swTaskWorker_signal_init();
    swWorker_onStart(serv);

    SwooleG.main_reactor = NULL;
    swWorker *worker = swProcessPool_get_worker(pool, worker_id);
    worker->start_time = serv->gs->now;
    worker->request_count = 0;
    worker->traced = 0;
    SwooleWG.worker = worker;
    SwooleWG.worker->status = SW_WORKER_IDLE;
}

static void swTaskWorker_signal_init(void)
{
    swSignal_set(SIGHUP, NULL, 1, 0);
    swSignal_set(SIGPIPE, NULL, 1, 0);
    swSignal_set(SIGUSR1, swWorker_signal_handler, 1, 0);
    swSignal_set(SIGUSR2, NULL, 1, 0);
    swSignal_set(SIGTERM, swWorker_signal_handler, 1, 0);
    swSignal_set(SIGALRM, swSystemTimer_signal_handler, 1, 0);
#ifdef SIGRTMIN
    swSignal_set(SIGRTMIN, swWorker_signal_handler, 1, 0);
#endif
}
onWorkerStop 函數(shù)
void swTaskWorker_onStop(swProcessPool *pool, int worker_id)
{
    swServer *serv = pool->ptr;
    swWorker_onStop(serv);
}
swProcessPool_worker_loop 事件循環(huán)

在事件循環(huán)時,如果使用的是消息隊列,那么就不斷的調(diào)用 swMsgQueue_pop 從消息隊列中取出數(shù)據(jù)。值得注意的是,SW_DISPATCH_QUEUE 代表采用了消息隊列通信,并設置為爭搶模式,因此沒有設置 out.mtype 的具體值。

如果使用的是 UXIX 域套接字,那么就不斷的 accept 接受新連接,并且讀取新連接發(fā)來的數(shù)據(jù)

如果是 pipefd,那么就從管道中讀取新數(shù)據(jù)。

獲取后的數(shù)據(jù)調(diào)用 onTask 回調(diào)函數(shù)

消費消息之后,向 stream 中發(fā)送空數(shù)據(jù),告知 worker 進程已消費,并且關(guān)閉新連接。

static int swProcessPool_worker_loop(swProcessPool *pool, swWorker *worker)
{
    struct
    {
        long mtype;
        swEventData buf;
    } out;

    int n = 0, ret;
    int task_n, worker_task_always = 0;

    if (pool->max_request < 1)
    {
        task_n = 1;
        worker_task_always = 1;
    }
    else
    {
        task_n = pool->max_request;
        if (pool->max_request > 10)
        {
            n = swoole_system_random(1, pool->max_request / 2);
            if (n > 0)
            {
                task_n += n;
            }
        }
    }

    /**
     * Use from_fd save the task_worker->id
     */
    out.buf.info.from_fd = worker->id;

    if (pool->dispatch_mode == SW_DISPATCH_QUEUE)
    {
        out.mtype = 0;
    }
    else
    {
        out.mtype = worker->id + 1;
    }

    while (SwooleG.running > 0 && task_n > 0)
    {
        /**
         * fetch task
         */
        if (pool->use_msgqueue)
        {
            n = swMsgQueue_pop(pool->queue, (swQueue_data *) &out, sizeof(out.buf));
            if (n < 0 && errno != EINTR)
            {
                swSysError("[Worker#%d] msgrcv() failed.", worker->id);
                break;
            }
        }
        else if (pool->use_socket)
        {
            int fd = accept(pool->stream->socket, NULL, NULL);
            if (fd < 0)
            {
                if (errno == EAGAIN || errno == EINTR)
                {
                    continue;
                }
                else
                {
                    swSysError("accept(%d) failed.", pool->stream->socket);
                    break;
                }
            }

            n = swStream_recv_blocking(fd, (void*) &out.buf, sizeof(out.buf));
            if (n == SW_CLOSE)
            {
                close(fd);
                continue;
            }
            pool->stream->last_connection = fd;
        }
        else
        {
            n = read(worker->pipe_worker, &out.buf, sizeof(out.buf));
            if (n < 0 && errno != EINTR)
            {
                swSysError("[Worker#%d] read(%d) failed.", worker->id, worker->pipe_worker);
            }
        }

        /**
         * timer
         */
        if (n < 0)
        {
            if (errno == EINTR && SwooleG.signal_alarm)
            {
                alarm_handler: SwooleG.signal_alarm = 0;
                swTimer_select(&SwooleG.timer);
            }
            continue;
        }

        /**
         * do task
         */
        worker->status = SW_WORKER_BUSY;
        worker->request_time = time(NULL);
        ret = pool->onTask(pool, &out.buf);
        worker->status = SW_WORKER_IDLE;
        worker->request_time = 0;
        worker->traced = 0;

        if (pool->use_socket && pool->stream->last_connection > 0)
        {
            int _end = 0;
            swSocket_write_blocking(pool->stream->last_connection, (void *) &_end, sizeof(_end));
            close(pool->stream->last_connection);
            pool->stream->last_connection = 0;
        }

        /**
         * timer
         */
        if (SwooleG.signal_alarm)
        {
            goto alarm_handler;
        }

        if (ret >= 0 && !worker_task_always)
        {
            task_n--;
        }
    }
    return SW_OK;
}

sendMessage 函數(shù)

sendMessage 函數(shù)用于 worker 進程向其他 task 進程發(fā)送消息

函數(shù)首先從參數(shù)中獲取 messageworker_id

調(diào)用 php_swoole_task_packmessage 的數(shù)據(jù)存儲到 buf 對象中。

調(diào)用 swWorker_send2worker 發(fā)送數(shù)據(jù)給其他 worker 進程

PHP_METHOD(swoole_server, sendMessage)
{
    swEventData buf;

    zval *message;
    long worker_id = -1;

    swServer *serv = swoole_get_object(getThis());

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "zl", &message, &worker_id) == FAILURE)
    {
        return;
    }

    if (php_swoole_task_pack(&buf, message TSRMLS_CC) < 0)
    {
        RETURN_FALSE;
    }

    buf.info.type = SW_EVENT_PIPE_MESSAGE;
    buf.info.from_id = SwooleWG.id;

    swWorker *to_worker = swServer_get_worker(serv, worker_id);
    SW_CHECK_RETURN(swWorker_send2worker(to_worker, &buf, sizeof(buf.info) + buf.info.len, SW_PIPE_MASTER | SW_PIPE_NONBLOCK));
}

php_swoole_task_pack 函數(shù)

如果發(fā)送的消息是字符串,那么字符串賦值給 task_data_str

如果發(fā)送的消息不是字符串,那么需要進行序列化。如果開啟快速序列化,調(diào)用 php_swoole_serialize 方法進行序列化;否則,調(diào)用 sw_php_var_serialize 進行序列化。

如果數(shù)據(jù)過大,那么調(diào)用 swTaskWorker_large_pack 將消息寫入臨時文件;否則賦值給 task->data

#define swTask_type(task)                  ((task)->info.from_fd)

int php_swoole_task_pack(swEventData *task, zval *data TSRMLS_DC)
{
    smart_str serialized_data = { 0 };
    php_serialize_data_t var_hash;
#if PHP_MAJOR_VERSION >= 7
    zend_string *serialized_string = NULL;
#endif

    task->info.type = SW_EVENT_TASK;
    task->info.fd = php_swoole_task_id++;

    task->info.from_id = SwooleWG.id;
    swTask_type(task) = 0;

    char *task_data_str;
    int task_data_len = 0;
  
    if (SW_Z_TYPE_P(data) != IS_STRING)
    {
        //serialize
        swTask_type(task) |= SW_TASK_SERIALIZE;

#if PHP_MAJOR_VERSION >= 7
        if (SWOOLE_G(fast_serialize))
        {
            serialized_string = php_swoole_serialize(data);
            task_data_str = serialized_string->val;
            task_data_len = serialized_string->len;
        }
        else
#endif
        {
            PHP_VAR_SERIALIZE_INIT(var_hash);
            sw_php_var_serialize(&serialized_data, data, &var_hash TSRMLS_CC);
            PHP_VAR_SERIALIZE_DESTROY(var_hash);

            if (!serialized_data.s)
            {
                return -1;
            }
            task_data_str = serialized_data.s->val;
            task_data_len = serialized_data.s->len;
#endif
        }
    }
    else
    {
        task_data_str = Z_STRVAL_P(data);
        task_data_len = Z_STRLEN_P(data);
    }

    if (task_data_len >= SW_IPC_MAX_SIZE - sizeof(task->info))
    {
        if (swTaskWorker_large_pack(task, task_data_str, task_data_len) < 0)
        {
            swoole_php_fatal_error(E_WARNING, "large task pack failed.");
            task->info.fd = SW_ERR;
            task->info.len = 0;
        }
    }
    else
    {
        memcpy(task->data, task_data_str, task_data_len);
        task->info.len = task_data_len;
    }

#if PHP_MAJOR_VERSION >= 7
    if (SWOOLE_G(fast_serialize) && serialized_string)
    {
        zend_string_release(serialized_string);
    }
    else
#endif
    {
        smart_str_free(&serialized_data);
    }
    return task->info.fd;
}

int swTaskWorker_large_pack(swEventData *task, void *data, int data_len)
{
    swPackage_task pkg;
    bzero(&pkg, sizeof(pkg));

    memcpy(pkg.tmpfile, SwooleG.task_tmpdir, SwooleG.task_tmpdir_len);

    //create temp file
    int tmp_fd = swoole_tmpfile(pkg.tmpfile);
    if (tmp_fd < 0)
    {
        return SW_ERR;
    }

    //write to file
    if (swoole_sync_writefile(tmp_fd, data, data_len) <= 0)
    {
        swWarn("write to tmpfile failed.");
        return SW_ERR;
    }

    task->info.len = sizeof(swPackage_task);
    //use tmp file
    swTask_type(task) |= SW_TASK_TMPFILE;

    pkg.length = data_len;
    memcpy(task->data, &pkg, sizeof(swPackage_task));
    close(tmp_fd);
    return SW_OK;
}
swWorker_send2worker 函數(shù)

swWorker_send2worker 函數(shù)負責向 task 進程發(fā)送消息。可以看到 sendMessage 函數(shù)并不支持 stream 模式。

int swWorker_send2worker(swWorker *dst_worker, void *buf, int n, int flag)
{
    int pipefd, ret;

    if (flag & SW_PIPE_MASTER)
    {
        pipefd = dst_worker->pipe_master;
    }
    else
    {
        pipefd = dst_worker->pipe_worker;
    }

    //message-queue
    if (dst_worker->pool->use_msgqueue)
    {
        struct
        {
            long mtype;
            swEventData buf;
        } msg;

        msg.mtype = dst_worker->id + 1;
        memcpy(&msg.buf, buf, n);

        return swMsgQueue_push(dst_worker->pool->queue, (swQueue_data *) &msg, n);
    }

    if ((flag & SW_PIPE_NONBLOCK) && SwooleG.main_reactor)
    {
        return SwooleG.main_reactor->write(SwooleG.main_reactor, pipefd, buf, n);
    }
    else
    {
        ret = swSocket_write_blocking(pipefd, buf, n);
    }

    return ret;
}

swoole_server->task 函數(shù)

除了使用 sendMessage/onPipeMessage 發(fā)送消息之外,還可以使用 task/finishtask 進程發(fā)送異步任務。

類似于 sendMessage,函數(shù)首先將 data 利用 php_swoole_task_pack 進行序列化

利用 buf.info.fdonFinish 異步回調(diào)函數(shù)保存到 task_callbacks

使用 swProcessPool_dispatch 將消息傳遞給 task 進程

PHP_METHOD(swoole_server, task)
{
    swEventData buf;
    zval *data;
    zval *callback = NULL;

    zend_long dst_worker_id = -1;

    swServer *serv = swoole_get_object(getThis());

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z|lz", &data, &dst_worker_id, &callback) == FAILURE)
    {
        return;
    }
#endif

    if (php_swoole_task_pack(&buf, data TSRMLS_CC) < 0)
    {
        RETURN_FALSE;
    }

    if (callback && !ZVAL_IS_NULL(callback))
    {
#ifdef PHP_SWOOLE_CHECK_CALLBACK
        char *func_name = NULL;
        if (!sw_zend_is_callable(callback, 0, &func_name TSRMLS_CC))
        {
            swoole_php_fatal_error(E_WARNING, "function "%s" is not callable", func_name);
            efree(func_name);
            return;
        }
        efree(func_name);
#endif
        swTask_type(&buf) |= SW_TASK_CALLBACK;
        sw_zval_add_ref(&callback);
        swHashMap_add_int(task_callbacks, buf.info.fd, sw_zval_dup(callback));
    }

    swTask_type(&buf) |= SW_TASK_NONBLOCK;

    int _dst_worker_id = (int) dst_worker_id;
    if (swProcessPool_dispatch(&serv->gs->task_workers, &buf, &_dst_worker_id) >= 0)
    {
        sw_atomic_fetch_add(&serv->stats->tasking_num, 1);
        RETURN_LONG(buf.info.fd);
    }
    else
    {
        RETURN_FALSE;
    }
}
swProcessPool_dispatch 函數(shù)

發(fā)送給 task 進程后,如果使用的是 stream 模式,那么可以直接向 UNXI 域套接字發(fā)送數(shù)據(jù)即可。

如果 dst_worker_id 為 -1,那么就調(diào)用 swProcessPool_schedule 選取空閑的 task 進程

調(diào)用 swWorker_send2worker 發(fā)送數(shù)據(jù)給 worker 進程。

int swProcessPool_dispatch(swProcessPool *pool, swEventData *data, int *dst_worker_id)
{
    int ret = 0;
    swWorker *worker;

    if (pool->use_socket)
    {
        swStream *stream = swStream_new(pool->stream->socket_file, 0, SW_SOCK_UNIX_STREAM);
        if (stream == NULL)
        {
            return SW_ERR;
        }
        stream->response = NULL;
        stream->session_id = 0;
        if (swStream_send(stream, (char*) data, sizeof(data->info) + data->info.len) < 0)
        {
            stream->cancel = 1;
            return SW_ERR;
        }
        return SW_OK;
    }

    if (*dst_worker_id < 0)
    {
        *dst_worker_id = swProcessPool_schedule(pool);
    }

    *dst_worker_id += pool->start_id;
    worker = swProcessPool_get_worker(pool, *dst_worker_id);

    int sendn = sizeof(data->info) + data->info.len;
    ret = swWorker_send2worker(worker, data, sendn, SW_PIPE_MASTER | SW_PIPE_NONBLOCK);

    if (ret >= 0)
    {
        sw_atomic_fetch_add(&worker->tasking_num, 1);
    }
    else
    {
        swWarn("send %d bytes to worker#%d failed.", sendn, *dst_worker_id);
    }

    return ret;
}

static sw_inline int swProcessPool_schedule(swProcessPool *pool)
{
    if (pool->dispatch_mode == SW_DISPATCH_QUEUE)
    {
        return 0;
    }

    int i, target_worker_id = 0;
    int run_worker_num = pool->run_worker_num;

    for (i = 0; i < run_worker_num + 1; i++)
    {
        target_worker_id = sw_atomic_fetch_add(&pool->round_id, 1) % run_worker_num;
        if (pool->workers[target_worker_id].status == SW_WORKER_IDLE)
        {
            break;
        }
    }
    return target_worker_id;
}
taskWait 函數(shù)

taskWait 函數(shù)是同步投遞任務的函數(shù),該函數(shù)利用 swProcessPool_dispatch_blocking 投遞任務之后,會不斷讀取 serv->task_notify,知道獲取返回的數(shù)據(jù)。

PHP_METHOD(swoole_server, taskwait)
{
    swEventData buf;
    zval *data;

    double timeout = SW_TASKWAIT_TIMEOUT;
    long dst_worker_id = -1;

    swServer *serv = swoole_get_object(getThis());

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z|dl", &data, &timeout, &dst_worker_id) == FAILURE)
    {
        return;
    }

    if (php_swoole_task_pack(&buf, data TSRMLS_CC) < 0)
    {
        RETURN_FALSE;
    }
    
    int task_id = buf.info.fd;

    uint64_t notify;
    swEventData *task_result = &(serv->task_result[SwooleWG.id]);
    bzero(task_result, sizeof(swEventData));
    swPipe *task_notify_pipe = &serv->task_notify[SwooleWG.id];
    int efd = task_notify_pipe->getFd(task_notify_pipe, 0);

    //clear history task
    while (read(efd, ¬ify, sizeof(notify)) > 0);

    int _dst_worker_id = (int) dst_worker_id;
    if (swProcessPool_dispatch_blocking(&serv->gs->task_workers, &buf, &_dst_worker_id) >= 0)
    {
        sw_atomic_fetch_add(&serv->stats->tasking_num, 1);
        task_notify_pipe->timeout = timeout;
        while(1)
        {
            if (task_notify_pipe->read(task_notify_pipe, ¬ify, sizeof(notify)) > 0)
            {
                if (task_result->info.fd != task_id)
                {
                    continue;
                }
                zval *task_notify_data = php_swoole_task_unpack(task_result TSRMLS_CC);
                RETVAL_ZVAL(task_notify_data, 0, 0);
                break;
            }
        }
    }
    RETURN_FALSE;
}
swProcessPool_dispatch_blocking 函數(shù)

swProcessPool_dispatch_blocking 函數(shù)與 swProcessPool_dispatch 函數(shù)唯一的不同在于調(diào)用 swWorker_send2worker 的時候并沒有使用 SW_PIPE_NONBLOCK 選項。

int swProcessPool_dispatch_blocking(swProcessPool *pool, swEventData *data, int *dst_worker_id)
{
    int ret = 0;
    int sendn = sizeof(data->info) + data->info.len;

    if (pool->use_socket)
    {
        swClient _socket;
        if (swClient_create(&_socket, SW_SOCK_UNIX_STREAM, SW_SOCK_SYNC) < 0)
        {
            return SW_ERR;
        }
        if (_socket.connect(&_socket, pool->stream->socket_file, 0, -1, 0) < 0)
        {
            return SW_ERR;
        }
        if (_socket.send(&_socket, (void*) data, sendn, 0) < 0)
        {
            return SW_ERR;
        }
        _socket.close(&_socket);
        return SW_OK;
    }

    if (*dst_worker_id < 0)
    {
        *dst_worker_id = swProcessPool_schedule(pool);
    }

    *dst_worker_id += pool->start_id;
    swWorker *worker = swProcessPool_get_worker(pool, *dst_worker_id);

    ret = swWorker_send2worker(worker, data, sendn, SW_PIPE_MASTER);
    if (ret < 0)
    {
        swWarn("send %d bytes to worker#%d failed.", sendn, *dst_worker_id);
    }
    else
    {
        sw_atomic_fetch_add(&worker->tasking_num, 1);
    }

    return ret;
}
php_swoole_task_unpack 函數(shù)
zval* php_swoole_task_unpack(swEventData *task_result TSRMLS_DC)
{
    zval *result_data, *result_unserialized_data;
    char *result_data_str;
    int result_data_len = 0;
    php_unserialize_data_t var_hash;
    swString *large_packet;

    /**
     * Large result package
     */
    if (swTask_type(task_result) & SW_TASK_TMPFILE)
    {
        large_packet = swTaskWorker_large_unpack(task_result);
        /**
         * unpack failed
         */
        if (large_packet == NULL)
        {
            return NULL;
        }
        result_data_str = large_packet->str;
        result_data_len = large_packet->length;
    }
    else
    {
        result_data_str = task_result->data;
        result_data_len = task_result->info.len;
    }

    if (swTask_type(task_result) & SW_TASK_SERIALIZE)
    {
        SW_ALLOC_INIT_ZVAL(result_unserialized_data);

#if PHP_MAJOR_VERSION >= 7
        if (SWOOLE_G(fast_serialize))
        {
            if (php_swoole_unserialize(result_data_str, result_data_len, result_unserialized_data, NULL, 0))
            {
                result_data = result_unserialized_data;
            }
            else
            {
                SW_ALLOC_INIT_ZVAL(result_data);
                SW_ZVAL_STRINGL(result_data, result_data_str, result_data_len, 1);
            }
        }
        else
#endif
        {
            PHP_VAR_UNSERIALIZE_INIT(var_hash);
            //unserialize success
            if (sw_php_var_unserialize(&result_unserialized_data, (const unsigned char ** ) &result_data_str,
                    (const unsigned char * ) (result_data_str + result_data_len), &var_hash TSRMLS_CC))
            {
                result_data = result_unserialized_data;
            }
            //failed
            else
            {
                SW_ALLOC_INIT_ZVAL(result_data);
                SW_ZVAL_STRINGL(result_data, result_data_str, result_data_len, 1);
            }
            PHP_VAR_UNSERIALIZE_DESTROY(var_hash);
        }
    }
    else
    {
        SW_ALLOC_INIT_ZVAL(result_data);
        SW_ZVAL_STRINGL(result_data, result_data_str, result_data_len, 1);
    }
    return result_data;
}

static sw_inline swString* swTaskWorker_large_unpack(swEventData *task_result)
{
    swPackage_task _pkg;
    memcpy(&_pkg, task_result->data, sizeof(_pkg));

    int tmp_file_fd = open(_pkg.tmpfile, O_RDONLY);
    if (tmp_file_fd < 0)
    {
        swSysError("open(%s) failed.", _pkg.tmpfile);
        return NULL;
    }
    if (SwooleTG.buffer_stack->size < _pkg.length && swString_extend_align(SwooleTG.buffer_stack, _pkg.length) < 0)
    {
        close(tmp_file_fd);
        return NULL;
    }
    if (swoole_sync_readfile(tmp_file_fd, SwooleTG.buffer_stack->str, _pkg.length) < 0)
    {
        close(tmp_file_fd);
        return NULL;
    }
    close(tmp_file_fd);
    if (!(swTask_type(task_result) & SW_TASK_PEEK))
    {
        unlink(_pkg.tmpfile);
    }
    SwooleTG.buffer_stack->length = _pkg.length;
    return SwooleTG.buffer_stack;
}
taskWaitMulti 函數(shù)

taskWaitMulti 函數(shù)用于同時投遞多個任務

函數(shù)首先創(chuàng)建臨時文件,循環(huán) tasks 并調(diào)用 swProcessPool_dispatch_blocking 發(fā)送同步任務。

不斷讀取 task_notify_pipe 直到收到全部消息或者超時

讀取臨時文件內(nèi)容,并解析文件中各個任務的返回值

#define SW_TASK_TMP_FILE                 "/tmp/swoole.task.XXXXXX"

PHP_METHOD(swoole_server, taskWaitMulti)
{
    swEventData buf;
    zval *tasks;
    zval *task;
    double timeout = SW_TASKWAIT_TIMEOUT;

    swServer *serv = swoole_get_object(getThis());

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z|d", &tasks, &timeout) == FAILURE)
    {
        return;
    }

    array_init(return_value);

    int dst_worker_id;
    int task_id;
    int i = 0;
    int n_task = Z_ARRVAL_P(tasks)->nNumOfElements;

    int list_of_id[SW_MAX_CONCURRENT_TASK];

    uint64_t notify;
    swEventData *task_result = &(serv->task_result[SwooleWG.id]);
    bzero(task_result, sizeof(swEventData));
    swPipe *task_notify_pipe = &serv->task_notify[SwooleWG.id];
    swWorker *worker = swServer_get_worker(serv, SwooleWG.id);

    char _tmpfile[sizeof(SW_TASK_TMP_FILE)] = SW_TASK_TMP_FILE;
    int _tmpfile_fd = swoole_tmpfile(_tmpfile);
    if (_tmpfile_fd < 0)
    {
        RETURN_FALSE;
    }
    close(_tmpfile_fd);
    int *finish_count = (int *) task_result->data;

    worker->lock.lock(&worker->lock);
    *finish_count = 0;
    memcpy(task_result->data + 4, _tmpfile, sizeof(_tmpfile));
    worker->lock.unlock(&worker->lock);

    //clear history task
    int efd = task_notify_pipe->getFd(task_notify_pipe, 0);
    while (read(efd, ¬ify, sizeof(notify)) > 0);

    SW_HASHTABLE_FOREACH_START(Z_ARRVAL_P(tasks), task)
        task_id = php_swoole_task_pack(&buf, task TSRMLS_CC);

        swTask_type(&buf) |= SW_TASK_WAITALL;
        dst_worker_id = -1;
        if (swProcessPool_dispatch_blocking(&serv->gs->task_workers, &buf, &dst_worker_id) < 0)
        {
            swoole_php_fatal_error(E_WARNING, "taskwait failed. Error: %s[%d]", strerror(errno), errno);
            task_id = -1;
            fail:
            add_index_bool(return_value, i, 0);
            n_task --;
        }
        sw_atomic_fetch_add(&serv->stats->tasking_num, 1);
        list_of_id[i] = task_id;
        i++;
    SW_HASHTABLE_FOREACH_END();

    if (n_task == 0)
    {
        SwooleG.error = SW_ERROR_TASK_DISPATCH_FAIL;
        RETURN_FALSE;
    }

    double _now = swoole_microtime();
    while (n_task > 0)
    {
        task_notify_pipe->timeout = timeout;
        int ret = task_notify_pipe->read(task_notify_pipe, ¬ify, sizeof(notify));
        if (ret > 0 && *finish_count < n_task)
        {
            if (swoole_microtime() - _now < timeout)
            {
                continue;
            }
        }
        break;
    }

    worker->lock.lock(&worker->lock);
    swString *content = swoole_file_get_contents(_tmpfile);
    worker->lock.unlock(&worker->lock);

    if (content == NULL)
    {
        RETURN_FALSE;
    }

    swEventData *result;
    zval *zdata;
    int j;

    do
    {
        result = (swEventData *) (content->str + content->offset);
        task_id = result->info.fd;
        zdata = php_swoole_task_unpack(result TSRMLS_CC);
        if (zdata == NULL)
        {
            goto next;
        }
        for (j = 0; j < Z_ARRVAL_P(tasks)->nNumOfElements; j++)
        {
            if (list_of_id[j] == task_id)
            {
                break;
            }
        }
        add_index_zval(return_value, j, zdata);
        efree(zdata);
        next: content->offset += sizeof(swDataHead) + result->info.len;
    }
    while(content->offset < content->length);
    //free memory
    swString_free(content);
    //delete tmp file
    unlink(_tmpfile);
}

pool->onTask 函數(shù)

task 進程接受到消息之后,要判斷消息來源于 sendMessage 還是 SW_TASK_CALLBACK

int swTaskWorker_onTask(swProcessPool *pool, swEventData *task)
{
    int ret = SW_OK;
    swServer *serv = pool->ptr;
    current_task = task;

    if (task->info.type == SW_EVENT_PIPE_MESSAGE)
    {
        serv->onPipeMessage(serv, task);
    }
    else
    {
        ret = serv->onTask(serv, task);
    }

    return ret;
}
php_swoole_onPipeMessage 函數(shù)

php_swoole_onPipeMessage 函數(shù)就是 serv->onPipeMessage(serv, task) 函數(shù),該函數(shù)主要功能就是調(diào)用回調(diào)函數(shù) onPipeMessage

static void php_swoole_onPipeMessage(swServer *serv, swEventData *req)
{
    SWOOLE_GET_TSRMLS;

    zval *zserv = (zval *) serv->ptr2;
    zval *zworker_id;
    zval *retval = NULL;

    SW_MAKE_STD_ZVAL(zworker_id);
    ZVAL_LONG(zworker_id, (long) req->info.from_id);

    zval *zdata = php_swoole_task_unpack(req TSRMLS_CC);

    {
        zval **args[3];
        args[0] = &zserv;
        args[1] = &zworker_id;
        args[2] = &zdata;

        if (sw_call_user_function_fast(php_sw_server_callbacks[SW_SERVER_CB_onPipeMessage], php_sw_server_caches[SW_SERVER_CB_onPipeMessage], &retval, 3, args TSRMLS_CC) == FAILURE)
        {
            swoole_php_fatal_error(E_WARNING, "onPipeMessage handler error.");
        }
    }
}
php_swoole_onTask 函數(shù)

本函數(shù)就是 serv->onTask(serv, task) 所調(diào)用的函數(shù),該函數(shù)最重要的功能是調(diào)用 onTask 回調(diào)函數(shù),回調(diào)函數(shù)結(jié)束之后調(diào)用 php_swoole_task_finish 函數(shù)向 worker 進程發(fā)送已結(jié)束信息。

static int php_swoole_onTask(swServer *serv, swEventData *req)
{
    zval *zserv = (zval *) serv->ptr2;
    zval **args[4];

    zval *zfd;
    zval *zfrom_id;

    sw_atomic_fetch_sub(&serv->stats->tasking_num, 1);

    zval *retval = NULL;

    SWOOLE_GET_TSRMLS;

    SW_MAKE_STD_ZVAL(zfd);
    ZVAL_LONG(zfd, (long) req->info.fd);

    SW_MAKE_STD_ZVAL(zfrom_id);
    ZVAL_LONG(zfrom_id, (long) req->info.from_id);

    zval *zdata = php_swoole_task_unpack(req TSRMLS_CC);
    if (zdata == NULL)
    {
        return SW_ERR;
    }

    args[0] = &zserv;
    args[1] = &zfd;
    args[2] = &zfrom_id;
    args[3] = &zdata;

    zend_fcall_info_cache *fci_cache = php_sw_server_caches[SW_SERVER_CB_onTask];
    if (sw_call_user_function_fast(php_sw_server_callbacks[SW_SERVER_CB_onTask], fci_cache, &retval, 4, args TSRMLS_CC) == FAILURE)
    {
        swoole_php_fatal_error(E_WARNING, "onTask handler error.");
    }

    if (EG(exception))
    {
        zend_exception_error(EG(exception), E_ERROR TSRMLS_CC);
    }

    sw_zval_ptr_dtor(&zfd);
    sw_zval_ptr_dtor(&zfrom_id);
    sw_zval_free(zdata);

    if (retval)
    {
        if (SW_Z_TYPE_P(retval) != IS_NULL)
        {
            php_swoole_task_finish(serv, retval TSRMLS_CC);
        }
        sw_zval_ptr_dtor(&retval);
    }

    return SW_OK;
}
php_swoole_task_finish 函數(shù)

php_swoole_task_finish 函數(shù)主要用于告知 worker 進程投遞的任務已完成。首先需要序列化參數(shù),然后調(diào)用 swTaskWorker_finish 函數(shù)發(fā)送消息。

static int php_swoole_task_finish(swServer *serv, zval *data TSRMLS_DC)
{
    int flags = 0;
    smart_str serialized_data = {0};
    php_serialize_data_t var_hash;
    char *data_str;
    int data_len = 0;
    int ret;

#if PHP_MAJOR_VERSION >= 7
    zend_string *serialized_string = NULL;
#endif

    //need serialize
    if (SW_Z_TYPE_P(data) != IS_STRING)
    {
        //serialize
        flags |= SW_TASK_SERIALIZE;
#if PHP_MAJOR_VERSION >= 7
        if (SWOOLE_G(fast_serialize))
        {
            serialized_string = php_swoole_serialize(data);
            data_str = serialized_string->val;
            data_len = serialized_string->len;
        }
        else
#endif
        {
            PHP_VAR_SERIALIZE_INIT(var_hash);
            sw_php_var_serialize(&serialized_data, data, &var_hash TSRMLS_CC);
            PHP_VAR_SERIALIZE_DESTROY(var_hash);
#if PHP_MAJOR_VERSION<7
            data_str = serialized_data.c;
            data_len = serialized_data.len;
#else
            data_str = serialized_data.s->val;
            data_len = serialized_data.s->len;
#endif
        }
    }
    else
    {
        data_str = Z_STRVAL_P(data);
        data_len = Z_STRLEN_P(data);
    }

    ret = swTaskWorker_finish(serv, data_str, data_len, flags);
#if PHP_MAJOR_VERSION >= 7
    if (SWOOLE_G(fast_serialize) && serialized_string)
    {
        zend_string_release(serialized_string);
    }
    else
#endif
    {
        smart_str_free(&serialized_data);
    }
    return ret;
}
swTaskWorker_finish 函數(shù)

如果是異步投遞任務的話,本函數(shù)會調(diào)用 swWorker_send2worker 函數(shù)發(fā)送消息。如果使用 stream 模式,會向 worker->pool->stream->last_connection 這個套接字寫入;如果數(shù)據(jù)量過大,會采用臨時文件;

如果是使用 taskWaitMulti 同步投遞任務的話,將消息寫入 serv->task_result 中的臨時文件中。值得注意的是,消息有可能存放在了 SwooleG.task_tmpdir 臨時文件中,這時候存入 serv->task_result 中的臨時文件中的僅僅是文件名而不是具體內(nèi)容。

如果使用的是 taskWait 同步投遞任務的話,將數(shù)據(jù)放入 serv->task_result 中,或者放入 SwooleG.task_tmpdir 指定的臨時文件中。向 serv->task_notify 發(fā)送消息,告知 worker 進行 task 已消費完畢。

int swTaskWorker_finish(swServer *serv, char *data, int data_len, int flags)
{
    swEventData buf;
    if (!current_task)
    {
        swWarn("cannot use finish in worker");
        return SW_ERR;
    }
    if (serv->task_worker_num < 1)
    {
        swWarn("cannot use task/finish, because no set serv->task_worker_num.");
        return SW_ERR;
    }
    if (current_task->info.type == SW_EVENT_PIPE_MESSAGE)
    {
        swWarn("task/finish is not supported in onPipeMessage callback.");
        return SW_ERR;
    }

    uint16_t source_worker_id = current_task->info.from_id;
    swWorker *worker = swServer_get_worker(serv, source_worker_id);

    if (worker == NULL)
    {
        swWarn("invalid worker_id[%d].", source_worker_id);
        return SW_ERR;
    }

    int ret;
    //for swoole_server_task
    if (swTask_type(current_task) & SW_TASK_NONBLOCK)
    {
        buf.info.type = SW_EVENT_FINISH;
        buf.info.fd = current_task->info.fd;
        //callback function
        if (swTask_type(current_task) & SW_TASK_CALLBACK)
        {
            flags |= SW_TASK_CALLBACK;
        }
        else if (swTask_type(current_task) & SW_TASK_COROUTINE)
        {
            flags |= SW_TASK_COROUTINE;
        }
        swTask_type(&buf) = flags;

        //write to file
        if (data_len >= SW_IPC_MAX_SIZE - sizeof(buf.info))
        {
            if (swTaskWorker_large_pack(&buf, data, data_len) < 0 )
            {
                swWarn("large task pack failed()");
                return SW_ERR;
            }
        }
        else
        {
            memcpy(buf.data, data, data_len);
            buf.info.len = data_len;
        }

        if (worker->pool->use_socket && worker->pool->stream->last_connection > 0)
        {
            int32_t _len = htonl(data_len);
            ret = swSocket_write_blocking(worker->pool->stream->last_connection, (void *) &_len, sizeof(_len));
            if (ret > 0)
            {
                ret = swSocket_write_blocking(worker->pool->stream->last_connection, data, data_len);
            }
        }
        else
        {
            ret = swWorker_send2worker(worker, &buf, sizeof(buf.info) + buf.info.len, SW_PIPE_MASTER);
        }
    }
    else
    {
        uint64_t flag = 1;

        /**
         * Use worker shm store the result
         */
        swEventData *result = &(serv->task_result[source_worker_id]);
        swPipe *task_notify_pipe = &(serv->task_notify[source_worker_id]);

        //lock worker
        worker->lock.lock(&worker->lock);

        if (swTask_type(current_task) & SW_TASK_WAITALL)
        {
            sw_atomic_t *finish_count = (sw_atomic_t*) result->data;
            char *_tmpfile = result->data + 4;
            int fd = open(_tmpfile, O_APPEND | O_WRONLY);
            if (fd >= 0)
            {
                buf.info.type = SW_EVENT_FINISH;
                buf.info.fd = current_task->info.fd;
                swTask_type(&buf) = flags;
                //result pack
                if (data_len >= SW_IPC_MAX_SIZE - sizeof(buf.info))
                {
                    if (swTaskWorker_large_pack(&buf, data, data_len) < 0)
                    {
                        swWarn("large task pack failed()");
                        buf.info.len = 0;
                    }
                }
                else
                {
                    buf.info.len = data_len;
                    memcpy(buf.data, data, data_len);
                }
                //write to tmpfile
                if (swoole_sync_writefile(fd, &buf, sizeof(buf.info) + buf.info.len) < 0)
                {
                    swSysError("write(%s, %ld) failed.", result->data, sizeof(buf.info) + buf.info.len);
                }
                sw_atomic_fetch_add(finish_count, 1);
                close(fd);
            }
        }
        else
        {
            result->info.type = SW_EVENT_FINISH;
            result->info.fd = current_task->info.fd;
            swTask_type(result) = flags;

            if (data_len >= SW_IPC_MAX_SIZE - sizeof(buf.info))
            {
                if (swTaskWorker_large_pack(result, data, data_len) < 0)
                {
                    //unlock worker
                    worker->lock.unlock(&worker->lock);
                    swWarn("large task pack failed()");
                    return SW_ERR;
                }
            }
            else
            {
                memcpy(result->data, data, data_len);
                result->info.len = data_len;
            }
        }

        //unlock worker
        worker->lock.unlock(&worker->lock);

        while (1)
        {
            ret = task_notify_pipe->write(task_notify_pipe, &flag, sizeof(flag));
#ifdef HAVE_KQUEUE
            if (ret < 0 && (errno == EAGAIN || errno == ENOBUFS))
#else
            if (ret < 0 && errno == EAGAIN)
#endif
            {
                if (swSocket_wait(task_notify_pipe->getFd(task_notify_pipe, 1), -1, SW_EVENT_WRITE) == 0)
                {
                    continue;
                }
            }
            break;
        }
    }
    if (ret < 0)
    {
        swWarn("TaskWorker: send result to worker failed. Error: %s[%d]", strerror(errno), errno);
    }
    return ret;
}
php_swoole_onFinish 函數(shù)

異步投遞任務結(jié)束后,task 進程會調(diào)用 swWorker_send2workerworker 進程發(fā)送消息,worker 進程進而調(diào)用 swWorker_onTask。

我們可以看到,worker 函數(shù)會調(diào)用 serv->onFinish 函數(shù),也就是 php_swoole_onFinish 函數(shù)。

php_swoole_onFinish 函數(shù)主要用于調(diào)用 onFinish 回調(diào)函數(shù)。onFinish 回調(diào)函數(shù)有些是 swoole_server->task 函數(shù)指定,存儲在 task_callbacks 中;有些是 swoole_server->onFinish 指定,存儲在 php_sw_server_callbacks[SW_SERVER_CB_onFinish] 中。

int swWorker_onTask(swFactory *factory, swEventData *task)
{
    ...
    switch (task->info.type)
    {
        case SW_EVENT_FINISH:
            serv->onFinish(serv, task);
            break;

        case SW_EVENT_PIPE_MESSAGE:
            serv->onPipeMessage(serv, task);
            break;
    }
    ...
}

static int php_swoole_onFinish(swServer *serv, swEventData *req)
{
    zval *zserv = (zval *) serv->ptr2;
    zval **args[3];

    zval *ztask_id;
    zval *zdata;
    zval *retval = NULL;

    SWOOLE_GET_TSRMLS;

    SW_MAKE_STD_ZVAL(ztask_id);
    ZVAL_LONG(ztask_id, (long) req->info.fd);

    zdata = php_swoole_task_unpack(req TSRMLS_CC);

    args[0] = &zserv;
    args[1] = &ztask_id;
    args[2] = &zdata;

    zval *callback = NULL;
    if (swTask_type(req) & SW_TASK_CALLBACK)
    {
        callback = swHashMap_find_int(task_callbacks, req->info.fd);
        if (callback == NULL)
        {
            swTask_type(req) = swTask_type(req) & (~SW_TASK_CALLBACK);
        }
    }
    if (callback == NULL)
    {
        callback = php_sw_server_callbacks[SW_SERVER_CB_onFinish];
    }
    if (sw_call_user_function_ex(EG(function_table), NULL, callback, &retval, 3, args, 0, NULL TSRMLS_CC) == FAILURE)
    {
        swoole_php_fatal_error(E_WARNING, "onFinish handler error.");
    }
    if (EG(exception))
    {
        zend_exception_error(EG(exception), E_ERROR TSRMLS_CC);
    }
    sw_zval_ptr_dtor(&ztask_id);
    sw_zval_free(zdata);
    if (retval != NULL)
    {
        sw_zval_ptr_dtor(&retval);
    }
    if (swTask_type(req) & SW_TASK_CALLBACK)
    {
        swHashMap_del_int(task_callbacks, req->info.fd);
        sw_zval_free(callback);
    }
    return SW_OK;
}

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/29286.html

相關(guān)文章

  • swoole入門4-初識swoole

    摘要:當某種網(wǎng)絡事件發(fā)生時,會回調(diào)用戶設置的指定回調(diào)函數(shù)。承擔了底層網(wǎng)絡事件的監(jiān)聽及各種底層事件處理,當收到請求時,會觸發(fā)事件提醒,然后將控制權(quán)轉(zhuǎn)交預先注冊的事件回調(diào)函數(shù),來進行后續(xù)的處理。請求到來時創(chuàng)建,請求結(jié)束后銷毀。 運行流程圖 showImg(https://segmentfault.com/img/remote/1460000017207791);showImg(https://s...

    forsigner 評論0 收藏0
  • Swoft 源碼剖析 - 代碼自動更新機制

    摘要:作者鏈接來源簡書著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對原文進行了重新的排版。文件重載管理進程注冊了一個名為的該進程會在系統(tǒng)引導的最后一個階段,即啟動前啟動。 作者:bromine鏈接:https://www.jianshu.com/p/e63...來源:簡書著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對原文進行了重新的排版。Swoft Github: https://githu...

    iflove 評論0 收藏0
  • Swoft 源碼剖析 - Swoole和Swoft的那些事 (Http/Rpc服務篇)

    摘要:和服務關(guān)系最密切的進程是中的進程組,絕大部分業(yè)務處理都在該進程中進行。隨后觸發(fā)一個事件各組件通過該事件進行配置文件加載路由注冊。事件每個請求到來時僅僅會觸發(fā)事件。服務器生命周期和服務基本一致,詳情參考源碼剖析功能實現(xiàn) 作者:bromine鏈接:https://www.jianshu.com/p/4c0...來源:簡書著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對原文進行了重新的排版。S...

    張漢慶 評論0 收藏0
  • swoole進程結(jié)構(gòu)

    摘要:管理進程會監(jiān)視所有子進程的退出事件,當進程發(fā)生致命錯誤或者運行生命周期結(jié)束時,管理進程會回收此進程,并創(chuàng)建新的進程。換句話也就是說,對于進程的創(chuàng)建回收等操作全權(quán)有保姆進程進行管理。跟的交互請求到達實際上是與進程中的某個線程發(fā)生了連接。 showImg(https://segmentfault.com/img/bVbrhb2?w=600&h=360); 一、進程的基本知識 什么是進程,所...

    546669204 評論0 收藏0
  • Swoft 源碼剖析 - 連接池

    摘要:基于擴展實現(xiàn)真正的數(shù)據(jù)庫連接池這種方案中,項目占用的連接數(shù)僅僅為。一種是連接暫時不再使用,其占用狀態(tài)解除,可以從使用者手中交回到空閑隊列中這種我們稱為連接的歸隊。源碼剖析系列目錄 作者:bromine鏈接:https://www.jianshu.com/p/1a7...來源:簡書著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對原文進行了重新的排版。Swoft Github: https:...

    rozbo 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<