摘要:前言本篇主要講解中的機(jī)制和網(wǎng)絡(luò)通訊中處理高并發(fā)的分為兩塊第一塊講解多線程下的機(jī)制第二塊講解如何在機(jī)制下優(yōu)化資源的浪費(fèi)服務(wù)器單線程下的機(jī)制就不用我介紹了,不懂得可以去查閱下資料那么多線程下,如果進(jìn)行套接字的使用呢我們使用最簡(jiǎn)單的服務(wù)器來(lái)幫助大
前言
本篇主要講解Java中的IO機(jī)制和網(wǎng)絡(luò)通訊中處理高并發(fā)的NIO
分為兩塊:
第一塊講解多線程下的IO機(jī)制
第二塊講解如何在IO機(jī)制下優(yōu)化CPU資源的浪費(fèi)(New IO)
單線程下的socket機(jī)制就不用我介紹了,不懂得可以去查閱下資料
那么多線程下,如果進(jìn)行套接字的使用呢?
我們使用最簡(jiǎn)單的echo服務(wù)器來(lái)幫助大家理解
首先,來(lái)看下多線程下服務(wù)端和客戶端的工作流程圖:
可以看到,多個(gè)客戶端同時(shí)向服務(wù)端發(fā)送請(qǐng)求
服務(wù)端做出的措施是開(kāi)啟多個(gè)線程來(lái)匹配相對(duì)應(yīng)的客戶端
并且每個(gè)線程去獨(dú)自完成他們的客戶端請(qǐng)求
原理講完了我們來(lái)看下是如何實(shí)現(xiàn)的
在這里我寫(xiě)了一個(gè)簡(jiǎn)單的服務(wù)器
用到了線程池的技術(shù)來(lái)創(chuàng)建線程(具體代碼作用我已經(jīng)加了注釋):
public class MyServer { private static ExecutorService executorService = Executors.newCachedThreadPool(); //創(chuàng)建一個(gè)線程池 private static class HandleMsg implements Runnable{ //一旦有新的客戶端請(qǐng)求,創(chuàng)建這個(gè)線程進(jìn)行處理 Socket client; //創(chuàng)建一個(gè)客戶端 public HandleMsg(Socket client){ //構(gòu)造傳參綁定 this.client = client; } @Override public void run() { BufferedReader bufferedReader = null; //創(chuàng)建字符緩存輸入流 PrintWriter printWriter = null; //創(chuàng)建字符寫(xiě)入流 try { bufferedReader = new BufferedReader(new InputStreamReader(client.getInputStream())); //獲取客戶端的輸入流 printWriter = new PrintWriter(client.getOutputStream(),true); //獲取客戶端的輸出流,true是隨時(shí)刷新 String inputLine = null; long a = System.currentTimeMillis(); while ((inputLine = bufferedReader.readLine())!=null){ printWriter.println(inputLine); } long b = System.currentTimeMillis(); System.out.println("此線程花費(fèi)了:"+(b-a)+"秒!"); } catch (IOException e) { e.printStackTrace(); }finally { try { bufferedReader.close(); printWriter.close(); client.close(); } catch (IOException e) { e.printStackTrace(); } } } } public static void main(String[] args) throws IOException { //服務(wù)端的主線程是用來(lái)循環(huán)監(jiān)聽(tīng)客戶端請(qǐng)求 ServerSocket server = new ServerSocket(8686); //創(chuàng)建一個(gè)服務(wù)端且端口為8686 Socket client = null; while (true){ //循環(huán)監(jiān)聽(tīng) client = server.accept(); //服務(wù)端監(jiān)聽(tīng)到一個(gè)客戶端請(qǐng)求 System.out.println(client.getRemoteSocketAddress()+"地址的客戶端連接成功!"); executorService.submit(new HandleMsg(client)); //將該客戶端請(qǐng)求通過(guò)線程池放入HandlMsg線程中進(jìn)行處理 } } }
上述代碼中我們使用一個(gè)類編寫(xiě)了一個(gè)簡(jiǎn)單的echo服務(wù)器
在主線程中用死循環(huán)來(lái)開(kāi)啟端口監(jiān)聽(tīng)
有了服務(wù)器,我們就可以對(duì)其進(jìn)行訪問(wèn),并且發(fā)送一些字符串?dāng)?shù)據(jù)
服務(wù)器的功能是返回這些字符串,并且打印出線程占用時(shí)間
下面來(lái)寫(xiě)個(gè)簡(jiǎn)單的客戶端來(lái)響應(yīng)服務(wù)端:
public class MyClient { public static void main(String[] args) throws IOException { Socket client = null; PrintWriter printWriter = null; BufferedReader bufferedReader = null; try { client = new Socket(); client.connect(new InetSocketAddress("localhost",8686)); printWriter = new PrintWriter(client.getOutputStream(),true); printWriter.println("hello"); printWriter.flush(); bufferedReader = new BufferedReader(new InputStreamReader(client.getInputStream())); //讀取服務(wù)器返回的信息并進(jìn)行輸出 System.out.println("來(lái)自服務(wù)器的信息是:"+bufferedReader.readLine()); } catch (IOException e) { e.printStackTrace(); }finally { printWriter.close(); bufferedReader.close(); client.close(); } } }
代碼中,我們用字符流發(fā)送了一個(gè)hello字符串過(guò)去,如果代碼沒(méi)問(wèn)題
服務(wù)器會(huì)返回一個(gè)hello數(shù)據(jù),并且打印出我們?cè)O(shè)置的日志信息
我們來(lái)運(yùn)行:
1.打開(kāi)server,開(kāi)啟循環(huán)監(jiān)聽(tīng):
2.打開(kāi)一個(gè)客戶端:
可以看到客戶端打印出了返回結(jié)果
3.查看服務(wù)端日志:
很好,一個(gè)簡(jiǎn)單的多線程套接字編程就實(shí)現(xiàn)了
但是試想一下:
如果一個(gè)客戶端請(qǐng)求中,在IO寫(xiě)入到服務(wù)端過(guò)程中加入Sleep,
使每個(gè)請(qǐng)求占用服務(wù)端線程10秒
然后有大量的客戶端請(qǐng)求,每個(gè)請(qǐng)求都占用那么長(zhǎng)時(shí)間
那么服務(wù)端的并能能力就會(huì)大幅度下降
這并不是因?yàn)榉?wù)端有多少繁重的任務(wù),而僅僅是因?yàn)榉?wù)線程在等待IO(因?yàn)閍ccept,read,write都是阻塞式的)
讓高速運(yùn)行的CPU去等待及其低效的網(wǎng)絡(luò)IO是非常不合算的行為
這時(shí)候該怎么辦?
NIONew IO成功的解決了上述問(wèn)題,它是怎樣解決的呢?
IO處理客戶端請(qǐng)求的最小單位是線程
而NIO使用了比線程還小一級(jí)的單位:通道(Channel)
可以說(shuō),NIO中只需要一個(gè)線程就能完成所有接收,讀,寫(xiě)等操作
要學(xué)習(xí)NIO,首先要理解它的三大核心
Selector,選擇器
Buffer,緩沖區(qū)
Channel,通道
博主不才,畫(huà)了張丑圖給大家加深下印象 ^ . ^
再給一張TCP下的NIO工作流程圖(好難畫(huà)的線條...)
大家大致看懂就行,我們一步步來(lái)
Buffer首先要知道什么是Buffer
在NIO中數(shù)據(jù)交互不再像IO機(jī)制那樣使用流
而是使用Buffer(緩沖區(qū))
博主覺(jué)得圖才是最容易理解的
所以...
可以看出Buffer在整個(gè)工作流程中的位置
buffer實(shí)際上是一個(gè)容器,一個(gè)連續(xù)數(shù)組,它通過(guò)幾個(gè)變量來(lái)保存這個(gè)數(shù)據(jù)的當(dāng)前位置狀態(tài):
1.capacity:容量,緩沖區(qū)能容納元素的數(shù)量
2.position:當(dāng)前位置,是緩沖區(qū)中下一次發(fā)生讀取和寫(xiě)入操作的索引,當(dāng)前位置通過(guò)大多數(shù)讀寫(xiě)操作向前推進(jìn)
3.limit:界限,是緩沖區(qū)中最后一個(gè)有效位置之后下一個(gè)位置的索引
如圖:
幾個(gè)常用方法:
.flip() //將limit設(shè)置為position,然后position重置為0,返回對(duì)緩沖區(qū)的引用 .clear() //清空調(diào)用緩沖區(qū)并返回對(duì)緩沖區(qū)的引用
來(lái)點(diǎn)實(shí)際點(diǎn)的,上面圖中的具體代碼如下:
1.首先給Buffer分配空間,以字節(jié)為單位
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
創(chuàng)建一個(gè)ByteBuffer對(duì)象并且指定內(nèi)存大小
2.向Buffer中寫(xiě)入數(shù)據(jù):
1).數(shù)據(jù)從Channel到Buffer:channel.read(byteBuffer); 2).數(shù)據(jù)從Client到Buffer:byteBuffer.put(...);
3.從Buffer中讀取數(shù)據(jù):
1).數(shù)據(jù)從Buffer到Channel:channel.write(byteBuffer); 2).數(shù)據(jù)從Buffer到Server:byteBuffer.get(...);Selector
選擇器是NIO的核心,它是channel的管理者
通過(guò)執(zhí)行select()阻塞方法,監(jiān)聽(tīng)是否有channel準(zhǔn)備好
一旦有數(shù)據(jù)可讀,此方法的返回值是SelectionKey的數(shù)量
所以服務(wù)端通常會(huì)死循環(huán)執(zhí)行select()方法,直到有channl準(zhǔn)備就緒,然后開(kāi)始工作
每個(gè)channel都會(huì)和Selector綁定一個(gè)事件,然后生成一個(gè)SelectionKey的對(duì)象
需要注意的是:
channel和Selector綁定時(shí),channel必須是非阻塞模式
而FileChannel不能切換到非阻塞模式,因?yàn)樗皇翘捉幼滞ǖ溃訤ileChannel不能和Selector綁定事件
在NIO中一共有四種事件:
1.SelectionKey.OP_CONNECT:連接事件
2.SelectionKey.OP_ACCEPT:接收事件
3.SelectionKey.OP_READ:讀事件
4.SelectionKey.OP_WRITE:寫(xiě)事件
共有四種通道:
FileChannel:作用于IO文件流
DatagramChannel:作用于UDP協(xié)議
SocketChannel:作用于TCP協(xié)議
ServerSocketChannel:作用于TCP協(xié)議
本篇文章通過(guò)常用的TCP協(xié)議來(lái)講解NIO
我們以ServerSocketChannel為例:
打開(kāi)一個(gè)ServerSocketChannel通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
關(guān)閉ServerSocketChannel通道:
serverSocketChannel.close();
循環(huán)監(jiān)聽(tīng)SocketChannel:
while(true){ SocketChannel socketChannel = serverSocketChannel.accept(); clientChannel.configureBlocking(false); }
clientChannel.configureBlocking(false);語(yǔ)句是將此通道設(shè)置為非阻塞,也就是異步
自由控制阻塞或非阻塞便是NIO的特性之一
SelectionKey是通道和選擇器交互的核心組件
比如在SocketChannel上綁定一個(gè)Selector,并注冊(cè)為連接事件:
SocketChannel clientChannel = SocketChannel.open(); clientChannel.configureBlocking(false); clientChannel.connect(new InetSocketAddress(port)); clientChannel.register(selector, SelectionKey.OP_CONNECT);
核心在register()方法,它返回一個(gè)SelectionKey對(duì)象
來(lái)檢測(cè)channel事件是那種事件可以使用以下方法:
selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable();
服務(wù)端便是通過(guò)這些方法 在輪詢中執(zhí)行相對(duì)應(yīng)操作
當(dāng)然通過(guò)Channel與Selector綁定的key也可以反過(guò)來(lái)拿到他們
Channel channel = selectionKey.channel(); Selector selector = selectionKey.selector();
在Channel上注冊(cè)事件時(shí),我們也可以順帶綁定一個(gè)Buffer:
clientChannel.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocateDirect(1024));
或者綁定一個(gè)Object:
selectionKey.attach(Object); Object anthorObj = selectionKey.attachment();NIO的TCP服務(wù)端
講了這么多,都是理論
我們來(lái)看下最簡(jiǎn)單也是最核心的代碼(加那么多注釋很不優(yōu)雅,但方便大家看懂):
package cn.blog.test.NioTest; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Set; public class MyNioServer { private Selector selector; //創(chuàng)建一個(gè)選擇器 private final static int port = 8686; private final static int BUF_SIZE = 10240; private void initServer() throws IOException { //創(chuàng)建通道管理器對(duì)象selector this.selector=Selector.open(); //創(chuàng)建一個(gè)通道對(duì)象channel ServerSocketChannel channel = ServerSocketChannel.open(); channel.configureBlocking(false); //將通道設(shè)置為非阻塞 channel.socket().bind(new InetSocketAddress(port)); //將通道綁定在8686端口 //將上述的通道管理器和通道綁定,并為該通道注冊(cè)O(shè)P_ACCEPT事件 //注冊(cè)事件后,當(dāng)該事件到達(dá)時(shí),selector.select()會(huì)返回(一個(gè)key),如果該事件沒(méi)到達(dá)selector.select()會(huì)一直阻塞 SelectionKey selectionKey = channel.register(selector,SelectionKey.OP_ACCEPT); while (true){ //輪詢 selector.select(); //這是一個(gè)阻塞方法,一直等待直到有數(shù)據(jù)可讀,返回值是key的數(shù)量(可以有多個(gè)) Set keys = selector.selectedKeys(); //如果channel有數(shù)據(jù)了,將生成的key訪入keys集合中 Iterator iterator = keys.iterator(); //得到這個(gè)keys集合的迭代器 while (iterator.hasNext()){ //使用迭代器遍歷集合 SelectionKey key = (SelectionKey) iterator.next(); //得到集合中的一個(gè)key實(shí)例 iterator.remove(); //拿到當(dāng)前key實(shí)例之后記得在迭代器中將這個(gè)元素刪除,非常重要,否則會(huì)出錯(cuò) if (key.isAcceptable()){ //判斷當(dāng)前key所代表的channel是否在Acceptable狀態(tài),如果是就進(jìn)行接收 doAccept(key); }else if (key.isReadable()){ doRead(key); }else if (key.isWritable() && key.isValid()){ doWrite(key); }else if (key.isConnectable()){ System.out.println("連接成功!"); } } } } public void doAccept(SelectionKey key) throws IOException { ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); System.out.println("ServerSocketChannel正在循環(huán)監(jiān)聽(tīng)"); SocketChannel clientChannel = serverChannel.accept(); clientChannel.configureBlocking(false); clientChannel.register(key.selector(),SelectionKey.OP_READ); } public void doRead(SelectionKey key) throws IOException { SocketChannel clientChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE); long bytesRead = clientChannel.read(byteBuffer); while (bytesRead>0){ byteBuffer.flip(); byte[] data = byteBuffer.array(); String info = new String(data).trim(); System.out.println("從客戶端發(fā)送過(guò)來(lái)的消息是:"+info); byteBuffer.clear(); bytesRead = clientChannel.read(byteBuffer); } if (bytesRead==-1){ clientChannel.close(); } } public void doWrite(SelectionKey key) throws IOException { ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE); byteBuffer.flip(); SocketChannel clientChannel = (SocketChannel) key.channel(); while (byteBuffer.hasRemaining()){ clientChannel.write(byteBuffer); } byteBuffer.compact(); } public static void main(String[] args) throws IOException { MyNioServer myNioServer = new MyNioServer(); myNioServer.initServer(); } }
我打印了監(jiān)聽(tīng)channel,告訴大家ServerSocketChannel是在什么時(shí)候開(kāi)始運(yùn)行的
如果配合NIO客戶端的debug,就能很清楚的發(fā)現(xiàn),進(jìn)入select()輪詢前
雖然已經(jīng)有了ACCEPT事件的KEY,但select()默認(rèn)并不會(huì)去調(diào)用
而是要等待有其它感興趣事件被select()捕獲之后,才會(huì)去調(diào)用ACCEPT的SelectionKey
這時(shí)候ServerSocketChannel才開(kāi)始進(jìn)行循環(huán)監(jiān)聽(tīng)
也就是說(shuō)一個(gè)Selector中,始終保持著ServerSocketChannel的運(yùn)行
而serverChannel.accept();真正做到了異步(在initServer方法中的channel.configureBlocking(false);)
如果沒(méi)有接受到connect,會(huì)返回一個(gè)null
如果成功連接了一個(gè)SocketChannel,則此SocketChannel會(huì)注冊(cè)寫(xiě)入(READ)事件
并且設(shè)置為異步
有服務(wù)端必定有客戶端
其實(shí)如果能完全理解了服務(wù)端
客戶端的代碼大同小異
package cn.blog.test.NioTest; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; public class MyNioClient { private Selector selector; //創(chuàng)建一個(gè)選擇器 private final static int port = 8686; private final static int BUF_SIZE = 10240; private static ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE); private void initClient() throws IOException { this.selector = Selector.open(); SocketChannel clientChannel = SocketChannel.open(); clientChannel.configureBlocking(false); clientChannel.connect(new InetSocketAddress(port)); clientChannel.register(selector, SelectionKey.OP_CONNECT); while (true){ selector.select(); Iterator輸出結(jié)果iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()){ SelectionKey key = iterator.next(); iterator.remove(); if (key.isConnectable()){ doConnect(key); }else if (key.isReadable()){ doRead(key); } } } } public void doConnect(SelectionKey key) throws IOException { SocketChannel clientChannel = (SocketChannel) key.channel(); if (clientChannel.isConnectionPending()){ clientChannel.finishConnect(); } clientChannel.configureBlocking(false); String info = "服務(wù)端你好!!"; byteBuffer.clear(); byteBuffer.put(info.getBytes("UTF-8")); byteBuffer.flip(); clientChannel.write(byteBuffer); //clientChannel.register(key.selector(),SelectionKey.OP_READ); clientChannel.close(); } public void doRead(SelectionKey key) throws IOException { SocketChannel clientChannel = (SocketChannel) key.channel(); clientChannel.read(byteBuffer); byte[] data = byteBuffer.array(); String msg = new String(data).trim(); System.out.println("服務(wù)端發(fā)送消息:"+msg); clientChannel.close(); key.selector().close(); } public static void main(String[] args) throws IOException { MyNioClient myNioClient = new MyNioClient(); myNioClient.initClient(); } }
這里我打開(kāi)一個(gè)服務(wù)端,兩個(gè)客戶端:
接下來(lái),你可以試下同時(shí)打開(kāi)一千個(gè)客戶端,只要你的CPU夠給力,服務(wù)端就不可能因?yàn)樽枞档托阅?/p>
以上便是Java NIO的基礎(chǔ)詳解
謝謝閱讀和關(guān)注~
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/70727.html
摘要:的選擇器允許單個(gè)線程監(jiān)視多個(gè)輸入通道。一旦執(zhí)行的線程已經(jīng)超過(guò)讀取代碼中的某個(gè)數(shù)據(jù)片段,該線程就不會(huì)在數(shù)據(jù)中向后移動(dòng)通常不會(huì)。 1、引言 很多初涉網(wǎng)絡(luò)編程的程序員,在研究Java NIO(即異步IO)和經(jīng)典IO(也就是常說(shuō)的阻塞式IO)的API時(shí),很快就會(huì)發(fā)現(xiàn)一個(gè)問(wèn)題:我什么時(shí)候應(yīng)該使用經(jīng)典IO,什么時(shí)候應(yīng)該使用NIO? 在本文中,將嘗試用簡(jiǎn)明扼要的文字,闡明Java NIO和經(jīng)典IO之...
摘要:好了,目前還不難,我們起碼知道這個(gè)抽象類上面的部分關(guān)系,因此當(dāng)然也有自己的方法,如下。又來(lái)一個(gè)的供應(yīng)商好吧,大佬們總是喜歡用一些設(shè)計(jì)模式的東西,沒(méi)錯(cuò),也是一個(gè)抽象類,這個(gè)現(xiàn)在不用太在意了。 前言 java nio,一個(gè)入門(mén)netty之前需要了解下的非阻塞I/O實(shí)現(xiàn),傳統(tǒng)的Socket通信,啟動(dòng)監(jiān)聽(tīng)后accept會(huì)一直處于阻塞狀態(tài),那么如果你想要多個(gè)(并發(fā))通信時(shí),那么我們就需要多個(gè)線性...
摘要:是一個(gè)用來(lái)替代標(biāo)準(zhǔn)的新型數(shù)據(jù)傳遞方式,像現(xiàn)在分布式架構(gòu)中會(huì)經(jīng)常存在他的身影。這個(gè)方法會(huì)一直阻塞到某個(gè)注冊(cè)的通道有事件就緒。保持不變,仍然表示能從中讀取多少個(gè)元素等與通過(guò)調(diào)用方法,可以標(biāo)記中的一個(gè)特定。 Java NIO是一個(gè)用來(lái)替代標(biāo)準(zhǔn)Java IO API的新型數(shù)據(jù)傳遞方式,像現(xiàn)在分布式架構(gòu)中會(huì)經(jīng)常存在他的身影。其比傳統(tǒng)的IO更加高效,非阻塞,異步,雙向 NIO主體結(jié)構(gòu) showIm...
摘要:當(dāng)我們需要與進(jìn)行交互時(shí)我們就需要使用到即數(shù)據(jù)從讀取到中并且從中寫(xiě)入到中實(shí)際上一個(gè)其實(shí)就是一塊內(nèi)存區(qū)域我們可以在這個(gè)內(nèi)存區(qū)域中進(jìn)行數(shù)據(jù)的讀寫(xiě)其實(shí)是這樣的內(nèi)存塊的一個(gè)封裝并提供了一些操作方法讓我們能夠方便地進(jìn)行數(shù)據(jù)的讀寫(xiě)類型有這些覆蓋了能從中傳 Java NIO Buffer 當(dāng)我們需要與 NIO Channel 進(jìn)行交互時(shí), 我們就需要使用到 NIO Buffer, 即數(shù)據(jù)從 Buffe...
摘要:允許一個(gè)單一的線程來(lái)操作多個(gè)如果我們的應(yīng)用程序中使用了多個(gè)那么使用很方便的實(shí)現(xiàn)這樣的目的但是因?yàn)樵谝粋€(gè)線程中使用了多個(gè)因此也會(huì)造成了每個(gè)傳輸效率的降低使用的圖解如下為了使用我們首先需要將注冊(cè)到中隨后調(diào)用的方法這個(gè)方法會(huì)阻塞直到注冊(cè)在中的發(fā)送 Selector Selector 允許一個(gè)單一的線程來(lái)操作多個(gè) Channel. 如果我們的應(yīng)用程序中使用了多個(gè) Channel, 那么使用 S...
閱讀 3663·2021-09-22 15:15
閱讀 3567·2021-08-12 13:24
閱讀 1313·2019-08-30 15:53
閱讀 1825·2019-08-30 15:43
閱讀 1188·2019-08-29 17:04
閱讀 2798·2019-08-29 15:08
閱讀 1586·2019-08-29 13:13
閱讀 3090·2019-08-29 11:06