成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

淺析 Netty 實(shí)現(xiàn)心跳機(jī)制與斷線重連

waterc / 3458人閱讀

摘要:基礎(chǔ)何為心跳顧名思義所謂心跳即在長(zhǎng)連接中客戶端和服務(wù)器之間定期發(fā)送的一種特殊的數(shù)據(jù)包通知對(duì)方自己還在線以確保連接的有效性為什么需要心跳因?yàn)榫W(wǎng)絡(luò)的不可靠性有可能在保持長(zhǎng)連接的過(guò)程中由于某些突發(fā)情況例如網(wǎng)線被拔出突然掉電等會(huì)造成服務(wù)器和客戶端的

基礎(chǔ) 何為心跳

顧名思義, 所謂 心跳, 即在 TCP 長(zhǎng)連接中, 客戶端和服務(wù)器之間定期發(fā)送的一種特殊的數(shù)據(jù)包, 通知對(duì)方自己還在線, 以確保 TCP 連接的有效性.

為什么需要心跳

因?yàn)榫W(wǎng)絡(luò)的不可靠性, 有可能在 TCP 保持長(zhǎng)連接的過(guò)程中, 由于某些突發(fā)情況, 例如網(wǎng)線被拔出, 突然掉電等, 會(huì)造成服務(wù)器和客戶端的連接中斷. 在這些突發(fā)情況下, 如果恰好服務(wù)器和客戶端之間沒(méi)有交互的話, 那么它們是不能在短時(shí)間內(nèi)發(fā)現(xiàn)對(duì)方已經(jīng)掉線的. 為了解決這個(gè)問(wèn)題, 我們就需要引入 心跳 機(jī)制. 心跳機(jī)制的工作原理是: 在服務(wù)器和客戶端之間一定時(shí)間內(nèi)沒(méi)有數(shù)據(jù)交互時(shí), 即處于 idle 狀態(tài)時(shí), 客戶端或服務(wù)器會(huì)發(fā)送一個(gè)特殊的數(shù)據(jù)包給對(duì)方, 當(dāng)接收方收到這個(gè)數(shù)據(jù)報(bào)文后, 也立即發(fā)送一個(gè)特殊的數(shù)據(jù)報(bào)文, 回應(yīng)發(fā)送方, 此即一個(gè) PING-PONG 交互. 自然地, 當(dāng)某一端收到心跳消息后, 就知道了對(duì)方仍然在線, 這就確保 TCP 連接的有效性.

如何實(shí)現(xiàn)心跳

我們可以通過(guò)兩種方式實(shí)現(xiàn)心跳機(jī)制:

使用 TCP 協(xié)議層面的 keepalive 機(jī)制.

在應(yīng)用層上實(shí)現(xiàn)自定義的心跳機(jī)制.

雖然在 TCP 協(xié)議層面上, 提供了 keepalive ?;顧C(jī)制, 但是使用它有幾個(gè)缺點(diǎn):

它不是 TCP 的標(biāo)準(zhǔn)協(xié)議, 并且是默認(rèn)關(guān)閉的.

TCP keepalive 機(jī)制依賴于操作系統(tǒng)的實(shí)現(xiàn), 默認(rèn)的 keepalive 心跳時(shí)間是 兩個(gè)小時(shí), 并且對(duì) keepalive 的修改需要系統(tǒng)調(diào)用(或者修改系統(tǒng)配置), 靈活性不夠.

TCP keepalive 與 TCP 協(xié)議綁定, 因此如果需要更換為 UDP 協(xié)議時(shí), keepalive 機(jī)制就失效了.

雖然使用 TCP 層面的 keepalive 機(jī)制比自定義的應(yīng)用層心跳機(jī)制節(jié)省流量, 但是基于上面的幾點(diǎn)缺點(diǎn), 一般的實(shí)踐中, 人們大多數(shù)都是選擇在應(yīng)用層上實(shí)現(xiàn)自定義的心跳.
既然如此, 那么我們就來(lái)大致看看在在 Netty 中是怎么實(shí)現(xiàn)心跳的吧. 在 Netty 中, 實(shí)現(xiàn)心跳機(jī)制的關(guān)鍵是 IdleStateHandler, 它可以對(duì)一個(gè) Channel 的 讀/寫(xiě)設(shè)置定時(shí)器, 當(dāng) Channel 在一定事件間隔內(nèi)沒(méi)有數(shù)據(jù)交互時(shí)(即處于 idle 狀態(tài)), 就會(huì)觸發(fā)指定的事件.

使用 Netty 實(shí)現(xiàn)心跳

上面我們提到了, 在 Netty 中, 實(shí)現(xiàn)心跳機(jī)制的關(guān)鍵是 IdleStateHandler, 那么這個(gè) Handler 如何使用呢? 我們來(lái)看看它的構(gòu)造器:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}

實(shí)例化一個(gè) IdleStateHandler 需要提供三個(gè)參數(shù):

readerIdleTimeSeconds, 讀超時(shí). 即當(dāng)在指定的時(shí)間間隔內(nèi)沒(méi)有從 Channel 讀取到數(shù)據(jù)時(shí), 會(huì)觸發(fā)一個(gè) READER_IDLE 的 IdleStateEvent 事件.

writerIdleTimeSeconds, 寫(xiě)超時(shí). 即當(dāng)在指定的時(shí)間間隔內(nèi)沒(méi)有數(shù)據(jù)寫(xiě)入到 Channel 時(shí), 會(huì)觸發(fā)一個(gè) WRITER_IDLE 的 IdleStateEvent 事件.

allIdleTimeSeconds, 讀/寫(xiě)超時(shí). 即當(dāng)在指定的時(shí)間間隔內(nèi)沒(méi)有讀或?qū)懖僮鲿r(shí), 會(huì)觸發(fā)一個(gè) ALL_IDLE 的 IdleStateEvent 事件.

為了展示具體的 IdleStateHandler 實(shí)現(xiàn)的心跳機(jī)制, 下面我們來(lái)構(gòu)造一個(gè)具體的EchoServer 的例子, 這個(gè)例子的行為如下:

在這個(gè)例子中, 客戶端和服務(wù)器通過(guò) TCP 長(zhǎng)連接進(jìn)行通信.

TCP 通信的報(bào)文格式是:

+--------+-----+---------------+ 
| Length |Type |   Content     |
|   17   |  1  |"HELLO, WORLD" |
+--------+-----+---------------+

客戶端每隔一個(gè)隨機(jī)的時(shí)間后, 向服務(wù)器發(fā)送消息, 服務(wù)器收到消息后, 立即將收到的消息原封不動(dòng)地回復(fù)給客戶端.

若客戶端在指定的時(shí)間間隔內(nèi)沒(méi)有讀/寫(xiě)操作, 則客戶端會(huì)自動(dòng)向服務(wù)器發(fā)送一個(gè) PING 心跳, 服務(wù)器收到 PING 心跳消息時(shí), 需要回復(fù)一個(gè) PONG 消息.

下面所使用的代碼例子可以在我的 Github github.com/yongshun/some_java_code 上找到.

通用部分

根據(jù)上面定義的行為, 我們接下來(lái)實(shí)現(xiàn)心跳的通用部分 CustomHeartbeatHandler:

/**
 * @author xiongyongshun
 * @version 1.0
 * @email [email protected]
 * @created 16/9/18 13:02
 */
public abstract class CustomHeartbeatHandler extends SimpleChannelInboundHandler {
    public static final byte PING_MSG = 1;
    public static final byte PONG_MSG = 2;
    public static final byte CUSTOM_MSG = 3;
    protected String name;
    private int heartbeatCount = 0;

    public CustomHeartbeatHandler(String name) {
        this.name = name;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
        if (byteBuf.getByte(4) == PING_MSG) {
            sendPongMsg(context);
        } else if (byteBuf.getByte(4) == PONG_MSG){
            System.out.println(name + " get pong msg from " + context.channel().remoteAddress());
        } else {
            handleData(context, byteBuf);
        }
    }

    protected void sendPingMsg(ChannelHandlerContext context) {
        ByteBuf buf = context.alloc().buffer(5);
        buf.writeInt(5);
        buf.writeByte(PING_MSG);
        context.writeAndFlush(buf);
        heartbeatCount++;
        System.out.println(name + " sent ping msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
    }

    private void sendPongMsg(ChannelHandlerContext context) {
        ByteBuf buf = context.alloc().buffer(5);
        buf.writeInt(5);
        buf.writeByte(PONG_MSG);
        context.channel().writeAndFlush(buf);
        heartbeatCount++;
        System.out.println(name + " sent pong msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
    }

    protected abstract void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf);

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // IdleStateHandler 所產(chǎn)生的 IdleStateEvent 的處理邏輯.
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case READER_IDLE:
                    handleReaderIdle(ctx);
                    break;
                case WRITER_IDLE:
                    handleWriterIdle(ctx);
                    break;
                case ALL_IDLE:
                    handleAllIdle(ctx);
                    break;
                default:
                    break;
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("---" + ctx.channel().remoteAddress() + " is active---");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("---" + ctx.channel().remoteAddress() + " is inactive---");
    }

    protected void handleReaderIdle(ChannelHandlerContext ctx) {
        System.err.println("---READER_IDLE---");
    }

    protected void handleWriterIdle(ChannelHandlerContext ctx) {
        System.err.println("---WRITER_IDLE---");
    }

    protected void handleAllIdle(ChannelHandlerContext ctx) {
        System.err.println("---ALL_IDLE---");
    }
}

類 CustomHeartbeatHandler 負(fù)責(zé)心跳的發(fā)送和接收, 我們接下來(lái)詳細(xì)地分析一下它的作用. 我們?cè)谇懊嫣岬? IdleStateHandler 是實(shí)現(xiàn)心跳的關(guān)鍵, 它會(huì)根據(jù)不同的 IO idle 類型來(lái)產(chǎn)生不同的 IdleStateEvent 事件, 而這個(gè)事件的捕獲, 其實(shí)就是在 userEventTriggered 方法中實(shí)現(xiàn)的.
我們來(lái)看看 CustomHeartbeatHandler.userEventTriggered 的具體實(shí)現(xiàn):

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
        IdleStateEvent e = (IdleStateEvent) evt;
        switch (e.state()) {
            case READER_IDLE:
                handleReaderIdle(ctx);
                break;
            case WRITER_IDLE:
                handleWriterIdle(ctx);
                break;
            case ALL_IDLE:
                handleAllIdle(ctx);
                break;
            default:
                break;
        }
    }
}

在 userEventTriggered 中, 根據(jù) IdleStateEvent 的 state() 的不同, 而進(jìn)行不同的處理. 例如如果是讀取數(shù)據(jù) idle, 則 e.state() == READER_IDLE, 因此就調(diào)用 handleReaderIdle 來(lái)處理它. CustomHeartbeatHandler 提供了三個(gè) idle 處理方法: handleReaderIdle, handleWriterIdle, handleAllIdle, 這三個(gè)方法目前只有默認(rèn)的實(shí)現(xiàn), 它需要在子類中進(jìn)行重寫(xiě), 現(xiàn)在我們暫時(shí)略過(guò)它們, 在具體的客戶端和服務(wù)器的實(shí)現(xiàn)部分時(shí)再來(lái)看它們.

知道了這一點(diǎn)后, 我們接下來(lái)看看數(shù)據(jù)處理部分:

@Override
protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
    if (byteBuf.getByte(4) == PING_MSG) {
        sendPongMsg(context);
    } else if (byteBuf.getByte(4) == PONG_MSG){
        System.out.println(name + " get pong msg from " + context.channel().remoteAddress());
    } else {
        handleData(context, byteBuf);
    }
}

在 CustomHeartbeatHandler.channelRead0 中, 我們首先根據(jù)報(bào)文協(xié)議:

+--------+-----+---------------+ 
| Length |Type |   Content     |
|   17   |  1  |"HELLO, WORLD" |
+--------+-----+---------------+

來(lái)判斷當(dāng)前的報(bào)文類型, 如果是 PING_MSG 則表示是服務(wù)器收到客戶端的 PING 消息, 此時(shí)服務(wù)器需要回復(fù)一個(gè) PONG 消息, 其消息類型是 PONG_MSG.
扔報(bào)文類型是 PONG_MSG, 則表示是客戶端收到服務(wù)器發(fā)送的 PONG 消息, 此時(shí)打印一個(gè) log 即可.

客戶端部分 客戶端初始化
public class Client {
    public static void main(String[] args) {
        NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
        Random random = new Random(System.currentTimeMillis());
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap
                    .group(workGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline p = socketChannel.pipeline();
                            p.addLast(new IdleStateHandler(0, 0, 5));
                            p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
                            p.addLast(new ClientHandler());
                        }
                    });

            Channel ch = bootstrap.remoteAddress("127.0.0.1", 12345).connect().sync().channel();
            for (int i = 0; i < 10; i++) {
                String content = "client msg " + i;
                ByteBuf buf = ch.alloc().buffer();
                buf.writeInt(5 + content.getBytes().length);
                buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);
                buf.writeBytes(content.getBytes());
                ch.writeAndFlush(buf);

                Thread.sleep(random.nextInt(20000));
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            workGroup.shutdownGracefully();
        }
    }
}

上面的代碼是 Netty 的客戶端端的初始化代碼, 使用過(guò) Netty 的朋友對(duì)這個(gè)代碼應(yīng)該不會(huì)陌生. 別的部分我們就不再贅述, 我們來(lái)看看 ChannelInitializer.initChannel 部分即可:

.handler(new ChannelInitializer() {
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline p = socketChannel.pipeline();
        p.addLast(new IdleStateHandler(0, 0, 5));
        p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
        p.addLast(new ClientHandler());
    }
});

我們給 pipeline 添加了三個(gè) Handler, IdleStateHandler 這個(gè) handler 是心跳機(jī)制的核心, 我們?yōu)榭蛻舳硕嗽O(shè)置了讀寫(xiě) idle 超時(shí), 時(shí)間間隔是5s, 即如果客戶端在間隔 5s 后都沒(méi)有收到服務(wù)器的消息或向服務(wù)器發(fā)送消息, 則產(chǎn)生 ALL_IDLE 事件.
接下來(lái)我們添加了 LengthFieldBasedFrameDecoder, 它是負(fù)責(zé)解析我們的 TCP 報(bào)文, 因?yàn)楹捅疚牡哪康臒o(wú)關(guān), 因此這里不詳細(xì)展開(kāi).
最后一個(gè) Handler 是 ClientHandler, 它繼承于 CustomHeartbeatHandler, 是我們處理業(yè)務(wù)邏輯部分.

客戶端 Handler
public class ClientHandler extends CustomHeartbeatHandler {
    public ClientHandler() {
        super("client");
    }

    @Override
    protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        byte[] data = new byte[byteBuf.readableBytes() - 5];
        byteBuf.skipBytes(5);
        byteBuf.readBytes(data);
        String content = new String(data);
        System.out.println(name + " get content: " + content);
    }

    @Override
    protected void handleAllIdle(ChannelHandlerContext ctx) {
        super.handleAllIdle(ctx);
        sendPingMsg(ctx);
    }
}

ClientHandler 繼承于 CustomHeartbeatHandler, 它重寫(xiě)了兩個(gè)方法, 一個(gè)是 handleData, 在這里面實(shí)現(xiàn) 僅僅打印收到的消息.
第二個(gè)重寫(xiě)的方法是 handleAllIdle. 我們?cè)谇懊嫣岬? 客戶端負(fù)責(zé)發(fā)送心跳的 PING 消息, 當(dāng)客戶端產(chǎn)生一個(gè) ALL_IDLE 事件后, 會(huì)導(dǎo)致父類的 CustomHeartbeatHandler.userEventTriggered 調(diào)用, 而 userEventTriggered 中會(huì)根據(jù) e.state() 來(lái)調(diào)用不同的方法, 因此最后調(diào)用的是 ClientHandler.handleAllIdle, 在這個(gè)方法中, 客戶端調(diào)用 sendPingMsg 向服務(wù)器發(fā)送一個(gè) PING 消息.

服務(wù)器部分 服務(wù)器初始化
public class Server {
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap
                    .group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline p = socketChannel.pipeline();
                            p.addLast(new IdleStateHandler(10, 0, 0));
                            p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
                            p.addLast(new ServerHandler());
                        }
                    });

            Channel ch = bootstrap.bind(12345).sync().channel();
            ch.closeFuture().sync();
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

服務(wù)器的初始化部分也沒(méi)有什么好說(shuō)的, 它也和客戶端的初始化一樣, 為 pipeline 添加了三個(gè) Handler.

服務(wù)器 Handler
public class ServerHandler extends CustomHeartbeatHandler {
    public ServerHandler() {
        super("server");
    }

    @Override
    protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf buf) {
        byte[] data = new byte[buf.readableBytes() - 5];
        ByteBuf responseBuf = Unpooled.copiedBuffer(buf);
        buf.skipBytes(5);
        buf.readBytes(data);
        String content = new String(data);
        System.out.println(name + " get content: " + content);
        channelHandlerContext.write(responseBuf);
    }

    @Override
    protected void handleReaderIdle(ChannelHandlerContext ctx) {
        super.handleReaderIdle(ctx);
        System.err.println("---client " + ctx.channel().remoteAddress().toString() + " reader timeout, close it---");
        ctx.close();
    }
}

ServerHandler 繼承于 CustomHeartbeatHandler, 它重寫(xiě)了兩個(gè)方法, 一個(gè)是 handleData, 在這里面實(shí)現(xiàn) EchoServer 的功能: 即收到客戶端的消息后, 立即原封不動(dòng)地將消息回復(fù)給客戶端.
第二個(gè)重寫(xiě)的方法是 handleReaderIdle, 因?yàn)榉?wù)器僅僅對(duì)客戶端的讀 idle 感興趣, 因此只重新了這個(gè)方法. 若服務(wù)器在指定時(shí)間后沒(méi)有收到客戶端的消息, 則會(huì)觸發(fā) READER_IDLE 消息, 進(jìn)而會(huì)調(diào)用 handleReaderIdle 這個(gè)方法. 我們?cè)谇懊嫣岬? 客戶端負(fù)責(zé)發(fā)送心跳的 PING 消息, 并且服務(wù)器的 READER_IDLE 的超時(shí)時(shí)間是客戶端發(fā)送 PING 消息的間隔的兩倍, 因此當(dāng)服務(wù)器 READER_IDLE 觸發(fā)時(shí), 就可以確定是客戶端已經(jīng)掉線了, 因此服務(wù)器直接關(guān)閉客戶端連接即可.

總結(jié)

使用 Netty 實(shí)現(xiàn)心跳機(jī)制的關(guān)鍵就是利用 IdleStateHandler 來(lái)產(chǎn)生對(duì)應(yīng)的 idle 事件.

一般是客戶端負(fù)責(zé)發(fā)送心跳的 PING 消息, 因此客戶端注意關(guān)注 ALL_IDLE 事件, 在這個(gè)事件觸發(fā)后, 客戶端需要向服務(wù)器發(fā)送 PING 消息, 告訴服務(wù)器"我還存活著".

服務(wù)器是接收客戶端的 PING 消息的, 因此服務(wù)器關(guān)注的是 READER_IDLE 事件, 并且服務(wù)器的 READER_IDLE 間隔需要比客戶端的 ALL_IDLE 事件間隔大(例如客戶端ALL_IDLE 是5s 沒(méi)有讀寫(xiě)時(shí)觸發(fā), 因此服務(wù)器的 READER_IDLE 可以設(shè)置為10s)

當(dāng)服務(wù)器收到客戶端的 PING 消息時(shí), 會(huì)發(fā)送一個(gè) PONG 消息作為回復(fù). 一個(gè) PING-PONG 消息對(duì)就是一個(gè)心跳交互.

實(shí)現(xiàn)客戶端的斷線重連
public class Client {
    private NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
    private Channel channel;
    private Bootstrap bootstrap;

    public static void main(String[] args) throws Exception {
        Client client = new Client();
        client.start();
        client.sendData();
    }

    public void sendData() throws Exception {
        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < 10000; i++) {
            if (channel != null && channel.isActive()) {
                String content = "client msg " + i;
                ByteBuf buf = channel.alloc().buffer(5 + content.getBytes().length);
                buf.writeInt(5 + content.getBytes().length);
                buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);
                buf.writeBytes(content.getBytes());
                channel.writeAndFlush(buf);
            }

            Thread.sleep(random.nextInt(20000));
        }
    }

    public void start() {
        try {
            bootstrap = new Bootstrap();
            bootstrap
                    .group(workGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline p = socketChannel.pipeline();
                            p.addLast(new IdleStateHandler(0, 0, 5));
                            p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
                            p.addLast(new ClientHandler(Client.this));
                        }
                    });
            doConnect();

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected void doConnect() {
        if (channel != null && channel.isActive()) {
            return;
        }

        ChannelFuture future = bootstrap.connect("127.0.0.1", 12345);

        future.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture futureListener) throws Exception {
                if (futureListener.isSuccess()) {
                    channel = futureListener.channel();
                    System.out.println("Connect to server successfully!");
                } else {
                    System.out.println("Failed to connect to server, try connect after 10s");

                    futureListener.channel().eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {
                            doConnect();
                        }
                    }, 10, TimeUnit.SECONDS);
                }
            }
        });
    }

}

上面的代碼中, 我們抽象出 doConnect 方法, 它負(fù)責(zé)客戶端和服務(wù)器的 TCP 連接的建立, 并且當(dāng) TCP 連接失敗時(shí), doConnect 會(huì) 通過(guò) "channel().eventLoop().schedule" 來(lái)延時(shí)10s 后嘗試重新連接.

客戶端 Handler
public class ClientHandler extends CustomHeartbeatHandler {
    private Client client;
    public ClientHandler(Client client) {
        super("client");
        this.client = client;
    }

    @Override
    protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        byte[] data = new byte[byteBuf.readableBytes() - 5];
        byteBuf.skipBytes(5);
        byteBuf.readBytes(data);
        String content = new String(data);
        System.out.println(name + " get content: " + content);
    }

    @Override
    protected void handleAllIdle(ChannelHandlerContext ctx) {
        super.handleAllIdle(ctx);
        sendPingMsg(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        client.doConnect();
    }
}

斷線重連的關(guān)鍵一點(diǎn)是檢測(cè)連接是否已經(jīng)斷開(kāi). 因此我們改寫(xiě)了 ClientHandler, 重寫(xiě)了 channelInactive 方法. 當(dāng) TCP 連接斷開(kāi)時(shí), 會(huì)回調(diào) channelInactive 方法, 因此我們?cè)谶@個(gè)方法中調(diào)用 client.doConnect() 來(lái)進(jìn)行重連.

完整代碼可以在我的 Github github.com/yongshun/some_java_code 上找到.

本文由 yongshun 發(fā)表于個(gè)人博客, 采用署名-非商業(yè)性使用-相同方式共享 3.0 中國(guó)大陸許可協(xié)議.
非商業(yè)轉(zhuǎn)載請(qǐng)注明作者及出處. 商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者本人
Email: [email protected]
本文標(biāo)題為: 淺析 Netty 實(shí)現(xiàn)心跳機(jī)制與斷線重連
本文鏈接為: https://segmentfault.com/a/1190000006931568

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/65168.html

相關(guān)文章

  • Netty實(shí)現(xiàn)心跳檢測(cè)斷線重連

    摘要:使用實(shí)現(xiàn)心跳機(jī)制代碼環(huán)境和具體思路如下使用提供的來(lái)檢測(cè)讀寫(xiě)操作的空閑時(shí)間使用序列化客戶端空閑后向服務(wù)端發(fā)送一個(gè)心跳包服務(wù)端空閑后心跳丟失計(jì)數(shù)器丟失的心跳包數(shù)量當(dāng)丟失的心跳包數(shù)量超過(guò)個(gè)時(shí),主動(dòng)斷開(kāi)該客戶端的斷開(kāi)連接后,客戶端之后重新連接代碼已 使用Netty實(shí)現(xiàn)心跳機(jī)制 代碼環(huán)境:JDK1.8和Netty4.x 具體思路如下: 使用Netty提供的IdleStateHandler來(lái)檢測(cè)...

    RobinQu 評(píng)論0 收藏0
  • 使用Netty,我們到底在開(kāi)發(fā)些什么?

    摘要:比如面向連接的功能包發(fā)送接收數(shù)量包發(fā)送接收速率錯(cuò)誤計(jì)數(shù)連接重連次數(shù)調(diào)用延遲連接狀態(tài)等。你要處理的,就是心跳超時(shí)的邏輯,比如延遲重連。發(fā)生異常后,可以根據(jù)不同的類型選擇斷線重連比如一些二進(jìn)制協(xié)議的編解碼紊亂問(wèn)題,或者調(diào)度到其他節(jié)點(diǎn)。 在java界,netty無(wú)疑是開(kāi)發(fā)網(wǎng)絡(luò)應(yīng)用的拿手菜。你不需要太多關(guān)注復(fù)雜的nio模型和底層網(wǎng)絡(luò)的細(xì)節(jié),使用其豐富的接口,可以很容易的實(shí)現(xiàn)復(fù)雜的通訊功能。 和...

    DesGemini 評(píng)論0 收藏0
  • 使用Netty,我們到底在開(kāi)發(fā)些什么?

    摘要:比如面向連接的功能包發(fā)送接收數(shù)量包發(fā)送接收速率錯(cuò)誤計(jì)數(shù)連接重連次數(shù)調(diào)用延遲連接狀態(tài)等。你要處理的,就是心跳超時(shí)的邏輯,比如延遲重連。發(fā)生異常后,可以根據(jù)不同的類型選擇斷線重連比如一些二進(jìn)制協(xié)議的編解碼紊亂問(wèn)題,或者調(diào)度到其他節(jié)點(diǎn)。 在java界,netty無(wú)疑是開(kāi)發(fā)網(wǎng)絡(luò)應(yīng)用的拿手菜。你不需要太多關(guān)注復(fù)雜的nio模型和底層網(wǎng)絡(luò)的細(xì)節(jié),使用其豐富的接口,可以很容易的實(shí)現(xiàn)復(fù)雜的通訊功能。 和...

    MSchumi 評(píng)論0 收藏0
  • 長(zhǎng)連接的心跳重連設(shè)計(jì)

    摘要:超過(guò)后則認(rèn)為服務(wù)端出現(xiàn)故障,需要重連。同時(shí)在每次心跳時(shí)候都用當(dāng)前時(shí)間和之前服務(wù)端響應(yīng)綁定到上的時(shí)間相減判斷是否需要重連即可??蛻舳藱z測(cè)到某個(gè)服務(wù)端遲遲沒(méi)有響應(yīng)心跳也能重連獲取一個(gè)新的連接。 showImg(https://segmentfault.com/img/remote/1460000017987884?w=800&h=536); 前言 說(shuō)道心跳這個(gè)詞大家都不陌生,當(dāng)然不是指男女...

    dreamGong 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看
<