成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Netty實(shí)現(xiàn)心跳檢測(cè)與斷線重連

RobinQu / 2571人閱讀

摘要:使用實(shí)現(xiàn)心跳機(jī)制代碼環(huán)境和具體思路如下使用提供的來檢測(cè)讀寫操作的空閑時(shí)間使用序列化客戶端空閑后向服務(wù)端發(fā)送一個(gè)心跳包服務(wù)端空閑后心跳丟失計(jì)數(shù)器丟失的心跳包數(shù)量當(dāng)丟失的心跳包數(shù)量超過個(gè)時(shí),主動(dòng)斷開該客戶端的斷開連接后,客戶端之后重新連接代碼已

使用Netty實(shí)現(xiàn)心跳機(jī)制 代碼環(huán)境:JDK1.8和Netty4.x 具體思路如下:

使用Netty提供的IdleStateHandler來檢測(cè)讀寫操作的空閑時(shí)間

使用Protocol Buffer序列化

客戶端write空閑5s后向服務(wù)端發(fā)送一個(gè)心跳包

服務(wù)端read空閑6s后心跳丟失計(jì)數(shù)器+1(丟失的心跳包數(shù)量)

當(dāng)丟失的心跳包數(shù)量超過3個(gè)時(shí),主動(dòng)斷開該客戶端的channel

斷開連接后,客戶端10s之后重新連接

代碼已上傳至GitHub:完整代碼地址 代碼實(shí)現(xiàn): 數(shù)據(jù)包結(jié)構(gòu)(proto文件)
option java_outer_classname = "PacketProto";

message Packet {

    // 包的類型
    enum PacketType {
        // 心跳包
        HEARTBEAT = 1;
        // 非心跳包
        DATA = 2;
    }

    // 包類型
    required PacketType packetType = 1;
    
    // 數(shù)據(jù)部分(可選,心跳包不包含數(shù)據(jù)部分)
    optional string data = 2;
}
ClientHeartbeatHandler類
public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("--- Server is active ---");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("--- Server is inactive ---");

        // 10s 之后嘗試重新連接服務(wù)器
        System.out.println("10s 之后嘗試重新連接服務(wù)器...");
        Thread.sleep(10 * 1000);
        Client.doConnect();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            // 不管是讀事件空閑還是寫事件空閑都向服務(wù)器發(fā)送心跳包
            sendHeartbeatPacket(ctx);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("連接出現(xiàn)異常");
    }

    /**
     * 發(fā)送心跳包
     *
     * @param ctx
     */
    private void sendHeartbeatPacket(ChannelHandlerContext ctx) {
        Packet.Builder builder = newBuilder();
        builder.setPacketType(Packet.PacketType.HEARTBEAT);
        Packet packet = builder.build();
        ctx.writeAndFlush(packet);
    }
}
Client類
public class Client {

    private static Channel ch;
    private static Bootstrap bootstrap;

    public static void main(String[] args) {
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            bootstrap = new Bootstrap();
            bootstrap
                    .group(workGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
                            pipeline.addLast(new ProtobufEncoder());
                            pipeline.addLast(new IdleStateHandler(0, 5, 0));
                            pipeline.addLast(new ClientHeartbeatHandler());
                        }
                    });

            // 連接服務(wù)器
            doConnect();

            // 模擬不定時(shí)發(fā)送向服務(wù)器發(fā)送數(shù)據(jù)的過程
            Random random = new Random();
            while (true) {
                int num = random.nextInt(21);
                Thread.sleep(num * 1000);
                PacketProto.Packet.Builder builder = newBuilder();
                builder.setPacketType(PacketProto.Packet.PacketType.DATA);
                builder.setData("我是數(shù)據(jù)包(非心跳包) " + num);
                PacketProto.Packet packet = builder.build();
                ch.writeAndFlush(packet);
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workGroup.shutdownGracefully();
        }
    }

    /**
     * 抽取出該方法 (斷線重連時(shí)使用)
     *
     * @throws InterruptedException
     */
    public static void doConnect() throws InterruptedException {
        ch = bootstrap.connect("127.0.0.1", 20000).sync().channel();
    }
}
ServerHeartbeatHandler類
public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter {

    // 心跳丟失計(jì)數(shù)器
    private int counter;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("--- Client is active ---");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("--- Client is inactive ---");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        // 判斷接收到的包類型
        if (msg instanceof Packet) {
            Packet packet = (Packet) msg;

            switch (packet.getPacketType()) {
                case HEARTBEAT:
                    handleHeartbreat(ctx, packet);
                    break;

                case DATA:
                    handleData(ctx, packet);
                    break;

                default:
                    break;
            }
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            // 空閑6s之后觸發(fā) (心跳包丟失)
            if (counter >= 3) {
                // 連續(xù)丟失3個(gè)心跳包 (斷開連接)
                ctx.channel().close().sync();
                System.out.println("已與Client斷開連接");
            } else {
                counter++;
                System.out.println("丟失了第 " + counter + " 個(gè)心跳包");
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("連接出現(xiàn)異常");
    }

    /**
     * 處理心跳包
     *
     * @param ctx
     * @param packet
     */
    private void handleHeartbreat(ChannelHandlerContext ctx, Packet packet) {
        // 將心跳丟失計(jì)數(shù)器置為0
        counter = 0;
        System.out.println("收到心跳包");
        ReferenceCountUtil.release(packet);
    }

    /**
     * 處理數(shù)據(jù)包
     *
     * @param ctx
     * @param packet
     */
    private void handleData(ChannelHandlerContext ctx, Packet packet) {
        // 將心跳丟失計(jì)數(shù)器置為0
        counter = 0;
        String data = packet.getData();
        System.out.println(data);
        ReferenceCountUtil.release(packet);
    }
}
Server類
public class Server {
    public static void main(String[] args) {
        NioEventLoopGroup acceptorGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap
                    .group(acceptorGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new ProtobufVarint32FrameDecoder());
                            pipeline.addLast(new ProtobufDecoder(PacketProto.Packet.getDefaultInstance()));
                            pipeline.addLast(new IdleStateHandler(6, 0, 0));
                            pipeline.addLast(new ServerHeartbeatHandler());
                        }
                    });
            Channel ch = bootstrap.bind(20000).sync().channel();
            System.out.println("Server has started...");
            ch.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            acceptorGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/65289.html

相關(guān)文章

  • 淺析 Netty 實(shí)現(xiàn)心跳機(jī)制斷線重連

    摘要:基礎(chǔ)何為心跳顧名思義所謂心跳即在長(zhǎng)連接中客戶端和服務(wù)器之間定期發(fā)送的一種特殊的數(shù)據(jù)包通知對(duì)方自己還在線以確保連接的有效性為什么需要心跳因?yàn)榫W(wǎng)絡(luò)的不可靠性有可能在保持長(zhǎng)連接的過程中由于某些突發(fā)情況例如網(wǎng)線被拔出突然掉電等會(huì)造成服務(wù)器和客戶端的 基礎(chǔ) 何為心跳 顧名思義, 所謂 心跳, 即在 TCP 長(zhǎng)連接中, 客戶端和服務(wù)器之間定期發(fā)送的一種特殊的數(shù)據(jù)包, 通知對(duì)方自己還在線, 以確保 ...

    waterc 評(píng)論0 收藏0
  • 使用Netty,我們到底在開發(fā)些什么?

    摘要:比如面向連接的功能包發(fā)送接收數(shù)量包發(fā)送接收速率錯(cuò)誤計(jì)數(shù)連接重連次數(shù)調(diào)用延遲連接狀態(tài)等。你要處理的,就是心跳超時(shí)的邏輯,比如延遲重連。發(fā)生異常后,可以根據(jù)不同的類型選擇斷線重連比如一些二進(jìn)制協(xié)議的編解碼紊亂問題,或者調(diào)度到其他節(jié)點(diǎn)。 在java界,netty無疑是開發(fā)網(wǎng)絡(luò)應(yīng)用的拿手菜。你不需要太多關(guān)注復(fù)雜的nio模型和底層網(wǎng)絡(luò)的細(xì)節(jié),使用其豐富的接口,可以很容易的實(shí)現(xiàn)復(fù)雜的通訊功能。 和...

    DesGemini 評(píng)論0 收藏0
  • 使用Netty,我們到底在開發(fā)些什么?

    摘要:比如面向連接的功能包發(fā)送接收數(shù)量包發(fā)送接收速率錯(cuò)誤計(jì)數(shù)連接重連次數(shù)調(diào)用延遲連接狀態(tài)等。你要處理的,就是心跳超時(shí)的邏輯,比如延遲重連。發(fā)生異常后,可以根據(jù)不同的類型選擇斷線重連比如一些二進(jìn)制協(xié)議的編解碼紊亂問題,或者調(diào)度到其他節(jié)點(diǎn)。 在java界,netty無疑是開發(fā)網(wǎng)絡(luò)應(yīng)用的拿手菜。你不需要太多關(guān)注復(fù)雜的nio模型和底層網(wǎng)絡(luò)的細(xì)節(jié),使用其豐富的接口,可以很容易的實(shí)現(xiàn)復(fù)雜的通訊功能。 和...

    MSchumi 評(píng)論0 收藏0
  • 長(zhǎng)連接的心跳重連設(shè)計(jì)

    摘要:超過后則認(rèn)為服務(wù)端出現(xiàn)故障,需要重連。同時(shí)在每次心跳時(shí)候都用當(dāng)前時(shí)間和之前服務(wù)端響應(yīng)綁定到上的時(shí)間相減判斷是否需要重連即可??蛻舳藱z測(cè)到某個(gè)服務(wù)端遲遲沒有響應(yīng)心跳也能重連獲取一個(gè)新的連接。 showImg(https://segmentfault.com/img/remote/1460000017987884?w=800&h=536); 前言 說道心跳這個(gè)詞大家都不陌生,當(dāng)然不是指男女...

    dreamGong 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<