流(stream)在 Node.js 中是處理流數(shù)據(jù)的抽象接口(abstract interface)。 stream 模塊提供了基礎(chǔ)的 API 。使用這些 API 可以很容易地來(lái)構(gòu)建實(shí)現(xiàn)流接口的對(duì)象。
Node.js 提供了多種流對(duì)象。 例如, HTTP 請(qǐng)求 和 process.stdout 就都是流的實(shí)例。
流可以是可讀的、可寫的,或是可讀寫的。所有的流都是 EventEmitter 的實(shí)例。
我們打算讀取一個(gè)文件,使用 fs.readFileSync 同步讀取一個(gè)文件,程序會(huì)被阻塞,所有的數(shù)據(jù)都會(huì)被讀取到內(nèi)存中。
換用 fs.readFile 讀取文件,程序不會(huì)被阻塞,但是所有的數(shù)據(jù)依舊會(huì)被一次性全部被讀取到內(nèi)存中。
流的類型Node.js 中有四種基本的流類型:
Readable - 可讀的流 (例如 fs.createReadStream()).
Writable - 可寫的流 (例如 fs.createWriteStream()).
Duplex - 可讀寫的流 (例如 net.Socket).
Transform - 在讀寫過(guò)程中可以修改和變換數(shù)據(jù)的 Duplex 流 (例如 zlib.createDeflate()).
可讀流(Readable Stream)可讀流有兩種模式:
監(jiān)聽(tīng) "data" 事件
調(diào)用 stream.resume() 方法
調(diào)用 stream.pipe() 方法將數(shù)據(jù)發(fā)送到 Writable
如果存在管道目標(biāo),調(diào)用 stream.unpipe()并取消"data"事件監(jiān)聽(tīng)
flowing模式const fs = require("fs") const path = require("path") const rs = fs.createReadStream(path.join(__dirname, "./1.txt")) rs.setEncoding("utf8") rs.on("data", (data) => { console.log(data) })paused模式
const fs = require("fs") const path = require("path") const rs = fs.createReadStream(path.join(__dirname, "./1.txt")) rs.setEncoding("utf8") rs.on("readable", () => { let d = rs.read(1) console.log(d) })實(shí)現(xiàn)原理 流動(dòng)模式原理
let EventEmitter = require("events"); let fs = require("fs"); class ReadStream extends EventEmitter { constructor(path,options) { super(); this.path = path; this.flags = options.flags || "r"; this.autoClose = options.autoClose || true; this.highWaterMark = options.highWaterMark|| 64*1024; this.start = options.start||0; this.end = options.end; this.encoding = options.encoding || null this.buffer = Buffer.alloc(this.highWaterMark);//定義緩存區(qū)大小 this.pos = this.start; // pos 讀取的位置 可變 start不變的 this.flowing = null; // null就是暫停模式 } } module.exports = ReadStream;
open(){ fs.open(this.path,this.flags,(err,fd)=>{ if(err){ this.emit("error",err); if(this.autoClose){ // 是否自動(dòng)關(guān)閉 this.destroy(); } return; } this.fd = fd; // 保存文件描述符 this.emit("open"); // 文件打開了 }); } destroy(){ // 先判斷有沒(méi)有fd 有關(guān)閉文件 觸發(fā)close事件 if(typeof this.fd ==="number"){ fs.close(this.fd,()=>{ this.emit("close"); }); return; } this.emit("close"); // 銷毀 }
constructor(path,options){ super(); this.path = path; this.flags = options.flags || "r"; this.autoClose = options.autoClose || true; this.highWaterMark = options.highWaterMark|| 64*1024; this.start = options.start||0; this.end = options.end; this.encoding = options.encoding || null this.flowing = null; this.buffer = Buffer.alloc(this.highWaterMark); this.pos = this.start; this.open();//打開文件 fd this.on("newListener",(eventName,callback)=>{ if(eventName === "data"){ // 相當(dāng)于用戶監(jiān)聽(tīng)了data事件 this.flowing = true; // 監(jiān)聽(tīng)了 就去讀 this.read(); // 去讀內(nèi)容了 } }) }
read(){ // 此時(shí)文件還沒(méi)打開呢 if(typeof this.fd !== "number"){ // 當(dāng)文件真正打開的時(shí)候 會(huì)觸發(fā)open事件,觸發(fā)事件后再執(zhí)行read,此時(shí)fd肯定有了 return this.once("open",()=>this.read()) } // 此時(shí)有fd了 // 應(yīng)該填highWaterMark? // 想讀4個(gè) 寫的是3 每次讀3個(gè) // 123 4 let howMuchToRead = this.end?Math.min(this.highWaterMark,this.end-this.pos+1):this.highWaterMark; fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytesRead)=>{ // 讀到了多少個(gè) 累加 if(bytesRead>0){ this.pos+= bytesRead; let data = this.encoding?this.buffer.slice(0,bytesRead).toString(this.encoding):this.buffer.slice(0,bytesRead); this.emit("data",data); // 當(dāng)讀取的位置 大于了末尾 就是讀取完畢了 if(this.pos > this.end){ this.emit("end"); this.destroy(); } if(this.flowing) { // 流動(dòng)模式繼續(xù)觸發(fā) this.read(); } }else{ this.emit("end"); this.destroy(); } }); }
resume() { this.flowing = true; this.read(); } pause() { this.flowing = false; }
let EventEmitter = require("events"); let fs = require("fs"); class ReadStream extends EventEmitter { constructor(path, options) { super(); this.path = path; this.flags = options.flags || "r"; this.autoClose = options.autoClose || true; this.highWaterMark = options.highWaterMark|| 64*1024; this.start = options.start||0; this.end = options.end; this.encoding = options.encoding || null this.open(); this.flowing = null; // null就是暫停模式 this.buffer = Buffer.alloc(this.highWaterMark); this.pos = this.start; this.on("newListener", (eventName,callback) => { if (eventName === "data") { this.flowing = true; this.read(); } }) } read(){ if (typeof this.fd !== "number") { return this.once("open", () => this.read()) } let howMuchToRead = this.end ? Math.min(this.highWaterMark, this.end - this.pos+1) : this.highWaterMark; fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err,bytesRead) => { if (bytesRead > 0) { this.pos += bytesRead; let data = this.encoding ? this.buffer.slice(0, bytesRead).toString(this.encoding) : this.buffer.slice(0, bytesRead); this.emit("data", data); if(this.pos > this.end){ this.emit("end"); this.destroy(); } if(this.flowing) { this.read(); } }else{ this.emit("end"); this.destroy(); } }); } resume() { this.flowing = true; this.read(); } pause() { this.flowing = false; } destroy() { if(typeof this.fd === "number") { fs.close(this.fd, () => { this.emit("close"); }); return; } this.emit("close"); }; open() { fs.open(this.path, this.flags, (err,fd) => { if (err) { this.emit("error", err); if (this.autoClose) { this.destroy(); } return; } this.fd = fd; this.emit("open"); }); } } module.exports = ReadStream;暫停模式原理
constructor(path, options) { super(); this.path = path; this.highWaterMark = options.highWaterMark || 64 * 1024; this.autoClose = options.autoClose || true; this.start = 0; this.end = options.end; this.flags = options.flags || "r"; this.buffers = []; // 緩存區(qū) this.pos = this.start; this.length = 0; // 緩存區(qū)大小 this.emittedReadable = false; this.reading = false; // 不是正在讀取的 this.open(); this.on("newListener", (eventName) => { if (eventName === "readable") { this.read(); } }) } read(n) { if (this.length == 0) { this.emittedReadable = true; } if (this.length < this.highWaterMark) { if(!this.reading) { this.reading = true; this._read(); } } } _read() { if (typeof this.fd !== "number") { return this.once("open", () => this._read()); } let buffer = Buffer.alloc(this.highWaterMark); fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => { if (bytesRead > 0) { this.buffers.push(buffer.slice(0, bytesRead)); this.pos += bytesRead; this.length += bytesRead; this.reading = false; if (this.emittedReadable) { this.emittedReadable = false; this.emit("readable"); } } else { this.emit("end"); this.destroy(); } }) }
function computeNewHighWaterMark(n) { n--; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n++; return n; }
read(n) { if(n>this.length){ // 更改緩存區(qū)大小 讀取五個(gè)就找 2的幾次放最近的 this.highWaterMark = computeNewHighWaterMark(n) this.emittedReadable = true; this._read(); } // 如果n>0 去緩存區(qū)中取吧 let buffer=null; let index = 0; // 維護(hù)buffer的索引的 let flag = true; if (n > 0 && n <= this.length) { // 讀的內(nèi)容 緩存區(qū)中有這么多 // 在緩存區(qū)中取 [[2,3],[4,5,6]] buffer = Buffer.alloc(n); // 這是要返回的buffer let buf; while (flag&&(buf = this.buffers.shift())) { for (let i = 0; i < buf.length; i++) { buffer[index++] = buf[i]; if(index === n){ // 拷貝夠了 不需要拷貝了 flag = false; this.length -= n; let bufferArr = buf.slice(i+1); // 取出留下的部分 // 如果有剩下的內(nèi)容 在放入到緩存中 if(bufferArr.length > 0){ this.buffers.unshift(bufferArr); } break; } } } } // 當(dāng)前緩存區(qū) 小于highWaterMark時(shí)在去讀取 if (this.length == 0) { this.emittedReadable = true; } if (this.length < this.highWaterMark) { if(!this.reading){ this.reading = true; this._read(); // 異步的 } } return buffer }
let fs = require("fs"); let EventEmitter = require("events"); function computeNewHighWaterMark(n) { n--; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n++; return n; } class ReadStream extends EventEmitter { constructor(path, options) { super(); this.path = path; this.highWaterMark = options.highWaterMark || 64 * 1024; this.autoClose = options.autoClose || true; this.start = 0; this.end = options.end; this.flags = options.flags || "r"; this.buffers = []; // 緩存區(qū) this.pos = this.start; this.length = 0; // 緩存區(qū)大小 this.emittedReadable = false; this.reading = false; // 不是正在讀取的 this.open(); this.on("newListener", (eventName) => { if (eventName === "readable") { this.read(); } }) } read(n) { if(n>this.length){ // 更改緩存區(qū)大小 讀取五個(gè)就找 2的幾次放最近的 this.highWaterMark = computeNewHighWaterMark(n) this.emittedReadable = true; this._read(); } // 如果n>0 去緩存區(qū)中取吧 let buffer=null; let index = 0; // 維護(hù)buffer的索引的 let flag = true; if (n > 0 && n <= this.length) { // 讀的內(nèi)容 緩存區(qū)中有這么多 // 在緩存區(qū)中取 [[2,3],[4,5,6]] buffer = Buffer.alloc(n); // 這是要返回的buffer let buf; while (flag&&(buf = this.buffers.shift())) { for (let i = 0; i < buf.length; i++) { buffer[index++] = buf[i]; if(index === n){ // 拷貝夠了 不需要拷貝了 flag = false; this.length -= n; let bufferArr = buf.slice(i+1); // 取出留下的部分 // 如果有剩下的內(nèi)容 在放入到緩存中 if(bufferArr.length > 0){ this.buffers.unshift(bufferArr); } break; } } } } // 當(dāng)前緩存區(qū) 小于highWaterMark時(shí)在去讀取 if (this.length == 0) { this.emittedReadable = true; } if (this.length < this.highWaterMark) { if(!this.reading){ this.reading = true; this._read(); // 異步的 } } return buffer } // 封裝的讀取的方法 _read() { // 當(dāng)文件打開后在去讀取 if (typeof this.fd !== "number") { return this.once("open", () => this._read()); } // 上來(lái)我要喝水 先倒三升水 [] let buffer = Buffer.alloc(this.highWaterMark); fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => { if (bytesRead > 0) { // 默認(rèn)讀取的內(nèi)容放到緩存區(qū)中 this.buffers.push(buffer.slice(0, bytesRead)); this.pos += bytesRead; // 維護(hù)讀取的索引 this.length += bytesRead;// 維護(hù)緩存區(qū)的大小 this.reading = false; // 是否需要觸發(fā)readable事件 if (this.emittedReadable) { this.emittedReadable = false; // 下次默認(rèn)不觸發(fā) this.emit("readable"); } } else { this.emit("end"); this.destroy(); } }) } destroy() { if (typeof this.fd !== "number") { return this.emit("close") } fs.close(this.fd, () => { this.emit("close") }) } open() { fs.open(this.path, this.flags, (err, fd) => { if (err) { this.emit("error", err); if (this.autoClose) { this.destroy(); } return } this.fd = fd; this.emit("open"); }); } } module.exports = ReadStream;
摘要:可寫流可寫流是對(duì)數(shù)據(jù)寫入目的地的一種抽象。對(duì)象流的特點(diǎn)就是它有一個(gè)標(biāo)志,我們可以設(shè)置它讓流可以接受任何對(duì)象。 可寫流(Writable Stream) 可寫流是對(duì)數(shù)據(jù)寫入目的地的一種抽象。 可寫流的原理其實(shí)與可讀流類似,當(dāng)數(shù)據(jù)過(guò)來(lái)的時(shí)候會(huì)寫入緩存池,當(dāng)寫入的速度很慢或者寫入暫停時(shí)候,數(shù)據(jù)流便會(huì)進(jìn)入到隊(duì)列池緩存起來(lái),當(dāng)然即使緩存池滿了,剩余的數(shù)據(jù)也是存在內(nèi)存 可寫流的簡(jiǎn)單用法如下代碼 l...
摘要:回調(diào)函數(shù)中檢測(cè)該次寫入是否被緩沖,若是,觸發(fā)事件。若目標(biāo)可寫流表示該寫入操作需要進(jìn)行緩沖,則立刻將源可讀流切換至?xí)和DJ健1O(jiān)聽(tīng)源可讀流的事件,相應(yīng)地結(jié)束目標(biāo)可寫流。 在Node.js中,流(Stream)是其眾多原生對(duì)象的基類,它對(duì)處理潛在的大文件提供了支持,也抽象了一些場(chǎng)景下的數(shù)據(jù)處理和傳遞。在它對(duì)外暴露的接口中,最為神奇的,莫過(guò)于導(dǎo)流(pipe)方法了。鑒于近期自己正在閱讀Node...
摘要:流是基于事件的用于管理和處理數(shù)據(jù)而且有不錯(cuò)的效率借助事件和非阻塞庫(kù)流模塊允許在其可用的時(shí)候動(dòng)態(tài)處理在其不需要的時(shí)候釋放掉使用流的好處舉一個(gè)讀取文件的例子使用同步讀取一個(gè)文件程序會(huì)被阻塞所有的數(shù)據(jù)都會(huì)被讀取到內(nèi)存中換用讀取文件程序不會(huì)被阻塞但 流是基于事件的API,用于管理和處理數(shù)據(jù),而且有不錯(cuò)的效率.借助事件和非阻塞I/O庫(kù),流模塊允許在其可用的時(shí)候動(dòng)態(tài)處理,在其不需要的時(shí)候釋放掉. ...
摘要:在可讀流事件里我們就必須調(diào)用方法。當(dāng)一個(gè)對(duì)象就意味著我們想發(fā)出信號(hào)這個(gè)流沒(méi)有更多數(shù)據(jù)了自定義可寫流為了實(shí)現(xiàn)可寫流,我們需要使用流模塊中的構(gòu)造函數(shù)。我們只需給構(gòu)造函數(shù)傳遞一些選項(xiàng)并創(chuàng)建一個(gè)對(duì)象。 前言 什么是流呢?看字面意思,我們可能會(huì)想起生活中的水流,電流。但是流不是水也不是電,它只是描述水和電的流動(dòng);所以說(shuō)流是抽象的。在node.js中流是一個(gè)抽象接口,它不關(guān)心文件內(nèi)容,只關(guān)注是否從...
摘要:方法也可以接收一個(gè)參數(shù)表示數(shù)據(jù)請(qǐng)求著請(qǐng)求的數(shù)據(jù)大小,但是可讀流可以根據(jù)需要忽略這個(gè)參數(shù)。讀取數(shù)據(jù)大部分情況下我們只要簡(jiǎn)單的使用方法將可讀流的數(shù)據(jù)重定向到另外形式的流,但是在某些情況下也許直接從可讀流中讀取數(shù)據(jù)更有用。 介紹本文介紹了使用 node.js streams 開發(fā)程序的基本方法。 We should have some ways of connecting programs ...
閱讀 3697·2021-09-22 15:15
閱讀 3599·2021-08-12 13:24
閱讀 1330·2019-08-30 15:53
閱讀 1842·2019-08-30 15:43
閱讀 1201·2019-08-29 17:04
閱讀 2815·2019-08-29 15:08
閱讀 1611·2019-08-29 13:13
閱讀 3107·2019-08-29 11:06