摘要:使用實(shí)現(xiàn)心跳機(jī)制代碼環(huán)境和具體思路如下使用提供的來檢測(cè)讀寫操作的空閑時(shí)間使用序列化客戶端空閑后向服務(wù)端發(fā)送一個(gè)心跳包服務(wù)端空閑后心跳丟失計(jì)數(shù)器丟失的心跳包數(shù)量當(dāng)丟失的心跳包數(shù)量超過個(gè)時(shí),主動(dòng)斷開該客戶端的斷開連接后,客戶端之后重新連接代碼已
使用Netty實(shí)現(xiàn)心跳機(jī)制 代碼環(huán)境:JDK1.8和Netty4.x 具體思路如下:
使用Netty提供的IdleStateHandler來檢測(cè)讀寫操作的空閑時(shí)間
使用Protocol Buffer序列化
客戶端write空閑5s后向服務(wù)端發(fā)送一個(gè)心跳包
服務(wù)端read空閑6s后心跳丟失計(jì)數(shù)器+1(丟失的心跳包數(shù)量)
當(dāng)丟失的心跳包數(shù)量超過3個(gè)時(shí),主動(dòng)斷開該客戶端的channel
斷開連接后,客戶端10s之后重新連接
代碼已上傳至GitHub:完整代碼地址 代碼實(shí)現(xiàn): 數(shù)據(jù)包結(jié)構(gòu)(proto文件)option java_outer_classname = "PacketProto"; message Packet { // 包的類型 enum PacketType { // 心跳包 HEARTBEAT = 1; // 非心跳包 DATA = 2; } // 包類型 required PacketType packetType = 1; // 數(shù)據(jù)部分(可選,心跳包不包含數(shù)據(jù)部分) optional string data = 2; }ClientHeartbeatHandler類
public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("--- Server is active ---"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("--- Server is inactive ---"); // 10s 之后嘗試重新連接服務(wù)器 System.out.println("10s 之后嘗試重新連接服務(wù)器..."); Thread.sleep(10 * 1000); Client.doConnect(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { // 不管是讀事件空閑還是寫事件空閑都向服務(wù)器發(fā)送心跳包 sendHeartbeatPacket(ctx); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("連接出現(xiàn)異常"); } /** * 發(fā)送心跳包 * * @param ctx */ private void sendHeartbeatPacket(ChannelHandlerContext ctx) { Packet.Builder builder = newBuilder(); builder.setPacketType(Packet.PacketType.HEARTBEAT); Packet packet = builder.build(); ctx.writeAndFlush(packet); } }Client類
public class Client { private static Channel ch; private static Bootstrap bootstrap; public static void main(String[] args) { NioEventLoopGroup workGroup = new NioEventLoopGroup(); try { bootstrap = new Bootstrap(); bootstrap .group(workGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializerServerHeartbeatHandler類() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast(new ProtobufEncoder()); pipeline.addLast(new IdleStateHandler(0, 5, 0)); pipeline.addLast(new ClientHeartbeatHandler()); } }); // 連接服務(wù)器 doConnect(); // 模擬不定時(shí)發(fā)送向服務(wù)器發(fā)送數(shù)據(jù)的過程 Random random = new Random(); while (true) { int num = random.nextInt(21); Thread.sleep(num * 1000); PacketProto.Packet.Builder builder = newBuilder(); builder.setPacketType(PacketProto.Packet.PacketType.DATA); builder.setData("我是數(shù)據(jù)包(非心跳包) " + num); PacketProto.Packet packet = builder.build(); ch.writeAndFlush(packet); } } catch (InterruptedException e) { e.printStackTrace(); } finally { workGroup.shutdownGracefully(); } } /** * 抽取出該方法 (斷線重連時(shí)使用) * * @throws InterruptedException */ public static void doConnect() throws InterruptedException { ch = bootstrap.connect("127.0.0.1", 20000).sync().channel(); } }
public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter { // 心跳丟失計(jì)數(shù)器 private int counter; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("--- Client is active ---"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("--- Client is inactive ---"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 判斷接收到的包類型 if (msg instanceof Packet) { Packet packet = (Packet) msg; switch (packet.getPacketType()) { case HEARTBEAT: handleHeartbreat(ctx, packet); break; case DATA: handleData(ctx, packet); break; default: break; } } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { // 空閑6s之后觸發(fā) (心跳包丟失) if (counter >= 3) { // 連續(xù)丟失3個(gè)心跳包 (斷開連接) ctx.channel().close().sync(); System.out.println("已與Client斷開連接"); } else { counter++; System.out.println("丟失了第 " + counter + " 個(gè)心跳包"); } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("連接出現(xiàn)異常"); } /** * 處理心跳包 * * @param ctx * @param packet */ private void handleHeartbreat(ChannelHandlerContext ctx, Packet packet) { // 將心跳丟失計(jì)數(shù)器置為0 counter = 0; System.out.println("收到心跳包"); ReferenceCountUtil.release(packet); } /** * 處理數(shù)據(jù)包 * * @param ctx * @param packet */ private void handleData(ChannelHandlerContext ctx, Packet packet) { // 將心跳丟失計(jì)數(shù)器置為0 counter = 0; String data = packet.getData(); System.out.println(data); ReferenceCountUtil.release(packet); } }Server類
public class Server { public static void main(String[] args) { NioEventLoopGroup acceptorGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .group(acceptorGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ProtobufVarint32FrameDecoder()); pipeline.addLast(new ProtobufDecoder(PacketProto.Packet.getDefaultInstance())); pipeline.addLast(new IdleStateHandler(6, 0, 0)); pipeline.addLast(new ServerHeartbeatHandler()); } }); Channel ch = bootstrap.bind(20000).sync().channel(); System.out.println("Server has started..."); ch.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { acceptorGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/65289.html
摘要:基礎(chǔ)何為心跳顧名思義所謂心跳即在長(zhǎng)連接中客戶端和服務(wù)器之間定期發(fā)送的一種特殊的數(shù)據(jù)包通知對(duì)方自己還在線以確保連接的有效性為什么需要心跳因?yàn)榫W(wǎng)絡(luò)的不可靠性有可能在保持長(zhǎng)連接的過程中由于某些突發(fā)情況例如網(wǎng)線被拔出突然掉電等會(huì)造成服務(wù)器和客戶端的 基礎(chǔ) 何為心跳 顧名思義, 所謂 心跳, 即在 TCP 長(zhǎng)連接中, 客戶端和服務(wù)器之間定期發(fā)送的一種特殊的數(shù)據(jù)包, 通知對(duì)方自己還在線, 以確保 ...
摘要:比如面向連接的功能包發(fā)送接收數(shù)量包發(fā)送接收速率錯(cuò)誤計(jì)數(shù)連接重連次數(shù)調(diào)用延遲連接狀態(tài)等。你要處理的,就是心跳超時(shí)的邏輯,比如延遲重連。發(fā)生異常后,可以根據(jù)不同的類型選擇斷線重連比如一些二進(jìn)制協(xié)議的編解碼紊亂問題,或者調(diào)度到其他節(jié)點(diǎn)。 在java界,netty無疑是開發(fā)網(wǎng)絡(luò)應(yīng)用的拿手菜。你不需要太多關(guān)注復(fù)雜的nio模型和底層網(wǎng)絡(luò)的細(xì)節(jié),使用其豐富的接口,可以很容易的實(shí)現(xiàn)復(fù)雜的通訊功能。 和...
摘要:比如面向連接的功能包發(fā)送接收數(shù)量包發(fā)送接收速率錯(cuò)誤計(jì)數(shù)連接重連次數(shù)調(diào)用延遲連接狀態(tài)等。你要處理的,就是心跳超時(shí)的邏輯,比如延遲重連。發(fā)生異常后,可以根據(jù)不同的類型選擇斷線重連比如一些二進(jìn)制協(xié)議的編解碼紊亂問題,或者調(diào)度到其他節(jié)點(diǎn)。 在java界,netty無疑是開發(fā)網(wǎng)絡(luò)應(yīng)用的拿手菜。你不需要太多關(guān)注復(fù)雜的nio模型和底層網(wǎng)絡(luò)的細(xì)節(jié),使用其豐富的接口,可以很容易的實(shí)現(xiàn)復(fù)雜的通訊功能。 和...
摘要:超過后則認(rèn)為服務(wù)端出現(xiàn)故障,需要重連。同時(shí)在每次心跳時(shí)候都用當(dāng)前時(shí)間和之前服務(wù)端響應(yīng)綁定到上的時(shí)間相減判斷是否需要重連即可??蛻舳藱z測(cè)到某個(gè)服務(wù)端遲遲沒有響應(yīng)心跳也能重連獲取一個(gè)新的連接。 showImg(https://segmentfault.com/img/remote/1460000017987884?w=800&h=536); 前言 說道心跳這個(gè)詞大家都不陌生,當(dāng)然不是指男女...
閱讀 3924·2021-11-24 09:38
閱讀 3106·2021-11-17 09:33
閱讀 3878·2021-11-10 11:48
閱讀 1244·2021-10-14 09:48
閱讀 3137·2019-08-30 13:14
閱讀 2554·2019-08-29 18:37
閱讀 3400·2019-08-29 12:38
閱讀 1423·2019-08-29 12:30