摘要:目前為止,我們已經(jīng)完成了一半的工作,剩下的就是在方法中啟動(dòng)服務(wù)器。第一個(gè)通常被稱為,負(fù)責(zé)接收已到達(dá)的。這兩個(gè)指針恰好標(biāo)記著數(shù)據(jù)的起始終止位置。
前言
本篇翻譯自netty官方Get Start教程,一方面能把好的文章分享給各位,另一方面能鞏固所學(xué)的知識(shí)。若有錯(cuò)誤和遺漏,歡迎各位指出。
https://netty.io/wiki/user-gu...
面臨的問題我們一般使用專用軟件或者是開源庫和其他系統(tǒng)通信。舉個(gè)例子,我們通常使用 http 客戶端從 web 服務(wù)器獲取信息,或者通過 web service 執(zhí)行一個(gè) remote procedure call (遠(yuǎn)程調(diào)用)。然而,一個(gè)通用的協(xié)議時(shí)常不具備很好的擴(kuò)展性,例如我們不會(huì)使用一個(gè)通用 http 服務(wù)器去做如下類型的數(shù)據(jù)交換——大文件,電子郵件,近實(shí)時(shí)的金融數(shù)據(jù)或者是游戲數(shù)據(jù)。因此,一個(gè)高度優(yōu)化的致力于解決某些問題的通訊協(xié)議是很有必要的,例如你希望實(shí)現(xiàn)一臺(tái)優(yōu)化過的 http 服務(wù)器,致力于聊天應(yīng)用,流媒體傳輸,大文件傳輸?shù)?。你甚至可以為已有需求量身定做一個(gè)全新的通信協(xié)議。另一個(gè)不可避免的情況是,你必須處理一個(gè)古老的專用協(xié)議,使用他去跟遺留系統(tǒng)通信,問題是我們?cè)撊绾慰焖賹?shí)現(xiàn)協(xié)議,同時(shí)不犧牲應(yīng)用的穩(wěn)定性和性能。
解決方法The Netty project is an effort to provide an asynchronous event-driven network application framework and tooling for the rapid development of maintainable high-performance · high-scalability protocol servers and clients.
In other words, Netty is an NIO client server framework that enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server development.
"Quick and easy" does not mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences learned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.
Some users might already have found other network application framework that claims to have the same advantage, and you might want to ask what makes Netty so different from them. The answer is the philosophy it is built on. Netty is designed to give you the most comfortable experience both in terms of the API and the implementation from the day one. It is not something tangible but you will realize that this philosophy will make your life much easier as you read this guide and play with Netty.
Get Started本章使用簡單的例子帶你瀏覽 netty 的核心構(gòu)造,你快速上手。在本章過后,你就可以寫出一個(gè)基于 netty 的客戶端和服務(wù)器。如果你希望有更好的學(xué)習(xí)體驗(yàn),你可以先瀏覽 Chapter 2, Architectural Overview 后再回來本章學(xué)習(xí) (先看這里也是OK的)。
開始之前能跑通本章例子的最低要求:最新版本的 netty(4.x) 和 JDK 1.6 或以上的版本。
在閱讀時(shí),當(dāng)你對(duì)本章中出現(xiàn)的 class 感到疑惑,請(qǐng)查閱他們的 api 文檔。而本章幾乎所有的 class 都會(huì)鏈接到他們的 api 文檔。如果你發(fā)現(xiàn)本章中有什么錯(cuò)誤的信息、代碼語法錯(cuò)誤、或者有什么好的想法,也請(qǐng)聯(lián)系 netty 社區(qū)通知我們。
世界上最簡單的協(xié)議并不是 hello world,而是Discard。這種協(xié)議會(huì)拋棄掉所有接收到的數(shù)據(jù),不會(huì)給客戶端任何響應(yīng),所以實(shí)現(xiàn)Discard協(xié)議唯一要做的是忽略所有接收到的數(shù)據(jù)。接下來讓我們著手寫一個(gè) handler,用來處理I/O events(I/O事件)。
package io.netty.example.discard; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * Handles a server-side channel. */ public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1) @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2) // Discard the received data silently. ((ByteBuf) msg).release(); // (3) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
1.DiscardServerHandler 繼承ChannelInboundHandlerAdapter,并間接實(shí)現(xiàn)了ChannelInboundHandler。ChannelInboundHandler接口提供了多種 event handler method (事件處理方法),你可能要逐個(gè)實(shí)現(xiàn)接口中的方法,但直接繼承ChannelInboundHandlerAdapter會(huì)是更好的選擇。
2.這里我們重寫了channelRead(),當(dāng)有新數(shù)據(jù)到達(dá)時(shí)該方法就會(huì)被調(diào)用,并附帶接收到的數(shù)據(jù)作為方法參數(shù)。在本例中,接收到的數(shù)據(jù)類型是ByteBuf。
3.要實(shí)現(xiàn) Discard 協(xié)議,這里 handler 會(huì)忽略接收到的數(shù)據(jù)。ByteBuf作為 reference-counted (引用計(jì)數(shù)) 對(duì)象,通過調(diào)用方法release()釋放資源,請(qǐng)記住這個(gè) release 動(dòng)作在 handler 中完成 (原文:是handler的職責(zé))。通常,我們會(huì)像下面那樣實(shí)現(xiàn)channelRead()
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { // Do something with msg } finally { ReferenceCountUtil.release(msg); } }
4.當(dāng) netty 發(fā)生 I/O 錯(cuò)誤,或者 handler 在處理 event (事件) 拋出異常時(shí),exceptionCaught()就會(huì)被調(diào)用。大多數(shù)情況下我們應(yīng)該記錄下被捕獲的異常,并關(guān)閉與之關(guān)聯(lián)的channel(通道),但同時(shí)你也可以做一些額外的異常處理,例如在關(guān)閉連接之前,你可能會(huì)發(fā)送一條帶有錯(cuò)誤代碼的響應(yīng)消息。
目前為止,我們已經(jīng)完成了一半的工作,剩下的就是在main()方法中啟動(dòng)Discard服務(wù)器。
package io.netty.example.discard; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Discards any incoming data. */ public class DiscardServer { private int port; public DiscardServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new DiscardServer(port).run(); } }
NioEventLoopGroup是一個(gè)處理I/O操作的事件循環(huán)器 (其實(shí)是個(gè)線程池)。netty為不同類型的傳輸協(xié)議提供了多種NioEventLoopGroup的實(shí)現(xiàn)。在本例中我們要實(shí)現(xiàn)一個(gè)服務(wù)端應(yīng)用,并使用了兩個(gè)NioEventLoopGroup。第一個(gè)通常被稱為boss,負(fù)責(zé)接收已到達(dá)的 connection。第二個(gè)被稱作 worker,當(dāng) boss 接收到 connection 并把它注冊(cè)到 worker 后,worker 就可以處理 connection 上的數(shù)據(jù)通信。要?jiǎng)?chuàng)建多少個(gè)線程,這些線程如何匹配到Channel上會(huì)隨著EventLoopGroup實(shí)現(xiàn)的不同而改變,或者你可以通過構(gòu)造器去配置他們。
ServerBootstrap是用來搭建 server 的協(xié)助類。你也可以直接使用Channel搭建 server,然而這樣做步驟冗長,不是一個(gè)好的實(shí)踐,大多數(shù)情況下建議使用ServerBootstrap。
這里我們指定NioServerSocketChannel類,用來初始化一個(gè)新的Channel去接收到達(dá)的connection。
這里的 handler 會(huì)被用來處理新接收的Channel。ChannelInitializer是一個(gè)特殊的 handler,幫助開發(fā)者配置Channel,而多數(shù)情況下你會(huì)配置Channel下的ChannelPipeline,往 pipeline 添加一些 handler (例如DiscardServerHandler) 從而實(shí)現(xiàn)你的應(yīng)用邏輯。當(dāng)你的應(yīng)用變得復(fù)雜,你可能會(huì)向 pipeline 添加更多的 handler,并把這里的匿名類抽取出來作為一個(gè)多帶帶的類。
你可以給Channel配置特有的參數(shù)。這里我們寫的是 TCP/IP 服務(wù)器,所以可以配置一些 socket 選項(xiàng),例如 tcpNoDeply 和 keepAlive。請(qǐng)參考ChannelOption和ChannelConfig文檔來獲取更多可用的 Channel 配置選項(xiàng),并對(duì)此有個(gè)大概的了解。
注意到option()和childOption()了嗎?option()用來配置NioServerSocketChannel(負(fù)責(zé)接收到來的connection),而childOption()是用來配置被ServerChannel(這里是NioServerSocketChannel) 所接收的Channel
剩下的事情就是綁定端口并啟動(dòng)服務(wù)器,這里我們綁定到機(jī)器的8080端口。你可以多次調(diào)用bind()(基于不同的地址)。
剛剛,你使用 netty 完成了第一個(gè)服務(wù)端程序,可喜可賀!
處理接收到的數(shù)據(jù)既然我們完成了第一個(gè)服務(wù)端程序,接下來要就要對(duì)它進(jìn)行測(cè)試。最簡單的方法是使用命令行 telnet,例如在命令行輸入telnet localhost 8080,然后再輸入點(diǎn)別的東西。
然而我們并不知道服務(wù)端是否真的在工作,因?yàn)樗?Discard Server,我們得不到任何響應(yīng)。為了證明他真的在工作,我們讓服務(wù)端打印接收到的數(shù)據(jù)。
我們知道當(dāng)接收到數(shù)據(jù)時(shí),channelRead()會(huì)被調(diào)用。所以讓我們加點(diǎn)代碼到 DiscardServerHandler 的channelRead()中:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; try { while (in.isReadable()) { // (1) System.out.print((char) in.readByte()); System.out.flush(); } } finally { ReferenceCountUtil.release(msg); // (2) } }
這步低效的循環(huán)可以替換成System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
你可以用in.release()替換這里的代碼。
如果你再運(yùn)行 telnet 命令,服務(wù)端會(huì)打印出接收到的數(shù)據(jù)。
Discard Server 完整的源碼放在io.netty.example.discard這個(gè)包中。
目前為止,我們寫的服務(wù)程序消費(fèi)了數(shù)據(jù)但沒有給出任何響應(yīng),而作為一臺(tái)服務(wù)器理應(yīng)要對(duì)每一個(gè)請(qǐng)求作出響應(yīng)。接下來讓我們實(shí)現(xiàn) ECHO 協(xié)議,學(xué)習(xí)如何響應(yīng)消息并把接收到的數(shù)據(jù)發(fā)回客戶端。
Echo Server 跟 Discard Server 唯一不同的地方在于,他把接收到的數(shù)據(jù)返回給客戶端,而不是把他們打印到控制臺(tái)。所以這里我們只需要修改channelRead()就行了:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); // (1) ctx.flush(); // (2) }
ChannelHandlerContext能觸發(fā)多種 I/O 事件和操作,這里我們調(diào)用write()方法逐字寫回接收到的數(shù)據(jù)。請(qǐng)注意我們并沒有釋放接收到的消息Object msg,因?yàn)樵趯憯?shù)據(jù)時(shí)ctx.write(msg),netty 已經(jīng)幫你釋放它了。
ctx.write()關(guān)沒有把消息寫到網(wǎng)絡(luò)上,他在內(nèi)部被緩存起來,你需要調(diào)用ctx.flush()把他刷新到網(wǎng)絡(luò)上。ctx.writeAndFlush(msg)是個(gè)更簡潔的方法。
如果再次使用命令行 telnet,你會(huì)看到服務(wù)端返回了你輸入過的東西。完整的 Echo Server 源碼放在io.netty.example.echo包下面。
寫一個(gè) Time Server (時(shí)間服務(wù)器)我們這一小節(jié)要實(shí)現(xiàn) TIME 協(xié)議。跟前面的例子不同,Timer Server 在連接建立時(shí) (收到請(qǐng)求前) 就返回一個(gè)32位 (4字節(jié)) 整數(shù),并在發(fā)送成功后關(guān)閉連接。在本例中,將會(huì)學(xué)習(xí)到如何構(gòu)造和發(fā)送一個(gè)消息,在發(fā)送完成時(shí)關(guān)閉連接。
因?yàn)橐趧偨⑦B接時(shí)發(fā)送消息而不管后來接收到的數(shù)據(jù),這次我們不能使用channelRead(),取而代之的是channelActive方法,以下是具體實(shí)現(xiàn):
package io.netty.example.time; public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(final ChannelHandlerContext ctx) { // (1) final ByteBuf time = ctx.alloc().buffer(4); // (2) time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); final ChannelFuture f = ctx.writeAndFlush(time); // (3) f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { assert f == future; ctx.close(); } }); // (4) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
當(dāng)連接被建立后channelActive()方法會(huì)被調(diào)用,我們?cè)诜椒w中發(fā)送一個(gè)32位的代表當(dāng)前時(shí)間的整數(shù)。
要發(fā)送一個(gè)新的消息,需要分配一個(gè)新的buffer(緩沖區(qū)) 去包含這個(gè)消息。我們要寫一個(gè)32位的整數(shù),因此緩沖區(qū)ByteBuf的容量至少是4個(gè)字節(jié)。通過ChannelHandlerContext.alloc()獲取ByteBufAllocator(字節(jié)緩沖區(qū)分配器),用他來分配一個(gè)新的buffer
像往常一樣把消息寫到網(wǎng)絡(luò)上。
等一下Σ( ° △ °|||),flip()方法哪去了?還記不記得在NIO中曾經(jīng)使用過的java.nio.ByteBuffer.flip()(簡單總結(jié)就是把ByteBuffer從寫模式變成讀模式)?ByteBuf并沒有這個(gè)方法,因?yàn)樗藘蓚€(gè)指針——讀指針和寫指針 (讀寫標(biāo)記,不要理解成C里的指針)。當(dāng)你往ByteBuf寫數(shù)據(jù)時(shí),寫指針會(huì)移動(dòng)而讀指針不變。這兩個(gè)指針恰好標(biāo)記著數(shù)據(jù)的起始、終止位置。
與之相反,原生 NIO 并沒有提供一個(gè)簡潔的方式去標(biāo)記數(shù)據(jù)的起始和終止位置,你必須要調(diào)用flip方法。有 時(shí)候你很可能忘記調(diào)用flip方法,導(dǎo)致發(fā)送不出數(shù)據(jù)或發(fā)送了錯(cuò)誤數(shù)據(jù)。這樣的錯(cuò)誤并不會(huì)發(fā)生在 netty,因?yàn)?netty 有不同的指針去應(yīng)對(duì)不同的操作 (讀寫操作),這使得編程更加簡單,因?yàn)槟悴辉傩枰?flipping out (瘋狂輸出原生 NIO)
其他需要注意的是ChannelHandlerContext.write()/writeAndFlush()方法返回了ChannelFuture。ChannelFuture表示一個(gè)還沒發(fā)生的 I/O 操作。這意味著你請(qǐng)求的一些 I/O 操作可能還沒被處理,因?yàn)?netty 中所有的操作都是異步的。舉個(gè)例子,下面的代碼可能在消息發(fā)送之前就關(guān)閉了連接:
Channel ch = ...; ch.writeAndFlush(message); ch.close();
所以,你要在 (write()返回的)ChannelFuture完成之后再調(diào)用close()。當(dāng)write操作完成后,ChannelFuture會(huì)通知到他的listeners(監(jiān)聽器)。需加注意,close()方法可能不會(huì)立即關(guān)閉鏈接,同樣close()也會(huì)返回一個(gè)ChannelFuture
那么我們?nèi)绾沃缹懖僮魍瓿闪??很簡單,只要?b>ChannelFuture注冊(cè)監(jiān)聽器 (ChannelFutureListener) 就行。這一步,我們創(chuàng)建了ChannelFutureListener的匿名類,在寫操作完成時(shí)關(guān)閉鏈接。
你也可以使用已經(jīng)定義好的監(jiān)聽器,例如這樣:
f.addListener(ChannelFutureListener.CLOSE);
為了測(cè)試 Time server 是否如期工作,你可以使用 unix 的命令行:
$ rdate -o寫一個(gè) Time Client (時(shí)間客戶端)-p
跟 DISCARD 和 ECHO 服務(wù)器不同,我們要寫一個(gè)客戶端程序應(yīng)對(duì) TIME 協(xié)議,因?yàn)槟銦o法把一個(gè)32位整數(shù)翻譯成日期。本節(jié)中,我們確保服務(wù)端正常工作,并學(xué)習(xí)如何使用 netty 寫一個(gè)客戶端程序。
netty 客戶端和服務(wù)器最大的不同在于,客戶端使用了不同的Bootstrap和Channel實(shí)現(xiàn)類。請(qǐng)看下面的例子:
package io.netty.example.time; public class TimeClient { public static void main(String[] args) throws Exception { String host = args[0]; int port = Integer.parseInt(args[1]); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); // (1) b.group(workerGroup); // (2) b.channel(NioSocketChannel.class); // (3) b.option(ChannelOption.SO_KEEPALIVE, true); // (4) b.handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); // (5) // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }
Bootstrap跟ServerBootstrap相似,但他是作用在客戶端或無連接模式的 Channel (通道)。
如果你只定義了一個(gè)EventLoopGroup,他會(huì)同時(shí)作為 boss group 和 worker group,雖然客戶端并沒有 boss worker 這個(gè)概念。
這里使用NioSocketChannel而不是NioServerSocketChannel。NioSocketChannel會(huì)被用來創(chuàng)建客戶端Channel
跟ServerBootstrap不同,這里我們沒有使用childOption(),因?yàn)榭蛻舳说?b>SocketChannel沒有父Channel
我們使用connect()代替bind()方法。
正如你所見,客戶端代碼跟服務(wù)端代碼沒有很大的區(qū)別。那么接下來就是實(shí)現(xiàn)ChannelHandler,他會(huì)從服務(wù)端接收一個(gè)32位整數(shù),翻譯成可讀的日期格式并打印出來,最后關(guān)閉連接:
package io.netty.example.time; import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; // (1) try { long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } finally { m.release(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
處理TCP/IP時(shí), netty 把讀取的數(shù)據(jù)放到ByteBuf
看起來非常簡單,跟服務(wù)端沒有多大區(qū)別。然而有時(shí)候 handler 會(huì)發(fā)生錯(cuò)誤,例如拋出異常IndexOutOfBoundsException,在下一章節(jié)我們會(huì)作具體討論。
基于流的傳輸協(xié)議 關(guān)于Socket緩沖區(qū)的小警告像TCP/IP這種基于流的傳輸協(xié)議,接收的數(shù)據(jù)會(huì)存儲(chǔ)到socket緩沖區(qū)。不幸的是,這類緩沖區(qū)不是數(shù)據(jù)包隊(duì)列,而是字節(jié)流隊(duì)列。這意味著,即使你想發(fā)送兩個(gè)消息并打包成兩個(gè)數(shù)據(jù)包,操作系統(tǒng)只會(huì)把他們當(dāng)作一連串字節(jié)。因此,這不能保證你讀到的數(shù)據(jù)恰好是遠(yuǎn)程發(fā)送端寫出的數(shù)據(jù)。舉個(gè)例子,假設(shè)操作系統(tǒng)TCP/IP棧收到三個(gè)數(shù)據(jù)包:
因?yàn)檫@種流式協(xié)議的特性,應(yīng)用程序很有可能像下圖的方式那樣讀取數(shù)據(jù)碎片:
所以,作為接收端 (不管是服務(wù)端還是客戶端),應(yīng)把接收到的數(shù)據(jù) (字節(jié)流) 整理成一個(gè)或多個(gè)易于理解的數(shù)據(jù)貞。對(duì)于上述的例子,整理如下:
讓我們回到 TIME Client 這個(gè)例子。一32位整數(shù)的數(shù)據(jù)量非常小,在本例中不應(yīng)用被分割。然而,問題在于他確實(shí)有可能被分割,可能性隨著通信數(shù)據(jù)量的增大而增大。
一個(gè)簡單的方法是創(chuàng)建一個(gè)內(nèi)部的cumulative buffer(累積緩沖區(qū)),等待數(shù)據(jù)直到接收到4個(gè)字節(jié)為止。下面是修改過的TimeClientHandler:
package io.netty.example.time; import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { private ByteBuf buf; @Override public void handlerAdded(ChannelHandlerContext ctx) { buf = ctx.alloc().buffer(4); // (1) } @Override public void handlerRemoved(ChannelHandlerContext ctx) { buf.release(); // (1) buf = null; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; buf.writeBytes(m); // (2) m.release(); if (buf.readableBytes() >= 4) { // (3) long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
ChannelHandler有兩個(gè)與生命周期有關(guān)的監(jiān)聽方法:handlerAdded()和handlerRemove()。你可以在里面執(zhí)行任意的初始化或析構(gòu)任務(wù),只要他們不會(huì)阻塞程序很長時(shí)間。
首先,所有接收的數(shù)據(jù)被累積到緩沖區(qū)。
其次,handler 檢查緩沖區(qū)buf是否接收到足夠的數(shù)據(jù) (4個(gè)字節(jié)),若是,則進(jìn)行實(shí)際業(yè)務(wù)處理。否則當(dāng)有更多數(shù)據(jù)到達(dá)時(shí),netty 會(huì)再次調(diào)用channelRead(),直到緩沖區(qū)累積到4個(gè)字節(jié)。
解決方案二雖然方案一解決了問題,但修改過的 handler 看上去不是那么簡潔。想像一下協(xié)議變得更為復(fù)雜,例如包含多個(gè)可變長字段,你的ChannelInboundHandler很快會(huì)變得不可維護(hù)。
你可能會(huì)注意到,可以向ChannelPipeline添加多個(gè)ChannelHandler。所以,你可以把一個(gè)龐大復(fù)雜的ChannelHandler分割成多個(gè)小模塊,從而減小應(yīng)用的復(fù)雜性。舉個(gè)例子,你可以把TimeClientHandler分割成兩個(gè)handler:
處理數(shù)據(jù)碎片的TimeDecoder
最初始的簡單版本的TimeClientHandler
幸運(yùn)的是,netty 提供了一個(gè)可擴(kuò)展的父類,幫助你書寫TimeDecoder
package io.netty.example.time; public class TimeDecoder extends ByteToMessageDecoder { // (1) @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List
ByteToMessageDecoder實(shí)現(xiàn)了ChannelInboundHandler,使你更容易去處理數(shù)據(jù)碎片。
當(dāng)有新數(shù)據(jù)到達(dá)時(shí),ByteToMessageDecoder調(diào)用decode()方法并維護(hù)了一個(gè)內(nèi)部的cumulative buffer(累積緩沖區(qū))
累積緩沖區(qū)數(shù)據(jù)不足時(shí),decode()方法不會(huì)添加任何東西到 out 列表。當(dāng)有更多數(shù)據(jù)到達(dá)時(shí),ByteToMessageDecoder會(huì)再次調(diào)用decode()方法。
如果decode()方法向 out 列表添加了一個(gè)對(duì)象,這表示decoder(解碼器) 成功解析了一個(gè)消息。ByteToMessageDecoder會(huì)拋棄掉cumulative buffer(累積緩沖區(qū))中已讀數(shù)據(jù)。請(qǐng)記住,你不需要去解析多個(gè)消息,因?yàn)?b>ByteToMessageDecoder會(huì)持續(xù)調(diào)用decode(),直到他沒有往 out 列表添加對(duì)象。
既然希望往ChannelPipeline添加其他 handler (上面的TimeDecoder),我們要修改TimeClient中的ChannelInitializer:
b.handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler()); } });
如果你充滿冒險(xiǎn)精神,你可以嘗試使用ReplayingDecoder,他會(huì)使代碼更加簡潔:
public class TimeDecoder extends ReplayingDecoder{ @Override protected void decode( ChannelHandlerContext ctx, ByteBuf in, List
此外,netty 提供了很多開箱即用的decoder,他們已經(jīng)實(shí)現(xiàn)了大多數(shù)的網(wǎng)絡(luò)協(xié)議,避免你自己去實(shí)現(xiàn)一個(gè)龐大的難以維護(hù)的handler。請(qǐng)參考下面的包獲取更多詳細(xì)例子:
io.netty.example.factorial 二進(jìn)制協(xié)議
io.netty.example.telnet 文本協(xié)議
使用 POJO 代替 ByteBuf上面所有例子都使用了ByteBuf作為協(xié)議中基本的數(shù)據(jù)結(jié)構(gòu)。在本小節(jié),我們將要升級(jí) TIME 協(xié)議中的客戶端和服務(wù)端,使用 POJO 代替ByteBuf
使用 POJO 的優(yōu)勢(shì)是顯而易見的:你的 handler 變得易于維護(hù)和可重用,通過把 (從ByteBuf中抽取信息的) 代碼分離出來。在 TIME 協(xié)議的例子里,我們僅僅讀取一個(gè)32位的整數(shù),直接使用ByteBuf并不會(huì)有太大問題。然而,當(dāng)實(shí)現(xiàn)一個(gè)真實(shí)的網(wǎng)絡(luò)協(xié)議時(shí),你會(huì)發(fā)現(xiàn)做代碼分離很有必要。
首先,讓我們定義一個(gè)新的類型UnixTime:
package io.netty.example.time; import java.util.Date; public class UnixTime { private final long value; public UnixTime() { this(System.currentTimeMillis() / 1000L + 2208988800L); } public UnixTime(long value) { this.value = value; } public long value() { return value; } @Override public String toString() { return new Date((value() - 2208988800L) * 1000L).toString(); } }
現(xiàn)在修改TimeDecoder,讓他向out列表添加一個(gè)UnixTime而不是ByteBuf
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List
既然修改了TimeDecoder,TimeClientHandler也不能再使用ByteBuf了:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { UnixTime m = (UnixTime) msg; System.out.println(m); ctx.close(); }
是不是更加簡單、優(yōu)雅?相同的竅門同樣能使用在服務(wù)端。這次讓我們先修改TimeServerHandler:
@Override public void channelActive(ChannelHandlerContext ctx) { ChannelFuture f = ctx.writeAndFlush(new UnixTime()); f.addListener(ChannelFutureListener.CLOSE); }
現(xiàn)在只剩下encoder(編碼器),他需要實(shí)現(xiàn)ChannelOutboundHandler,把UnixTime翻譯回ByteBuf。這里比書寫decoder更加簡單,因?yàn)槲覀儾辉傩枰幚頂?shù)據(jù)包碎片并把他們組裝起來了。
package io.netty.example.time; public class TimeEncoder extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { UnixTime m = (UnixTime) msg; ByteBuf encoded = ctx.alloc().buffer(4); encoded.writeInt((int)m.value()); ctx.write(encoded, promise); // (1) } }
這一行代碼有幾個(gè)重要的點(diǎn):
首先,我們把參數(shù)中ChannelPromise傳遞到write(),以便 netty 把他標(biāo)記為成功或失敗 (當(dāng)數(shù)據(jù)真正寫到網(wǎng)絡(luò)時(shí))。
其次,我們沒有調(diào)用ctx.flush(),因?yàn)?b>ChannelOutboundHandlerAdapter中有一個(gè)多帶帶的方法void flush(ChannelHandlerContext ctx)專門用來處理flush操作。
你可以使用MessageToByteEncoder更加地簡化代碼:
public class TimeEncoder extends MessageToByteEncoder{ @Override protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) { out.writeInt((int)msg.value()); } }
最后一步就是把TimeEncoder添加到服務(wù)端ChannelPipeline,留作練習(xí)。
關(guān)閉你的應(yīng)用關(guān)閉netty應(yīng)用很簡單——通過shutdownGracefully()去關(guān)閉所有創(chuàng)建的EventLoopGroup。他返回一個(gè)Future去通知你什么時(shí)候EventLoopGroup和他從屬的 Channel 已經(jīng)完全關(guān)閉了。
總結(jié)本章,我們快速瀏覽了 netty,使用他書寫了一個(gè)可用的網(wǎng)絡(luò)應(yīng)用。接下來的章節(jié)中會(huì)介紹更多關(guān)于 netty 的詳細(xì)資料,我們也希望你去重溫io.netty.example package包中的例子。netty 社區(qū)的大門會(huì)向你敞開,你可以向社區(qū)提出問題和意見,您的的反饋會(huì)幫助netty項(xiàng)目變得更加完善。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/77054.html
摘要:的選擇器允許單個(gè)線程監(jiān)視多個(gè)輸入通道。一旦執(zhí)行的線程已經(jīng)超過讀取代碼中的某個(gè)數(shù)據(jù)片段,該線程就不會(huì)在數(shù)據(jù)中向后移動(dòng)通常不會(huì)。 1、引言 很多初涉網(wǎng)絡(luò)編程的程序員,在研究Java NIO(即異步IO)和經(jīng)典IO(也就是常說的阻塞式IO)的API時(shí),很快就會(huì)發(fā)現(xiàn)一個(gè)問題:我什么時(shí)候應(yīng)該使用經(jīng)典IO,什么時(shí)候應(yīng)該使用NIO? 在本文中,將嘗試用簡明扼要的文字,闡明Java NIO和經(jīng)典IO之...
摘要:響應(yīng)式編程是基于異步和事件驅(qū)動(dòng)的非阻塞程序,只是垂直通過在內(nèi)啟動(dòng)少量線程擴(kuò)展,而不是水平通過集群擴(kuò)展。三特性常用的生產(chǎn)的特性如下響應(yīng)式編程模型適用性內(nèi)嵌容器組件還有對(duì)日志消息測(cè)試及擴(kuò)展等支持。 摘要: 原創(chuàng)出處 https://www.bysocket.com 「公眾號(hào):泥瓦匠BYSocket 」歡迎關(guān)注和轉(zhuǎn)載,保留摘要,謝謝! 02:WebFlux 快速入門實(shí)踐 文章工程: JDK...
摘要:個(gè)高級(jí)多線程面試題及回答后端掘金在任何面試當(dāng)中多線程和并發(fā)方面的問題都是必不可少的一部分。目前在生產(chǎn)環(huán)基于的技術(shù)問答網(wǎng)站系統(tǒng)實(shí)現(xiàn)后端掘金這一篇博客將詳細(xì)介紹一個(gè)基于的問答網(wǎng)站的實(shí)現(xiàn),有詳細(xì)的代碼。 15 個(gè)高級(jí) Java 多線程面試題及回答 - 后端 - 掘金在任何Java面試當(dāng)中多線程和并發(fā)方面的問題都是必不可少的一部分。如果你想獲得任何股票投資銀行的前臺(tái)資訊職位,那么你應(yīng)該準(zhǔn)備很多...
摘要:個(gè)高級(jí)多線程面試題及回答后端掘金在任何面試當(dāng)中多線程和并發(fā)方面的問題都是必不可少的一部分。目前在生產(chǎn)環(huán)基于的技術(shù)問答網(wǎng)站系統(tǒng)實(shí)現(xiàn)后端掘金這一篇博客將詳細(xì)介紹一個(gè)基于的問答網(wǎng)站的實(shí)現(xiàn),有詳細(xì)的代碼。 15 個(gè)高級(jí) Java 多線程面試題及回答 - 后端 - 掘金在任何Java面試當(dāng)中多線程和并發(fā)方面的問題都是必不可少的一部分。如果你想獲得任何股票投資銀行的前臺(tái)資訊職位,那么你應(yīng)該準(zhǔn)備很多...
摘要:完成客戶端服務(wù)器通信,需要基于協(xié)議之上,自定義一套簡單的通信協(xié)議,其中數(shù)據(jù)交換方式需要使用自定義幀。輸入數(shù)據(jù)處理器以下為輸入數(shù)據(jù)的第一個(gè)處理器,可以保證無論幀經(jīng)歷怎樣的粘包拆包,均可以準(zhǔn)確提取每一個(gè)自定義幀的數(shù)據(jù)部分。 「博客搬家」 原地址: 簡書 原發(fā)表時(shí)間: 2017-03-26 本文采用 Netty 這一最流行的 Java NIO 框架,作為 Java 服務(wù)器通信部分的基礎(chǔ)...
閱讀 3701·2021-11-19 09:56
閱讀 1486·2021-09-22 15:11
閱讀 1142·2019-08-30 15:55
閱讀 3386·2019-08-29 14:02
閱讀 2927·2019-08-29 11:07
閱讀 447·2019-08-28 17:52
閱讀 3182·2019-08-26 13:59
閱讀 447·2019-08-26 13:53