成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

ChannelPipeline 和 ChannelHandler

William_Sang / 3074人閱讀

摘要:概念與概念一致用以連接設備文件等的紐帶例如將網絡的讀寫客戶端發(fā)起連接主動關閉連接鏈路關閉獲取通信雙方的網絡地址等的類型主要有兩種非阻塞以及阻塞數據傳輸類型有兩種按事件消息傳遞以及按字節(jié)傳遞適用方類型也有兩種服務器以及客戶端還有一些根據傳輸協

ChannelHandler Channel

Channel 概念與 java.nio.channel 概念一致, 用以連接IO設備 (socket, 文件等) 的紐帶. 例如將網絡的讀、寫, 客戶端發(fā)起連接, 主動關閉連接, 鏈路關閉, 獲取通信雙方的網絡地址等.

Channel 的 IO 類型主要有兩種: 非阻塞IO (NIO) 以及阻塞IO(OIO).

數據傳輸類型有兩種: 按事件消息傳遞 (Message) 以及按字節(jié)傳遞 (Byte).

適用方類型也有兩種: 服務器(ServerSocket) 以及客戶端(Socket). 還有一些根據傳輸協議而制定的的Channel, 如: UDT、SCTP等.

Netty 按照類型逐層設計相應的類. 最底層的為抽象類 AbstractChannel, 再以此根據IO類型、數據傳輸類型、適用方類型實現. 類圖可以一目了然, 如下圖所示:

Channel 狀態(tài)

channelRegistered 狀態(tài)

/**
 * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
 */
void channelRegistered(ChannelHandlerContext ctx) throws Exception;

從注釋里面可以看到是在 Channel 綁定到 Eventloop 上面的時候調用的.

不管是 Server 還是 Client, 綁定到 Eventloop 的時候, 最終都是調用 Abstract.initAndRegister() 這個方法上(Server是在 AbstractBootstrap.doBind() 的時候調用的, Client 是在 Bootstrap.doConnect() 的時候調用的).

initAndRegister() 方法定義如下:

final ChannelFuture initAndRegister() {
    final Channel channel = channelFactory().newChannel();
    try {
        init(channel);
    } catch (Throwable t) {
        channel.unsafe().closeForcibly();
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }
    // 把channel綁定到Eventloop對象上面去
    ChannelFuture regFuture = group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

繼續(xù)跟蹤下去會定位到 AbstractChannel.AbstractUnsafe.register0() 方法上.

        private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                // 做實際的綁定動作。把Channel感興趣的事件注冊到Eventloop.selector上面.具體實現在Abstract.doRegister()方法內
                doRegister();
                neverRegistered = false;
                registered = true;

                // 通過pipeline的傳播機制,觸發(fā)handlerAdded事件
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                
                // 通過pipeline的傳播機制,觸發(fā)channelRegistered事件
                pipeline.fireChannelRegistered();

                // 還沒有綁定,所以這里的 isActive() 返回false.
                if (isActive()) {
                    if (firstRegistration) {
                        // 如果當前鏈路已經激活,則調用channelActive()方法
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

從上面的代碼也可以看出, 在調用完 pipeline.fireChannelRegistered() 之后, 緊接著會調用 isActive() 判斷當前鏈路是否激活, 如果激活了則會調用 pipeline.fireChannelActive() 方法.

這個時候, 對于 Client 和 Server 都還沒有激活, 所以, 這個時候不管是 Server 還是 Client 都不會調用 pipeline.fireChanenlActive() 方法.

channelActive 狀態(tài)

從啟動器的 bind() 接口開始, 往下調用 doBind() 方法:

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 初始化及注冊
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        // 調用 doBind0
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        ....
    }
}

doBind 方法又會調用 doBind0() 方法, 在 doBind0() 方法中會通過 EventLoop 去執(zhí)行 channelbind()任務.

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                // 調用channel.bind接口
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

doBind0() 方法往下會調用到 pipeline.bind(localAddress, promise); 方法, 通過 pipeline 的傳播機制, 最終會調用到 AbstractChannel.AbstractUnsafe.bind() 方法, 這個方法主要做兩件事情:

調用 doBind(): 調用底層JDK API進行 Channel 的端口綁定.

調用 pipeline.fireChannelActive().

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    
    ....
    
    // wasActive 在綁定成功前為 false
    boolean wasActive = isActive();
    try {
        // 調用doBind()調用JDK底層API進行端口綁定
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
    // 完成綁定之后,isActive() 返回true
    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                // 觸發(fā)channelActive事件
                pipeline.fireChannelActive();
            }
        });
    }
    safeSetSuccess(promise);
}

也就是說當有新客戶端連接的時候, 會變成活動狀態(tài).

channelInactive 狀態(tài)

fireChannelnactive() 方法在兩個地方會被調用: Channel.close()Channel.disconnect().

在調用前會先確認狀態(tài)是從 Active--->Inactive.

channelUnregistered 狀態(tài)

fireChannelUnregistered() 方法是在 ChannelEventloop 中解除注冊的時候被調用的. Channel.close() 的時候被觸發(fā)執(zhí)行.

ChannelHandler 的生命周期

handlerAdded(): 添加到 ChannelPipeline 時調用.
handlerRemoved(): 從 ChannelPipeline 中移除時調用.
exceptionCaught(): 處理過程中在 ChannelPipeline 中有錯誤產生時調用.

處理 I/O 事件或截獲 I/O 操作, 并將其轉發(fā)到 ChannelPipeline 中的下一個處理程序. ChannelHandler 本身不提供許多方法, 但通常必須實現其子類型之一:

ChannelInboundHandler: 處理入站數據以及各種狀態(tài)變化.

ChannelOutboundHandler: 處理出站數據并且允許攔截所有的操作.

ChannelInboundHandler 接口

channelRegistered(): 當 Channel 已經注冊到它的 EventLoop 并且能夠處理 I/O 時被調用.
channelUnregistered(): 當 Channel 從他的 EventLoop 注銷并且無法處理任何 I/O 時被調用.
channelActive(): 當 Channel 處于活動狀態(tài)時被調用.
channelInactive(): 當 Channel 離開活動狀態(tài)并且不再連接遠程節(jié)點時被調用.
channelRead(): 當從 Channel 讀取數據時被調用.
channelReadComplete(): 當 Channel 上的一個讀操作完成時被調用. 當所有可讀字節(jié)都從 Channel 中讀取之后, 將會調用該回調方法.

ChannelOutboundHandler 接口

出站操作和數據將由 ChannelOutboundHandler 處理. 它的方法將被 Channel ChannelPipeline 以及 ChannelHandlerContext 調用.

ChannelOutboundHandler 的一個強大的功能是可以按需推遲操作或事件, 這使得可以通過一些復雜的方法來處理請求. 例如, 如果到遠程節(jié)點的寫入被暫停, 那么你可以推遲刷新操作并在稍后繼續(xù).

connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise): 當請求將 Channel 連接到遠程節(jié)點時被調用.
disconnect(ChannelHandlerContext ctx, ChannelPromise promise): 當請求將 Channel 從遠程節(jié)點斷開時被調用.
deregister(ChannelHandlerContext ctx, ChannelPromise promise): 當請求將 Channel 從它的 EventLoop 注銷時被調用.
read(ChannelHandlerContext ctx): 當請求從 Channel 讀取更多的數據時被調用.
write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise): 當請求通過 Channel 將數據寫到遠程節(jié)點時被調用.
flush(ChannelHandlerContext ctx): 當請求從 Channel 將入隊數據沖刷到遠程節(jié)點時被調用.

ChannelPromise 和 ChannelFuture

ChannelFuture 表示 Channel 中異步I/O操作的結果, 在 netty 中所有的 I/O 操作都是異步的, I/O 的調用會直接返回, 可以通過 ChannelFuture 來獲取 I/O 操作的結果或者狀態(tài)信息.

當 I/O 操作開始時, 將創(chuàng)建一個新對象. 新的對象是未完成的-它既沒有成功, 也沒有失敗, 也沒有被取消, 因為 I/O 操作還沒有完成.

如果 I/O 操作已成功完成(失敗或取消), 則對象將標記為已完成, 其中包含更具體的信息, 例如故障原因.

請注意, 即使失敗和取消屬于已完成狀態(tài).

ChannelPromiseChannelFuture 的一個子接口, 其定義了一些可寫的方法, 如 setSuccess()setFailure(), 從而使 ChannelFuture 不可變.

優(yōu)先使用addListener(GenericFutureListener),而非await()

當做了一個 I/O 操作并有任何后續(xù)任務的時候, 推薦優(yōu)先使用 addListener(GenericFutureListener) 的方式來獲得通知, 而非 await()

addListener(GenericFutureListener) 是非阻塞的. 它會把特定的 ChannelFutureListener 添加到 ChannelFuture 中, 然后 I/O 線程會在 I/O 操作相關的 future 完成的時候通知監(jiān)聽器.

ChannelFutureListener 會利于最佳的性能和資源的利用, 因為它一點阻塞都沒有. 而且不會造成死鎖.

ChannelHandler 適配器

ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter 這兩個適配器類分別提供了
ChannelInboundHandlerChannelOutboundHandler 的基本實現, 它們繼承了共同的父接口
ChannelHandler 的方法, 擴展了抽象類 ChannelHandlerAdapter.

ChannelHandlerAdapter 還提供了實用方法 isSharable().

如果其對應的實現被標注為 Sharable, 那么這個方法將返回 true, 表示它可以被添加到多個 ChannelPipeline 中.

ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter 中所提供的方法體調用了其相關聯的 ChannelHandlerContext 上的等效方法, 從而將事件轉發(fā)到了 ChannelPipeline 中的 ChannelHandler 中.

ChannelPipeline 接口

ChannelPipeline 將多個 ChannelHandler 鏈接在一起來讓事件在其中傳播處理 (通過擴展
ChannelInitializer). 一個 ChannelPipeline 中可能不僅有入站處理器, 還有出站處理器, 入站處理器只會處理入站的事件, 而出站處理器只會處理出站的數據.

每一個新創(chuàng)建的 Channel 都將會分配一個新的 ChannelPipeline, 不能附加另一個 ChannelPipeline, 也不能分離當前的.

通過調用 ChannelHandlerContext 實現, 它將被轉發(fā)給同一個超類型的下一個 ChannelHandler.

從事件途徑 ChannelPilpeline 的角度來看, ChannelPipeline 的頭部和尾端取決于該事件是入站的還是出站的.

而 Netty 總是將 ChannelPilpeline 的入站口 (左側) 作為頭部, 將出站口 (右側) 作為尾端.

當通過調用 ChannelPilpeline.add*() 方法將入站處理器和出站處理器混合添加到 ChannelPilpeline 之后, 每一個 ChannelHandler 從頭部到尾端的順序就是我們添加的順序.

ChannelPilpeline 傳播事件時, 它會測試 ChannelPilpeline 中的下一個 ChannelHandler 的類型是否和事件的運動方向相匹配. 如果不匹配, ChannelPilpeline 將跳過該 ChannelHandler 并前進到下一個, 直到它找到和該事件期望的方向相匹配的為止.

修改 ChannelPipeline

這里指修改 ChannelPipeline 中的 ChannelHandler 的編排.

通過調用 ChannelPipeline 上的相關方法, ChannelHandler 可以添加, 刪除或者替換其他的 ChannelHandler, 從而實時地修改 ChannelPipeline 的布局.

addFirst  // 將 ChannelHandler 插入第一個位置
addBefore // 在某個 ChannelHandler 之前添加一個
addAfter  // 在某個 ChannelHandler 之后添加一個
addLast   // 將 ChannelHandler 插入最后一個位置
remove    // 移除某個 ChannelHandler
replace   // 將某個 ChannelHandler 替換成指定 ChannelHandler
ChannelHandlerContext 接口

ChannelHandlerContext 代表了 ChanelHandlerChannelPipeline 之間的關聯, 每當有 ChanelHandler 添加到 ChannelPipeline 中, 都會創(chuàng)建 ChannelHandlerContext.

ChannelHandlerContext 的主要功能是管理它所關聯的 ChannelPipeline 和同一個 ChannelPipeline 中的其他 ChanelHandler 之間的交互.

ChannelHandlerContext 有很多的方法, 其中一些方法也存在于 ChannelChannelPipeline 上, 但是有一點重要的不同.

如果調用 ChannelChannelPipeline 上的這些方法將沿著 ChannelPipeline 進行傳播(從頭或尾開始).
而調用位于 ChannelHandlerContext 上的相同方法, 則將從當前所關聯的 ChannelHandler 開始, 并且只會傳播給位于該 ChannelPipeline 中的下一個能夠處理該事件的 ChannelHandler.

這樣做可以減少 ChannelHandler 的調用開銷.

使用 ChannelHandlerContext

上圖為 Channel ChannelPipeline ChannelHandler 以及 ChannelHandlerContext 之間的關系.

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://systransis.cn/yun/74384.html

相關文章

  • Netty-ChannelHandler-ChannelPipeline

    摘要:只有在詳盡的測試之后才應設置為這值使用的默認采樣率檢測并報告任何發(fā)現的泄漏。這是默認級別,適合絕大部分情況使用默認的采樣率,報告所發(fā)現的任何的泄漏以及對應的消息被訪問的位置類似于但是其將會對每次對消息的訪問都進行采樣。 ChannelHandler Channel生命周期 狀態(tài) 描述 ChannelUnregistered Channel已經被創(chuàng)建,但未注冊到EventLoo...

    warkiz 評論0 收藏0
  • Netty組件入門學習

    摘要:可以用來接收入站事件和數據,隨后使用應用程序的業(yè)務邏輯進行處理。因為用戶并不是關心所有的事件,因此提供了抽象類和。抽象類最常見的一個情況,你的應用程序會利用一個來接受解碼消息,并對該數據應用業(yè)務邏輯。 Channel、EventLoop和ChannelFuture Channel——Socket; EventLoop——控制流、多線程處理、并發(fā) ChannelFuture異步通知 ...

    qpal 評論0 收藏0
  • Netty 框架總結「ChannelHandler 及 EventLoop」

    摘要:隨著狀態(tài)發(fā)生變化,相應的產生。這些被轉發(fā)到中的來采取相應的操作。當收到數據或相關的狀態(tài)改變時,這些方法被調用,這些方法和的生命周期密切相關。主要由一系列組成的。采用的線程模型,在同一個線程的中處理所有發(fā)生的事。 「博客搬家」 原地址: 簡書 原發(fā)表時間: 2017-05-05 學習了一段時間的 Netty,將重點與學習心得總結如下,本文主要總結ChannelHandler 及 E...

    VioletJack 評論0 收藏0
  • Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (一)

    摘要:目錄源碼之下無秘密做最好的源碼分析教程源碼分析之番外篇的前生今世的前生今世之一簡介的前生今世之二小結的前生今世之三詳解的前生今世之四詳解源碼分析之零磨刀不誤砍柴工源碼分析環(huán)境搭建源碼分析之一揭開神秘的紅蓋頭源碼分析之一揭開神秘的紅蓋頭客戶端 目錄 源碼之下無秘密 ── 做最好的 Netty 源碼分析教程 Netty 源碼分析之 番外篇 Java NIO 的前生今世 Java NI...

    tunny 評論0 收藏0
  • Netty學習筆記(二)

    摘要:支持很多協議,并且提供用于數據處理的容器。我們已經知道由特定事件觸發(fā)??蓪S糜趲缀跛械膭幼?,包括將一個對象轉為字節(jié)或相反,執(zhí)行過程中拋出的異常處理。提供了一個容器給鏈并提供了一個用于管理沿著鏈入站和出站事件的流動。子類通過進行注冊。 前兩天寫了一點netty相關的知識,并寫了一個demo,但是對其原理還是沒有深入,今天我們來做一次研究吧 首先讓我們來認識一下netty的幾個核心人物吧...

    0x584a 評論0 收藏0

發(fā)表評論

0條評論

William_Sang

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<