摘要:背景最近發(fā)現(xiàn)的回調(diào)方法,在連接創(chuàng)建成功和讀取數(shù)據(jù)后都會(huì)被回調(diào)。那我也嘗試著從源碼找到答案吧?;卣{(diào)流程分析的回調(diào)流程和流程沒有什么區(qū)別,可參考上文分析。但是在的方法中會(huì)調(diào)用這個(gè)是讀數(shù)據(jù)的關(guān)鍵讀數(shù)據(jù)分析讀數(shù)據(jù)分析
背景
最近發(fā)現(xiàn)ChannelOutboundHandlerAdapter的read()回調(diào)方法,在連接創(chuàng)建成功和讀取數(shù)據(jù)后都會(huì)被回調(diào)。因此就產(chǎn)生了疑問“為什么建立連接和讀取數(shù)據(jù)后read()方法會(huì)被調(diào)用呢?” 從網(wǎng)上搜索到一片文章https://my.oschina.net/lifany... 可以看出一些端倪,但是具體流程和一些疑問還是沒有解開。
那我也嘗試著從源碼找到答案吧。
我們先寫個(gè)小Demo,其中Test1OutboundHandlerAdapter是一個(gè)ChannelOutboundHandlerAdapter,里面的read()添加一行打印。 Test1HandlerAdapter 是一個(gè)ChannelInboundHandlerAdapter 里面的channelActive(xxx)、
channelRead(xxx)、channelReadComplete(xxx)添加打印。由于很簡單,下面只貼部分代碼
Test1OutboundHandlerAdapter.java
@Override public void read(ChannelHandlerContext ctx) throws Exception { super.read(ctx); System.out.println("Test1OutboundHandlerAdapter------------->read"); }
Test1HandlerAdapter.java
@Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); System.out.println("Test1HandlerAdapter-------------->channelRegistered"); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); System.out.println("Test1HandlerAdapter-------------->channelActive"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Test1HandlerAdapter-------------->channelRead"); ctx.writeAndFlush(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { super.channelReadComplete(ctx); }
然后我們建立連接,隨便發(fā)一下數(shù)據(jù),服務(wù)器收到數(shù)據(jù),打印如下:
Test1HandlerAdapter-------------->handlerAdded
Test1HandlerAdapter-------------->channelRegistered
Test1HandlerAdapter-------------->channelActive
Test1OutboundHandlerAdapter------------->read
Test1HandlerAdapter-------------->channelRead
Test1HandlerAdapter-------------->channelReadComplete
Test1OutboundHandlerAdapter------------->read
如果把Test1OutboundHandlerAdapter的read(xxx)回調(diào)方法注釋掉,會(huì)發(fā)現(xiàn)服務(wù)器無法接收數(shù)據(jù)了。
源碼分析 1.channelRegistered回調(diào)流程分析可以定位到在AbstractChannelHandlerContext invokeChannelRegistered()方法調(diào)用了channelRegistered(xxx)方法,然后再查找會(huì)發(fā)現(xiàn)是
AbstractChannelHandlerContext的fireChannelRegistered()----->
invokeChannelRegistered(final AbstractChannelHandlerContext next)----->invokeChannelRegistered()
AbstractChannelHandlerContext
@Override public ChannelHandlerContext fireChannelRegistered() { invokeChannelRegistered(findContextInbound()); return this; } static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRegistered(); } }); } } private void invokeChannelRegistered() { if (invokeHandler()) { try { /**ChannelInboundHandler的register(xxx)在這里被調(diào)用*/ ((ChannelInboundHandler) handler()).channelRegistered(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRegistered(); } }
順藤fireChannelRegistered()摸瓜,最終定位到AbstractChannel內(nèi)部類AbstractUnsafe的
register(EventLoop eventLoop, final ChannelPromise promise)----->register0(ChannelPromise promise)
AbstractUnsafe
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
在繼續(xù)找的會(huì)找到在EventLooop層面的調(diào)用了,我們可以先不用管了。在register0的方法中又調(diào)用了pipeline.fireChannelRegistered()和pipeline.fireChannelActive();,這正是我們要找的,也符合打印順序先channelRegistered后channelActive。為了驗(yàn)證我們可以加上斷點(diǎn)調(diào)試,就是這兒了。
至此我們可以總結(jié)一下:
channelRegistered流程:
說明
DefaultChannelPipeline 的fireChannelRegistered()
@Override public final ChannelPipeline fireChannelUnregistered() { AbstractChannelHandlerContext.invokeChannelUnregistered(head); return this; }
AbstractChannelHandlerContext.invokeChannelUnregistered(head);傳遞的參數(shù)是DefaultChannelPipeline的head,這樣保證了register事件沿著pipeline從頭流向尾,其對(duì)應(yīng)DefaultChannelPipeline內(nèi)部類HeadContext。 HeadContext多重身份即是ChannelHandlerContext又是ChannelInboundHandler和ChannelOutboundhandler
DefaultChannelPipeline
final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; ...省略... protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
DefaultChannelPipeline的內(nèi)部類HeadContext
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } @Override public ChannelHandler handler() { return this; } 省略后邊的代碼
2.上圖黃色的部分都是調(diào)用的HeadContext中的方法
static void invokeChannelRegistered(final AbstractChannelHandlerContext next)接收的參數(shù)是DefaultChannelPipeline傳遞的head即HeadContext,那么也就是head.invokeChannelRegistered()。
invokeChannelRegistered()方法中會(huì)調(diào)用
((ChannelInboundHandler) handler()).channelRegistered(this);
HeadContext類中該方法返回的就是自己(可查看上面的代碼),因?yàn)镠eadContext本身也是ChannelInboundHandler。 同時(shí)又將自己作為參數(shù),調(diào)用自己的channelRegistered方法
3.HeadContext的ChannelRegister方法中調(diào)用AbstractChannelHandlerContext的fireChannelRegistered();
(還是調(diào)用的自己)該方法中調(diào)用了invokeChannelRegistered(findContextInbound()); findContextInbound()所實(shí)現(xiàn)的功能就是查找到下一個(gè)ChanelInboundHandler即HeadContext(本身是ChannelInboundHandler)下一個(gè)ChanelInboundHandler
上面的步驟不斷重復(fù),自此registered事件可以沿著pipeline在不同的InboundHandler里流動(dòng)了。
channelActive的回調(diào)流程和channelRegister流程沒有什么區(qū)別,可參考上文分析。 但是在HeadContext的channelActive方法中會(huì)調(diào)用readIfIsAutoRead(); 這個(gè)是讀數(shù)據(jù)的關(guān)鍵
3.netty讀數(shù)據(jù)分析讀數(shù)據(jù)分析
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/69148.html
摘要:用這種方式很好的規(guī)避了多線程所帶來的問題,很值得我們借鑒那么這個(gè)怎么來的呢看一下的方法如果為,就返回。 Netty channelRegisteredChannelActive---源碼分析經(jīng)過下面的分析我們可以了解netty讀數(shù)據(jù)的一個(gè)過程,以及為什么channelActive方法、channelReadComplete方法會(huì)回調(diào)ChannelOutboundHandler的read...
摘要:背景在工作中雖然我經(jīng)常使用到庫但是很多時(shí)候?qū)Φ囊恍└拍钸€是處于知其然不知其所以然的狀態(tài)因此就萌生了學(xué)習(xí)源碼的想法剛開始看源碼的時(shí)候自然是比較痛苦的主要原因有兩個(gè)第一網(wǎng)上沒有找到讓我滿意的詳盡的源碼分析的教程第二我也是第一次系統(tǒng)地學(xué)習(xí)這么大代 背景 在工作中, 雖然我經(jīng)常使用到 Netty 庫, 但是很多時(shí)候?qū)?Netty 的一些概念還是處于知其然, 不知其所以然的狀態(tài), 因此就萌生了學(xué)...
摘要:目錄此文章屬于源碼之下無秘密做最好的源碼分析教程系列文章之一代碼下載首先到的倉庫中點(diǎn)擊右邊綠色的按鈕拷貝地址然后在終端中輸入如下命令克隆工程工程源碼較大加上國內(nèi)網(wǎng)絡(luò)問題下載源碼可能會(huì)比較耗時(shí)當(dāng)有如下輸出時(shí)表示克隆成功了如果有朋友實(shí)在下載太 目錄 此文章屬于 源碼之下無秘密 ── 做最好的 Netty 源碼分析教程 系列文章之一. 代碼下載 首先到 Netty 的 Github 倉庫 中...
摘要:目錄源碼分析之番外篇的前生今世的前生今世之一簡介的前生今世之二小結(jié)的前生今世之三詳解的前生今世之四詳解源碼分析之零磨刀不誤砍柴工源碼分析環(huán)境搭建源碼分析之一揭開神秘的紅蓋頭源碼分析之一揭開神秘的紅蓋頭客戶端源碼分析之一揭開神秘的紅蓋頭服務(wù)器 目錄 Netty 源碼分析之 番外篇 Java NIO 的前生今世 Java NIO 的前生今世 之一 簡介 Java NIO 的前生今世 ...
摘要:目錄源碼之下無秘密做最好的源碼分析教程源碼分析之番外篇的前生今世的前生今世之一簡介的前生今世之二小結(jié)的前生今世之三詳解的前生今世之四詳解源碼分析之零磨刀不誤砍柴工源碼分析環(huán)境搭建源碼分析之一揭開神秘的紅蓋頭源碼分析之一揭開神秘的紅蓋頭客戶端 目錄 源碼之下無秘密 ── 做最好的 Netty 源碼分析教程 Netty 源碼分析之 番外篇 Java NIO 的前生今世 Java NI...
閱讀 1918·2021-11-25 09:43
閱讀 1423·2021-11-22 14:56
閱讀 3288·2021-11-22 09:34
閱讀 2027·2021-11-15 11:37
閱讀 2282·2021-09-01 10:46
閱讀 1409·2019-08-30 15:44
閱讀 2305·2019-08-30 13:15
閱讀 2404·2019-08-29 13:07