摘要:緩沖區(qū)的限制不能為負(fù),并且不能大于其容量。如果指向的位置超過限制,則拋出異常。使用臨時(shí)緩沖區(qū)執(zhí)行低層次操作。臨時(shí)緩沖區(qū)對(duì)象離開作用域,并最終成為被回收的無用數(shù)據(jù)。
前天剛好看了點(diǎn)《UNIX網(wǎng)絡(luò)編程》,比較頭大?,F(xiàn)在我來整理一下所學(xué)所得,并用于個(gè)人備忘。如果有不對(duì),請(qǐng)批評(píng)。
想要解鎖更多新姿勢?請(qǐng)?jiān)L問https://blog.tengshe789.tech/
IO模型介紹IO模型是什么?很多書籍或者百度百度百科,都沒有給出明確的解釋,我也不敢亂下定義。以我愚見,IO模型,是通過根據(jù)前人主觀意識(shí)的思考而構(gòu)成客觀闡述IO復(fù)雜操作邏輯的物件。
要知道,應(yīng)用程序使用系統(tǒng)資源的一個(gè)過程,進(jìn)程無法直接操作IO設(shè)備的,因?yàn)橛脩暨M(jìn)程不能直接訪問磁盤,所以要通過內(nèi)核的系統(tǒng)調(diào)用讀取,這個(gè)內(nèi)核讀取的過程就是用戶進(jìn)程等待的過程,等待內(nèi)核讀取后將數(shù)據(jù)從內(nèi)核內(nèi)存復(fù)制到進(jìn)程內(nèi)存。因此操作系統(tǒng)設(shè)立一個(gè)IO模型進(jìn)行規(guī)范,就非常有必要了。
為了更好地了解IO模型,我們需要事先回顧下:同步、異步、阻塞、非阻塞
同步與異步:描述的是用戶線程與內(nèi)核的交互方式,同步指用戶線程發(fā)起IO請(qǐng)求后需要等待或者輪詢內(nèi)核IO操作完成后才能繼續(xù)執(zhí)行;而異步是指用戶線程發(fā)起IO請(qǐng)求后仍然繼續(xù)執(zhí)行,當(dāng)內(nèi)核IO操作完成后會(huì)通知用戶線程,或者調(diào)用用戶線程注冊的回調(diào)函數(shù)。
阻塞與非阻塞:描述是用戶線程調(diào)用內(nèi)核IO操作的方式,阻塞是指IO操作需要徹底完成后才返回到用戶空間;而非阻塞是指IO操作被調(diào)用后立即返回給用戶一個(gè)狀態(tài)值,無需等到IO操作徹底完成。
IO模型一共有5類:
blocking-IO BIO(阻塞IO)
non-blocking IO NIO(非阻塞IO)
IO multiplexing IO多路復(fù)用
signal driven IO 信號(hào)驅(qū)動(dòng)IO
asynchronous IO AIO(異步IO)
由于signal driven IO(信號(hào)驅(qū)動(dòng)IO)在實(shí)際中并不常用,所以主要介紹其余四種IO Model。
BIO(blocking io)先來看看讀操作流程
從圖中可以看出,用戶進(jìn)程調(diào)用了recvfrom這個(gè)系統(tǒng)調(diào)用,kernel就開始了IO的第一個(gè)階段:準(zhǔn)備數(shù)據(jù)。
對(duì)于network io來說,很多時(shí)候數(shù)據(jù)在一開始還沒有到達(dá)(比如,還沒有收到一個(gè)完整的UDP包),這個(gè)時(shí)候kernel就要等待足夠的數(shù)據(jù)到來。
而在用戶進(jìn)程這邊,整個(gè)進(jìn)程會(huì)被阻塞。當(dāng)kernel一直等到數(shù)據(jù)準(zhǔn)備好了,它就會(huì)將數(shù)據(jù)從kernel中拷貝到用戶內(nèi)存,然后kernel返回結(jié)果,用戶進(jìn)程才解除block的狀態(tài),重新運(yùn)行起來。
也就是說,blocking IO的特點(diǎn)就是在IO執(zhí)行的兩個(gè)階段(等待數(shù)據(jù)和拷貝數(shù)據(jù)兩個(gè)階段)都被block了。
JAVA 阻塞 demo下面的例子主要使用Socket通道進(jìn)行編程。服務(wù)端如下:
/** * @program: socketTest * @description: one thread demo for bio version * @author: tEngSHe789 * @create: 2018-08-26 21:17 **/ public class Server { public static void main(String[] args) { try { ServerSocket serverSocket=new ServerSocket(8888); System.out.println("服務(wù)端Start...."); //等待客戶端就緒 -> 堵塞 while (true){ Socket socket = serverSocket.accept(); System.out.println("發(fā)現(xiàn)客戶端連接"); InputStream is=socket.getInputStream(); byte[] b =new byte[1024]; //等待客戶端發(fā)送請(qǐng)求 -> 堵塞 while (true) { int data = is.read(b); String info=null; if (data!=-1){ info=new String(b,0,data,"GBK"); } System.out.println(info); } } } catch (IOException e) { } } }
客戶端
/** * @program: socketTest * @description: one thread demo for bio version * @author: tEngSHe789 **/ public class Client { public static void main(String[] args) { try { Socket socket=new Socket("127.0.0.1",8888); OutputStream os = socket.getOutputStream(); System.out.println("正在發(fā)送數(shù)據(jù)"); os.write("這是來自客戶端的信息".getBytes()); os.flush(); } catch (IOException e) { e.printStackTrace(); } } }PY 阻塞 demo
服務(wù)端
import socket s = socket.socket() s.bind(("127.0.0.1",8888)) print("服務(wù)端啟動(dòng)....") # 等待客戶端就緒 -> 堵塞 s.listen() # 等待客戶端發(fā)送請(qǐng)求 -> 堵塞 conn,addr = s.accept() msg = conn.recv(1024).decode("utf-8") print(msg) conn.close() s.close()
客戶端
import socket s = socket.socket() s.connect(("127.0.0.1",8888)) print("客戶端已啟動(dòng)....") s.send("正在發(fā)送數(shù)據(jù)".encode("utf-8")) s.close()NIO(non blocking io)
NIO就不一樣了,recvform系統(tǒng)調(diào)用調(diào)用之后,進(jìn)程并沒有被阻塞,內(nèi)核馬上返回給進(jìn)程,如果數(shù)據(jù)還沒準(zhǔn)備好,此時(shí)會(huì)返回一個(gè)error。進(jìn)程在返回之后,可以干點(diǎn)別的事情,然后再發(fā)起recvform系統(tǒng)調(diào)用。重復(fù)上面的過程,循環(huán)往復(fù)的進(jìn)行recvform系統(tǒng)調(diào)用。這個(gè)過程通常被稱之為輪詢。
輪詢檢查內(nèi)核數(shù)據(jù),直到數(shù)據(jù)準(zhǔn)備好,再拷貝數(shù)據(jù)到進(jìn)程,進(jìn)行數(shù)據(jù)處理。需要注意,拷貝數(shù)據(jù)整個(gè)過程,進(jìn)程仍然是屬于阻塞的狀態(tài)。
JAVA 與NIOJava NIO(New IO)是一個(gè)可以替代標(biāo)準(zhǔn)Java IO API的IO API(從Java 1.4開始),Java NIO提供了與標(biāo)準(zhǔn)IO不同的IO工作方式。
在java中,標(biāo)準(zhǔn)的IO基于字節(jié)流和字符流進(jìn)行操作的,而NIO是基于通道(Channel)和緩沖區(qū)(Buffer)進(jìn)行操作,數(shù)據(jù)總是從通道讀取到緩沖區(qū)中,或者從緩沖區(qū)寫入到通道中。
我們先看看Buffer類
Buffer類Java NIO中的Buffer主要用于與NIO通道進(jìn)行交互,數(shù)據(jù)是從通道讀入到緩沖區(qū),從緩沖區(qū)寫入通道中的。概念上,緩沖區(qū)可以看成包在一個(gè)對(duì)象內(nèi)的數(shù)組,下面看一個(gè)圖
這是一個(gè)新創(chuàng)建的容量為10的ByteBuffer邏輯圖,他有四個(gè)屬性來提供關(guān)于其包含的數(shù)據(jù)元素信息,分別是:
1)容量(capacity):表示Buffer最大數(shù)據(jù)容量,緩沖區(qū)容量不能為負(fù),并且建立后不能修改。
2)限制(limit):也叫上界。第一個(gè)不應(yīng)該讀取或者寫入的數(shù)據(jù)的索引,即位于limit后的數(shù)據(jù)不可以讀寫。緩沖區(qū)的限制不能為負(fù),并且不能大于其容量(capacity)。
3)位置(position):下一個(gè)要讀取或?qū)懭氲臄?shù)據(jù)的索引。緩沖區(qū)的位置不能為負(fù),并且不能大于其限制(limit)。
4)標(biāo)記(mark)與重置(reset):標(biāo)記是一個(gè)索引,通過Buffer中的mark()方法指定Buffer中一個(gè)特定的position,之后可以通過調(diào)用reset()方法恢復(fù)到這個(gè)position。
從這幅圖可以看到,他的容量(capacity)和限制(limit)設(shè)置為10,位置設(shè)置為0,每個(gè)緩沖區(qū)容量是固定的,標(biāo)記是未定義的,其他三個(gè)屬性可以通過使用緩沖區(qū)解決。
緩沖區(qū)存儲(chǔ)數(shù)據(jù)支持的數(shù)據(jù)類型支持七種數(shù)據(jù)類型,他們是:
1.byteBuffer
2.charBuffer
3.shortBuffer
4.IntBuffer
5.LongBuffer
6.FloatBuffer
7.DubooBuffer
使用Buffer讀寫數(shù)據(jù)一般遵循以下四個(gè)步驟:
(1) 寫入數(shù)據(jù)到Buffer,一般有可以從Channel讀取到緩沖區(qū)中,也可以調(diào)用put方法寫入。
(2) 調(diào)用flip()方法,切換數(shù)據(jù)模式。
(3) 從Buffer中讀取數(shù)據(jù),一般從緩沖區(qū)讀取數(shù)據(jù)寫入到通道中,也可以調(diào)用get方法讀取。
(4) 調(diào)用clear()方法或者compact()方法。
緩沖區(qū)API首先,用allocate 指定緩沖區(qū)大小1024
ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
我們可以用put 存入數(shù)據(jù)到緩沖區(qū)
byteBuffer.put("tengshe789".getBytes());
當(dāng)調(diào)用put時(shí),會(huì)指出下一個(gè)元素應(yīng)當(dāng)被插入的位置,位置(position)指向的是下一個(gè)元素。如果指向的位置超過限制(limit),則拋出BufferOverFlowException異常。
Flip將一個(gè)能夠繼續(xù)添加數(shù)據(jù)元素的填充狀態(tài)的緩沖區(qū)翻轉(zhuǎn)成一個(gè)準(zhǔn)備讀出元素的釋放狀態(tài)
byteBuffer.flip();
具體有什么用呢?
對(duì)于已經(jīng)寫滿了緩沖區(qū),如果將緩沖區(qū)內(nèi)容傳遞給一個(gè)通道,以使內(nèi)容能被全部寫出。
但如果通道現(xiàn)在在緩沖區(qū)上執(zhí)行g(shù)et,那么它將從我們剛剛插入的有用數(shù)據(jù)之外取出未定義數(shù)據(jù)。通過翻轉(zhuǎn)將位置值重新設(shè)為 0,通道就會(huì)從正確位置開始獲取。
例如我們定義了一個(gè)容量是10的buffer,并填入hello,如下圖所示
翻轉(zhuǎn)后如下圖所示
Rewind與 flip相似,但不影響上界屬性。它只是將位置值設(shè)回 0??梢允褂?rewind()后退,重讀已經(jīng)被翻轉(zhuǎn)的緩沖區(qū)中的數(shù)據(jù)。
byteBuffer.rewind();
翻轉(zhuǎn)完了,就可以用get獲取緩沖區(qū)數(shù)據(jù)了
byte[] b= new byte[byteBuffer.limit()]; byteBuffer.get(b);
當(dāng)調(diào)用get時(shí),會(huì)指出下一個(gè)元素應(yīng)當(dāng)被索引的位置,位置(position)返回時(shí)會(huì)+1s。如果指向的位置超過限制(limit),則拋出BufferUnderFlowException異常。如果提供的索引超過范圍,也會(huì)拋出IndexOutOfBoundsException異常
remaining可以告訴你從當(dāng)前位置(position)到限制(limit)還剩的元素?cái)?shù)目
int count = byteBuffer.remaining();
clear將緩沖區(qū)重置為空狀態(tài)
byteBuffer.clear();
如果我們只想從緩沖區(qū)中釋放一部分?jǐn)?shù)據(jù),而不是全部,然后重新填充。為了實(shí)現(xiàn)這一點(diǎn),未讀的數(shù)據(jù)元素需要下移以使第一個(gè)元素索引為 0。盡管重復(fù)這樣做會(huì)效率低下,但這有時(shí)非常必要,而 API 對(duì)此為您提供了一個(gè) compact()函數(shù)。
byteBuffer.compact();
標(biāo)記是一個(gè)索引,通過Buffer中的mark()方法指定Buffer中一個(gè)特定的position,之后可以通過調(diào)用reset()方法恢復(fù)到這個(gè)position。要知道緩沖區(qū)的標(biāo)記在mark()函數(shù)被調(diào)用前時(shí)未定義的,如果標(biāo)記未定義,調(diào)用reset()會(huì)導(dǎo)致InvalidMarkException異常
byteBuffer.position(2).mark().position(4).reset();
要注意,java.nio中的類特意被設(shè)計(jì)為支持級(jí)聯(lián)調(diào)用,優(yōu)雅的使用級(jí)聯(lián)調(diào)用,可以產(chǎn)生優(yōu)美易讀的代碼。
直接緩沖區(qū)與非直接緩沖區(qū)上面我們說了ByteBuffer,也就是緩沖區(qū)的用法,譬如用allocate() 方法指定緩沖區(qū)大小,然后進(jìn)行填充或翻轉(zhuǎn)操作等等等。我們所創(chuàng)建的緩沖區(qū),都屬于直接緩沖區(qū)。他們都是在JVM中內(nèi)存中創(chuàng)建,在每次調(diào)用基礎(chǔ)操作系統(tǒng)的一個(gè)本機(jī)IO之前或者之后,虛擬機(jī)都會(huì)將緩沖區(qū)的內(nèi)容復(fù)制到中間緩沖區(qū)(或者從中間緩沖區(qū)復(fù)制內(nèi)容),緩沖區(qū)的內(nèi)容駐留在JVM內(nèi),因此銷毀容易,但是占用JVM內(nèi)存開銷,處理過程中有復(fù)制操作。
非直接緩沖區(qū)寫入步驟:
1.創(chuàng)建一個(gè)臨時(shí)的直接ByteBuffer對(duì)象。
2.將非直接緩沖區(qū)的內(nèi)容復(fù)制到臨時(shí)緩沖中。
3.使用臨時(shí)緩沖區(qū)執(zhí)行低層次I/O操作。
4.臨時(shí)緩沖區(qū)對(duì)象離開作用域,并最終成為被回收的無用數(shù)據(jù)。
/** * @program: UndirectBuffer * @description: 利用通道完成文件的復(fù)制(非直接緩沖區(qū)) * @author: tEngSHe789 **/ public class UndirectBuffer { public static void main(String[] args) throws IOException { // 創(chuàng)建流 FileInputStream fis = new FileInputStream("d://blog.md"); FileOutputStream fos = new FileOutputStream("d://blog.md"); //獲取管道 FileChannel in = fis.getChannel(); FileChannel out = fos.getChannel(); // 分配指定大小的緩沖區(qū) ByteBuffer buffer = ByteBuffer.allocate(1024); while (in.read(buffer) !=-1){ buffer.flip();// 準(zhǔn)備讀數(shù)據(jù)了 out.write(buffer); buffer.clear(); } out.close(); in.close(); fis.close(); fos.close(); } }
直接緩沖區(qū),是通過 allocateDirect() 方法在JVM內(nèi)存外開辟內(nèi)存,在每次調(diào)用基礎(chǔ)操作系統(tǒng)的一個(gè)本機(jī)IO之前或者之后,虛擬機(jī)都會(huì)避免將緩沖區(qū)的內(nèi)容復(fù)制到中間緩沖區(qū)(或者從中間緩沖區(qū)復(fù)制內(nèi)容),緩沖區(qū)的內(nèi)容駐留在物理內(nèi)存內(nèi),會(huì)少一次復(fù)制過程,如果需要循環(huán)使用緩沖區(qū),用直接緩沖區(qū)可以很大地提高性能。
雖然直接緩沖區(qū)使JVM可以進(jìn)行高效的I/O操作,但它使用的內(nèi)存是操作系統(tǒng)分配的,繞過了JVM堆棧,建立和銷毀比堆棧上的緩沖區(qū)要更大的開銷。
/** * @program: DirectBuffer * @description: 使用直接緩沖區(qū)完成文件的復(fù)制(內(nèi)存映射文件) * @author: tEngSHe789 **/ public class DirectBuffer { public static void main(String[] args) throws IOException { //創(chuàng)建管道 FileChannel in=FileChannel.open(Paths.get("d://blog.md"),StandardOpenOption.READ); FileChannel out=FileChannel.open(Paths.get("d://blog.md"),StandardOpenOption.WRITE ,StandardOpenOption.READ,StandardOpenOption.CREATE); // 拿到將管道內(nèi)容映射到內(nèi)存的直接緩沖區(qū)映射文件(一個(gè)位置在硬盤的基于內(nèi)存的緩沖區(qū)) MappedByteBuffer inMappedByteBuffer = in.map(FileChannel.MapMode.READ_ONLY, 0, in.size()); MappedByteBuffer outMappedByteBuffer = out.map(FileChannel.MapMode.READ_WRITE, 0, in.size()); // 對(duì)直接緩沖區(qū)進(jìn)行數(shù)據(jù)讀寫操作 byte[] bytes=new byte[inMappedByteBuffer.limit()]; inMappedByteBuffer.get(bytes); outMappedByteBuffer.put(bytes); in.close(); out.close(); } }
字節(jié)緩沖區(qū)要么是直接的,要么是非直接的。如果為直接字節(jié)緩沖區(qū),則 Java 虛擬機(jī)會(huì)盡最大努力直接在此緩沖區(qū)上執(zhí)行本機(jī) I/O 操作。也就是說,在每次調(diào)用基礎(chǔ)操作系統(tǒng)的一個(gè)本機(jī) I/O 操作之前(或之后),虛擬機(jī)都會(huì)盡量避免將緩沖區(qū)的內(nèi)容復(fù)制到中間緩沖區(qū)中(或從中間緩沖區(qū)中復(fù)制內(nèi)容)。
直接字節(jié)緩沖區(qū)可以通過調(diào)用此類的 allocateDirect() 工廠方法來創(chuàng)建。此方法返回的緩沖區(qū)進(jìn)行分配和取消分配所需成本通常高于非直接緩沖區(qū)。直接緩沖區(qū)的內(nèi)容可以駐留在常規(guī)的垃圾回收堆之外,因此,它們對(duì)應(yīng)用程序的內(nèi)存需求量造成的影響可能并不明顯。所以,建議將直接緩沖區(qū)主要分配給那些易受基礎(chǔ)系統(tǒng)的本機(jī) I/O 操作影響的大型、持久的緩沖區(qū)。一般情況下,最好僅在直接緩沖區(qū)能在程序性能方面帶來明顯好處時(shí)分配它們。
直接字節(jié)緩沖區(qū)還可以通過 FileChannel 的 map() 方法 將文件區(qū)域直接映射到內(nèi)存中來創(chuàng)建。該方法返回MappedByteBuffer 。 Java 平臺(tái)的實(shí)現(xiàn)有助于通過 JNI 從本機(jī)代碼創(chuàng)建直接字節(jié)緩沖區(qū)。如果以上這些緩沖區(qū)中的某個(gè)緩沖區(qū)實(shí)例指的是不可訪問的內(nèi)存區(qū)域,則試圖訪問該區(qū)域不會(huì)更改該緩沖區(qū)的內(nèi)容,并且將會(huì)在訪問期間或稍后的某個(gè)時(shí)間導(dǎo)致拋出不確定的異常。
字節(jié)緩沖區(qū)是直接緩沖區(qū)還是非直接緩沖區(qū)可通過調(diào)用其 isDirect() 方法來確定。提供此方法是為了能夠在性能關(guān)鍵型代碼中執(zhí)行顯式緩沖區(qū)管理。
Channel通道是java.nio的第二個(gè)創(chuàng)新,表示提供 IO 設(shè)備(例如:文件、套接字)的直接連接。
若需要使用 NIO 系統(tǒng),需要獲取用于連接 IO 設(shè)備的通道以及用于容納數(shù)據(jù)的緩沖區(qū)。然后操作緩沖區(qū),對(duì)數(shù)據(jù)進(jìn)行處理。這其中,Channel負(fù)責(zé)傳輸, Buffer 負(fù)責(zé)存儲(chǔ)。
通道是由java.nio.channels 包定義的,Channel 表示 IO 源與目標(biāo)打開的連接。Channel 類似于傳統(tǒng)的“流”。只不過 Channel本身不能直接訪問數(shù)據(jù), Channel 只能與Buffer 進(jìn)行交互。
接口java.nio.channels.Channel 接口:
FileChannel
SocketChannel
ServerSocketChannel
DatagramChannel
與緩沖區(qū)不同,通道API主要由接口指定,不同操作系統(tǒng)上通道的實(shí)現(xiàn)會(huì)不一樣
實(shí)現(xiàn)直接緩沖區(qū)與非直接緩沖區(qū)的栗子
分散讀取與聚集寫入通道可以有選擇地實(shí)現(xiàn)兩個(gè)新的接口: ScatteringByteChannel 和 GatheringByteChannel。
ScatteringByteChannel 有2個(gè)read方法,我們都叫她分散讀取(scattering Reads),分散讀取中,通道依次填充每個(gè)緩沖區(qū)。填滿一個(gè)緩沖區(qū)后,它就開始填充下一個(gè)。在某種意義上,緩沖區(qū)數(shù)組就像一個(gè)大緩沖區(qū)。
GatheringByteChannel中有2個(gè)wirte方法,我們都叫她聚集寫入(gathering Writes),他可以將多個(gè)緩沖區(qū)的數(shù)據(jù)聚集到通道中
分散讀取/聚集寫入對(duì)于將數(shù)據(jù)劃分為幾個(gè)部分很有用。例如,您可能在編寫一個(gè)使用消息對(duì)象的網(wǎng)絡(luò)應(yīng)用程序,每一個(gè)消息被劃分為固定長度的頭部和固定長度的正文。您可以創(chuàng)建一個(gè)剛好可以容納頭部的緩沖區(qū)和另一個(gè)剛好可以容難正文的緩沖區(qū)。當(dāng)您將它們放入一個(gè)數(shù)組中并使用分散讀取來向它們讀入消息時(shí),頭部和正文將整齊地劃分到這兩個(gè)緩沖區(qū)中。
我們從緩沖區(qū)所得到的方便性對(duì)于緩沖區(qū)數(shù)組同樣有效。因?yàn)槊恳粋€(gè)緩沖區(qū)都跟蹤自己還可以接受多少數(shù)據(jù),所以分散讀取會(huì)自動(dòng)找到有空間接受數(shù)據(jù)的第一個(gè)緩沖區(qū)。在這個(gè)緩沖區(qū)填滿后,它就會(huì)移動(dòng)到下一個(gè)緩沖區(qū)。
Python與NIO服務(wù)端(具體見注釋)
from socket import * import time s=socket(AF_INET,SOCK_STREAM) s.bind(("127.0.0.1",8888)) s.listen(5) s.setblocking(False) #設(shè)置socket的接口為非阻塞 conn_l=[] # 存儲(chǔ)和server的連接 的 連接 del_l=[] # 存儲(chǔ)和和server的斷開 的 連接 while True: try: # 這個(gè)過程是不阻塞的 conn,addr=s.accept() # 當(dāng)沒人連接的時(shí)候會(huì)報(bào)錯(cuò),走exception(<- py中是except) conn_l.append(conn) except BlockingIOError: print(conn_l) for conn in conn_l: try: data=conn.recv(1024) if not data: del_l.append(conn) # 這個(gè)過程是不阻塞的 data=conn.recv(1024) # 不阻塞 if not data: # 如果拿不到data del_l.append(conn) # 在廢棄列表中添加conn continue conn.send(data.upper()) except BlockingIOError: pass except ConnectionResetError: del_l.append(conn) for conn in del_l: conn_l.remove(conn) conn.close() del_l=[]
客戶端
from socket import * c=socket(AF_INET,SOCK_STREAM) c.connect(("127.0.0.1",8888)) while True: msg=input(">>: ") if not msg:continue c.send(msg.encode("utf-8")) data=c.recv(1024) print(data.decode("utf-8"))IO復(fù)用(IO multiplexing)
I/O多路復(fù)用實(shí)際上就是用select, poll, epoll監(jiān)聽多個(gè)io對(duì)象,當(dāng)io對(duì)象有變化(有數(shù)據(jù))的時(shí)候就通知用戶進(jìn)程。有些地方也稱這種IO方式為事件驅(qū)動(dòng)IO(event driven IO)。與多進(jìn)程和多線程技術(shù)相比,I/O多路復(fù)用技術(shù)的最大優(yōu)勢是系統(tǒng)開銷小,系統(tǒng)不必創(chuàng)建進(jìn)程/線程,也不必維護(hù)這些進(jìn)程/線程,從而大大減小了系統(tǒng)的開銷。當(dāng)然具體的可以看看這篇博客,現(xiàn)在先來看下I/O多路復(fù)用的流程:
(1)當(dāng)用戶進(jìn)程調(diào)用了select,那么整個(gè)進(jìn)程會(huì)被block;
(2)而同時(shí),kernel會(huì)“監(jiān)視”所有select負(fù)責(zé)的socket;
(3)當(dāng)任何一個(gè)socket中的數(shù)據(jù)準(zhǔn)備好了,select就會(huì)返回;
(4)這個(gè)時(shí)候用戶進(jìn)程再調(diào)用read操作,將數(shù)據(jù)從kernel拷貝到用戶進(jìn)程。
這個(gè)圖和BIO的圖其實(shí)并沒有太大的不同,事實(shí)上還更差一些。因?yàn)檫@里需要使用兩個(gè)系統(tǒng)調(diào)用(select和recvfrom),而BIO只調(diào)用了一個(gè)系統(tǒng)調(diào)用(recvfrom)。但是,用select的優(yōu)勢在于它可以同時(shí)處理多個(gè)connection。
JAVA實(shí)現(xiàn)IO復(fù)用這里我們使用的是java.nio下模塊來完成I/O多路復(fù)用的例子。我用到的Selector(選擇器),是Java NIO中能夠檢測一到多個(gè)NIO通道,并能夠知曉通道是否為諸如讀寫事件做好準(zhǔn)備的組件。這樣,一個(gè)多帶帶的線程可以管理多個(gè)channel,從而管理多個(gè)網(wǎng)絡(luò)連接。
Selector的使用 Selector的創(chuàng)建Selector selector = Selector.open();向Selector注冊通道
為了將Channel和Selector配合使用,必須將channel注冊到selector上。通過SelectableChannel.register()方法來實(shí)現(xiàn),如下:
channel.configureBlocking(false); SelectionKey key = channel.register(selector,Selectionkey.OP_READ);
register()方法的第二個(gè)參數(shù)是一個(gè)“interest集合”,意思是在通過Selector監(jiān)聽Channel時(shí)對(duì)什么事件感興趣??梢员O(jiān)聽四種不同類型的事件:Connect、Accept、Read、Write
通道觸發(fā)了一個(gè)事件意思是該事件已經(jīng)就緒。所以,某個(gè)channel成功連接到另一個(gè)服務(wù)器稱為“連接就緒”。一個(gè)server socket channel準(zhǔn)備好接收新進(jìn)入的連接稱為“接收就緒”。一個(gè)有數(shù)據(jù)可讀的通道可以說是“讀就緒”。等待寫數(shù)據(jù)的通道可以說是“寫就緒”。
這四種事件用SelectionKey的四個(gè)常量來表示:
SelectionKey.OP_CONNECT可連接
SelectionKey.OP_ACCEPT可接受連接
SelectionKey.OP_READ可讀
SelectionKey.OP_WRITE可寫
SelectionKey當(dāng)向Selector注冊Channel時(shí),register()方法會(huì)返回一個(gè)SelectionKey對(duì)象。它包含了:
interest集合
ready集合
Channel
Selector
附加的對(duì)象(可選)
interest集合是你所選擇的感興趣的事件集合??梢酝ㄟ^SelectionKey讀寫interest集合,像這樣:
int interestSet = selectionKey.interestOps(); boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT; boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
可以看到,用“位與”操作interest 集合和給定的SelectionKey常量,可以確定某個(gè)確定的事件是否在interest 集合中。
ready 集合是通道已經(jīng)準(zhǔn)備就緒的操作的集合。在一次選擇(Selection)之后,你會(huì)首先訪問這個(gè)ready set。Selection將在下一小節(jié)進(jìn)行解釋??梢赃@樣訪問ready集合:
int readySet = selectionKey.readyOps();
可以用像檢測interest集合那樣的方法,來檢測channel中什么事件或操作已經(jīng)就緒。但是,也可以使用以下四個(gè)方法,它們都會(huì)返回一個(gè)布爾類型:
selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable();從SelectionKey訪問Channel和Selector
Channel channel = selectionKey.channel(); Selector selector = selectionKey.selector();java代碼
/** * @program: NIOServer * @description: 服務(wù)端 * @author: tEngSHe789 **/ public class NIOServer { public static void main(String[] args) throws IOException { System.out.println("服務(wù)端Start...."); // 創(chuàng)建通道 ServerSocketChannel serverSocketChannel=ServerSocketChannel.open(); // 設(shè)置非阻塞 serverSocketChannel.configureBlocking(false); // 綁定連接 serverSocketChannel.bind(new InetSocketAddress(8888)); // 獲取選擇器 Selector selector=Selector.open(); // 將通道注冊到選擇器 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 輪調(diào)式獲取選擇“已經(jīng)準(zhǔn)備就緒”的事件 while (selector.select() > 0){ // 獲取當(dāng)前選擇器的左右已經(jīng)準(zhǔn)備就緒的監(jiān)聽事件(選擇key) Iterator客戶端:iterator=selector.selectedKeys().iterator(); while (iterator.hasNext()){ // 獲取準(zhǔn)備就緒事件 SelectionKey selectionKey=iterator.next(); // 判斷具體是什么事件 if (selectionKey.isAcceptable()){//如果是“接受就緒” SocketChannel socketChannel=serverSocketChannel.accept();// 獲取連接 socketChannel.configureBlocking(false); // 設(shè)置非阻塞 //將該通道注冊到服務(wù)器上 socketChannel.register(selector, SelectionKey.OP_READ); }else if (selectionKey.isReadable()){//如是“已經(jīng)就緒” SocketChannel socketChannel= (SocketChannel) selectionKey.channel();//獲取連接 //讀數(shù)據(jù) ByteBuffer buffer=ByteBuffer.allocate(1024); int len = 0; //分散讀取 len=socketChannel.read(buffer); while (len > 0){ buffer.flip(); System.out.println(new String(buffer.array(),0,len)); buffer.clear(); } } iterator.remove(); } } } }
/** * @program: NIOClient * @description: 客戶端 * @author: tEngSHe789 **/ public class NIOClient { public static void main(String[] args) throws IOException { System.out.println("客戶端Start...."); // 創(chuàng)建通道 SocketChannel socketChannel=SocketChannel.open(new InetSocketAddress("127.0.0.1",8888)); // 設(shè)置SocketChannel接口為非阻塞 socketChannel.configureBlocking(false); //指定緩沖區(qū)大小 ByteBuffer buffer=ByteBuffer.allocate(1024); Scanner scanner=new Scanner(System.in); while (scanner.hasNext()){ String msg = scanner.next(); // 存儲(chǔ) buffer.put((new Date().toString()+" "+msg).getBytes()); // 翻轉(zhuǎn) buffer.flip(); // 聚集寫入 socketChannel.write(buffer); // 釋放 buffer.clear(); } socketChannel.close(); } }python實(shí)現(xiàn)IO復(fù)用
對(duì)比java用的是Selector,可以幫我們在默認(rèn)操作系統(tǒng)下選擇最合適的select, poll, epoll這三種多路復(fù)合模型,python是通過一種機(jī)制一個(gè)進(jìn)程能同時(shí)等待多個(gè)文件描述符,而這些文件描述符(套接字描述符)其中的任意一個(gè)進(jìn)入讀就緒狀態(tài),select()函數(shù)就可以返回。
服務(wù)端from socket import * import select s=socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind(("127.0.0.1",8888)) s.listen(5) s.setblocking(False) #設(shè)置socket的接口為非阻塞 read_l=[s,] # 數(shù)據(jù)可讀通道的列表 while True: # 監(jiān)聽的read_l中的socket對(duì)象內(nèi)部如果有變化,那么這個(gè)對(duì)象就會(huì)在r_l # 第二個(gè)參數(shù)里有什么對(duì)象,w_l中就有什么對(duì)象 # 第三個(gè)參數(shù) 如果這里的對(duì)象內(nèi)部出錯(cuò),那會(huì)把這些對(duì)象加到x_l中 # 1 是超時(shí)時(shí)間 r_l,w_l,x_l=select.select(read_l,[],[],1) print(r_l) for ready_obj in r_l: if ready_obj == s: conn,addr=ready_obj.accept() #此時(shí)的ready_obj等于s read_l.append(conn) else: try: data=ready_obj.recv(1024) #此時(shí)的ready_obj等于conn if not data: ready_obj.close() read_l.remove(ready_obj) raise Exception("連接斷開") ready_obj.send(data.upper()) except ConnectionResetError: ready_obj.close() read_l.remove(ready_obj)客戶端
from socket import * c=socket(AF_INET,SOCK_STREAM) c.connect(("127.0.0.1",8888)) while True: msg=input(">>>: ") if not msg:continue c.send(msg.encode("utf-8")) data=c.recv(1024) print(data.decode("utf-8"))AIO(asynchronous io)
真正的異步I/O很牛逼,流程大概如下:
(1)用戶進(jìn)程發(fā)起read操作之后,立刻就可以開始去做其它的事。
(2)而另一方面,從kernel的角度,當(dāng)它受到一個(gè)asynchronous read之后,首先它會(huì)立刻返回,所以不會(huì)對(duì)用戶進(jìn)程產(chǎn)生任何block。
(3)然后,kernel會(huì)等待數(shù)據(jù)準(zhǔn)備完成,然后將數(shù)據(jù)拷貝到用戶內(nèi)存,當(dāng)這一切都完成之后,kernel會(huì)給用戶進(jìn)程發(fā)送一個(gè)signal,告訴它read操作完成了。
JavaJava中使用AIO需要用到j(luò)ava.nio.channels.AsynchronousChannelGroup和java.nio.channels.AsynchronousServerSocketChannel的包,由于實(shí)際項(xiàng)目鮮有人用,就不演示了
總結(jié)回顧一下各個(gè)IO Model的比較,如圖所示:
blocking io :阻塞型io,再熟悉不過,處理accept、read、write都會(huì)阻塞用戶進(jìn)程
non blocking io:當(dāng)通過系統(tǒng)調(diào)用的時(shí)候,如果沒有連接或者數(shù)據(jù)到達(dá)就直接返回一個(gè)錯(cuò)誤,用戶進(jìn)程不阻塞但是不斷的輪詢。注意這個(gè)不是java nio框架中對(duì)應(yīng)的網(wǎng)絡(luò)模型
io multiplexing:io多路復(fù)用才是nio對(duì)應(yīng)的網(wǎng)絡(luò)io模型。該模型對(duì)于用戶進(jìn)程也是阻塞的,優(yōu)點(diǎn)是可以同時(shí)支持多個(gè)connetciotn。前三種都屬于同步模式,既然都是同步的,如果要做到看似非阻塞,那么就需要輪詢機(jī)制。相對(duì)于上一種模型,這種只是將輪詢從用戶進(jìn)程轉(zhuǎn)移到了操作系統(tǒng)內(nèi)核,通過調(diào)用select函數(shù),不斷輪詢多個(gè)connection是否ready,如果有一種ready好的,就通過事件通知用戶進(jìn)程,用戶進(jìn)程再通過事件來處理。所以在java的nio中會(huì)看到一大堆事件處理。這種模型的阻塞不是在socket層面的阻塞,而是在調(diào)動(dòng)select函數(shù)的阻塞。而且相對(duì)于blocking io,還多了一次select的系統(tǒng)調(diào)用,其實(shí)性能會(huì)更低,所以在低吞吐量下,這種io不見得比bio+線程池的模型優(yōu)越。
sign driven:極少使用,不知道
async io :java7時(shí)候開始升級(jí),也成為nio2。實(shí)現(xiàn)了異步的io。前三種都是通過用戶進(jìn)程在主動(dòng)獲取(bio的阻塞,nbio的輪詢和iomult的按事件獲取),而aio交互很簡單,用戶進(jìn)程調(diào)用后立即返回,用戶進(jìn)程不阻塞,內(nèi)核當(dāng)完成網(wǎng)絡(luò)io和數(shù)據(jù)復(fù)制后,主動(dòng)通知用戶進(jìn)程。前面說到的系統(tǒng)內(nèi)核做的操作,除了等待網(wǎng)絡(luò)io就緒數(shù)據(jù)到達(dá)內(nèi)核,還有從系統(tǒng)內(nèi)核復(fù)制用戶空間去的過程,異步io這兩者對(duì)于用戶進(jìn)程而言都是非阻塞的,而前三種,在數(shù)據(jù)從內(nèi)核復(fù)制到用戶空間這個(gè)過程,都是阻塞的。
參考資料前言說的那本書
Ron Hitchens于2002年 著的《java nio》
findumars
冬瓜蔡
彼岸船夫
NIO的/分散讀取和聚集寫入
并發(fā)編程網(wǎng)
感謝
續(xù)1s時(shí)間全片結(jié)束,覺得我寫的不錯(cuò)?想要了解更多精彩新姿勢?趕快打開我的
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/42337.html
摘要:緩沖區(qū)的限制不能為負(fù),并且不能大于其容量。如果指向的位置超過限制,則拋出異常。使用臨時(shí)緩沖區(qū)執(zhí)行低層次操作。臨時(shí)緩沖區(qū)對(duì)象離開作用域,并最終成為被回收的無用數(shù)據(jù)。 前天剛好看了點(diǎn)《UNIX網(wǎng)絡(luò)編程》,比較頭大?,F(xiàn)在我來整理一下所學(xué)所得,并用于個(gè)人備忘。如果有不對(duì),請(qǐng)批評(píng)。 想要解鎖更多新姿勢?請(qǐng)?jiān)L問https://blog.tengshe789.tech/ IO模型介紹 IO模型是什么...
摘要:參考鏈接面向?qū)ο缶幊棠P同F(xiàn)在的很多編程語言基本都具有面向?qū)ο蟮乃枷耄热绲鹊?,而面向?qū)ο蟮闹饕枷雽?duì)象,類,繼承,封裝,多態(tài)比較容易理解,這里就不多多描述了。 前言 在我們的日常日發(fā)和學(xué)習(xí)生活中會(huì)常常遇到一些名詞,比如 命令式編程模型,聲明式編程模型,xxx語言是面向?qū)ο蟮牡鹊?,這個(gè)編程模型到處可見,但是始終搞不清是什么?什么語言又是什么編程模型,當(dāng)你新接觸一門語言的時(shí)候,有些問題是需...
摘要:編程基礎(chǔ)要學(xué)習(xí)如何用進(jìn)行數(shù)據(jù)分析,數(shù)據(jù)分析師建議第一步是要了解一些的編程基礎(chǔ),知道的數(shù)據(jù)結(jié)構(gòu),什么是向量列表數(shù)組字典等等了解的各種函數(shù)及模塊。數(shù)據(jù)分析師認(rèn)為數(shù)據(jù)分析有的工作都在處理數(shù)據(jù)。 showImg(https://segmentfault.com/img/bVbnbZo?w=1024&h=653); 本文為CDA數(shù)據(jù)分析研究院原創(chuàng)作品,轉(zhuǎn)載需授權(quán) 1.為什么選擇Python進(jìn)行數(shù)...
摘要:四種模型把同步阻塞同步非阻塞異步阻塞異步非阻塞的模型講得很清楚。有人對(duì)于模型有一些批判,認(rèn)為多線程模型同步阻塞模型不比事件模型差,講了提到的多線程模型的性能瓶頸在如今的內(nèi)核里已經(jīng)不存在了,而多線程模型開發(fā)起來更簡單。 四種IO模型 Boost application performance using asynchronous I/O把同步阻塞、同步非阻塞、異步阻塞、異步非阻塞的模型講...
閱讀 1877·2023-04-26 02:46
閱讀 2008·2021-11-25 09:43
閱讀 1148·2021-09-29 09:35
閱讀 2107·2019-08-30 15:56
閱讀 3429·2019-08-30 15:54
閱讀 2639·2019-08-29 16:35
閱讀 3126·2019-08-29 15:25
閱讀 3298·2019-08-29 14:01