成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Swoole 源碼分析——Server模塊之Stream 模式

wums / 1508人閱讀

摘要:新建可以看到,自動(dòng)采用包長(zhǎng)檢測(cè)的方法該函數(shù)主要功能是設(shè)置各種回調(diào)函數(shù)值得注意的是第三個(gè)參數(shù)代表是否異步。發(fā)送數(shù)據(jù)函數(shù)并不是直接發(fā)送數(shù)據(jù),而是將數(shù)據(jù)存儲(chǔ)在,等著寫事件就緒之后調(diào)用發(fā)送數(shù)據(jù)。

swReactorThread_dispatch 發(fā)送數(shù)據(jù)

reactor 線程會(huì)通過 swReactorThread_dispatch 發(fā)送數(shù)據(jù),當(dāng)采用 stream 發(fā)送數(shù)據(jù)的時(shí)候,會(huì)調(diào)用 swStream_new 新建 stream,利用 swStream_send 發(fā)送數(shù)據(jù)。

int swReactorThread_dispatch(swConnection *conn, char *data, uint32_t length)
{
    ...
    if (serv->dispatch_mode == SW_DISPATCH_STREAM)
    {
        swStream *stream = swStream_new(serv->stream_socket, 0, SW_SOCK_UNIX_STREAM);
        if (stream == NULL)
        {
            return SW_ERR;
        }
        stream->response = swReactorThread_onStreamResponse;
        stream->session_id = conn->session_id;
        swListenPort *port = swServer_get_port(serv, conn->fd);
        swStream_set_max_length(stream, port->protocol.package_max_length);

        task.data.info.fd = conn->session_id;
        task.data.info.type = SW_EVENT_PACKAGE_END;
        task.data.info.len = 0;

        if (swStream_send(stream, (char*) &task.data.info, sizeof(task.data.info)) < 0)
        {
            return SW_ERR;
        }
        if (swStream_send(stream, data, length) < 0)
        {
            stream->cancel = 1;
            return SW_ERR;
        }
        return SW_OK;
    }
    ...
}
swStream_new 新建 stream

可以看到,stream 自動(dòng)采用包長(zhǎng)檢測(cè)的方法

該函數(shù)主要功能是設(shè)置各種回調(diào)函數(shù)

值得注意的是 swClient_create 第三個(gè)參數(shù)代表是否異步。在這里設(shè)置的是 1,也就是說,無論 connect 還是 send 都是異步。

typedef struct _swStream
{
    swString *buffer;
    uint32_t session_id;
    uint8_t cancel;
    void (*response)(struct _swStream *stream, char *data, uint32_t length);
    swClient client;
} swStream;

swStream* swStream_new(char *dst_host, int dst_port, int type)
{
    swStream *stream = (swStream*) sw_malloc(sizeof(swStream));
    bzero(stream, sizeof(swStream));

    swClient *cli = &stream->client;
    if (swClient_create(cli, type, 1) < 0)
    {
        swStream_free(stream);
        return NULL;
    }

    cli->onConnect = swStream_onConnect;
    cli->onReceive = swStream_onReceive;
    cli->onError = swStream_onError;
    cli->onClose = swStream_onClose;
    cli->object = stream;

    cli->open_length_check = 1;
    swStream_set_protocol(&cli->protocol);

    if (cli->connect(cli, dst_host, dst_port, -1, 0) < 0)
    {
        swSysError("failed to connect to [%s:%d].", dst_host, dst_port);
        swStream_free(stream);
        return NULL;
    }
    else
    {
        return stream;
    }
}

void swStream_set_protocol(swProtocol *protocol)
{
    protocol->get_package_length = swProtocol_get_package_length;
    protocol->package_length_size = 4;
    protocol->package_length_type = "N";
    protocol->package_body_offset = 4;
    protocol->package_length_offset = 0;
}
swStream_onConnect 連接回調(diào)函數(shù)

swStream_onConnect 不僅是連接成功的回調(diào)函數(shù),還是每次 onWrite 寫事件的回調(diào)函數(shù),因此每次都需要調(diào)用 cli->send 函數(shù),發(fā)送存儲(chǔ)在 stream->buffer 數(shù)據(jù)。值得注意的是,每次發(fā)送數(shù)據(jù),都要將數(shù)據(jù)長(zhǎng)度存放在 buffer 的頭部,否則包長(zhǎng)檢測(cè)會(huì)失敗。

static void swStream_onConnect(swClient *cli)
{
    swStream *stream = (swStream*) cli->object;
    if (stream->cancel)
    {
        cli->close(cli);
    }
    *((uint32_t *) stream->buffer->str) = ntohl(stream->buffer->length - 4);
    if (cli->send(cli, stream->buffer->str, stream->buffer->length, 0) < 0)
    {
        cli->close(cli);
    }
    else
    {
        swString_free(stream->buffer);
        stream->buffer = NULL;
    }
}
swStream_send 發(fā)送數(shù)據(jù)

swStream_send 函數(shù)并不是直接發(fā)送數(shù)據(jù),而是將數(shù)據(jù)存儲(chǔ)在 stream->buffer,等著寫事件就緒之后調(diào)用 swStream_onConnect 發(fā)送數(shù)據(jù)。值得注意的是,每次新建 buffer 的時(shí)候,要預(yù)留 4 個(gè)字節(jié)來存儲(chǔ) buffer 的數(shù)據(jù)長(zhǎng)度

int swStream_send(swStream *stream, char *data, size_t length)
{
    if (stream->buffer == NULL)
    {
        stream->buffer = swString_new(swoole_size_align(length + 4, SwooleG.pagesize));
        if (stream->buffer == NULL)
        {
            return SW_ERR;
        }
        stream->buffer->length = 4;
    }
    if (swString_append_ptr(stream->buffer, data, length) < 0)
    {
        return SW_ERR;
    }
    return SW_OK;
}
swStream_onReceive 函數(shù)

swStream_onReceive 函數(shù)是 stream 讀事件就緒的回調(diào)函數(shù),worker 進(jìn)程發(fā)送給客戶端的數(shù)據(jù)將會(huì)發(fā)送到本函數(shù)。如果 length 為 4,說明 worker 只發(fā)送了一個(gè) length 的空數(shù)據(jù)包,代表著 worker 進(jìn)程已消費(fèi)完畢,這時(shí)我們可以關(guān)閉 stream

static void swStream_onReceive(swClient *cli, char *data, uint32_t length)
{
    swStream *stream = (swStream*) cli->object;
    if (length == 4)
    {
        cli->socket->close_wait = 1;
    }
    else
    {
        stream->response(stream, data + 4, length - 4);
    }
}

static void swReactorThread_onStreamResponse(swStream *stream, char *data, uint32_t length)
{
    swSendData response;
    swConnection *conn = swServer_connection_verify(SwooleG.serv, stream->session_id);
    if (!conn)
    {
        swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SESSION_NOT_EXIST, "connection[fd=%d] does not exists.", stream->session_id);
        return;
    }
    response.info.fd = conn->session_id;
    response.info.type = SW_EVENT_TCP;
    response.info.len = 0;
    response.length = length;
    response.data = data;
    swReactorThread_send(&response);
}
swWorker_onStreamAccept 接受連接請(qǐng)求

接受請(qǐng)求和主進(jìn)程的 reactor 接受連接大致一致,略有不同的是 conn->socket_type 設(shè)置為了 SW_SOCK_UNIX_STREAM

static int swWorker_onStreamAccept(swReactor *reactor, swEvent *event)
{
    int fd = 0;
    swSocketAddress client_addr;
    socklen_t client_addrlen = sizeof(client_addr);

#ifdef HAVE_ACCEPT4
    fd = accept4(event->fd, (struct sockaddr *) &client_addr, &client_addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
#else
    fd = accept(event->fd, (struct sockaddr *) &client_addr, &client_addrlen);
#endif
    if (fd < 0)
    {
        switch (errno)
        {
        case EINTR:
        case EAGAIN:
            return SW_OK;
        default:
            swoole_error_log(SW_LOG_ERROR, SW_ERROR_SYSTEM_CALL_FAIL, "accept() failed. Error: %s[%d]", strerror(errno),
                    errno);
            return SW_OK;
        }
    }
#ifndef HAVE_ACCEPT4
    else
    {
        swoole_fcntl_set_option(fd, 1, 1);
    }
#endif

    swConnection *conn = swReactor_get(reactor, fd);
    bzero(conn, sizeof(swConnection));
    conn->fd = fd;
    conn->active = 1;
    conn->socket_type = SW_SOCK_UNIX_STREAM;
    memcpy(&conn->info.addr, &client_addr, sizeof(client_addr));

    if (reactor->add(reactor, fd, SW_FD_STREAM | SW_EVENT_READ) < 0)
    {
        return SW_ERR;
    }

    return SW_OK;
}

swWorker_onStreamRead 讀取數(shù)據(jù)

swWorker_onStreamRead 讀取數(shù)據(jù)核心是調(diào)用 swProtocol_recv_check_length 函數(shù)收取數(shù)據(jù)放入 serv->buffer_pool 單鏈表中,swProtocol_recv_check_length 函數(shù)我們?cè)?reactor 線程的事件循環(huán)中已經(jīng)了解了,我們這里不再重復(fù),我們知道,該函數(shù)獲取數(shù)據(jù)之后,會(huì)調(diào)用 onPackage 函數(shù),也就是 swWorker_onStreamPackage 函數(shù)

void swStream_set_protocol(swProtocol *protocol)
{
    protocol->get_package_length = swProtocol_get_package_length;
    protocol->package_length_size = 4;
    protocol->package_length_type = "N";
    protocol->package_body_offset = 4;
    protocol->package_length_offset = 0;
}


static int swWorker_onStreamRead(swReactor *reactor, swEvent *event)
{
    swConnection *conn = event->socket;
    swServer *serv = SwooleG.serv;
    swProtocol *protocol = &serv->stream_protocol;
    swString *buffer;

    if (!event->socket->recv_buffer)
    {
        buffer = swLinkedList_shift(serv->buffer_pool);
        if (buffer == NULL)
        {
            buffer = swString_new(8192);
            if (!buffer)
            {
                return SW_ERR;
            }

        }
        event->socket->recv_buffer = buffer;
    }
    else
    {
        buffer = event->socket->recv_buffer;
    }

    if (swProtocol_recv_check_length(protocol, conn, buffer) < 0)
    {
        swWorker_onStreamClose(reactor, event);
    }

    return SW_OK;
}
swWorker_onStreamPackage 函數(shù)

swWorker_onStreamPackage 函數(shù)用于將數(shù)據(jù)包投送到 swWorker_onTask 函數(shù)進(jìn)行消費(fèi)。消費(fèi)完畢會(huì)發(fā)送一個(gè)只含長(zhǎng)度 0 的數(shù)據(jù)包,告知 reactor worker 已經(jīng)結(jié)束。

static int swWorker_onStreamPackage(swConnection *conn, char *data, uint32_t length)
{
    swServer *serv = SwooleG.serv;
    swEventData *task = (swEventData *) (data + 4);

    serv->last_stream_fd = conn->fd;

    swString *package = swWorker_get_buffer(serv, task->info.from_id);
    uint32_t data_length = length - sizeof(task->info) - 4;
    //merge data to package buffer
    swString_append_ptr(package, data + sizeof(task->info) + 4, data_length);

    swWorker_onTask(&serv->factory, task);

    int _end = htonl(0);
    SwooleG.main_reactor->write(SwooleG.main_reactor, conn->fd, (void *) &_end, sizeof(_end));

    return SW_OK;
}

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/29277.html

相關(guān)文章

  • Swoole 源碼分析——Client模塊Send

    摘要:當(dāng)此時(shí)的套接字不可寫的時(shí)候,會(huì)自動(dòng)放入緩沖區(qū)中。當(dāng)大于高水線時(shí),會(huì)自動(dòng)調(diào)用回調(diào)函數(shù)。寫就緒狀態(tài)當(dāng)監(jiān)控到套接字進(jìn)入了寫就緒狀態(tài)時(shí),就會(huì)調(diào)用函數(shù)。如果為,說明此時(shí)異步客戶端雖然建立了連接,但是還沒有調(diào)用回調(diào)函數(shù),因此這時(shí)要調(diào)用函數(shù)。 前言 上一章我們說了客戶端的連接 connect,對(duì)于同步客戶端來說,連接已經(jīng)建立成功;但是對(duì)于異步客戶端來說,此時(shí)可能還在進(jìn)行 DNS 的解析,on...

    caozhijian 評(píng)論0 收藏0
  • Swoole 源碼分析——Server模塊TaskWorker事件循環(huán)

    摘要:函數(shù)事件循環(huán)在事件循環(huán)時(shí),如果使用的是消息隊(duì)列,那么就不斷的調(diào)用從消息隊(duì)列中取出數(shù)據(jù)。獲取后的數(shù)據(jù)調(diào)用回調(diào)函數(shù)消費(fèi)消息之后,向中發(fā)送空數(shù)據(jù),告知進(jìn)程已消費(fèi),并且關(guān)閉新連接。 swManager_start 創(chuàng)建進(jìn)程流程 task_worker 進(jìn)程的創(chuàng)建可以分為三個(gè)步驟:swServer_create_task_worker 申請(qǐng)所需的內(nèi)存、swTaskWorker_init 初始化...

    用戶83 評(píng)論0 收藏0
  • Swoole 源碼分析——Server模塊初始化

    摘要:如果在調(diào)用之前我們?cè)O(shè)置了,但是不在第二個(gè)進(jìn)程啟動(dòng)前這個(gè)套接字,那么第二個(gè)進(jìn)程仍然會(huì)在調(diào)用函數(shù)的時(shí)候出錯(cuò)。 前言 本節(jié)主要介紹 server 模塊進(jìn)行初始化的代碼,關(guān)于初始化過程中,各個(gè)屬性的意義,可以參考官方文檔: SERVER 配置選項(xiàng) 關(guān)于初始化過程中,用于監(jiān)聽的 socket 綁定問題,可以參考: UNP 學(xué)習(xí)筆記——基本 TCP 套接字編程 UNP 學(xué)習(xí)筆記——套接字選項(xiàng) 構(gòu)造...

    Half 評(píng)論0 收藏0
  • Swoole 源碼分析——Reactor模塊ReactorBase

    前言 作為一個(gè)網(wǎng)絡(luò)框架,最為核心的就是消息的接受與發(fā)送。高效的 reactor 模式一直是眾多網(wǎng)絡(luò)框架的首要選擇,本節(jié)主要講解 swoole 中的 reactor 模塊。 UNP 學(xué)習(xí)筆記——IO 復(fù)用 Reactor 的數(shù)據(jù)結(jié)構(gòu) Reactor 的數(shù)據(jù)結(jié)構(gòu)比較復(fù)雜,首先 object 是具體 Reactor 對(duì)象的首地址,ptr 是擁有 Reactor 對(duì)象的類的指針, event_nu...

    baukh789 評(píng)論0 收藏0
  • Swoole 源碼分析——Client模塊Connect

    摘要:兩個(gè)函數(shù)是可選回調(diào)函數(shù)。附帶了一組可信任證書。應(yīng)該注意的是,驗(yàn)證失敗并不意味著連接不能使用。在對(duì)證書進(jìn)行驗(yàn)證時(shí),有一些安全性檢查并沒有執(zhí)行,包括證書的失效檢查和對(duì)證書中通用名的有效性驗(yàn)證。 前言 swoole_client 提供了 tcp/udp socket 的客戶端的封裝代碼,使用時(shí)僅需 new swoole_client 即可。 swoole 的 socket client 對(duì)比...

    Charles 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<