摘要:當(dāng)操作系統(tǒng)發(fā)生事件,并且準(zhǔn)備好數(shù)據(jù)后,在主動(dòng)通知應(yīng)用程序,觸發(fā)相應(yīng)的函數(shù)。當(dāng)失敗時(shí)觸發(fā)該方法,第一個(gè)參數(shù)代表操作失敗引發(fā)的異?;蝈e(cuò)誤。
BIO編程
回顧下Linux下阻塞IO模型:
再看看Java的BIO編程模型:
/** * 類說明:客戶端 */ public class BioClient { public static void main(String[] args) throws InterruptedException, IOException { //通過構(gòu)造函數(shù)創(chuàng)建Socket,并且連接指定地址和端口的服務(wù)端 Socket socket = new Socket(DEFAULT_SERVER_IP,DEFAULT_PORT); System.out.println("請輸入請求消息:"); //啟動(dòng)讀取服務(wù)端輸出數(shù)據(jù)的線程 new ReadMsg(socket).start(); PrintWriter pw = null; //允許客戶端在控制臺(tái)輸入數(shù)據(jù),然后送往服務(wù)器 while(true){ pw = new PrintWriter(socket.getOutputStream()); pw.println(new Scanner(System.in).next()); pw.flush(); } } //讀取服務(wù)端輸出數(shù)據(jù)的線程 private static class ReadMsg extends Thread { Socket socket; public ReadMsg(Socket socket) { this.socket = socket; } @Override public void run() { //負(fù)責(zé)socket讀寫的輸入流 try (BufferedReader br = new BufferedReader( new InputStreamReader(socket.getInputStream()))){ String line = null; //通過輸入流讀取服務(wù)端傳輸?shù)臄?shù)據(jù) //如果已經(jīng)讀到輸入流尾部,返回null,退出循環(huán) //如果得到非空值,就將結(jié)果進(jìn)行業(yè)務(wù)處理 while((line=br.readLine())!=null){ System.out.printf("%s ",line); } } catch (SocketException e) { System.out.printf("%s ", "服務(wù)器斷開了你的連接"); } catch (Exception e) { e.printStackTrace(); } finally { clear(); } } //必要的資源清理工作 private void clear() { if (socket != null) try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * 類說明:bio的服務(wù)端主程序 */ public class BioServer { //服務(wù)器端必須 private static ServerSocket server; //線程池,處理每個(gè)客戶端的請求 private static ExecutorService executorService = Executors.newFixedThreadPool(5); private static void start() throws IOException{ try{ //通過構(gòu)造函數(shù)創(chuàng)建ServerSocket //如果端口合法且空閑,服務(wù)端就監(jiān)聽成功 server = new ServerSocket(DEFAULT_PORT); System.out.println("服務(wù)器已啟動(dòng),端口號:" + DEFAULT_PORT); while(true){ Socket socket= server.accept(); System.out.println("有新的客戶端連接----" ); //當(dāng)有新的客戶端接入時(shí),打包成一個(gè)任務(wù),投入線程池 executorService.execute(new BioServerHandler(socket)); } }finally{ if(server!=null){ server.close(); } } } public static void main(String[] args) throws IOException { start(); } } /** * 類說明: */ public class BioServerHandler implements Runnable{ private Socket socket; public BioServerHandler(Socket socket) { this.socket = socket; } public void run() { try(//負(fù)責(zé)socket讀寫的輸出、輸入流 BufferedReader in = new BufferedReader( new InputStreamReader(socket.getInputStream())); PrintWriter out = new PrintWriter(socket.getOutputStream(), true)){ String message; String result; //通過輸入流讀取客戶端傳輸?shù)臄?shù)據(jù) //如果已經(jīng)讀到輸入流尾部,返回null,退出循環(huán) //如果得到非空值,就將結(jié)果進(jìn)行業(yè)務(wù)處理 while((message = in.readLine())!=null){ System.out.println("Server accept message:"+message); result = response(message); //將業(yè)務(wù)結(jié)果通過輸出流返回給客戶端 out.println(result); } }catch(Exception e){ e.printStackTrace(); }finally{ if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null; } } } }
過程:
服務(wù)端提供IP和監(jiān)聽端口
客戶端通過連接操作想服務(wù)端監(jiān)聽的地址發(fā)起連接請求,通過三次握手連接
如果連接成功建立,雙方就可以通過套接字進(jìn)行通信
最早的時(shí)候服務(wù)器端是針對一個(gè)連接新建一個(gè)線程來處理→演變成服務(wù)端針對每個(gè)客戶端連接把請求丟進(jìn)線程池來處理任務(wù)
缺點(diǎn):若高并發(fā)場景且處理時(shí)間稍長則許多請求會(huì)阻塞一直等待,嚴(yán)重影響性能.
先回顧下Linux下AIO模型:
原生JDK網(wǎng)絡(luò)編程AIO:
異步IO采用“訂閱-通知”模式:即應(yīng)用程序向操作系統(tǒng)注冊IO監(jiān)聽,然后繼續(xù)做自己的事情。當(dāng)操作系統(tǒng)發(fā)生IO事件,并且準(zhǔn)備好數(shù)據(jù)后,在主動(dòng)通知應(yīng)用程序,觸發(fā)相應(yīng)的函數(shù)。
注意:異步IO里面客戶端和服務(wù)端均采用這種“訂閱-通知”模式.
AIO編程幾個(gè)核心類:
①:AsynchronousServerSocketChannel:類似BIO里面的ServerSocket
②:AsynchronousSocketChannel :類似BIO里面的socket用來通信,有三個(gè)方法:connect():用于連接到指定端口,指定IP地址的服務(wù)器,read()、write():完成讀寫
注意點(diǎn):
1.這三個(gè)方法會(huì)執(zhí)行就相當(dāng)于上面圖解里面的Subscrible函數(shù)向操作系統(tǒng)監(jiān)聽線程。
2.這幾個(gè)方法里面有個(gè)參數(shù),比如write(ByteBuffer src,A attachment,CompletionHandler Channel可看做JDK對IO的抽象,除了網(wǎng)絡(luò)通道,還有文件通道FileChannel。
③:CompletionHandler:源碼注釋是異步IO操作中用來處理消費(fèi)的結(jié)果,其實(shí)也就是結(jié)果回調(diào)函數(shù),連接丶讀寫都是異步操作都需要實(shí)現(xiàn)此接口。
而CompletionHandler接口中定義了兩個(gè)方法,
completed(V result , A attachment):當(dāng)IO完成時(shí)觸發(fā)該方法,該方法的第一個(gè)參數(shù)代表IO操作返回的對象,第二個(gè)參數(shù)代表發(fā)起IO操作時(shí)傳入的附加參數(shù)。
faild(Throwable exc, A attachment):當(dāng)IO失敗時(shí)觸發(fā)該方法,第一個(gè)參數(shù)代表IO操作失敗引發(fā)的異?;蝈e(cuò)誤。
先上代碼
客戶端:
/** * 類說明:aio的客戶端主程序 */ public class AioClient { //IO通信處理器 private static AioClientHandler clientHandle; public static void start(){ if(clientHandle!=null) return; clientHandle = new AioClientHandler(DEFAULT_SERVER_IP,DEFAULT_PORT); //負(fù)責(zé)網(wǎng)絡(luò)通訊的線程 new Thread(clientHandle,"Client").start(); } //向服務(wù)器發(fā)送消息 public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMessag(msg); return true; } public static void main(String[] args) throws Exception{ AioClient.start(); System.out.println("請輸入請求消息:"); Scanner scanner = new Scanner(System.in); while(AioClient.sendMsg(scanner.nextLine())); } } /** * 類說明:IO通信處理器,負(fù)責(zé)連接服務(wù)器,對外暴露對服務(wù)端發(fā)送數(shù)據(jù)的API */ public class AioClientHandler implements CompletionHandler,Runnable { private AsynchronousSocketChannel clientChannel; private String host; private int port; private CountDownLatch latch;//防止線程退出 public AioClientHandler(String host, int port) { this.host = host; this.port = port; try { //創(chuàng)建一個(gè)實(shí)際異步的客戶端通道 clientChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //創(chuàng)建CountDownLatch,因?yàn)槭钱惒秸{(diào)用,下面的connect不會(huì)阻塞, // 那么整個(gè)run方法會(huì)迅速結(jié)束,那么負(fù)責(zé)網(wǎng)絡(luò)通訊的線程也會(huì)迅速結(jié)束 latch = new CountDownLatch(1); //發(fā)起異步連接操作,回調(diào)參數(shù)就是這個(gè)實(shí)例本身, // 如果連接成功會(huì)回調(diào)這個(gè)實(shí)例的completed方法 clientChannel.connect(new InetSocketAddress(host,port), null,this); try { latch.await(); clientChannel.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } //連接成功,這個(gè)方法會(huì)被系統(tǒng)調(diào)用 @Override public void completed(Void result, AioClientHandler attachment) { System.out.println("已經(jīng)連接到服務(wù)端。"); } //連接失敗,這個(gè)方法會(huì)被系統(tǒng)調(diào)用 @Override public void failed(Throwable exc, AioClientHandler attachment) { System.err.println("連接失敗。"); exc.printStackTrace(); latch.countDown(); try { clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } //對外暴露對服務(wù)端發(fā)送數(shù)據(jù)的API public void sendMessag(String msg){ /*為了把msg變成可以在網(wǎng)絡(luò)傳輸?shù)母袷?/ byte[] bytes = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); /*進(jìn)行異步寫,同樣的這個(gè)方法會(huì)迅速返回, 需要提供一個(gè)接口讓系統(tǒng)在一次網(wǎng)絡(luò)寫操作完成后通知我們的應(yīng)用程序。 所以我們傳入一個(gè)實(shí)現(xiàn)了CompletionHandler的AioClientWriteHandler 第1個(gè)writeBuffer,表示我們要發(fā)送給服務(wù)器的數(shù)據(jù); 第2個(gè)writeBuffer,考慮到網(wǎng)絡(luò)寫有可能無法一次性將數(shù)據(jù)寫完,需要進(jìn)行多次網(wǎng)絡(luò)寫, 所以將writeBuffer作為附件傳遞給AioClientWriteHandler。 */ clientChannel.write(writeBuffer,writeBuffer, new AioClientWriteHandler(clientChannel,latch)); } } /** * 類說明:網(wǎng)絡(luò)寫的處理器,CompletionHandler 中 * Integer:本次網(wǎng)絡(luò)寫操作完成實(shí)際寫入的字節(jié)數(shù), * ByteBuffer:寫操作的附件,存儲(chǔ)了寫操作需要寫入的數(shù)據(jù) */ public class AioClientWriteHandler implements CompletionHandler { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public AioClientWriteHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { //有可能無法一次性將數(shù)據(jù)寫完,需要檢查緩沖區(qū)中是否還有數(shù)據(jù)需要繼續(xù)進(jìn)行網(wǎng)絡(luò)寫 if(buffer.hasRemaining()){ clientChannel.write(buffer,buffer,this); }else{ //寫操作已經(jīng)完成,為讀取服務(wù)端傳回的數(shù)據(jù)建立緩沖區(qū) ByteBuffer readBuffer = ByteBuffer.allocate(1024); /*這個(gè)方法會(huì)迅速返回,需要提供一個(gè)接口讓 系統(tǒng)在讀操作完成后通知我們的應(yīng)用程序。*/ clientChannel.read(readBuffer,readBuffer, new AioClientReadHandler(clientChannel,latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("數(shù)據(jù)發(fā)送失敗..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } } /** * 類說明:網(wǎng)絡(luò)讀的處理器 * CompletionHandler 中 * Integer:本次網(wǎng)絡(luò)讀操作實(shí)際讀取的字節(jié)數(shù), * ByteBuffer:讀操作的附件,存儲(chǔ)了讀操作讀到的數(shù)據(jù) * */ public class AioClientReadHandler implements CompletionHandler { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public AioClientReadHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String msg; try { msg = new String(bytes,"UTF-8"); System.out.println("accept message:"+msg); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { System.err.println("數(shù)據(jù)讀取失敗..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }
服務(wù)端:
/** * 類說明:服務(wù)器主程序 */ public class AioServer { private static AioServerHandler serverHandle; //統(tǒng)計(jì)客戶端個(gè)數(shù) public volatile static long clientCount = 0; public static void start(){ if(serverHandle!=null) return; serverHandle = new AioServerHandler(DEFAULT_PORT); new Thread(serverHandle,"Server").start(); } public static void main(String[] args){ AioServer.start(); } } /** * 類說明:處理用戶連接的處理器 */ public class AioAcceptHandler implements CompletionHandler{ @Override public void completed(AsynchronousSocketChannel channel, AioServerHandler serverHandler) { AioServer.clientCount++; System.out.println("連接的客戶端數(shù):" + AioServer.clientCount); //重新注冊監(jiān)聽,讓別的客戶端也可以連接 serverHandler.channel.accept(serverHandler,this); ByteBuffer readBuffer = ByteBuffer.allocate(1024); //1)ByteBuffer dst:接收緩沖區(qū),用于從異步Channel中讀取數(shù)據(jù)包; //2)? A attachment:異步Channel攜帶的附件,通知回調(diào)的時(shí)候作為入?yún)⑹褂茫? //3)? CompletionHandler :系統(tǒng)回調(diào)的業(yè)務(wù)handler,進(jìn)行讀操作 channel.read(readBuffer,readBuffer, new AioReadHandler(channel)); } @Override public void failed(Throwable exc, AioServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); } } /** * 類說明:讀數(shù)據(jù)的處理器 */ public class AioReadHandler implements CompletionHandler { private AsynchronousSocketChannel channel; public AioReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } //讀取到消息后的處理 @Override public void completed(Integer result, ByteBuffer attachment) { //如果條件成立,說明客戶端主動(dòng)終止了TCP套接字,這時(shí)服務(wù)端終止就可以了 if(result == -1) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } return; } //flip操作 attachment.flip(); byte[] message = new byte[attachment.remaining()]; attachment.get(message); try { System.out.println(result); String msg = new String(message,"UTF-8"); System.out.println("server accept message:"+msg); String responseStr = response(msg); //向客戶端發(fā)送消息 doWrite(responseStr); } catch (Exception e) { e.printStackTrace(); } } //發(fā)送消息 private void doWrite(String result) { byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //異步寫數(shù)據(jù) channel.write(writeBuffer, writeBuffer, new CompletionHandler () { @Override public void completed(Integer result, ByteBuffer attachment) { if(attachment.hasRemaining()){ channel.write(attachment,attachment,this); }else{ //讀取客戶端傳回的數(shù)據(jù) ByteBuffer readBuffer = ByteBuffer.allocate(1024); //異步讀數(shù)據(jù) channel.read(readBuffer,readBuffer, new AioReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 類說明:響應(yīng)網(wǎng)絡(luò)操作的處理器 */ public class AioServerHandler implements Runnable { public CountDownLatch latch; /*進(jìn)行異步通信的通道*/ public AsynchronousServerSocketChannel channel; public AioServerHandler(int port) { try { //創(chuàng)建服務(wù)端通道 channel = AsynchronousServerSocketChannel.open(); //綁定端口 channel.bind(new InetSocketAddress(port)); System.out.println("Server is start,port:"+port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { latch = new CountDownLatch(1); //用于接收客戶端的連接,異步操作, // 需要實(shí)現(xiàn)了CompletionHandler接口的處理器處理和客戶端的連接操作 channel.accept(this,new AioAcceptHandler()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } }
疑難點(diǎn)1:
怎么理解這里客戶端寫操作的處理器回調(diào)方法?
客戶端把ByteBuffer里面的數(shù)據(jù)寫到AsynchronousSocketChannel這個(gè)管道上,
如果ByteBuffer里面數(shù)據(jù)很大,超過了管道容量,這時(shí)會(huì)先完成寫操作,服務(wù)端收到數(shù)據(jù)回調(diào)這個(gè)completed方法
則需要ByteBuffer再寫入剩下的數(shù)據(jù)到管道里,每發(fā)完一次數(shù)據(jù)通知一次,這個(gè)管道容量取決于網(wǎng)卡的緩沖區(qū)。這個(gè)completed方法并不是說ByteBuffer的數(shù)據(jù)寫完了,而是當(dāng)前網(wǎng)卡這份數(shù)據(jù)寫完了.
疑難點(diǎn)2:
Buffer:
查看源碼可看到幾個(gè)重要屬性:
capacity:表示分配的內(nèi)存大小
position:類似指針類的索引,讀取或?qū)懭氲奈恢脴?biāo)識(shí)符,下一個(gè)可寫入的初始位置/下一個(gè)可讀取的初始位置
limit:可讀或可寫的范圍,小于等于capacity,當(dāng)小于capacity,limit到capaticy的最大容量值的這段空間不予寫入是放一些初始化值的.
ByteBuffer可以理解為放在內(nèi)存中的一個(gè)數(shù)組。
比如圖中一開始是寫入模式,寫入五個(gè)字節(jié),地址為0-4,position在5,調(diào)用flip方法后切換到讀模式,position變?yōu)?即開始序列,limit變?yōu)?,這樣就可以buffer開頭開始讀取了.
應(yīng)用場景:
可以服務(wù)端用AIO模型,客戶端使用BIO簡化編程,本文的例子即可調(diào)試,啟動(dòng)AioServer再啟動(dòng)BioClient,通信是沒問題的
AIO編程相對復(fù)雜,代碼中一些關(guān)鍵方法都有注釋,目前Linux下沒有真正意義上的AIO,實(shí)際上是用了NIO里面的epoll(true),底層原理還是用了IO復(fù)用(NIO).windows實(shí)現(xiàn)了AIO,AIO是未來的方向,需待linux內(nèi)核支持.
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/74892.html
摘要:后改良為用線程池的方式代替新增線程,被稱為偽異步。最大的問題是阻塞,同步。每次請求都由程序執(zhí)行并返回,這是同步的缺陷。這些都會(huì)被注冊在多路復(fù)用器上。多路復(fù)用器提供選擇已經(jīng)就緒狀態(tài)任務(wù)的能力。并沒有采用的多路復(fù)用器,而是使用異步通道的概念。 Netty是一個(gè)提供異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用框架,用以快速開發(fā)高性能、高可靠的網(wǎng)絡(luò)服務(wù)器和客戶端程序。Netty簡化了網(wǎng)絡(luò)程序的開發(fā),是很多框架和公司...
摘要:后改良為用線程池的方式代替新增線程,被稱為偽異步。最大的問題是阻塞,同步。每次請求都由程序執(zhí)行并返回,這是同步的缺陷。這些都會(huì)被注冊在多路復(fù)用器上。多路復(fù)用器提供選擇已經(jīng)就緒狀態(tài)任務(wù)的能力。并沒有采用的多路復(fù)用器,而是使用異步通道的概念。 Netty是一個(gè)提供異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用框架,用以快速開發(fā)高性能、高可靠的網(wǎng)絡(luò)服務(wù)器和客戶端程序。Netty簡化了網(wǎng)絡(luò)程序的開發(fā),是很多框架和公司...
摘要:即可以理解為,方法都是異步的,完成后會(huì)主動(dòng)調(diào)用回調(diào)函數(shù)。主要在包下增加了下面四個(gè)異步通道其中的方法,會(huì)返回一個(gè)帶回調(diào)函數(shù)的對象,當(dāng)執(zhí)行完讀取寫入操作后,直接調(diào)用回調(diào)函數(shù)。 本文原創(chuàng)地址,我的博客:jsbintask.cn/2019/04/16/…(食用效果最佳),轉(zhuǎn)載請注明出處! 在理解什么是BIO,NIO,AIO之前,我們首先需要了解什么是同步,異步,阻塞,非阻塞。假如我們現(xiàn)在要去銀行取...
時(shí)間:2018年04月11日星期三 說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):https://www.imooc.com 教學(xué)源碼:https://github.com/zccodere/s... 學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:課程介紹 1-1 課程介紹 什么是Netty 高性能、事件驅(qū)動(dòng)、異步非阻塞的IO Java開源框架 基于NIO的客戶...
閱讀 820·2023-04-25 20:18
閱讀 2104·2021-11-22 13:54
閱讀 2547·2021-09-26 09:55
閱讀 3912·2021-09-22 15:28
閱讀 2982·2021-09-03 10:34
閱讀 1719·2021-07-28 00:15
閱讀 1645·2019-08-30 14:25
閱讀 1289·2019-08-29 17:16