成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

記一次有趣的 Netty 源碼問(wèn)題

harriszh / 898人閱讀

摘要:背景起因是一個(gè)朋友問(wèn)我的一個(gè)關(guān)于啟動(dòng)的問(wèn)題相關(guān)他的問(wèn)題我復(fù)述一下的綁定流程如下在中可能會(huì)調(diào)用即并且在中也會(huì)有的調(diào)用即那么有沒(méi)有可能造成了兩次的調(diào)用我的回答是不會(huì)為什么呢對(duì)于直接想知道答案的朋友可以直接閱讀到最后面的回答與總

背景

起因是一個(gè)朋友問(wèn)我的一個(gè)關(guān)于 ServerBootstrap 啟動(dòng)的問(wèn)題.
相關(guān) issue
他的問(wèn)題我復(fù)述一下:
ServerBootstrap 的綁定流程如下:

ServerBootstrap.bind ->
    AbstractBootstrap.bind ->
        AbstractBootstrap.doBind ->
            AbstractBootstrap.initAndRegister ->
                AbstractChannel#AbstractUnsafe.register ->
                    eventLoop.execute( () -> AbstractUnsafe.register0)
            doBind0() ->
                channel.eventLoop().execute( () -> channel.bind) ->
                    AbstractUnsafe.bind

AbstractUnsafe.register0 中可能會(huì)調(diào)用 pipeline.fireChannelActive(), 即:

private void register0(ChannelPromise promise) {
    try {
        ...
        boolean firstRegistration = neverRegistered;
        doRegister();
        ...
        if (firstRegistration && isActive()) {
            pipeline.fireChannelActive();
        }
    } catch (Throwable t) {
        ...
    }
}

并且在 AbstractUnsafe.bind 中也會(huì)有 pipeline.fireChannelActive() 的調(diào)用, 即:

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    ...
    boolean wasActive = isActive();
    try {
        doBind(localAddress);
    } catch (Throwable t) {
        ...
    }

    if (!wasActive && isActive()) {
        invokeLater(new OneTimeTask() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }
    ...
}

那么有沒(méi)有可能造成了兩次的 pipeline.fireChannelActive() 調(diào)用?

我的回答是不會(huì). 為什么呢? 對(duì)于直接想知道答案的朋友可以直接閱讀到最后面的 回答總結(jié) 兩節(jié)..

下面我們就來(lái)根據(jù)代碼詳細(xì)分析一下.

分析

首先, 根據(jù)我們上面所列出的調(diào)用流程, 會(huì)有 AbstractBootstrap.doBind 的調(diào)用, 它的代碼如下:

private ChannelFuture doBind(final SocketAddress localAddress) {
        // 步驟1
        final ChannelFuture regFuture = initAndRegister();
        ...
        // 步驟2
        if (regFuture.isDone()) {
            ...
            doBind0(regFuture, channel, localAddress, promise);
            ...
        } else {
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    ...
                    doBind0(regFuture, channel, localAddress, promise);
                }
            });
        }
}

首先在 doBind 中, 執(zhí)行步驟1, 即調(diào)用 initAndRegister 方法, 這個(gè)方法會(huì)最終調(diào)用到AbstractChannel#AbstractUnsafe.register. 而在 AbstractChannel#AbstractUnsafe.register 中, 會(huì)通過(guò) eventLoop.execute 的形式將 AbstractUnsafe.register0 的調(diào)用提交到任務(wù)隊(duì)列中(即提交到 eventLoop 線程中, 而當(dāng)前代碼所在的線程是 main 線程), 即:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 當(dāng)前線程是主線程, 因此這個(gè)判斷是 false
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new OneTimeTask() {
                @Override
                public void run() {
                    // register0 在 eventLoop 線程中執(zhí)行.
                    register0(promise);
                }
            });
        } catch (Throwable t) {
           ...
        }
    }
}

接著 AbstractBootstrap.initAndRegister 返回, 回到 AbstractBootstrap.doBind 中, 于是執(zhí)行到步驟2. 注意, 因?yàn)?AbstractUnsafe.register0 是在 eventLoop 中執(zhí)行的, 因此有可能主線程執(zhí)行到步驟2 時(shí), AbstractUnsafe.register0 已經(jīng)執(zhí)行完畢了, 此時(shí)必然有 regFuture.isDone() == true; 但也有可能 AbstractUnsafe.register0 沒(méi)有來(lái)得及執(zhí)行, 因此此時(shí) regFuture.isDone() == false. 所以上面的步驟2 考慮到了這兩種情況, 因此分別針對(duì)這兩種情況做了區(qū)分, 即:

// 步驟2
if (regFuture.isDone()) {
    ...
    doBind0(regFuture, channel, localAddress, promise);
    ...
} else {
    regFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            ...
            doBind0(regFuture, channel, localAddress, promise);
        }
    });
}

一般情況下, regFuture.isDone() 為 false, 因?yàn)榻壎ú僮魇潜容^費(fèi)時(shí)的, 因此很大幾率會(huì)執(zhí)行到 else 分支, 并且 if 分支和 else 分支從結(jié)果上說(shuō)沒(méi)有不同, 而且 if 分支邏輯還更簡(jiǎn)單一些, 因此我們以 else 分支來(lái)分析吧. 在 else 分支中, 會(huì)為 regFuture 設(shè)置一個(gè)回調(diào)監(jiān)聽(tīng)器. regFuture 是一個(gè) ChannelFuture, 而 ChannelFuture 代表了一個(gè) Channel 的異步 IO 的操作結(jié)果, 因此這里 regFuture 代表了 Channel 注冊(cè)(register) 的這個(gè)異步 IO 的操作結(jié)果.
Netty 這里之所以要為 regFuture 設(shè)置一個(gè)回調(diào)監(jiān)聽(tīng)器, 是為了保證 register 和 bind 的時(shí)序上的正確性: Channel 的注冊(cè)必須要發(fā)生在 Channel 的綁定之前.
(關(guān)于時(shí)序的正確性的問(wèn)題, 我們?cè)诤竺嬗凶C明)

接下來(lái)我們來(lái)看一下 AbstractUnsafe.register0 方法:

private void register0(ChannelPromise promise) {
    try {
        ....
        // neverRegistered 一開(kāi)始是 true, 因此 firstRegistration == true
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;
        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        // firstRegistration == true, 而 isActive() == false, 
        // 因此不會(huì)執(zhí)行到 pipeline.fireChannelActive()
        if (firstRegistration && isActive()) {
            pipeline.fireChannelActive();
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

注意, 我需要再?gòu)?qiáng)調(diào)一下, 這里 AbstractUnsafe.register0 是在 eventLoop 中執(zhí)行的.
AbstractUnsafe.register0 中會(huì)調(diào)用 doRegister() 注冊(cè) NioServerSocketChannel, 然后調(diào)用 safeSetSuccess() 設(shè)置 promise 的狀態(tài)為成功. 而這個(gè) promise 變量是什么呢? 我將 AbstractBootstrap.doBind 的調(diào)用鏈寫(xiě)詳細(xì)一些:

AbstractBootstrap.doBind ->
    AbstractBootstrap.initAndRegister ->
        MultithreadEventLoopGroup.register ->
            SingleThreadEventLoop.register -> 
                AbstractChannel#AbstractUnsafe.register ->
                    eventLoop.execute( () -> AbstractUnsafe.register0)

在 SingleThreadEventLoop.register 中會(huì)實(shí)例化一個(gè) DefaultChannelPromise, 即:

@Override
public ChannelFuture register(Channel channel) {
    return register(channel, new DefaultChannelPromise(channel, this));
}

接著調(diào)用重載的 SingleThreadEventLoop.register 方法:

@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    if (promise == null) {
        throw new NullPointerException("promise");
    }

    channel.unsafe().register(this, promise);
    return promise;
}

我們看到, 實(shí)例化的 DefaultChannelPromise 最終會(huì)以方法返回值的方式返回到調(diào)用方, 即返回到 AbstractBootstrap.doBind 中:

final ChannelFuture regFuture = initAndRegister();

因此我們這里有一個(gè)共識(shí): regFuture 是一個(gè)在 SingleThreadEventLoop.register 中實(shí)例化的 DefaultChannelPromise 對(duì)象.

再回到 SingleThreadEventLoop.register 中, 在這里會(huì)調(diào)用 channel.unsafe().register(this, promise), 將 promise 對(duì)象傳遞到 AbstractChannel#AbstractUnsafe.register 中, 因此在 AbstractUnsafe.register0 中的 promise 就是 AbstractBootstrap.doBind 中的 regFuture.
promise == regFuture 很關(guān)鍵.

既然我們已經(jīng)確定了 promise 的身份, 那么調(diào)用的 safeSetSuccess(promise); 我們也知道是干嘛的了. safeSetSuccess 方法設(shè)置一個(gè) Promise 的狀態(tài)為成功態(tài), 而 Promise 的 成功態(tài) 是最終狀態(tài), 即此時(shí) promise.isDone() == true. 那么 設(shè)置 promise 為成功態(tài)后, 會(huì)發(fā)生什么呢?
還記得不 promise == regFuture, 而我們?cè)?AbstractBootstrap.doBind 的 else 分支中設(shè)置了一個(gè)回調(diào)監(jiān)聽(tīng)器:

final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        Throwable cause = future.cause();
        if (cause != null) {
            // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
            // IllegalStateException once we try to access the EventLoop of the Channel.
            promise.setFailure(cause);
        } else {
            // Registration was successful, so set the correct executor to use.
            // See https://github.com/netty/netty/issues/2586
            promise.executor = channel.eventLoop();
        }
        doBind0(regFuture, channel, localAddress, promise);
    }
});

因此當(dāng) safeSetSuccess(promise); 調(diào)用時(shí), 根據(jù) Netty 的 Promise/Future 機(jī)制, 會(huì)觸發(fā)上面的 operationComplete 回調(diào), 在回調(diào)中調(diào)用 doBind0 方法:

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

注意到, 有一個(gè)關(guān)鍵的地方, 代碼中將 **channel.bind** 的調(diào)用放到了 eventLoop 中執(zhí)行. doBind0 返回后, 代碼繼續(xù)執(zhí)行 AbstractUnsafe.register0 方法的剩余部分代碼, 即:

private void register0(ChannelPromise promise) {
    try {
        ....
        safeSetSuccess(promise);
        // safeSetSuccess 返回后, 繼續(xù)執(zhí)行如下代碼
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        // firstRegistration == true, 而 isActive() == false, 
        // 因此不會(huì)執(zhí)行到 pipeline.fireChannelActive()
        if (firstRegistration && isActive()) {
            pipeline.fireChannelActive();
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

當(dāng) AbstractUnsafe.register0 方法執(zhí)行完畢后, 才執(zhí)行到 channel.bind 方法.

channel.bind 方法最終會(huì)調(diào)用到 AbstractChannel#AbstractUnsafe.bind 方法, 源碼如下:

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    boolean wasActive = isActive();
    logger.info("---wasActive: {}---", wasActive);

    try {
        // 調(diào)用 NioServerSocketChannel.bind 方法, 
        // 將底層的 Java NIO SocketChannel 綁定到指定的端口.
        // 當(dāng) SocketChannel 綁定到端口后, isActive() 才為真.
        doBind(localAddress);
    } catch (Throwable t) {
        ...
    }

    boolean activeNow = isActive();
    logger.info("---activeNow: {}---", activeNow);

    // 這里 wasActive == false
    // isActive() == true
    if (!wasActive && isActive()) {
        invokeLater(new OneTimeTask() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}

上面的代碼中, 調(diào)用了 doBind(localAddress) 將底層的 Java NIO SocketChannel 綁定到指定的端口. 并且當(dāng) SocketChannel 綁定到端口后, isActive() 才為真.
因此我們知道, 如果 SocketChannel 第一次綁定時(shí), 在調(diào)用 doBind 前, wasActive == false == isActive(), 而當(dāng)調(diào)用了 doBind 后, isActive() == true, 因此第一次綁定端口時(shí), if 判斷成立, 會(huì)調(diào)用 pipeline.fireChannelActive().

關(guān)于 Channel 注冊(cè)與綁定的時(shí)序問(wèn)題

我們?cè)谇暗姆治鲋? 直接認(rèn)定了 Channel 注冊(cè)Channel 的綁定 之前完成, 那么依據(jù)是什么呢?
其實(shí)所有的關(guān)鍵在于 EventLoop 的任務(wù)隊(duì)列機(jī)制.
不要閑我啰嗦哦. 我們需要繼續(xù)回到 AbstractUnsafe.register0 的調(diào)用中(再次強(qiáng)調(diào)一下, 在 eventLoop 線程中執(zhí)行AbstractUnsafe.register0), 這個(gè)方法我們已經(jīng)分析了, 它會(huì)調(diào)用 safeSetSuccess(promise), 并由 Netty 的 Promise/Future 機(jī)制, 導(dǎo)致了AbstractBootstrap.doBind 中的 regFuture 所設(shè)置的回調(diào)監(jiān)聽(tīng)器的 operationComplete 方法調(diào)用, 而 operationComplete 中調(diào)用了 AbstractBootstrap.doBind0:

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

doBind0 中, 根據(jù) EventLoop 的任務(wù)隊(duì)列機(jī)制, 會(huì)使用 eventLoop().execute 將 channel.bind 封裝為一個(gè) Task, 放到 eventLoop 的 taskQueue 中.
如下用一幅圖表示上面的過(guò)程:

原圖在此

而當(dāng) channel.bind 被調(diào)度時(shí), AbstractUnsafe.register0 早就已經(jīng)調(diào)用結(jié)束了.

因此由于 EventLoop 的任務(wù)隊(duì)列機(jī)制, 我們知道, 在執(zhí)行 AbstractUnsafe.register0 時(shí), 是在 EventLoop 線程中的, 而 channel.bind 的調(diào)用是以 task 的形式添加到 taskQueue 隊(duì)列的末尾, 因此必然是有 EventLoop 線程先執(zhí)行完 AbstractUnsafe.register0 方法后, 才有機(jī)會(huì)從 taskQueue 中取出一個(gè) task 來(lái)執(zhí)行, 因此這個(gè)機(jī)制從根本上保證了 Channel 注冊(cè)發(fā)生在綁定 之前.

回答

你的疑惑是, AbstractChannel#AbstractUnsafe.register0 中, 可能會(huì)調(diào)用 pipeline.fireChannelActive(), 即:

private void register0(ChannelPromise promise) {
    try {
        ...
        boolean firstRegistration = neverRegistered;
        doRegister();
        ...
        if (firstRegistration && isActive()) {
            pipeline.fireChannelActive();
        }
    } catch (Throwable t) {
        ...
    }
}

并且在 AbstractChannel#AbstractUnsafe.bind 中也可能會(huì)調(diào)用到pipeline.fireChannelActive(), 即:

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    ...
    boolean wasActive = isActive();
    try {
        doBind(localAddress);
    } catch (Throwable t) {
        ...
    }

    if (!wasActive && isActive()) {
        invokeLater(new OneTimeTask() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }
    ...
}

我覺(jué)得是 不會(huì). 因?yàn)楦鶕?jù)上面我們分析的結(jié)果可知, Netty 的 Promise/Future 與 EventLoop 的任務(wù)隊(duì)列機(jī)制保證了 NioServerSocketChannel 的注冊(cè)和 NioServerSocketChannel 的綁定的時(shí)序: Channel 的注冊(cè)必須要發(fā)生在 Channel 的綁定之前, 而當(dāng)一個(gè) NioServerSocketChannel 沒(méi)有綁定到具體的端口前, 它是不活躍的(Inactive), 因此在 register0 中, if (firstRegistration && isActive()) 就不成立, 進(jìn)而就不會(huì)執(zhí)行到 pipeline.fireChannelActive() 了.
而執(zhí)行完注冊(cè)操作后, 在 AbstractChannel#AbstractUnsafe.bind 才會(huì)調(diào)用pipeline.fireChannelActive(), 因此最終只有一次 fireChannelActive 調(diào)用.

總結(jié)

有兩點(diǎn)需要注意的:

isActive() == true 成立的關(guān)鍵是此 NioServerSocketChannel 已經(jīng)綁定到端口上了.

由 Promise/Future 與 EventLoop 機(jī)制, 導(dǎo)致了 Channel 的注冊(cè) 發(fā)生在 Channel 的綁定 之前, 因此在 AbstractChannel#AbstractUnsafe.register0 中的 isActive() == false, if 判斷不成立, 最終就是 register0 中的 pipeline.fireChannelActive() 不會(huì)被調(diào)用.

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/65271.html

相關(guān)文章

  • 一次XX前端面試

    摘要:面試官說(shuō)那我問(wèn)你一個(gè)哲學(xué)的問(wèn)題,為什么有數(shù)據(jù)結(jié)構(gòu)這種東西哇,這是啥,巴拉巴拉扯了一通,大致就是物以類聚,人以群分,先人積累下來(lái)的經(jīng)驗(yàn),這些讓我們更方便處理數(shù)據(jù)啥的。 前因,沒(méi)有比摸魚(yú)有趣的事了 距離自己被外派(俗稱外包)出去,已經(jīng)過(guò)了快五個(gè)月,工作的話,很閑。人啊,一定保持好的習(xí)慣,懶惰是會(huì)上癮,日常摸魚(yú),懷疑人生,我是誰(shuí),我在哪,我要干什么。 中午吃飯的時(shí)候,收到了boss直聘的一條...

    Shisui 評(píng)論0 收藏0
  • 如果連鐵將軍都不再可靠--一次排查使用分布式輪候鎖+SESSION防訂單重復(fù)仍然加鎖失效問(wèn)題經(jīng)歷

    摘要:盡可能地將數(shù)據(jù)寫(xiě)入,例如創(chuàng)建設(shè)置的都會(huì)將數(shù)據(jù)立即的寫(xiě)入再來(lái)看看文檔怎么描述的看看這可愛(ài)的默認(rèn)值我們終于知道了當(dāng)我們不做任何設(shè)置時(shí),默認(rèn)采用的是方式顯而易見(jiàn),使用方式能最大限度的減少與的交互,而在大多數(shù)場(chǎng)景下都是沒(méi)有問(wèn)題的。 0.問(wèn)題背景 此次問(wèn)題源于一次挺嚴(yán)重的生產(chǎn)事故:客戶的訂單被重復(fù)生成了,而出問(wèn)題的代碼其實(shí)很簡(jiǎn)單: // .... redisLockUtil.lock(membe...

    econi 評(píng)論0 收藏0
  • 一次開(kāi)源學(xué)習(xí)--D2Admin 人人企業(yè)版

    摘要:前言上個(gè)月月底開(kāi)源組開(kāi)源了使用適配人人企業(yè)版專業(yè)版的前端工程具體詳情見(jiàn)人人企業(yè)版適配發(fā)布。當(dāng)然,也督促自己產(chǎn)出一篇相關(guān)的文章,來(lái)記錄這次有趣的學(xué)習(xí)之旅。 Created by huqi at 2019-5-5 13:01:14 Updated by huqi at 2019-5-20 15:57:37 前言 上個(gè)月月底@D2開(kāi)源組 開(kāi)源了使用 D2Admin 適配 人人企業(yè)版(專業(yè)版) 的...

    notebin 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<