摘要:完成客戶端服務器通信,需要基于協議之上,自定義一套簡單的通信協議,其中數據交換方式需要使用自定義幀。輸入數據處理器以下為輸入數據的第一個處理器,可以保證無論幀經歷怎樣的粘包拆包,均可以準確提取每一個自定義幀的數據部分。
「博客搬家」 原地址: 簡書 原發(fā)表時間: 2017-03-26
本文采用 Netty 這一最流行的 Java NIO 框架,作為 Java 服務器通信部分的基礎框架,探索使用一個通道、一臺服務器對多個客戶端提供服務。
完成客戶端 - 服務器通信,需要基于 TCP 協議之上,自定義一套簡單的通信協議,其中數據交換方式需要使用自定義幀。為實現以上方案,本文采用 Netty 框架實現 Java 服務器的通信部分。
Netty 是由 JBoss 提供的一個 Java開源 框架。Netty 提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發(fā)高性能、高可靠性的網絡服務器和客戶端程序。也就是說,Netty 是一個基于 NIO 的客戶、服務器端編程框架,使用Netty 可以確保你快速和簡單的開發(fā)出一個網絡應用,例如實現了某種協議的客戶,服務端應用。Netty 相當簡化和流線化了網絡應用的編程開發(fā)過程,例如,TCP 和 UDP 的 socket 服務開發(fā)。
本項目的硬件設備集群使用 CAN 總線作為通信協議,硬件設備產生的數據和工作人員的控制指令均由服務器后端應用程序處理并存儲。由于服務器并未原生支持 CAN 總線,故為了方便起見,使用「CAN轉以太網」模塊作為 CAN 協議和 TCP 協議交換的中介,以謀求實現的簡單化。項目總體架構圖如下圖所示:
CAN - bus,即控制器局域網,是國際上應用最廣泛的現場總線之一。1. Netty 框架的學習作為一種技術先進、可靠性高、功能完善、成本合理的遠程網絡通信控制方式,CAN - bus 已經被廣泛應用到各個自動化控制系統(tǒng)中。從高速的網絡到低價位的多路接線都可以使用 CAN - bus。例如,在汽車電子、自動控制、智能大廈、電路系統(tǒng)、安防監(jiān)控等領域。
以下提供幾篇不錯的文章,幫助大家學習 Netty 這一頗受矚目的框架。
《Netty in Action》中文版 - 并發(fā)編程網
Essential Netty in Action -《Netty 實戰(zhàn)(精髓)》
Netty 4.x User Guide 中文翻譯《Netty 4.x 用戶指南》
2. Bootstrapping 服務器方案以下代碼是 Bootstrapping 服務器的實現方案:
public class KyServer{ private SuccessfulListener launchListener; private SuccessfulListener finishListener; private NioEventLoopGroup group; public void start() { new Thread(() -> { group = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group) .channel(NioServerSocketChannel.class) .childHandler(new ServerChannelInitializerTest()); ChannelFuture channelFuture = bootstrap.bind(new InetSocketAddress(30232)); channelFuture.addListener( (ChannelFutureListener) future -> startListenerHandle(future, launchListener)); }).start(); } private void startListenerHandle(Future future, SuccessfulListener listener) { if (!future.isSuccess()) future.cause().printStackTrace(); if (listener != null) listener.onSuccess(future.isSuccess()); } public void setLaunchSuccessfulListener( SuccessfulListener successfulListener) { this.launchListener = successfulListener; } public void setFinishSuccessfulListener( SuccessfulListener finishListener) { this.finishListener = finishListener; } public void shutdown() { if (group != null) { Future> futureShutdown = group.shutdownGracefully(); futureShutdown.addListener(future -> startListenerHandle(future, finishListener)); } } }2.1 Bootstrapping 服務器的設計要點
創(chuàng)建一個 ServerBootstrap 實例來啟動和綁定服務器
創(chuàng)建并且分配一個 NioEventLoopgroup 實例來處理 event,比如接受新的連接和讀/寫數據
指定本地 InetSocketAddress 到服務器綁定的端口
用 ChannelHandler 實例來初始化 Channel
調用 ServerBootstrap.bind() 來綁定服務器
2.2 服務器監(jiān)聽器的設計「觀察者模式」首先在該類中設置成員變量:
private SuccessfulListener launchListener; private SuccessfulListener finishListener;
而后添加該變量的 set 方法,以及監(jiān)聽器的處理方法:
private void startListenerHandle(Future future, SuccessfulListener listener) { if (!future.isSuccess()) future.cause().printStackTrace(); if (listener != null) listener.onSuccess(future.isSuccess()); } public void setLaunchSuccessfulListener(SuccessfulListener successfulListener) { this.launchListener = successfulListener; } public void setFinishSuccessfulListener(SuccessfulListener finishListener) { this.finishListener = finishListener; }
在服務器啟動監(jiān)聽時,執(zhí)行如下代碼
ChannelFuture channelFuture = bootstrap.bind(new InetSocketAddress(30232)); channelFuture.addListener(future -> startListenerHandle(future, launchListener));
在外部關閉服務器時,執(zhí)行該方法:
public void shutdown() { if (group != null) { Future> futureShutdown = group.shutdownGracefully(); futureShutdown.addListener(future -> startListenerHandle(future, finishListener)); } }
通過如上方法,外部操作者可以方便得知服務器是否啟動成功以及是否結束成功,使用觀察者模式,完美實現了對服務器啟動及關閉的監(jiān)聽。
3. 服務器業(yè)務邏輯的實現首先使用初始化方法 ServerChannelInitializer 完成所有 ChannelHandler 對 Channel 的綁定操作:
public class ServerChannelInitializer extends ChannelInitializer3.1 輸入數據處理方案 3.1.1 自定義幀方案{ @Override protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new LoggingHandler("NO1")); byte head = 0x11; ch.pipeline().addLast(new FrameIdentifierChannelInboundHandler(head)); ch.pipeline().addLast(new ShowByteBufAsFrameInBoundHandler()); } }
自定義幀包括「幀標識位」、「數據長度」、「數據體」,如下圖所示,:
幀標識位:0x11。
數據長度:兩個字節(jié),可表示數據部分大小最大為 2 ^ 16 - 1 。
數據體:實際有用的數據。
3.1.2 輸入數據處理器以下為輸入數據的第一個處理器,可以保證無論 TCP 幀經歷怎樣的粘包、拆包,均可以準確提取每一個自定義幀的數據部分。
/** * 入端自定義幀提取處理器 * 將數據流提取出完整的自定義幀并傳入下一個處理器 */ public class FrameIdentifierChannelInboundHandler extends SimpleChannelInboundHandler{ private byte[] frameHead; private int frameHeadLength; private int frameBodyLength; private FrameReceivedEnum frameStatus = FrameReceivedEnum.READY; private ByteBuf holdByteBuf = Unpooled.buffer(1024); FrameIdentifierChannelInboundHandler(byte... frameHead) { this(); this.frameHead = frameHead; frameHeadLength = frameHead.length; } @Override protected void channelRead0 (ChannelHandlerContext ctx, ByteBuf msg) { //數據讀入本地buffer holdByteBuf.writeBytes(msg); while (true) { //若讀取狀態(tài)為: 開始讀取 if (frameStatus == FrameReceivedEnum.READY) { if (!matchFrameHead(holdByteBuf)) { holdByteBuf.clear(); break; } } //若讀取狀態(tài)為: 幀長讀取 //數據體完全包含在 buffer 內,則可通過此狀態(tài) if (frameStatus == FrameReceivedEnum.READING_LENGTH) { if (holdByteBuf.readableBytes() <= 1) break; //無符號 short 需要用 int 型引用 int currentFrameLength = holdByteBuf.getUnsignedShort(holdByteBuf.readerIndex()); //可讀byte數為長度計數(2)+數據體長度; 所以當前幀長+2 <= 可讀幀長 if (currentFrameLength + 2 <= holdByteBuf.readableBytes()) { frameBodyLength = holdByteBuf.readUnsignedShort(); frameStatus = FrameReceivedEnum.READING_BODY; } else { break; } } //若讀取狀態(tài)為: 數據體讀取 //預設數據體完全包含在buffer內,否則拋出異常 if (frameStatus == FrameReceivedEnum.READING_BODY) { if (frameBodyLength == 0) { frameStatus = FrameReceivedEnum.READY; frameBodyLength = -1; holdByteBuf.discardReadBytes(); } else if (frameBodyLength > 0) { ByteBuf returnBuf = Unpooled.buffer(frameBodyLength); holdByteBuf.readBytes(returnBuf); frameStatus = FrameReceivedEnum.READY; // ctx.fireChannelRead(returnBuf); ctx.writeAndFlush(returnBuf); frameBodyLength = -1; holdByteBuf.discardReadBytes(); } else { throw new FrameLoadException("自定義幀長度計數異常"); } } else { throw new FrameLoadException("自定義幀讀取異常"); } } } private boolean matchFrameHead(ByteBuf byteBuf) { while (true) { if (byteBuf.readableBytes() < frameHeadLength) { return false; } if (frameHead[0] == byteBuf.readByte()) { frameStatus = FrameReceivedEnum.READING_LENGTH; return true; } } } }
以下為第二個輸入數據處理器,可將前一處理器的結果「優(yōu)雅」打印到控制臺上并原樣發(fā)送至客戶端:
public class ShowByteBufAsFrameInBoundHandler extends SimpleChannelInboundHandler4. 參考鏈接{ @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { System.out.println(ByteBufUtil.prettyHexDump(byteBuf)); ctx.pipeline().writeAndFlush(Unpooled.copiedBuffer(msg)); } }
CAN 轉以太網設備介紹
Netty - 百度百科
《Netty in Action》中文版 - 并發(fā)編程網
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://systransis.cn/yun/68253.html
摘要:提供異步的事件驅動的網絡應用程序框架和工具,用以快速開發(fā)高性能高可靠性的網絡服務器和客戶端程序??偨Y我們完成了服務端的簡單搭建,模擬了聊天會話場景。 之前一直在搞前端的東西,都快忘了自己是個java開發(fā)。其實還有好多java方面的東西沒搞過,突然了解到netty,覺得有必要學一學。 介紹 Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框...
摘要:是一個分布式服務框架,以及治理方案。手寫注意要點手寫注意要點基于上文中對于協議的理解,如果我們自己去實現,需要考慮哪些技術呢其實基于圖的整個流程應該有一個大概的理解。基于手寫實現基于手寫實現理解了協議后,我們基于來實現一個通信框架。閱讀這篇文章之前,建議先閱讀和這篇文章關聯的內容。[1]詳細剖析分布式微服務架構下網絡通信的底層實現原理(圖解)[2][年薪60W的技巧]工作了5年,你真的理解N...
摘要:啟動一個線程,獲取阻塞隊列的元素,當通道發(fā)生事件時,隊列會被放入事件對象啟動一個定時器,每個執(zhí)行一次,掃描,超時沒有獲取結果的會被移除掉客戶端跟服務器端差不多。而這個對象會在傳輸之前進行編碼,消息接收到進行解碼。 rocketMQ通信模塊 Rocketmq的通信層是基于通信框架netty 4.0.21.Final之上做了簡單的協議封裝,基本的類圖如下: showImg(https://...
摘要:后端好書閱讀與推薦系列文章后端好書閱讀與推薦后端好書閱讀與推薦續(xù)后端好書閱讀與推薦續(xù)二后端好書閱讀與推薦續(xù)三這里依然記錄一下每本書的亮點與自己讀書心得和體會,分享并求拍磚。然后又請求封鎖,當釋放了上的封鎖之后,系統(tǒng)又批準了的請求一直等待。 后端好書閱讀與推薦系列文章:后端好書閱讀與推薦后端好書閱讀與推薦(續(xù))后端好書閱讀與推薦(續(xù)二)后端好書閱讀與推薦(續(xù)三) 這里依然記錄一下每本書的...
閱讀 1917·2021-11-24 11:16
閱讀 3265·2021-09-10 10:51
閱讀 3218·2021-08-03 14:03
閱讀 1272·2019-08-29 17:03
閱讀 3253·2019-08-29 12:36
閱讀 2239·2019-08-26 14:06
閱讀 502·2019-08-23 16:32
閱讀 2695·2019-08-23 13:42