摘要:新建可以看到,自動(dòng)采用包長(zhǎng)檢測(cè)的方法該函數(shù)主要功能是設(shè)置各種回調(diào)函數(shù)值得注意的是第三個(gè)參數(shù)代表是否異步。發(fā)送數(shù)據(jù)函數(shù)并不是直接發(fā)送數(shù)據(jù),而是將數(shù)據(jù)存儲(chǔ)在,等著寫事件就緒之后調(diào)用發(fā)送數(shù)據(jù)。
swReactorThread_dispatch 發(fā)送數(shù)據(jù)
reactor 線程會(huì)通過 swReactorThread_dispatch 發(fā)送數(shù)據(jù),當(dāng)采用 stream 發(fā)送數(shù)據(jù)的時(shí)候,會(huì)調(diào)用 swStream_new 新建 stream,利用 swStream_send 發(fā)送數(shù)據(jù)。
int swReactorThread_dispatch(swConnection *conn, char *data, uint32_t length) { ... if (serv->dispatch_mode == SW_DISPATCH_STREAM) { swStream *stream = swStream_new(serv->stream_socket, 0, SW_SOCK_UNIX_STREAM); if (stream == NULL) { return SW_ERR; } stream->response = swReactorThread_onStreamResponse; stream->session_id = conn->session_id; swListenPort *port = swServer_get_port(serv, conn->fd); swStream_set_max_length(stream, port->protocol.package_max_length); task.data.info.fd = conn->session_id; task.data.info.type = SW_EVENT_PACKAGE_END; task.data.info.len = 0; if (swStream_send(stream, (char*) &task.data.info, sizeof(task.data.info)) < 0) { return SW_ERR; } if (swStream_send(stream, data, length) < 0) { stream->cancel = 1; return SW_ERR; } return SW_OK; } ... }swStream_new 新建 stream
可以看到,stream 自動(dòng)采用包長(zhǎng)檢測(cè)的方法
該函數(shù)主要功能是設(shè)置各種回調(diào)函數(shù)
值得注意的是 swClient_create 第三個(gè)參數(shù)代表是否異步。在這里設(shè)置的是 1,也就是說,無論 connect 還是 send 都是異步。
typedef struct _swStream { swString *buffer; uint32_t session_id; uint8_t cancel; void (*response)(struct _swStream *stream, char *data, uint32_t length); swClient client; } swStream; swStream* swStream_new(char *dst_host, int dst_port, int type) { swStream *stream = (swStream*) sw_malloc(sizeof(swStream)); bzero(stream, sizeof(swStream)); swClient *cli = &stream->client; if (swClient_create(cli, type, 1) < 0) { swStream_free(stream); return NULL; } cli->onConnect = swStream_onConnect; cli->onReceive = swStream_onReceive; cli->onError = swStream_onError; cli->onClose = swStream_onClose; cli->object = stream; cli->open_length_check = 1; swStream_set_protocol(&cli->protocol); if (cli->connect(cli, dst_host, dst_port, -1, 0) < 0) { swSysError("failed to connect to [%s:%d].", dst_host, dst_port); swStream_free(stream); return NULL; } else { return stream; } } void swStream_set_protocol(swProtocol *protocol) { protocol->get_package_length = swProtocol_get_package_length; protocol->package_length_size = 4; protocol->package_length_type = "N"; protocol->package_body_offset = 4; protocol->package_length_offset = 0; }swStream_onConnect 連接回調(diào)函數(shù)
swStream_onConnect 不僅是連接成功的回調(diào)函數(shù),還是每次 onWrite 寫事件的回調(diào)函數(shù),因此每次都需要調(diào)用 cli->send 函數(shù),發(fā)送存儲(chǔ)在 stream->buffer 數(shù)據(jù)。值得注意的是,每次發(fā)送數(shù)據(jù),都要將數(shù)據(jù)長(zhǎng)度存放在 buffer 的頭部,否則包長(zhǎng)檢測(cè)會(huì)失敗。
static void swStream_onConnect(swClient *cli) { swStream *stream = (swStream*) cli->object; if (stream->cancel) { cli->close(cli); } *((uint32_t *) stream->buffer->str) = ntohl(stream->buffer->length - 4); if (cli->send(cli, stream->buffer->str, stream->buffer->length, 0) < 0) { cli->close(cli); } else { swString_free(stream->buffer); stream->buffer = NULL; } }swStream_send 發(fā)送數(shù)據(jù)
swStream_send 函數(shù)并不是直接發(fā)送數(shù)據(jù),而是將數(shù)據(jù)存儲(chǔ)在 stream->buffer,等著寫事件就緒之后調(diào)用 swStream_onConnect 發(fā)送數(shù)據(jù)。值得注意的是,每次新建 buffer 的時(shí)候,要預(yù)留 4 個(gè)字節(jié)來存儲(chǔ) buffer 的數(shù)據(jù)長(zhǎng)度
int swStream_send(swStream *stream, char *data, size_t length) { if (stream->buffer == NULL) { stream->buffer = swString_new(swoole_size_align(length + 4, SwooleG.pagesize)); if (stream->buffer == NULL) { return SW_ERR; } stream->buffer->length = 4; } if (swString_append_ptr(stream->buffer, data, length) < 0) { return SW_ERR; } return SW_OK; }swStream_onReceive 函數(shù)
swStream_onReceive 函數(shù)是 stream 讀事件就緒的回調(diào)函數(shù),worker 進(jìn)程發(fā)送給客戶端的數(shù)據(jù)將會(huì)發(fā)送到本函數(shù)。如果 length 為 4,說明 worker 只發(fā)送了一個(gè) length 的空數(shù)據(jù)包,代表著 worker 進(jìn)程已消費(fèi)完畢,這時(shí)我們可以關(guān)閉 stream。
static void swStream_onReceive(swClient *cli, char *data, uint32_t length) { swStream *stream = (swStream*) cli->object; if (length == 4) { cli->socket->close_wait = 1; } else { stream->response(stream, data + 4, length - 4); } } static void swReactorThread_onStreamResponse(swStream *stream, char *data, uint32_t length) { swSendData response; swConnection *conn = swServer_connection_verify(SwooleG.serv, stream->session_id); if (!conn) { swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SESSION_NOT_EXIST, "connection[fd=%d] does not exists.", stream->session_id); return; } response.info.fd = conn->session_id; response.info.type = SW_EVENT_TCP; response.info.len = 0; response.length = length; response.data = data; swReactorThread_send(&response); }swWorker_onStreamAccept 接受連接請(qǐng)求
接受請(qǐng)求和主進(jìn)程的 reactor 接受連接大致一致,略有不同的是 conn->socket_type 設(shè)置為了 SW_SOCK_UNIX_STREAM
static int swWorker_onStreamAccept(swReactor *reactor, swEvent *event) { int fd = 0; swSocketAddress client_addr; socklen_t client_addrlen = sizeof(client_addr); #ifdef HAVE_ACCEPT4 fd = accept4(event->fd, (struct sockaddr *) &client_addr, &client_addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC); #else fd = accept(event->fd, (struct sockaddr *) &client_addr, &client_addrlen); #endif if (fd < 0) { switch (errno) { case EINTR: case EAGAIN: return SW_OK; default: swoole_error_log(SW_LOG_ERROR, SW_ERROR_SYSTEM_CALL_FAIL, "accept() failed. Error: %s[%d]", strerror(errno), errno); return SW_OK; } } #ifndef HAVE_ACCEPT4 else { swoole_fcntl_set_option(fd, 1, 1); } #endif swConnection *conn = swReactor_get(reactor, fd); bzero(conn, sizeof(swConnection)); conn->fd = fd; conn->active = 1; conn->socket_type = SW_SOCK_UNIX_STREAM; memcpy(&conn->info.addr, &client_addr, sizeof(client_addr)); if (reactor->add(reactor, fd, SW_FD_STREAM | SW_EVENT_READ) < 0) { return SW_ERR; } return SW_OK; }swWorker_onStreamRead 讀取數(shù)據(jù)
swWorker_onStreamRead 讀取數(shù)據(jù)核心是調(diào)用 swProtocol_recv_check_length 函數(shù)收取數(shù)據(jù)放入 serv->buffer_pool 單鏈表中,swProtocol_recv_check_length 函數(shù)我們?cè)?reactor 線程的事件循環(huán)中已經(jīng)了解了,我們這里不再重復(fù),我們知道,該函數(shù)獲取數(shù)據(jù)之后,會(huì)調(diào)用 onPackage 函數(shù),也就是 swWorker_onStreamPackage 函數(shù)
void swStream_set_protocol(swProtocol *protocol) { protocol->get_package_length = swProtocol_get_package_length; protocol->package_length_size = 4; protocol->package_length_type = "N"; protocol->package_body_offset = 4; protocol->package_length_offset = 0; } static int swWorker_onStreamRead(swReactor *reactor, swEvent *event) { swConnection *conn = event->socket; swServer *serv = SwooleG.serv; swProtocol *protocol = &serv->stream_protocol; swString *buffer; if (!event->socket->recv_buffer) { buffer = swLinkedList_shift(serv->buffer_pool); if (buffer == NULL) { buffer = swString_new(8192); if (!buffer) { return SW_ERR; } } event->socket->recv_buffer = buffer; } else { buffer = event->socket->recv_buffer; } if (swProtocol_recv_check_length(protocol, conn, buffer) < 0) { swWorker_onStreamClose(reactor, event); } return SW_OK; }swWorker_onStreamPackage 函數(shù)
swWorker_onStreamPackage 函數(shù)用于將數(shù)據(jù)包投送到 swWorker_onTask 函數(shù)進(jìn)行消費(fèi)。消費(fèi)完畢會(huì)發(fā)送一個(gè)只含長(zhǎng)度 0 的數(shù)據(jù)包,告知 reactor worker 已經(jīng)結(jié)束。
static int swWorker_onStreamPackage(swConnection *conn, char *data, uint32_t length) { swServer *serv = SwooleG.serv; swEventData *task = (swEventData *) (data + 4); serv->last_stream_fd = conn->fd; swString *package = swWorker_get_buffer(serv, task->info.from_id); uint32_t data_length = length - sizeof(task->info) - 4; //merge data to package buffer swString_append_ptr(package, data + sizeof(task->info) + 4, data_length); swWorker_onTask(&serv->factory, task); int _end = htonl(0); SwooleG.main_reactor->write(SwooleG.main_reactor, conn->fd, (void *) &_end, sizeof(_end)); return SW_OK; }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/29277.html
摘要:當(dāng)此時(shí)的套接字不可寫的時(shí)候,會(huì)自動(dòng)放入緩沖區(qū)中。當(dāng)大于高水線時(shí),會(huì)自動(dòng)調(diào)用回調(diào)函數(shù)。寫就緒狀態(tài)當(dāng)監(jiān)控到套接字進(jìn)入了寫就緒狀態(tài)時(shí),就會(huì)調(diào)用函數(shù)。如果為,說明此時(shí)異步客戶端雖然建立了連接,但是還沒有調(diào)用回調(diào)函數(shù),因此這時(shí)要調(diào)用函數(shù)。 前言 上一章我們說了客戶端的連接 connect,對(duì)于同步客戶端來說,連接已經(jīng)建立成功;但是對(duì)于異步客戶端來說,此時(shí)可能還在進(jìn)行 DNS 的解析,on...
摘要:函數(shù)事件循環(huán)在事件循環(huán)時(shí),如果使用的是消息隊(duì)列,那么就不斷的調(diào)用從消息隊(duì)列中取出數(shù)據(jù)。獲取后的數(shù)據(jù)調(diào)用回調(diào)函數(shù)消費(fèi)消息之后,向中發(fā)送空數(shù)據(jù),告知進(jìn)程已消費(fèi),并且關(guān)閉新連接。 swManager_start 創(chuàng)建進(jìn)程流程 task_worker 進(jìn)程的創(chuàng)建可以分為三個(gè)步驟:swServer_create_task_worker 申請(qǐng)所需的內(nèi)存、swTaskWorker_init 初始化...
摘要:如果在調(diào)用之前我們?cè)O(shè)置了,但是不在第二個(gè)進(jìn)程啟動(dòng)前這個(gè)套接字,那么第二個(gè)進(jìn)程仍然會(huì)在調(diào)用函數(shù)的時(shí)候出錯(cuò)。 前言 本節(jié)主要介紹 server 模塊進(jìn)行初始化的代碼,關(guān)于初始化過程中,各個(gè)屬性的意義,可以參考官方文檔: SERVER 配置選項(xiàng) 關(guān)于初始化過程中,用于監(jiān)聽的 socket 綁定問題,可以參考: UNP 學(xué)習(xí)筆記——基本 TCP 套接字編程 UNP 學(xué)習(xí)筆記——套接字選項(xiàng) 構(gòu)造...
前言 作為一個(gè)網(wǎng)絡(luò)框架,最為核心的就是消息的接受與發(fā)送。高效的 reactor 模式一直是眾多網(wǎng)絡(luò)框架的首要選擇,本節(jié)主要講解 swoole 中的 reactor 模塊。 UNP 學(xué)習(xí)筆記——IO 復(fù)用 Reactor 的數(shù)據(jù)結(jié)構(gòu) Reactor 的數(shù)據(jù)結(jié)構(gòu)比較復(fù)雜,首先 object 是具體 Reactor 對(duì)象的首地址,ptr 是擁有 Reactor 對(duì)象的類的指針, event_nu...
摘要:兩個(gè)函數(shù)是可選回調(diào)函數(shù)。附帶了一組可信任證書。應(yīng)該注意的是,驗(yàn)證失敗并不意味著連接不能使用。在對(duì)證書進(jìn)行驗(yàn)證時(shí),有一些安全性檢查并沒有執(zhí)行,包括證書的失效檢查和對(duì)證書中通用名的有效性驗(yàn)證。 前言 swoole_client 提供了 tcp/udp socket 的客戶端的封裝代碼,使用時(shí)僅需 new swoole_client 即可。 swoole 的 socket client 對(duì)比...
閱讀 2855·2021-10-26 09:48
閱讀 1738·2021-09-22 15:22
閱讀 4134·2021-09-22 15:05
閱讀 665·2021-09-06 15:02
閱讀 2633·2019-08-30 15:52
閱讀 2133·2019-08-29 18:38
閱讀 2784·2019-08-28 18:05
閱讀 2353·2019-08-26 13:55