摘要:搞懂了這部分后,我們將明白在世界中扮演的角色進(jìn)擊的此圖展示的已經(jīng)算是優(yōu)化后的了用到了線程池。多線程將這種處理操作分隔出來,非型操作業(yè)務(wù)操作配備以線程池,進(jìn)化成多線程模型這樣的架構(gòu),系統(tǒng)瓶頸轉(zhuǎn)移至部分。
Channel定位
注意:如無特別說明,文中的Channel都指的是Netty Channel(io.netty.channel)
一周時(shí)間的Channel家族學(xué)習(xí),一度讓我懷疑人生——研究這個(gè)方法有沒有用?學(xué)習(xí)Netty是不是有點(diǎn)兒下了高速走鄉(xiāng)間小路的意思?我為啥要讀源碼?
之所以產(chǎn)生這些疑問,除了我本身心理活動(dòng)豐富以外,主要病因在于沒搞清楚Channel在Netty體系中的定位。而沒能清晰理解Netty的定位,也默默的送出了一記助攻。
作些本質(zhì)思考:Netty是一個(gè)NIO框架,是一個(gè)嫁接在java NIO基礎(chǔ)上的框架。
宏觀上可以這么理解,見下圖:
先不急著聊Channel,回顧下IO演進(jìn)過程,重點(diǎn)關(guān)注IO框架的結(jié)構(gòu)變化。搞懂了這部分后,我們將明白Channel在IO世界中扮演的角色!
進(jìn)擊的IO BIO此圖展示的已經(jīng)算是優(yōu)化后的BIO了——用到了線程池。顯然,每一個(gè)client都需要server端付出一個(gè)Thread的代價(jià),即使你通過線程池做了優(yōu)化,由于受到線程個(gè)數(shù)的制約,激增的客戶端依舊表現(xiàn)的“欲求不滿”。
NIOAcceptor注冊(cè)Selector,監(jiān)聽accept事件
當(dāng)客戶端連接后,觸發(fā)accept事件
服務(wù)器構(gòu)建對(duì)應(yīng)的Channel,并在其上注冊(cè)Selector,監(jiān)聽讀寫事件
當(dāng)發(fā)生讀寫事件后,進(jìn)行相應(yīng)的讀寫處理
Reactor單線程與NIO模型相似,當(dāng)然也就有和NIO同樣的問題:selector/reactor單個(gè)線程處理多個(gè)channel的各種操作,如果其中一個(gè)channel的事件處理延緩了,將影響其它c(diǎn)hannel。
Reactor多線程將read/write這種io處理操作分隔出來,非io型操作(業(yè)務(wù)操作)配備以線程池,進(jìn)化成reactor多線程模型:
這樣的架構(gòu),系統(tǒng)瓶頸轉(zhuǎn)移至Reactor部分。而目前勞苦功高的Reactor作了兩件事:
1.接收客戶端鏈接請(qǐng)求
2.處理IO型讀寫操作
將接收client鏈接的功能再次拆分出來:
Netty恰恰就是主從Reactor模型的實(shí)踐者,想想服務(wù)端創(chuàng)建時(shí)的代碼:
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) ...
從nio時(shí)代的模型圖上開始出現(xiàn)channel(java channel),它的定位就是進(jìn)行諸如connect、write、read、close等底層交互。概括一下,java channel是上承selector下連socket的存在。而netty channel,則把java channel當(dāng)作了底層。
源碼分析 類結(jié)構(gòu)清楚了Channel的定位,接下來對(duì)其常用api進(jìn)行分析。
首先拍出類圖:
其實(shí)Channel內(nèi)部還有一套體系,Unsafe家族:
Unsafe是Channel的內(nèi)置類(接口),與java channel交互的重任最終會(huì)落到Unsafe身上。
write方法write只是將數(shù)據(jù)寫入到了ChannelOutboundBuffer中,并沒有真正的發(fā)送出去,到flush方法調(diào)用時(shí),才寫入到j(luò)ava channel中發(fā)送給對(duì)方。
下面列出AbstractChannel的write方法,值得關(guān)注的地方已打上中文注釋:
@Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); //作消息的包裝,轉(zhuǎn)換成ByteBuf等 size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } outboundBuffer.addMessage(msg, size, promise); //msg消息寫入ChannelOutboundBuffer }
上述代碼最后一行,msg寫入了ChannelOutboundBuffer的尾節(jié)點(diǎn)tailEntry,同時(shí)將unflushedEntry賦值暫存。代碼展開如下:
public void addMessage(Object msg, int size, ChannelPromise promise) { Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) { flushedEntry = null; tailEntry = entry; } else { Entry tail = tailEntry; tail.next = entry; tailEntry = entry; } if (unflushedEntry == null) { //注釋一、標(biāo)記成“未刷新”的數(shù)據(jù) unflushedEntry = entry; } incrementPendingOutboundBytes(entry.pendingSize, false); }ChannelOutboundBuffer類
這里對(duì)ChannelOutboundBuffer類進(jìn)行簡(jiǎn)單說明,按慣例先看類注釋。
/** * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending * outbound write requests. * * 省略... */
前文提到過,write方法將消息寫到ChannelOutboundBuffer,算是數(shù)據(jù)暫存;之后的flush再將消息刷到j(luò)ava channel乃至客戶端。
來張示意圖,方便理解:
圖中列出的三個(gè)屬性,在write->ChannelOutboundBuffer->flush的數(shù)據(jù)流轉(zhuǎn)過程中比較關(guān)鍵。Entry是啥?ChannelOutboundBuffer的靜態(tài)內(nèi)部類,典型的鏈表結(jié)構(gòu)數(shù)據(jù):
static final class Entry { Entry next; // 省略... }
write方法的最后部分(注釋一位置)調(diào)用outboundBuffer.addMessage(msg, size, promise),已將封裝msg的Entry賦值給tailEntry和unflushedEntry;而flush方法,通過調(diào)用outboundBuffer.addFlush()(下文,注釋二位置),將unflushedEntry間接賦值給了flushedEntry。
public void addFlush() { Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry; } do { flushed ++; if (!entry.promise.setUncancellable()) { // Was cancelled so make sure we free up memory and notify about the freed bytes int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); // All flushed so reset unflushedEntry unflushedEntry = null; } }flush方法
直接從AbstractChannel的flush方法開始(若以Channel的flush為開端會(huì)經(jīng)pipeline,將有很長(zhǎng)調(diào)用鏈,省略):
public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); //注釋二、標(biāo)記成“已刷新”數(shù)據(jù) flush0(); //數(shù)據(jù)處理 }
outboundBuffer.addFlush()方法已經(jīng)分析過了,跟蹤調(diào)用鏈flush0->doWrite,我們看下AbstractNioByteChannel的doWrite方法:
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = config().getWriteSpinCount(); //自旋計(jì)數(shù),限制循環(huán)次數(shù),默認(rèn)16 do { Object msg = in.current(); //flushedEntry的msg if (msg == null) { // Wrote all messages. clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; } writeSpinCount -= doWriteInternal(in, msg); } while (writeSpinCount > 0); incompleteWrite(writeSpinCount < 0); }
writeSpinCount是個(gè)自旋計(jì)數(shù),類似于自旋鎖的設(shè)定,防止當(dāng)前IO線程由于網(wǎng)絡(luò)等原因無盡執(zhí)行寫操作,而使得線程假死,造成資源浪費(fèi)。
觀察doWriteInternal方法,關(guān)鍵處依舊中文注釋伺候:
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (!buf.isReadable()) { //writerIndex - readerIndex >0 ? true: flase in.remove(); return 0; } final int localFlushedAmount = doWriteBytes(buf); //返回實(shí)際寫入到j(luò)ava channel的字節(jié)數(shù) if (localFlushedAmount > 0) { //寫入成功 in.progress(localFlushedAmount); /** * 1.已經(jīng)全部寫完,執(zhí)行in.remove() * 2.“寫半包”場(chǎng)景,直接返回1。 * 外層方法的自旋變量writeSpinCount遞減成15,輪詢?cè)俅螆?zhí)行本方法 */ if (!buf.isReadable()) { in.remove(); } return 1; } } else if (msg instanceof FileRegion) { //“文件型”消息處理邏輯省略.. } else { // Should not reach here. throw new Error(); } return WRITE_STATUS_SNDBUF_FULL; //發(fā)送緩沖區(qū)滿,值=Integer.MAX_VALUE }
回到doWrite方法,最后執(zhí)行了incompleteWrite(writeSpinCount < 0):
protected final void incompleteWrite(boolean setOpWrite) { // Did not write completely. if (setOpWrite) { setOpWrite(); } else { // Schedule flush again later so other tasks can be picked up in the meantime Runnable flushTask = this.flushTask; if (flushTask == null) { flushTask = this.flushTask = new Runnable() { @Override public void run() { flush(); } }; } eventLoop().execute(flushTask); } }
這里的設(shè)定挺有意思:
如果 setOpWrite = writeSpinCount < 0 = true,即 doWriteInternal方法返回值 = WRITE_STATUS_SNDBUF_FULL(發(fā)送緩沖區(qū)滿)時(shí),設(shè)置寫操作位:
protected final void setOpWrite() { final SelectionKey key = selectionKey(); // Check first if the key is still valid as it may be canceled as part of the deregistration // from the EventLoop // See https://github.com/netty/netty/issues/2104 if (!key.isValid()) { return; } final int interestOps = key.interestOps(); if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } }
其實(shí)就是設(shè)置SelectionKey的OP_WRITE操作位,在selector/reactor下次輪詢的時(shí)候,將再次執(zhí)行寫操作
如果 setOpWrite = writeSpinCount < 0 = false,即 doWriteInternal方法返回值 = 1,16次寫半包仍舊沒將消息發(fā)送出去,則通過定時(shí)器再次執(zhí)行flush:
public Channel flush() { pipeline.flush(); return this; }
結(jié)論:前者由于發(fā)送緩沖區(qū)滿,已無法寫入數(shù)據(jù),于是繼希望于selector的下次輪詢;后者則可能只是因?yàn)樽孕螖?shù)少,引起的數(shù)據(jù)發(fā)送不完全,直接將任務(wù)再次放入pipeline,而無需等待selector。
這無疑是種優(yōu)化,細(xì)節(jié)之處,功力盡顯!
高性能Server---Reactor模型
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/68810.html
摘要:之所以稱它為卡車,只因編程思想中有段比喻我們可以把它想象成一個(gè)煤礦,通道是一個(gè)包含煤層數(shù)據(jù)的礦藏,而緩沖器則是派送到礦藏中的卡車。那么升級(jí)版卡車,自然指的就是。結(jié)構(gòu)和功能之所以再次打造了升級(jí)版的緩沖器,顯然是不滿中的某些弊端。 卡車 卡車指的是java原生類ByteBuffer,這兄弟在NIO界大名鼎鼎,與Channel、Selector的鐵三角組合構(gòu)筑了NIO的核心。之所以稱它為卡車...
摘要:非阻塞模型這種也很好理解,由阻塞的死等系統(tǒng)響應(yīng)進(jìn)化成多次調(diào)用查看數(shù)據(jù)就緒狀態(tài)。復(fù)用模型,以及它的增強(qiáng)版就屬于該種模型。此時(shí)用戶進(jìn)程阻塞在事件上,數(shù)據(jù)就緒系統(tǒng)予以通知。信號(hào)驅(qū)動(dòng)模型應(yīng)用進(jìn)程建立信號(hào)處理程序時(shí),是非阻塞的。 引言 之前的兩篇文章 FastThreadLocal怎么Fast?、ScheduledThreadPoolExecutor源碼解讀 搞的我心力交瘁,且讀源碼過程中深感功...
摘要:答曰摸索直譯為服務(wù)加載器,最終目的是獲取的實(shí)現(xiàn)類。代碼走起首先,要有一個(gè)接口形狀接口介紹然后,要有該接口的實(shí)現(xiàn)類。期具體實(shí)現(xiàn)依靠的內(nèi)部類,感性趣的朋友可以自己看一下。總結(jié)重點(diǎn)在于可跨越包獲取,這一點(diǎn)筆者通過多模塊項(xiàng)目親測(cè)延時(shí)加載特性 前戲 netty源碼注釋有云: ... If a provider class has been installed in a jar file tha...
摘要:閱讀源碼時(shí),發(fā)現(xiàn)很多,理所當(dāng)然會(huì)想翻閱資料后,該技能,姿勢(shì)如下環(huán)境中的全部屬性全部屬性注意如果將本行代碼放在自定義屬性之后,會(huì)不會(huì)打出把自定義屬性也給獲取到可以結(jié)論會(huì)獲取目前環(huán)境中全部的屬性值,無論系統(tǒng)提供還是個(gè)人定義系統(tǒng)提供屬性代碼中定義 閱讀源碼時(shí),發(fā)現(xiàn)很多System.getProperty(xxx),理所當(dāng)然會(huì)想:whats fucking this? 翻閱資料后,Get該技能...
摘要:實(shí)現(xiàn)原理淺談幫助理解的示意圖中有一屬性,類型是的靜態(tài)內(nèi)部類。剛剛說過,是一個(gè)中的靜態(tài)內(nèi)部類,則是的內(nèi)部節(jié)點(diǎn)。這個(gè)會(huì)在線程中,作為其屬性初始是一個(gè)數(shù)組的索引,達(dá)成與類似的效果。的方法被調(diào)用時(shí),會(huì)根據(jù)記錄的槽位信息進(jìn)行大掃除。 概述 FastThreadLocal的類名本身就充滿了對(duì)ThreadLocal的挑釁,快男FastThreadLocal是怎么快的?源碼中類注釋坦白如下: /** ...
閱讀 3156·2021-10-08 10:04
閱讀 1098·2021-09-30 09:48
閱讀 3466·2021-09-22 10:53
閱讀 1684·2021-09-10 11:22
閱讀 1698·2021-09-06 15:00
閱讀 2156·2019-08-30 15:56
閱讀 719·2019-08-30 15:53
閱讀 2288·2019-08-30 13:04