摘要:服務(wù)器構(gòu)成至少一個該組件實現(xiàn)了服務(wù)器對從客戶端接受的數(shù)據(jù)的處理,即它的業(yè)務(wù)邏輯引導(dǎo)配置服務(wù)器的啟動代碼。至少,它會將服務(wù)器綁定到它要監(jiān)聽連接請求的端口上。需要注意的是,由服務(wù)器發(fā)送的消息可能會被分塊接受。
Netty服務(wù)器構(gòu)成
至少一個ChannelHandler——該組件實現(xiàn)了服務(wù)器對從客戶端接受的數(shù)據(jù)的處理,即它的業(yè)務(wù)邏輯
引導(dǎo)——配置服務(wù)器的啟動代碼。至少,它會將服務(wù)器綁定到它要監(jiān)聽連接請求的端口上。
ChannelHandler和業(yè)務(wù)邏輯?ChannelHandler是一個接口族的父接口,它的實現(xiàn)負(fù)責(zé)接受并響應(yīng)事件通知,在Netty應(yīng)用程序中,所有的數(shù)據(jù)處理邏輯都包含在這些核心抽象的實現(xiàn)中。
?Echo服務(wù)器會響應(yīng)傳入的消息,因此需要實現(xiàn)ChannelInboundHandler接口,用來定義響應(yīng)入站事件的方法。由于Echo服務(wù)器的應(yīng)用程序只需要用到少量的方法,所以只需要繼承ChannelInboundHandlerAdapter類,它提供了ChannelInboundHandler的默認(rèn)實現(xiàn)。
?在ChannelInboundHandler中,我們感興趣的方法有:
channelRead()——對于每個傳入的消息都要調(diào)用
channelReadComplete()——通知ChannelInboundHandler最后一次對channelRead()的調(diào)用是當(dāng)前批量讀取中的最后一條消息
execeptionCaught()——在讀取操作期間,有異常拋出時會調(diào)用。
EchoServerHandler實現(xiàn)package cn.sh.demo.echo; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; /** * @author sh * @ChannelHandler.Sharable 標(biāo)示一個ChannelHandler可以被多個Channel安全地共享 */ @ChannelHandler.Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; //將接受到的消息輸出到客戶端 System.out.println("Server received:" + in.toString(CharsetUtil.UTF_8)); //將接收到的消息寫給發(fā)送者,而不沖刷出站消息 ctx.write(in); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { //將消息沖刷到客戶端,并且關(guān)閉該Channel ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { //打印異常堆棧跟蹤 cause.printStackTrace(); //關(guān)閉該Channel ctx.close(); } }
備注
ChannelInbounHandlerAdapter每個方法都可以被重寫然后掛鉤到事件生命周期的恰當(dāng)點(diǎn)上。
重寫exceptionCaught()方法允許你對Throwable的任何子類型作出反應(yīng)。
每個Channel都擁有一個與之關(guān)聯(lián)的ChannelPipeline,ChannelPipeline持有一個ChannelHandler的實例鏈。在默認(rèn)情況下,ChannelHandler會把對方法的調(diào)用轉(zhuǎn)發(fā)給鏈中的下一個ChannelHandler。因此,如果exceptionCaught()方法沒有被該鏈中的某處實現(xiàn),那么異常將會被傳遞到ChannelPipeline的末端進(jìn)行記錄
引導(dǎo)服務(wù)器主要涉及的內(nèi)容
綁定監(jiān)聽并接受傳入連接請求的端口
配置Channel,將有關(guān)的入站消息通知給EchoServerHandler實例
Echo服務(wù)引導(dǎo)示例代碼
package cn.sh.demo.echo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public void startServer() throws InterruptedException { EchoServerHandler serverHandler = new EchoServerHandler(); //創(chuàng)建EventLoopGroup EventLoopGroup group = new NioEventLoopGroup(); //創(chuàng)建ServerBootstrap ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group) //指定所使用的NIO傳輸Channel .channel(NioServerSocketChannel.class) //使用指定的端口套接字 .localAddress(new InetSocketAddress(port)) //添加一個EchoServerHandler到子Channel的ChannelPipeline .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel channel) throws Exception { //此處由于EchoServerHandler被注解標(biāo)注為@Shareble,所以我們總是使用相同的實例 channel.pipeline().addLast(serverHandler); } }); try { //異步的綁定服務(wù)器,調(diào)用sync()方法阻塞等待直到綁定完成 ChannelFuture channelFuture = bootstrap.bind().sync(); //獲取Channel的CloseFuture,并且阻塞當(dāng)前線程直到它完成 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //關(guān)閉EventLoopGroup,釋放所有的資源 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws InterruptedException { if (args.length != 1) { System.err.println("參數(shù)類型或者個數(shù)不正確"); return; } //設(shè)置端口值 int port = Integer.parseInt(args[0]); //啟動Echo服務(wù)器 new EchoServer(port).startServer(); } }
備注
此處使用了一個特殊的類——ChannelInitializer。當(dāng)一個新的連接被接受時,一個新的子Channel會被創(chuàng)建,此時ChannelInitializer就會把一個EchoServerHandler的示例添加到該Channel的ChannelPipeline中,這個ChannelHandler將會收到有關(guān)入站消息的通知。
回顧引導(dǎo)服務(wù)創(chuàng)建一個ServerBootStrap實例來引導(dǎo)和綁定服務(wù)器
創(chuàng)建并分配一個NioEventLoopGroup實例進(jìn)行事件的處理,如接受新連接以及讀/寫數(shù)據(jù)
指定服務(wù)器綁定的本地InetSocketAddress
使用EchoServerHandler的實例初始化每一個新的Channel
調(diào)用ServerBootstrap.bind()方法來綁定服務(wù)器
Echo客戶端客戶端主要包括的操作:
連接到服務(wù)器
發(fā)送一個或多個消息
對于每個消息,等待并接受從服務(wù)器發(fā)回相同的消息
關(guān)閉連接
編寫客戶端主要包括業(yè)務(wù)邏輯和引導(dǎo)
ChannelHandler實現(xiàn)客戶端邏輯在該示例中,我們使用SimpleChannelInboundHandler類來處理所有的事件,主要的方法有:
channelActive()——在和服務(wù)器的連接已經(jīng)建立之后被調(diào)用
channelRead0()——當(dāng)從服務(wù)器接收到一條消息時被調(diào)用
exceptionCaught()——在處理過程中引發(fā)異常時被調(diào)用
示例代碼如下:
package cn.sh.demo.echo; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; /** * @author sh * @ChannelHandler.Sharable 標(biāo)記該類的示例可以被多個Channel共享 */ @ChannelHandler.Sharable public class EchoClientHandler extends SimpleChannelInboundHandler{ @Override public void channelActive(ChannelHandlerContext ctx) { //當(dāng)一個連接被服務(wù)器接受并建立后,發(fā)送一條消息 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Netty", CharsetUtil.UTF_8)); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) { //記錄客戶端接收到服務(wù)器的消息 System.out.println("Client received:" + byteBuf.toString(CharsetUtil.UTF_8)); } /** * 在發(fā)生異常時,記錄錯誤并關(guān)閉Channel * @param ctx * @param cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
備注
?每次在接受數(shù)據(jù)時,都會調(diào)用channelRead0()方法。需要注意的是,由服務(wù)器發(fā)送的消息可能會被分塊接受。也就是說,如果服務(wù)器發(fā)送了5字節(jié),那么不能保證這5字節(jié)會被一次性接受。即使是對于這么少量的數(shù)據(jù),channelRead0()方法也可能會被調(diào)用兩次,第一次使用一個持有3字節(jié)的ByteBuf(Netty的字節(jié)容器),第二次使用一個持有2字節(jié)的ByteBuf。作為一個面向流的協(xié)議,TCP保證了字節(jié)數(shù)組會按照服務(wù)器發(fā)送它們的順序被接受。
主要和業(yè)務(wù)邏輯如何處理消息以及Netty如何管理資源有關(guān)
客戶端中,當(dāng)channelRead0()方法完成時,已經(jīng)接受了消息并且處理完畢,當(dāng)該方法返回時,SimpleChannelInboundHandler負(fù)責(zé)釋放指向保存該消息的ByteBuf的內(nèi)存引用。
但是在服務(wù)器端,你需要將消息返回給客戶端,write()操作是異步的,直到channelRead()方法返回后有可能仍然沒有完成,ChannelInboundHandlerAdapter在這個時間點(diǎn)上不會釋放消息。
服務(wù)端的消息是在channelComplete()方法中,通過writeAndFlush()方法調(diào)用時被釋放。
客戶端使用主機(jī)和端口參數(shù)來連接遠(yuǎn)程地址,也就是Echo服務(wù)器的地址,而不是綁定到一個一直被監(jiān)聽的端口。
示例代碼如下:
package cn.sh.demo.echo; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); //創(chuàng)建客戶端引導(dǎo)器 Bootstrap bootstrap = new Bootstrap(); //指定使用NioEventLoopGroup來處理客戶端事件 bootstrap.group(group) //指定使用NIO傳輸?shù)腃hannel類型 .channel(NioSocketChannel.class) //設(shè)置服務(wù)器的InetSocketAddress .remoteAddress(new InetSocketAddress(host, port)) //在創(chuàng)建Channel時,向ChannelPipeline中添加一個EchoHandler實例 .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new EchoClientHandler()); } }); try { //連接到遠(yuǎn)程節(jié)點(diǎn),阻塞等待直到連接完成 ChannelFuture future = bootstrap.connect().sync(); //阻塞直到Channel關(guān)閉 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //關(guān)閉線程池并且釋放所有的資源 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws InterruptedException { if (args.length != 2) { System.err.println("參數(shù)個數(shù)不正確"); return; } int port = Integer.parseInt(args[1]); new EchoClient(args[0], port).start(); } }
備注
服務(wù)器和客戶端均使用了NIO傳輸,但是,客戶端和服務(wù)端可以使用不同的傳輸,例如,在服務(wù)器使用NIO傳輸,客戶端可以使用OIO傳輸
回顧引導(dǎo)服務(wù)器創(chuàng)建一個Bootstrap實例,引導(dǎo)并創(chuàng)建客戶端
創(chuàng)建一個NioEventLoopGroup實例來進(jìn)行事件處理,其中事件處理包括創(chuàng)建新的連接以及處理入站和出站數(shù)據(jù)
為服務(wù)器連接創(chuàng)建了一個InetSocketAddress實例
當(dāng)連接建立時,一個EchoClientHandler實例會被添加到(該Channel)的ChannelPipeline中
設(shè)置完成后,調(diào)用Bootstrap.connetc()連接到遠(yuǎn)程節(jié)點(diǎn)
運(yùn)行程序啟動服務(wù)端
再啟動客戶端
服務(wù)端的輸出如下:
客戶端的輸出如下:
代碼地址該文章的示例代碼位于cn.sh.demo.echo包下。
示例代碼,點(diǎn)擊查看
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/69234.html
摘要:前言本文以自帶的示例工程為例,簡要介紹線程模型示例工程的代碼位于很簡單,僅包含一個方法用于初始化以及,我們來看看其中和線程模型相關(guān)的一些代碼在的初始化代碼中實例化了兩個對象和,它們有著公共基類,這個是線程模型的核心類名讓人聯(lián)想到組合模式, 前言 本文以 netty 4.1 自帶的示例工程 netty-example 為例,簡要介紹 netty 線程模型 EchoServer echo ...
摘要:目前為止,我們已經(jīng)完成了一半的工作,剩下的就是在方法中啟動服務(wù)器。第一個通常被稱為,負(fù)責(zé)接收已到達(dá)的。這兩個指針恰好標(biāo)記著數(shù)據(jù)的起始終止位置。 前言 本篇翻譯自netty官方Get Start教程,一方面能把好的文章分享給各位,另一方面能鞏固所學(xué)的知識。若有錯誤和遺漏,歡迎各位指出。 https://netty.io/wiki/user-gu... 面臨的問題 我們一般使用專用軟件或者...
摘要:目錄源碼分析之番外篇的前生今世的前生今世之一簡介的前生今世之二小結(jié)的前生今世之三詳解的前生今世之四詳解源碼分析之零磨刀不誤砍柴工源碼分析環(huán)境搭建源碼分析之一揭開神秘的紅蓋頭源碼分析之一揭開神秘的紅蓋頭客戶端源碼分析之一揭開神秘的紅蓋頭服務(wù)器 目錄 Netty 源碼分析之 番外篇 Java NIO 的前生今世 Java NIO 的前生今世 之一 簡介 Java NIO 的前生今世 ...
摘要:結(jié)構(gòu)作為服務(wù)端作為序列化數(shù)據(jù)的協(xié)議前端通訊演示地址服務(wù)端實現(xiàn)啟動類長連接示例主線程組從線程組請求的解碼和編碼把多個消息轉(zhuǎn)換為一個單一的或是,原因是解碼器會在每個消息中生成多個消息對象主要用于處理大數(shù)據(jù)流,比如一個大小的文件如果你直接傳輸肯定 結(jié)構(gòu) netty 作為服務(wù)端 protobuf 作為序列化數(shù)據(jù)的協(xié)議 websocket 前端通訊 演示 GitHub 地址 showImg(...
摘要:結(jié)構(gòu)作為服務(wù)端作為序列化數(shù)據(jù)的協(xié)議前端通訊演示地址服務(wù)端實現(xiàn)啟動類長連接示例主線程組從線程組請求的解碼和編碼把多個消息轉(zhuǎn)換為一個單一的或是,原因是解碼器會在每個消息中生成多個消息對象主要用于處理大數(shù)據(jù)流,比如一個大小的文件如果你直接傳輸肯定 結(jié)構(gòu) netty 作為服務(wù)端 protobuf 作為序列化數(shù)據(jù)的協(xié)議 websocket 前端通訊 演示 GitHub 地址 showImg(...
閱讀 797·2021-08-23 09:46
閱讀 946·2019-08-30 15:44
閱讀 2604·2019-08-30 13:53
閱讀 3050·2019-08-29 12:48
閱讀 3873·2019-08-26 13:46
閱讀 1807·2019-08-26 13:36
閱讀 3520·2019-08-26 11:46
閱讀 1419·2019-08-26 10:48