摘要:上游水源通過里中的方法流入水電站中。當(dāng)在接收數(shù)據(jù)中出現(xiàn)錯誤時發(fā)出。暫停可讀流,不再發(fā)出事件恢復(fù)可讀流,繼續(xù)發(fā)出事件把這個可讀流的輸出傳遞給指定的流,兩個流組成一個管道。
題外話
該文章整合了多篇網(wǎng)絡(luò)文章(整合之處已設(shè)置超鏈接,可點擊直接了解原文),目的僅僅是為了和大伙分享,更加通俗易懂的了解流的各個流程的初始。本人也是node的初學(xué)菜鳥,有描述錯誤或誤人子弟的地方多請大神們多多指出。
readable 我們先來安利一些思路,方便理清楚邏輯:)。事件 查看原文讀緩沖區(qū)(readable buffer):這里的讀是個形容詞,是指可讀流臨時存放data(只能是字符串或者Buffer,不能是數(shù)字)的緩沖區(qū)。(讀緩沖區(qū)就像一個水電站一樣,感覺這樣描述比較好理解flowing、paused模式)
flowing模式:即流動模式,就像打開水電站的水閘一樣,上游的水和下游完完全全連通直到上游來源的數(shù)據(jù)耗盡。
paused模式:即暫停模式,就像水電站的水閘在你指定的時候(使用stream.read())才會打開。不過,當(dāng)你使用read()打開水閘的時候是一個超自然現(xiàn)象---水電站里的水瞬間被抽干,上游的水還沒來得及填充水電站。然后自動關(guān)閉水閘,等待你的下一次“惠顧“read()。
_read:上游水源通過_read里中的push、unshift方法流入水電站中。
函數(shù) 查看原文readable:在數(shù)據(jù)塊可以從流中讀取的時候發(fā)出。它對應(yīng)的處理器沒有參數(shù),可以在處理器里調(diào)用read([size])方法讀取數(shù)據(jù)。
data:有數(shù)據(jù)可讀時發(fā)出。它對應(yīng)的處理器有一個參數(shù),代表數(shù)據(jù)。如果你只想快快地讀取一個流的數(shù)據(jù),給data關(guān)聯(lián)一個處理器是最方便的辦法。處理器的參數(shù)是Buffer對象,如果你調(diào)用了Readable的setEncoding(encoding)方法,處理器的參數(shù)就是String對象。
end:當(dāng)數(shù)據(jù)被讀完時發(fā)出。對應(yīng)的處理器沒有參數(shù)。
close:當(dāng)?shù)讓拥馁Y源,如文件,已關(guān)閉時發(fā)出。不是所有的Readable流都會發(fā)出這個事件。對應(yīng)的處理器沒有參數(shù)。
error:當(dāng)在接收數(shù)據(jù)中出現(xiàn)錯誤時發(fā)出。對應(yīng)的處理器參數(shù)是Error的實例,它的message屬性描述了錯誤原因,stack屬性保存了發(fā)生錯誤時的堆棧信息。
流動模式和暫停模式切換 查看原文read([size]):如果你給read方法傳遞了一個大小作為參數(shù),那它會返回指定數(shù)量的數(shù)據(jù),如果數(shù)據(jù)不足,就會返回null。如果你不給read方法傳參,它會返回內(nèi)部緩沖區(qū)里的所有數(shù)據(jù),如果沒有數(shù)據(jù),會返回null,此時有可能說明遇到了文件末尾。read返回的數(shù)據(jù)可能是Buffer對象,也可能是String對象。
setEncoding(encoding):給流設(shè)置一個編碼格式,用于解碼讀到的數(shù)據(jù)。調(diào)用此方法后,read([size])方法返回String對象。
pause():暫??勺x流,不再發(fā)出data事件
resume():恢復(fù)可讀流,繼續(xù)發(fā)出data事件
pipe(destination,[options]):把這個可讀流的輸出傳遞給destination指定的Writable流,兩個流組成一個管道。options是一個JS對象,這個對象有一個布爾類型的end屬性,默認(rèn)值為true,當(dāng)end為true時,Readable結(jié)束時自動結(jié)束Writable。注意,我們可以把一個Readable與若干Writable連在一起,組成多個管道,每一個Writable都能得到同樣的數(shù)據(jù)。這個方法返回destination,如果destination本身又是Readable流,就可以級聯(lián)調(diào)用pipe(比如我們在使用gzip壓縮、解壓縮時就會這樣,馬上會講到)。
unpipe([destination]):端口與指定destination的管道。不傳遞destination時,斷開與這個可讀流連在一起的所有管道。
通過添加 data 事件監(jiān)聽器來啟動數(shù)據(jù)監(jiān)聽
調(diào)用 resume() 方法啟動數(shù)據(jù)流
調(diào)用 pipe() 方法將數(shù)據(jù)轉(zhuǎn)接到另一個 可寫流
觸發(fā)準(zhǔn)備數(shù)據(jù)(_read)的方法在流沒有 pipe() 時,調(diào)用 pause() 方法可以將流暫停
pipe() 時,需要移除所有 data 事件的監(jiān)聽,再調(diào)用 unpipe() 方法
工作流程 查看原文data listener
readable listener
read()——如果當(dāng)前緩沖區(qū)為空,或者緩沖區(qū)并未超出我們設(shè)定的最大值,那么就可以繼續(xù)準(zhǔn)備數(shù)據(jù);如果此時正在準(zhǔn)備數(shù)據(jù)(_read())或者已經(jīng)結(jié)束讀?。╬ush(null)),那么就放棄準(zhǔn)備數(shù)據(jù)。
1.在paused模式下則讀取全部緩沖區(qū)的長度;若讀取的字節(jié)數(shù)(n)大于設(shè)置的緩沖區(qū)最大值,則適當(dāng)擴(kuò)大緩沖區(qū)的大?。J(rèn)為16k,最大為8m);若讀取的長度大于當(dāng)前緩沖區(qū)的大小,設(shè)置needReadable屬性并準(zhǔn)備數(shù)據(jù)等待下一次讀取。
2.如果當(dāng)前緩沖區(qū)為空,或者緩沖區(qū)并未超出我們設(shè)定的最大值,那么就可以繼續(xù)準(zhǔn)備數(shù)據(jù);如果此時正在準(zhǔn)備數(shù)據(jù)(_read())或者已經(jīng)結(jié)束讀?。╬ush(null)),那么就放棄準(zhǔn)備數(shù)據(jù)。
3.針對這個私有方法_read,文檔上有特殊說明,自定義的Readable實現(xiàn)類需要實現(xiàn)這個方法,在該方法中手動添加數(shù)據(jù)到Readable對象的讀緩沖區(qū),然后進(jìn)行Readable的讀取??梢岳斫鉃開read函數(shù)為讀取數(shù)據(jù)前的準(zhǔn)備工作(準(zhǔn)備數(shù)據(jù)),針對的是流的實現(xiàn)者而言。
1.對于處在flowing模式下的讀取,每次只讀緩沖區(qū)中第一個buffer的長度
2.針對這個私有方法_read,文檔上有特殊說明,自定義的Readable實現(xiàn)類需要實現(xiàn)這個方法,在該方法中手動添加數(shù)據(jù)到Readable對象的讀緩沖區(qū),然后進(jìn)行Readable的讀取??梢岳斫鉃開read函數(shù)為讀取數(shù)據(jù)前的準(zhǔn)備工作(準(zhǔn)備數(shù)據(jù)),針對的是流的實現(xiàn)者而言。
實例//這是一個將存放多條json字符串的txt文件讀取成json的例子 const stream = require("stream"); const fs = require("fs"); const util = require("util"); function JSONLineReader(source) { stream.Readable.call(this); this._source = source; this._foundLineEnd = false; this._buffer = ""; source.on("readable", function() {//監(jiān)聽source什么時候準(zhǔn)備好,那么我們就可以用read()或則readable listener去觸發(fā)JSONLineReader的_read方法 this.read(); // this.on("readable", function(data) { // console.log("readable"); // }); }.bind(this)) } util.inherits(JSONLineReader, stream.Readable); JSONLineReader.prototype._read = function(size) { var chunk; var line; var lineIndex; var result; if (this._buffer.length === 0) { chunk = this._source.read(); this._buffer += chunk; //一次就拿完 只是看什么時候push null } lineIndex = this._buffer.indexOf(" "); if (lineIndex !== -1) { line = this._buffer.slice(0, lineIndex); if (line) { result = JSON.parse(line); this._buffer = this._buffer.slice(lineIndex + 1); this.emit("object", result);util.inspect(result)) this.push(util.inspect(result)); } else { this._buffer = this._buffer.slice(1); } } } let input = fs.createReadStream(__dirname + "/json-lines.txt", { encoding: "utf8" }); var jsonLineReader = new JSONLineReader(input); jsonLineReader.on("object", function(obj) { console.log("pos:", obj); }) /*json-lines.txt {"success":false,"code":501} {"success":true,"code":202} {"success":false,"code":503} {"success":true,"code":204} {"success":false,"code":505} {"success":true,"code":206} {"success":false,"code":507} {"success":true,"code":208} {"success":false,"code":509} */
let stream = require("stream"); let util = require("util"); util.inherits(flowingReadableDemo, stream.Readable); function flowingReadableDemo(opt) { stream.Readable.call(this, opt); this.quotes = ["yessdasdsa", "noasdasdas", "maybe"]; this._index = 0; } flowingReadableDemo.prototype._read = function() { if (this._index >= this.quotes.length) { this.push(null); } else { this.push(this.quotes[this._index]); this._index += 1; } }; let r = new flowingReadableDemo(); r.on("data", function(data) { console.log("Callback read: " + data.toString()); // flowing狀態(tài)下,我們無需執(zhí)行read,僅需要設(shè)置data事件處理函數(shù)或者設(shè)定導(dǎo)流目標(biāo)pipe }); r.on("end", function(data) { console.log("No more answers."); });
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/87390.html
摘要:內(nèi)部架構(gòu)上圖表示一個實例的組成部分部分緩沖數(shù)組內(nèi)部函數(shù)部分緩沖鏈表內(nèi)部函數(shù)實例必須實現(xiàn)的內(nèi)部函數(shù)以及系統(tǒng)提供的回調(diào)函數(shù)。有三個參數(shù),第一個為待處理的數(shù)據(jù),第二個為編碼,第三個為回調(diào)函數(shù)。 Transform流特性 在開發(fā)中直接接觸Transform流的情況不是很多,往往是使用相對成熟的模塊或者封裝的API來完成流的處理,最為特殊的莫過于through2模塊和gulp流操作。那么,Tra...
摘要:回調(diào)函數(shù)中檢測該次寫入是否被緩沖,若是,觸發(fā)事件。若目標(biāo)可寫流表示該寫入操作需要進(jìn)行緩沖,則立刻將源可讀流切換至?xí)和DJ?。監(jiān)聽源可讀流的事件,相應(yīng)地結(jié)束目標(biāo)可寫流。 在Node.js中,流(Stream)是其眾多原生對象的基類,它對處理潛在的大文件提供了支持,也抽象了一些場景下的數(shù)據(jù)處理和傳遞。在它對外暴露的接口中,最為神奇的,莫過于導(dǎo)流(pipe)方法了。鑒于近期自己正在閱讀Node...
摘要:是消費數(shù)據(jù)的,從中獲取數(shù)據(jù),然后對得到的塊數(shù)據(jù)進(jìn)行處理,至于如何處理,就依賴于具體實現(xiàn)也就是的實現(xiàn)。也可以說是建立在的基礎(chǔ)上。 1. 認(rèn)識Stream Stream的概念最早來源于Unix系統(tǒng),其可以將一個大型系統(tǒng)拆分成一些小的組件,然后將這些小的組件可以很好地運(yùn)行 TCP/IP協(xié)議中的TCP協(xié)議也用到了Stream的思想,進(jìn)而可以進(jìn)行流量控制、差錯控制 在unix中通過 |來表示流...
摘要:方法也可以接收一個參數(shù)表示數(shù)據(jù)請求著請求的數(shù)據(jù)大小,但是可讀流可以根據(jù)需要忽略這個參數(shù)。讀取數(shù)據(jù)大部分情況下我們只要簡單的使用方法將可讀流的數(shù)據(jù)重定向到另外形式的流,但是在某些情況下也許直接從可讀流中讀取數(shù)據(jù)更有用。 介紹本文介紹了使用 node.js streams 開發(fā)程序的基本方法。 We should have some ways of connecting programs ...
摘要:事件的觸發(fā)頻次同樣是由實現(xiàn)者決定,譬如在進(jìn)行文件讀取時,可能每行都會觸發(fā)一次而在請求處理時,可能數(shù)的數(shù)據(jù)才會觸發(fā)一次。如果有參數(shù)傳入,它會讓可讀流停止流向某個特定的目的地,否則,它會移除所有目的地。 showImg(https://segmentfault.com/img/remote/1460000016328758?w=1967&h=821); 本文節(jié)選自 Node.js Chea...
閱讀 4642·2021-10-25 09:48
閱讀 3220·2021-09-07 09:59
閱讀 2204·2021-09-06 15:01
閱讀 2704·2021-09-02 15:21
閱讀 2741·2019-08-30 14:14
閱讀 2194·2019-08-29 13:59
閱讀 2526·2019-08-29 11:02
閱讀 2544·2019-08-26 13:33