摘要:之后如果仍然有剩余未發(fā)送的數(shù)據(jù),那么就如果已經(jīng)沒有剩余數(shù)據(jù)了,繼續(xù)去取下一個(gè)數(shù)據(jù)包。拿到后,要用函數(shù)轉(zhuǎn)化為相應(yīng)的類型即可得到包長值。
swPort_onRead_check_eof EOF 自動(dòng)分包
我們前面說過,swPort_onRead_raw 是最簡單的向 worker 進(jìn)程發(fā)送數(shù)據(jù)包的方法,swoole 會(huì)將從客戶端接受到的數(shù)據(jù)包,立刻發(fā)送給 worker 進(jìn)程,用戶自己把數(shù)據(jù)包拼接起來
如果啟用了 EOF 自動(dòng)分包,那么 swoole 會(huì)檢測(cè) EOF 符號(hào),拼接完畢數(shù)據(jù)之后再向 worker 發(fā)送數(shù)據(jù)
swProtocol_recv_check_eof 用于檢測(cè) EOF 符號(hào),如果沒有檢測(cè)到數(shù)據(jù)就存儲(chǔ)到 buffer。
static int swPort_onRead_check_eof(swReactor *reactor, swListenPort *port, swEvent *event) { swConnection *conn = event->socket; swProtocol *protocol = &port->protocol; swServer *serv = reactor->ptr; swString *buffer = swServer_get_buffer(serv, event->fd); if (!buffer) { return SW_ERR; } if (swProtocol_recv_check_eof(protocol, conn, buffer) < 0) { swReactorThread_onClose(reactor, event); } return SW_OK; } static sw_inline swString *swServer_get_buffer(swServer *serv, int fd) { swString *buffer = serv->connection_list[fd].recv_buffer; if (buffer == NULL) { buffer = swString_new(SW_BUFFER_SIZE_STD); //alloc memory failed. if (!buffer) { return NULL; } serv->connection_list[fd].recv_buffer = buffer; } return buffer; }swProtocol_recv_check_eof 檢測(cè) EOF
首先需要調(diào)用 swConnection_recv 函數(shù)接受客戶端發(fā)來的數(shù)據(jù),如果發(fā)生錯(cuò)誤返回 SW_OK,等待 socket 讀就緒重新讀??;如果錯(cuò)誤是 SW_CLOSE,那么就要返回 SW_ERR,然后讓 swPort_onRead_check_eof 函數(shù)調(diào)用 swReactorThread_onClose 函數(shù)。
EOF 自動(dòng)分包也有兩種方式,分別是 open_eof_check 和 open_eof_split,open_eof_check 只檢查接收數(shù)據(jù)的末尾是否為 EOF,因此它的性能最好,幾乎沒有消耗,但是無法解決多個(gè)數(shù)據(jù)包合并的問題,比如同時(shí)發(fā)送兩條帶有 EOF 的數(shù)據(jù),底層可能會(huì)一次全部返回;open_eof_split 會(huì)從左到右對(duì)數(shù)據(jù)進(jìn)行逐字節(jié)對(duì)比,查找數(shù)據(jù)中的 EOF 進(jìn)行分包,性能較差。但是每次只會(huì)返回一個(gè)數(shù)據(jù)包
如果采用 open_eof_check,那么只需要簡單的 memcmp 對(duì)比數(shù)據(jù)包的最后字符即可,如果符合條件就會(huì)調(diào)用 protocol->onPackage 函數(shù),也就是 swReactorThread_dispatch
如果采用的是 open_eof_split 就會(huì)比較麻煩,需要調(diào)用 swProtocol_split_package_by_eof 逐個(gè)去找 EOF
如果超過了 protocol->package_max_length 大小,那么說明一直沒有發(fā)送成功,就會(huì)返回錯(cuò)誤,結(jié)束當(dāng)前連接
如果緩沖區(qū)不足,那么就將緩沖區(qū)擴(kuò)容到 protocol->package_max_length,繼續(xù)接受數(shù)據(jù)
int swProtocol_recv_check_eof(swProtocol *protocol, swConnection *conn, swString *buffer) { int recv_again = SW_FALSE; int buf_size; recv_data: buf_size = buffer->size - buffer->length; char *buf_ptr = buffer->str + buffer->length; if (buf_size > SW_BUFFER_SIZE_STD) { buf_size = SW_BUFFER_SIZE_STD; } int n = swConnection_recv(conn, buf_ptr, buf_size, 0); if (n < 0) { switch (swConnection_error(errno)) { case SW_ERROR: swSysError("recv from socket#%d failed.", conn->fd); return SW_OK; case SW_CLOSE: conn->close_errno = errno; return SW_ERR; default: return SW_OK; } } else if (n == 0) { return SW_ERR; } else { buffer->length += n; if (buffer->length < protocol->package_eof_len) { return SW_OK; } if (protocol->split_by_eof) { if (swProtocol_split_package_by_eof(protocol, conn, buffer) == 0) { return SW_OK; } else { recv_again = SW_TRUE; } } else if (memcmp(buffer->str + buffer->length - protocol->package_eof_len, protocol->package_eof, protocol->package_eof_len) == 0) { if (protocol->onPackage(conn, buffer->str, buffer->length) < 0) { return SW_ERR; } if (conn->removed) { return SW_OK; } swString_clear(buffer); return SW_OK; } //over max length, will discard if (buffer->length == protocol->package_max_length) { swWarn("Package is too big. package_length=%d", (int )buffer->length); return SW_ERR; } //buffer is full, may have not read data if (buffer->length == buffer->size) { recv_again = SW_TRUE; if (buffer->size < protocol->package_max_length) { uint32_t extend_size = swoole_size_align(buffer->size * 2, SwooleG.pagesize); if (extend_size > protocol->package_max_length) { extend_size = protocol->package_max_length; } if (swString_extend(buffer, extend_size) < 0) { return SW_ERR; } } } //no eof if (recv_again) { goto recv_data; } } return SW_OK; }swProtocol_split_package_by_eof 尋找 EOF
如果當(dāng)前緩存中數(shù)據(jù)連 package_eof_len 也就是 EOF 的長度都不夠,那么就直接返回,繼續(xù)接受數(shù)據(jù)
根據(jù) package_eof 來查找第一個(gè) EOF 的位置,如果沒有找到 EOF,那么遞增 buffer->offset,返回繼續(xù)接受數(shù)據(jù)
找到了 EOF 之后,就要調(diào)用 protocol->onPackage 函數(shù),發(fā)送給 worker 進(jìn)程
接著就要從剩余的數(shù)據(jù)里面循環(huán)不斷尋找 EOF,調(diào)用 protocol->onPackage 函數(shù)
static sw_inline int swProtocol_split_package_by_eof(swProtocol *protocol, swConnection *conn, swString *buffer) { #if SW_LOG_TRACE_OPEN > 0 static int count; count++; #endif int eof_pos; if (buffer->length - buffer->offset < protocol->package_eof_len) { eof_pos = -1; } else { eof_pos = swoole_strnpos(buffer->str + buffer->offset, buffer->length - buffer->offset, protocol->package_eof, protocol->package_eof_len); } swTraceLog(SW_TRACE_EOF_PROTOCOL, "#[0] count=%d, length=%ld, size=%ld, offset=%ld.", count, buffer->length, buffer->size, (long)buffer->offset); //waiting for more data if (eof_pos < 0) { buffer->offset = buffer->length - protocol->package_eof_len; return buffer->length; } uint32_t length = buffer->offset + eof_pos + protocol->package_eof_len; swTraceLog(SW_TRACE_EOF_PROTOCOL, "#[4] count=%d, length=%d", count, length); if (protocol->onPackage(conn, buffer->str, length) < 0) { return SW_ERR; } if (conn->removed) { return SW_OK; } //there are remaining data if (length < buffer->length) { uint32_t remaining_length = buffer->length - length; char *remaining_data = buffer->str + length; swTraceLog(SW_TRACE_EOF_PROTOCOL, "#[5] count=%d, remaining_length=%d", count, remaining_length); while (1) { if (remaining_length < protocol->package_eof_len) { goto wait_more_data; } eof_pos = swoole_strnpos(remaining_data, remaining_length, protocol->package_eof, protocol->package_eof_len); if (eof_pos < 0) { wait_more_data: swTraceLog(SW_TRACE_EOF_PROTOCOL, "#[1] count=%d, remaining_length=%d, length=%d", count, remaining_length, length); memmove(buffer->str, remaining_data, remaining_length); buffer->length = remaining_length; buffer->offset = 0; return SW_OK; } else { length = eof_pos + protocol->package_eof_len; if (protocol->onPackage(conn, remaining_data, length) < 0) { return SW_ERR; } if (conn->removed) { return SW_OK; } swTraceLog(SW_TRACE_EOF_PROTOCOL, "#[2] count=%d, remaining_length=%d, length=%d", count, remaining_length, length); remaining_data += length; remaining_length -= length; } } } swTraceLog(SW_TRACE_EOF_PROTOCOL, "#[3] length=%ld, size=%ld, offset=%ld", buffer->length, buffer->size, (long)buffer->offset); swString_clear(buffer); return SW_OK; }swPort_onRead_check_length 包長檢測(cè)
類似地本函數(shù)也是調(diào)用 swProtocol_recv_check_length 來進(jìn)行包長檢測(cè)
static int swPort_onRead_check_length(swReactor *reactor, swListenPort *port, swEvent *event) { swServer *serv = reactor->ptr; swConnection *conn = event->socket; swProtocol *protocol = &port->protocol; swString *buffer = swServer_get_buffer(serv, event->fd); if (!buffer) { return SW_ERR; } if (swProtocol_recv_check_length(protocol, conn, buffer) < 0) { swTrace("Close Event.FD=%d|From=%d", event->fd, event->from_id); swReactorThread_onClose(reactor, event); } return SW_OK; }swProtocol_recv_check_length 函數(shù)
進(jìn)行包長檢測(cè)的時(shí)候,每次讀取數(shù)據(jù)之前都要先讀取 header,從 header 中獲取到數(shù)據(jù)包的大小后,再去讀取真正的數(shù)據(jù)
當(dāng)我們不知道包長大小的時(shí)候,buffer->offset 為 0,此時(shí)需要讀取 length 大小,但是這個(gè)數(shù)據(jù)位于 header 的 protocol->package_length_offset 位置,假設(shè) length 位于 header 的第 8 個(gè)字節(jié);length 自身數(shù)據(jù)大小為 protocol->package_length_size,例如 int_32 類型,這個(gè)值就是 4,因此我們需要先讀取 12 個(gè)字節(jié),這 12 個(gè)字節(jié)的最后 4 個(gè)字節(jié)就是 length 的值,也就是包長。
將數(shù)據(jù)拿到后(此時(shí) recv_wait 為 0),調(diào)用 protocol->get_package_length 就可以獲取 length 的值,根據(jù) buffer->offset 的值為包長值,
如果此時(shí) buffer->length 已接收的數(shù)據(jù)大于這個(gè)包長,那么就調(diào)用 onPackage 發(fā)送給 worker 進(jìn)程
如果此時(shí)已接收的數(shù)據(jù)不足,那么 recv_size 就是剩余需要接受的數(shù)據(jù)大小,此時(shí) recv_wait 為 1,繼續(xù)接受數(shù)據(jù)
如果接受到的數(shù)據(jù)已經(jīng)大于包長,那么就調(diào)用 onPackage 發(fā)送。之后如果仍然有剩余未發(fā)送的數(shù)據(jù),那么就 do_get_length;如果已經(jīng)沒有剩余數(shù)據(jù)了,繼續(xù)去取下一個(gè)數(shù)據(jù)包。
如果數(shù)據(jù)還是不夠,那么就返回,等待讀就緒事件
int swProtocol_recv_check_length(swProtocol *protocol, swConnection *conn, swString *buffer) { int package_length; uint32_t recv_size; char swap[SW_BUFFER_SIZE_STD]; if (conn->skip_recv) { conn->skip_recv = 0; goto do_get_length; } do_recv: if (conn->active == 0) { return SW_OK; } if (buffer->offset > 0) { recv_size = buffer->offset - buffer->length; } else { recv_size = protocol->package_length_offset + protocol->package_length_size; } int n = swConnection_recv(conn, buffer->str + buffer->length, recv_size, 0); if (n < 0) { switch (swConnection_error(errno)) { case SW_ERROR: swSysError("recv(%d, %d) failed.", conn->fd, recv_size); return SW_OK; case SW_CLOSE: conn->close_errno = errno; return SW_ERR; default: return SW_OK; } } else if (n == 0) { return SW_ERR; } else { buffer->length += n; if (conn->recv_wait) { if (buffer->length >= buffer->offset) { do_dispatch: if (protocol->onPackage(conn, buffer->str, buffer->offset) < 0) { return SW_ERR; } if (conn->removed) { return SW_OK; } conn->recv_wait = 0; int remaining_length = buffer->length - buffer->offset; if (remaining_length > 0) { assert(remaining_length < sizeof(swap)); memcpy(swap, buffer->str + buffer->offset, remaining_length); memcpy(buffer->str, swap, remaining_length); buffer->offset = 0; buffer->length = remaining_length; goto do_get_length; } else { swString_clear(buffer); goto do_recv; } } else { return SW_OK; } } else { do_get_length: package_length = protocol->get_package_length(protocol, conn, buffer->str, buffer->length); //invalid package, close connection. if (package_length < 0) { return SW_ERR; } //no length else if (package_length == 0) { return SW_OK; } else if (package_length > protocol->package_max_length) { swWarn("package is too big, remote_addr=%s:%d, length=%d.", swConnection_get_ip(conn), swConnection_get_port(conn), package_length); return SW_ERR; } //get length success else { if (buffer->size < package_length) { if (swString_extend(buffer, package_length) < 0) { return SW_ERR; } } conn->recv_wait = 1; buffer->offset = package_length; if (buffer->length >= package_length) { goto do_dispatch; } else { goto do_recv; } } } } return SW_OK; }swProtocol_get_package_length 獲取包長
本函數(shù)邏輯很簡單,如果長度連 length 都不夠,那么包長信息并不在 data 中,直接返回繼續(xù)接受數(shù)據(jù)。拿到 length 后,要用 swoole_unpack 函數(shù)轉(zhuǎn)化為相應(yīng)的類型即可得到包長值。
int swProtocol_get_package_length(swProtocol *protocol, swConnection *conn, char *data, uint32_t size) { uint16_t length_offset = protocol->package_length_offset; int32_t body_length; /** * no have length field, wait more data */ if (size < length_offset + protocol->package_length_size) { return 0; } body_length = swoole_unpack(protocol->package_length_type, data + length_offset); //Length error //Protocol length is not legitimate, out of bounds or exceed the allocated length if (body_length < 0) { swWarn("invalid package, remote_addr=%s:%d, length=%d, size=%d.", swConnection_get_ip(conn), swConnection_get_port(conn), body_length, size); return SW_ERR; } //total package length return protocol->package_body_offset + body_length; } static sw_inline int32_t swoole_unpack(char type, void *data) { switch(type) { /*-------------------------16bit-----------------------------*/ case "c": return *((int8_t *) data); case "C": return *((uint8_t *) data); /*-------------------------16bit-----------------------------*/ /** * signed short (always 16 bit, machine byte order) */ case "s": return *((int16_t *) data); /** * unsigned short (always 16 bit, machine byte order) */ case "S": return *((uint16_t *) data); /** * unsigned short (always 16 bit, big endian byte order) */ case "n": return ntohs(*((uint16_t *) data)); /** * unsigned short (always 32 bit, little endian byte order) */ case "v": return swoole_swap_endian16(ntohs(*((uint16_t *) data))); /*-------------------------32bit-----------------------------*/ /** * unsigned long (always 32 bit, machine byte order) */ case "L": return *((uint32_t *) data); /** * signed long (always 32 bit, machine byte order) */ case "l": return *((int *) data); /** * unsigned long (always 32 bit, big endian byte order) */ case "N": return ntohl(*((uint32_t *) data)); /** * unsigned short (always 32 bit, little endian byte order) */ case "V": return swoole_swap_endian32(ntohl(*((uint32_t *) data))); default: return *((uint32_t *) data); } }swReactorThread_onPipeWrite 寫事件回調(diào)
當(dāng) reactor 線程檢測(cè)到相對(duì)應(yīng)的 worker 進(jìn)程的 pipe_master 寫就緒的時(shí)候,就會(huì)調(diào)用 swReactorThread_onPipeWrite
當(dāng) in_buffer 不是空的話,就會(huì)循環(huán)拿出單鏈表的數(shù)據(jù),調(diào)用 swServer_connection_verify 驗(yàn)證 session_id 是否正確,然后調(diào)用 write 發(fā)送數(shù)據(jù)
當(dāng)返回的錯(cuò)誤是 EAGAIN 的時(shí)候,說明 socket 已經(jīng)不可用,返回等待下一次寫就緒即可
值得注意的是 write 的返回結(jié)果不需要關(guān)心到底寫入了多少,因?yàn)閷?duì)于 linux 來說,pipe 可以保證 write 小于 PIPE_BUF 大小數(shù)據(jù)的原子性,不是全部寫入成功,就是寫入失敗,不會(huì)出現(xiàn)寫入部分?jǐn)?shù)據(jù)的可能。
當(dāng)所有的數(shù)據(jù)都發(fā)送成功后,取消寫就緒監(jiān)控,防止重復(fù)浪費(fèi)調(diào)用
static int swReactorThread_onPipeWrite(swReactor *reactor, swEvent *ev) { int ret; swBuffer_trunk *trunk = NULL; swEventData *send_data; swConnection *conn; swServer *serv = reactor->ptr; swBuffer *buffer = serv->connection_list[ev->fd].in_buffer; swLock *lock = serv->connection_list[ev->fd].object; //lock thread lock->lock(lock); while (!swBuffer_empty(buffer)) { trunk = swBuffer_get_trunk(buffer); send_data = trunk->store.ptr; //server active close, discard data. if (swEventData_is_stream(send_data->info.type)) { //send_data->info.fd is session_id conn = swServer_connection_verify(serv, send_data->info.fd); if (conn == NULL || conn->closed) { #ifdef SW_USE_RINGBUFFER swReactorThread *thread = swServer_get_thread(SwooleG.serv, SwooleTG.id); swPackage package; memcpy(&package, send_data->data, sizeof(package)); thread->buffer_input->free(thread->buffer_input, package.data); #endif if (conn && conn->closed) { swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SESSION_CLOSED_BY_SERVER, "Session#%d is closed by server.", send_data->info.fd); } swBuffer_pop_trunk(buffer, trunk); continue; } } ret = write(ev->fd, trunk->store.ptr, trunk->length); if (ret < 0) { //release lock lock->unlock(lock); #ifdef HAVE_KQUEUE return (errno == EAGAIN || errno == ENOBUFS) ? SW_OK : SW_ERR; #else return errno == EAGAIN ? SW_OK : SW_ERR; #endif } else { swBuffer_pop_trunk(buffer, trunk); } } //remove EPOLLOUT event if (swBuffer_empty(buffer)) { if (SwooleG.serv->connection_list[ev->fd].from_id == SwooleTG.id) { ret = reactor->set(reactor, ev->fd, SW_FD_PIPE | SW_EVENT_READ); } else { ret = reactor->del(reactor, ev->fd); } if (ret < 0) { swSysError("reactor->set(%d) failed.", ev->fd); } } //release lock lock->unlock(lock); return SW_OK; }swReactorThread_onPipeReceive 讀事件就緒
從 worker 進(jìn)程返回的數(shù)據(jù)有三種:SW_RESPONSE_SMALL(少量數(shù)據(jù))、SW_RESPONSE_SHM(大數(shù)據(jù)包存儲(chǔ)在共享內(nèi)存中)、SW_RESPONSE_TMPFILE(臨時(shí)文件)
需要將從 worker 接受到的 swEventData 對(duì)象轉(zhuǎn)化為 swSendData
對(duì)于大數(shù)據(jù)包,worker 并不會(huì)將數(shù)據(jù)通過 socket 來傳遞,而是將 work_id 發(fā)送過來,數(shù)據(jù)存放在 worker->send_shm 中
如果是臨時(shí)文件,worker 發(fā)送過來的數(shù)據(jù)是臨時(shí)文件的名字,需要調(diào)用 swTaskWorker_large_unpack 將文件內(nèi)容讀取到 SwooleTG.buffer_stack 中去
swReactorThread_send 函數(shù)用于向客戶端發(fā)送數(shù)據(jù)
typedef struct _swSendData { swDataHead info; /** * for big package */ uint32_t length; char *data; } swSendData; typedef struct { int length; int worker_id; } swPackage_response; static int swReactorThread_onPipeReceive(swReactor *reactor, swEvent *ev) { int n; swEventData resp; swSendData _send; swPackage_response pkg_resp; swWorker *worker; #ifdef SW_REACTOR_RECV_AGAIN while (1) #endif { n = read(ev->fd, &resp, sizeof(resp)); if (n > 0) { memcpy(&_send.info, &resp.info, sizeof(resp.info)); //pipe data if (_send.info.from_fd == SW_RESPONSE_SMALL) { _send.data = resp.data; _send.length = resp.info.len; swReactorThread_send(&_send); } //use send shm else if (_send.info.from_fd == SW_RESPONSE_SHM) { memcpy(&pkg_resp, resp.data, sizeof(pkg_resp)); worker = swServer_get_worker(SwooleG.serv, pkg_resp.worker_id); _send.data = worker->send_shm; _send.length = pkg_resp.length; swReactorThread_send(&_send); worker->lock.unlock(&worker->lock); } //use tmp file else if (_send.info.from_fd == SW_RESPONSE_TMPFILE) { swString *data = swTaskWorker_large_unpack(&resp); if (data == NULL) { return SW_ERR; } _send.data = data->str; _send.length = data->length; swReactorThread_send(&_send); } //reactor thread exit else if (_send.info.from_fd == SW_RESPONSE_EXIT) { reactor->running = 0; return SW_OK; } //will never be here else { abort(); } } else if (errno == EAGAIN) { return SW_OK; } else { swWarn("read(worker_pipe) failed. Error: %s[%d]", strerror(errno), errno); return SW_ERR; } } return SW_OK; } static sw_inline swString* swTaskWorker_large_unpack(swEventData *task_result) { swPackage_task _pkg; memcpy(&_pkg, task_result->data, sizeof(_pkg)); int tmp_file_fd = open(_pkg.tmpfile, O_RDONLY); if (tmp_file_fd < 0) { swSysError("open(%s) failed.", _pkg.tmpfile); return NULL; } if (SwooleTG.buffer_stack->size < _pkg.length && swString_extend_align(SwooleTG.buffer_stack, _pkg.length) < 0) { close(tmp_file_fd); return NULL; } if (swoole_sync_readfile(tmp_file_fd, SwooleTG.buffer_stack->str, _pkg.length) < 0) { close(tmp_file_fd); return NULL; } close(tmp_file_fd); if (!(swTask_type(task_result) & SW_TASK_PEEK)) { unlink(_pkg.tmpfile); } SwooleTG.buffer_stack->length = _pkg.length; return SwooleTG.buffer_stack; }swReactorThread_send 函數(shù)
首先要獲取連接的 session_id,利用 session_id 獲取 swConnection 對(duì)象,進(jìn)而拿到負(fù)責(zé)該連接的 reactor 對(duì)象
SW_EVENT_CONFIRM 代表 worker 確認(rèn)接收該連接(當(dāng)服務(wù)端使用 enable_delay_receive 選項(xiàng)時(shí))
當(dāng)調(diào)用 swoole_server->pause 函數(shù)時(shí),BASE 模式會(huì)調(diào)用本函數(shù),將不會(huì)讀取客戶端數(shù)據(jù),去除 reactor 對(duì)讀就緒事件的監(jiān)聽
類似地 swoole_server->resume 函數(shù)用于恢復(fù)當(dāng)前連接,重新將讀就緒放入 reactor 的監(jiān)聽事件中
如果 conn->out_buffer 為空,那么就嘗試向 socket 寫數(shù)據(jù),如果沒有全部寫入成功,那么就將數(shù)據(jù)放入 conn->out_buffer 中去,并開啟事件監(jiān)聽
如果 conn->out_buffe 數(shù)據(jù)量過大,需要設(shè)置 conn->high_watermark 為 1,調(diào)用 onBufferFull 回調(diào)
int swReactorThread_send(swSendData *_send) { swServer *serv = SwooleG.serv; uint32_t session_id = _send->info.fd; void *_send_data = _send->data; uint32_t _send_length = _send->length; swConnection *conn; if (_send->info.type != SW_EVENT_CLOSE) { conn = swServer_connection_verify(serv, session_id); } else { conn = swServer_connection_verify_no_ssl(serv, session_id); } int fd = conn->fd; swReactor *reactor; { reactor = &(serv->reactor_threads[conn->from_id].reactor); assert(fd % serv->reactor_num == reactor->id); assert(fd % serv->reactor_num == SwooleTG.id); } /** * Reset send buffer, Immediately close the connection. */ if (_send->info.type == SW_EVENT_CLOSE && (conn->close_reset || conn->removed)) { goto close_fd; } else if (_send->info.type == SW_EVENT_CONFIRM) { reactor->add(reactor, conn->fd, conn->fdtype | SW_EVENT_READ); conn->listen_wait = 0; return SW_OK; } /** * pause recv data */ else if (_send->info.type == SW_EVENT_PAUSE_RECV) { if (conn->events & SW_EVENT_WRITE) { return reactor->set(reactor, conn->fd, conn->fdtype | SW_EVENT_WRITE); } else { return reactor->del(reactor, conn->fd); } } /** * resume recv data */ else if (_send->info.type == SW_EVENT_RESUME_RECV) { if (conn->events & SW_EVENT_WRITE) { return reactor->set(reactor, conn->fd, conn->fdtype | SW_EVENT_READ | SW_EVENT_WRITE); } else { return reactor->add(reactor, conn->fd, conn->fdtype | SW_EVENT_READ); } } if (swBuffer_empty(conn->out_buffer)) { /** * close connection. */ if (_send->info.type == SW_EVENT_CLOSE) { close_fd: reactor->close(reactor, fd); return SW_OK; } #ifdef SW_REACTOR_SYNC_SEND //Direct send if (_send->info.type != SW_EVENT_SENDFILE) { if (!conn->direct_send) { goto buffer_send; } int n; direct_send: n = swConnection_send(conn, _send_data, _send_length, 0); if (n == _send_length) { return SW_OK; } else if (n > 0) { _send_data += n; _send_length -= n; goto buffer_send; } else if (errno == EINTR) { goto direct_send; } else { goto buffer_send; } } #endif //buffer send else { #ifdef SW_REACTOR_SYNC_SEND buffer_send: #endif if (!conn->out_buffer) { conn->out_buffer = swBuffer_new(SW_BUFFER_SIZE); if (conn->out_buffer == NULL) { return SW_ERR; } } } } swBuffer_trunk *trunk; //close connection if (_send->info.type == SW_EVENT_CLOSE) { trunk = swBuffer_new_trunk(conn->out_buffer, SW_CHUNK_CLOSE, 0); trunk->store.data.val1 = _send->info.type; } //sendfile to client else if (_send->info.type == SW_EVENT_SENDFILE) { swSendFile_request *req = (swSendFile_request *) _send_data; swConnection_sendfile(conn, req->filename, req->offset, req->length); } //send data else { //connection is closed if (conn->removed) { swWarn("connection#%d is closed by client.", fd); return SW_ERR; } //connection output buffer overflow if (conn->out_buffer->length >= conn->buffer_size) { if (serv->send_yield) { SwooleG.error = SW_ERROR_OUTPUT_BUFFER_OVERFLOW; } else { swoole_error_log(SW_LOG_WARNING, SW_ERROR_OUTPUT_BUFFER_OVERFLOW, "connection#%d output buffer overflow.", fd); } conn->overflow = 1; if (serv->onBufferEmpty && serv->onBufferFull == NULL) { conn->high_watermark = 1; } } int _length = _send_length; void* _pos = _send_data; int _n; //buffer enQueue while (_length > 0) { _n = _length >= SW_BUFFER_SIZE_BIG ? SW_BUFFER_SIZE_BIG : _length; swBuffer_append(conn->out_buffer, _pos, _n); _pos += _n; _length -= _n; } swListenPort *port = swServer_get_port(serv, fd); if (serv->onBufferFull && conn->high_watermark == 0 && conn->out_buffer->length >= port->buffer_high_watermark) { swServer_tcp_notify(serv, conn, SW_EVENT_BUFFER_FULL); conn->high_watermark = 1; } } //listen EPOLLOUT event if (reactor->set(reactor, fd, SW_EVENT_TCP | SW_EVENT_WRITE | SW_EVENT_READ) < 0 && (errno == EBADF || errno == ENOENT)) { goto close_fd; } return SW_OK; }swConnection_sendfile 發(fā)送文件
對(duì)于文件的發(fā)送,swoole 將文件的信息存儲(chǔ)在 swTask_sendfile 對(duì)象中,然后將其放入 conn->out_buffer 中。
typedef struct { char *filename; uint16_t name_len; int fd; size_t length; off_t offset; } swTask_sendfile; int swConnection_sendfile(swConnection *conn, char *filename, off_t offset, size_t length) { if (conn->out_buffer == NULL) { conn->out_buffer = swBuffer_new(SW_BUFFER_SIZE); if (conn->out_buffer == NULL) { return SW_ERR; } } swBuffer_trunk error_chunk; swTask_sendfile *task = sw_malloc(sizeof(swTask_sendfile)); if (task == NULL) { swWarn("malloc for swTask_sendfile failed."); return SW_ERR; } bzero(task, sizeof(swTask_sendfile)); task->filename = sw_strdup(filename); int file_fd = open(filename, O_RDONLY); if (file_fd < 0) { sw_free(task->filename); sw_free(task); swSysError("open(%s) failed.", filename); return SW_OK; } task->fd = file_fd; task->offset = offset; struct stat file_stat; if (fstat(file_fd, &file_stat) < 0) { swSysError("fstat(%s) failed.", filename); error_chunk.store.ptr = task; swConnection_sendfile_destructor(&error_chunk); return SW_ERR; } if (offset < 0 || (length + offset > file_stat.st_size)) { swoole_error_log(SW_LOG_WARNING, SW_ERROR_INVALID_PARAMS, "length or offset is invalid."); error_chunk.store.ptr = task; swConnection_sendfile_destructor(&error_chunk); return SW_OK; } if (length == 0) { task->length = file_stat.st_size; } else { task->length = length + offset; } swBuffer_trunk *chunk = swBuffer_new_trunk(conn->out_buffer, SW_CHUNK_SENDFILE, 0); if (chunk == NULL) { swWarn("get out_buffer trunk failed."); error_chunk.store.ptr = task; swConnection_sendfile_destructor(&error_chunk); return SW_ERR; } chunk->store.ptr = (void *) task; chunk->destroy = swConnection_sendfile_destructor; return SW_OK; }swConnection_onSendfile 向客戶端發(fā)送文件
HAVE_TCP_NOPUSH 是避免 TCP 延遲接受的一種方法,為了避免 Nagle 算法造成的延遲,我們需要設(shè)置 TCP_NODELAY 選項(xiàng)和 TCP_CORK 選項(xiàng)來避免延遲接受和合并數(shù)據(jù)包(詳情可以看 Nagle 算法與 TCP socket 選項(xiàng) TCP_CORK)
獲取到 sendn 后,就要調(diào)用 swoole_sendfile 讀取文件內(nèi)容,發(fā)送數(shù)據(jù)
發(fā)送數(shù)據(jù)結(jié)束后,再將 TCP_CORK 設(shè)置為 0
static sw_inline int swSocket_tcp_nopush(int sock, int nopush) { return setsockopt(sock, IPPROTO_TCP, TCP_CORK, (const void *) &nopush, sizeof(int)); } int swConnection_onSendfile(swConnection *conn, swBuffer_trunk *chunk) { int ret; swTask_sendfile *task = chunk->store.ptr; #ifdef HAVE_TCP_NOPUSH if (task->offset == 0 && conn->tcp_nopush == 0) { /** * disable tcp_nodelay */ if (conn->tcp_nodelay) { int tcp_nodelay = 0; if (setsockopt(conn->fd, IPPROTO_TCP, TCP_NODELAY, (const void *) &tcp_nodelay, sizeof(int)) == -1) { swWarn("setsockopt(TCP_NODELAY) failed. Error: %s[%d]", strerror(errno), errno); } } /** * enable tcp_nopush */ if (swSocket_tcp_nopush(conn->fd, 1) == -1) { swWarn("swSocket_tcp_nopush() failed. Error: %s[%d]", strerror(errno), errno); } conn->tcp_nopush = 1; } #endif int sendn = (task->length - task->offset > SW_SENDFILE_CHUNK_SIZE) ? SW_SENDFILE_CHUNK_SIZE : task->length - task->offset; { ret = swoole_sendfile(conn->fd, task->fd, &task->offset, sendn); } swTrace("ret=%d|task->offset=%ld|sendn=%d|filesize=%ld", ret, (long)task->offset, sendn, task->length); if (ret <= 0) { switch (swConnection_error(errno)) { case SW_ERROR: swSysError("sendfile(%s, %ld, %d) failed.", task->filename, (long)task->offset, sendn); swBuffer_pop_trunk(conn->out_buffer, chunk); return SW_OK; case SW_CLOSE: conn->close_wait = 1; return SW_ERR; case SW_WAIT: conn->send_wait = 1; return SW_ERR; default: break; } } //sendfile finish if (task->offset >= task->length) { swBuffer_pop_trunk(conn->out_buffer, chunk); #ifdef HAVE_TCP_NOPUSH /** * disable tcp_nopush */ if (swSocket_tcp_nopush(conn->fd, 0) == -1) { swWarn("swSocket_tcp_nopush() failed. Error: %s[%d]", strerror(errno), errno); } conn->tcp_nopush = 0; /** * enable tcp_nodelay */ if (conn->tcp_nodelay) { int value = 1; if (setsockopt(conn->fd, IPPROTO_TCP, TCP_NODELAY, (const void *) &value, sizeof(int)) == -1) { swWarn("setsockopt(TCP_NODELAY) failed. Error: %s[%d]", strerror(errno), errno); } } #endif } return SW_OK; } int swoole_sendfile(int out_fd, int in_fd, off_t *offset, size_t size) { char buf[SW_BUFFER_SIZE_BIG]; int readn = size > sizeof(buf) ? sizeof(buf) : size; int ret; int n = pread(in_fd, buf, readn, *offset); if (n > 0) { ret = write(out_fd, buf, n); if (ret < 0) { swSysError("write() failed."); } else { *offset += ret; } return ret; } else { swSysError("pread() failed."); return SW_ERR; } }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/29241.html
摘要:線程在建立之時(shí),就會(huì)調(diào)用函數(shù)開啟事件循環(huán)。如果為空,那么重新設(shè)置文件描述符的監(jiān)聽事件,刪除寫就緒,只設(shè)置讀就緒。這個(gè)是水平觸發(fā)模式的必要步驟,避免無數(shù)據(jù)寫入時(shí),頻繁地調(diào)用寫就緒回調(diào)函數(shù)。 前言 經(jīng)過 php_swoole_server_before_start 調(diào)用 swReactorThread_create 創(chuàng)建了 serv->reactor_threads 對(duì)象后,swServe...
摘要:是緩存區(qū)高水位線,達(dá)到了說明緩沖區(qū)即將滿了創(chuàng)建線程函數(shù)用于將監(jiān)控的存放于中向中添加監(jiān)聽的文件描述符等待所有的線程開啟事件循環(huán)利用創(chuàng)建線程,線程啟動(dòng)函數(shù)是保存監(jiān)聽本函數(shù)將用于監(jiān)聽的存放到當(dāng)中,并設(shè)置相應(yīng)的屬性 Server 的啟動(dòng) 在 server 啟動(dòng)之前,swoole 首先要調(diào)用 php_swoole_register_callback 將 PHP 的回調(diào)函數(shù)注冊(cè)到 server...
前言 作為一個(gè)網(wǎng)絡(luò)框架,最為核心的就是消息的接受與發(fā)送。高效的 reactor 模式一直是眾多網(wǎng)絡(luò)框架的首要選擇,本節(jié)主要講解 swoole 中的 reactor 模塊。 UNP 學(xué)習(xí)筆記——IO 復(fù)用 Reactor 的數(shù)據(jù)結(jié)構(gòu) Reactor 的數(shù)據(jù)結(jié)構(gòu)比較復(fù)雜,首先 object 是具體 Reactor 對(duì)象的首地址,ptr 是擁有 Reactor 對(duì)象的類的指針, event_nu...
摘要:服務(wù)本身是一個(gè),開起的線程數(shù)為,再加上一些其他線程,總的線程數(shù)不會(huì)超過服務(wù)內(nèi)自己沒有顯示創(chuàng)建線程或者使用線程池。問題解決找到所在后,結(jié)局方案很簡單,只需將的通過單例的方式注入到服務(wù)中,即可解決堆外內(nèi)存泄漏的問題。 內(nèi)存泄漏Bug現(xiàn)場 一個(gè)做BI數(shù)據(jù)展示的服務(wù)在一個(gè)晚上重啟了5次,由于是通過k8s容器編排,服務(wù)掛了以后會(huì)自動(dòng)重啟,所以服務(wù)還能繼續(xù)提供服務(wù)。 第一時(shí)間先上日志系統(tǒng)查看錯(cuò)誤日...
摘要:新建可以看到,自動(dòng)采用包長檢測(cè)的方法該函數(shù)主要功能是設(shè)置各種回調(diào)函數(shù)值得注意的是第三個(gè)參數(shù)代表是否異步。發(fā)送數(shù)據(jù)函數(shù)并不是直接發(fā)送數(shù)據(jù),而是將數(shù)據(jù)存儲(chǔ)在,等著寫事件就緒之后調(diào)用發(fā)送數(shù)據(jù)。 swReactorThread_dispatch 發(fā)送數(shù)據(jù) reactor 線程會(huì)通過 swReactorThread_dispatch 發(fā)送數(shù)據(jù),當(dāng)采用 stream 發(fā)送數(shù)據(jù)的時(shí)候,會(huì)調(diào)用 sw...
閱讀 2510·2021-11-15 11:38
閱讀 1959·2021-11-05 09:37
閱讀 2281·2021-10-08 10:12
閱讀 2818·2019-08-30 15:55
閱讀 2120·2019-08-30 15:52
閱讀 1230·2019-08-29 13:24
閱讀 471·2019-08-26 18:27
閱讀 1483·2019-08-26 18:27