成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

流的剖析和實(shí)現(xiàn)

LeviDing / 369人閱讀

摘要:開始讀取位置結(jié)束讀取位置包括結(jié)束位置如果為,則文件描述符不會(huì)被關(guān)閉,即使有錯(cuò)誤。需要程序負(fù)責(zé)關(guān)閉它,并且確保沒有文件描述符泄漏。

流的定義

流是抽象化的概念,形象生動(dòng)的描述了數(shù)據(jù)的流動(dòng)、變化。
具體來說,在node中流是處理數(shù)據(jù)的抽象接口,繼承了EventEmitter,通過這個(gè)接口我們能夠控制流的開關(guān),流動(dòng)的方向等等。
比較形象直觀一點(diǎn)類似我們?cè)趌inux上使用shell,通過管道,鏈接處理各個(gè)部分,下面是我寫的一個(gè)命令,篩選出version并導(dǎo)出到文件中。

流的分類

Readable(可讀流)

Writable(可寫流)

Duplex(可讀可寫的流)

Transform(在讀寫過程中可以修改和變化的Duplex流)

流按照功能大致劃分為以上四類,具體應(yīng)用的話有很多場景,如下圖所示(來源:參考鏈接2)

下面我根據(jù)流的分類,列舉一些demo應(yīng)用實(shí)例

Readable

可讀流能接受各種數(shù)據(jù)源,例如控制臺(tái)的輸入,文件,字符串等等,就如介紹中所說是抽象接口,可以面向各種形式的輸入,下面舉幾個(gè)例子。

文件流
require("fs").createReadStream("./1.txt",{
    encoding: "utf8"
}).on("data",(data) => {
    console.log(data)
})
// 輸出  hello jsdt

說明 為什么要用流來讀取,直接用fs.readFile豈不是更方便嗎,因?yàn)閞eadFile是整體操作,會(huì)將文件全部讀到內(nèi)存中在做處理,這樣的話文件如果很大,程序就會(huì)很卡,甚至報(bào)錯(cuò)。

標(biāo)準(zhǔn)輸入流
process.stdin.setEncoding("utf8");
process.stdin.on("data",(data) => {
    console.log("輸出: "+ data)
})
node 運(yùn)行code,然后輸入 hello jsdt
輸出: hello jsdt

說明 這個(gè)做acm的時(shí)候會(huì)用到,或者平時(shí)自己寫一些交互式應(yīng)用的時(shí)候

普通數(shù)據(jù)流
let {Readable} = require("stream")
let util = require("util")
class Test extends  Readable{
    constructor(){
        super()
        this.dataSource = 5
    }
    _read(){
        if(this.dataSource-->0){
            this.push(this.dataSource+"");
        }else{
            this.push(null);
        }
    }
}
let counter = new Test();
counter.on("data",function(data){
    console.log(data.toString())
});
輸出:
4
3
2
1

說明 重寫_read方法,自定義輸入的邏輯,上面示例中是自己邏輯中產(chǎn)生的一個(gè)數(shù)據(jù)源。

Writable 文件流
let dataSource = "hello jsdt",i = 0;
(function(){
    let ws = require("fs").createWriteStream("./1.txt",{
        encoding: "utf8"
    })
    let flag = true;
    while(flag && i


說明 閉包自執(zhí)行,通過流將數(shù)據(jù)寫入到文件中,上面是輸出結(jié)果。

自定義輸出
let {Writable} = require("stream")
let arr = []
let ws = Writable({
    write(chunk,encoding,cb){
        arr.push(chunk)
        cb()
    }
})
for(let i = 1; i<= 3;i++){
  ws.write(""+i,"utf8",()=>{})
}
process.nextTick(function () {
    console.log(arr.toString())
})
//  輸出 1,2,3

說明 上面重寫了流的write方法,可以自定義寫邏輯

Duplex
require("net").createServer(socket => {
    socket.on("data",data => {
        console.log("client message " + data);
        socket.write("server message " + "hello client ");
    })
}).listen(8080,() =>{})


說明 作為可寫流一面socket可以向客戶端發(fā)送信息,做為可讀流一面可以監(jiān)聽data事件,收到客戶端發(fā)送過信息

Transform
let t = require("stream").Transform({
    transform(chunk,encoding,cb){
        this.push(chunk.toString().toUpperCase());
        cb();
    }
});
process.stdin.pipe(t).pipe(process.stdout);
// 輸入abc
// 輸出ABC

說明 上面使用轉(zhuǎn)換流,實(shí)現(xiàn)了terminal上小寫輸入,對(duì)應(yīng)大寫輸出的功能

流中數(shù)據(jù)分類

二進(jìn)制模式

對(duì)象模式

在創(chuàng)建流的時(shí)候可以指定配置,objectMode默認(rèn)為false,設(shè)為true切換到對(duì)象模式。二進(jìn)制即buffer模式,可讀或可寫流都會(huì)將數(shù)據(jù)會(huì)緩存數(shù)據(jù)在buffer中。

流的剖析

通過上面的介紹我們明確了流的定義,并按照功能對(duì)流進(jìn)行了分類,下面我進(jìn)行下剖析,總的來說流的各種形態(tài)間轉(zhuǎn)化傳輸?shù)讓佣际嵌M(jìn)制,具體到使用形態(tài)上有buffer,string等等。
首先詳細(xì)說下可讀流,可讀流有兩種模式,默認(rèn)為paused模式。

flowing 按照初始化配置,自動(dòng)讀取數(shù)據(jù),并通過觀察者模式,直接將數(shù)據(jù)提供給訂閱者

paused 顯式調(diào)用流的read方法讀取數(shù)據(jù)

其中如果我們想切換到流動(dòng)模式可以通過監(jiān)聽data事件的方式、或者調(diào)用stream.resume()、stream.pipe() 這些方法。

可讀流源碼分析
// 可讀流入口,根據(jù)配置返回一個(gè)可讀流
fs.createReadStream = function(path, options) {
  return new ReadStream(path, options);
};

// 實(shí)現(xiàn)原理是ReadStream.prototype.__proto__ = Readable.prototype,可以繼承Readable上的一些方法
util.inherits(ReadStream, Readable);
fs.ReadStream = ReadStream;

function ReadStream(path, options) {
  // 非new方式調(diào)用,直接返回一個(gè)實(shí)例
  if (!(this instanceof ReadStream))
    return new ReadStream(path, options);

  options = copyObject(getOptions(options, {}));
  if (options.highWaterMark === undefined)
  // highWaterMark默認(rèn)值為64k,設(shè)置了flow模式下緩沖區(qū)的大小
    options.highWaterMark = 64 * 1024;  

  Readable.call(this, options);

  handleError((this.path = getPathFromURL(path)));
  // 文件描述符,根據(jù)這個(gè)句柄找到文件
  this.fd = options.fd === undefined ? null : options.fd;
  // flags打開文件要做的操作,默認(rèn)為"r"
  this.flags = options.flags === undefined ? "r" : options.flags;
  // 用于設(shè)置文件模式(權(quán)限和粘結(jié)位),僅限創(chuàng)建文件時(shí)。
  this.mode = options.mode === undefined ? 0o666 : options.mode;
  // 開始讀取位置
  this.start = options.start;
  // 結(jié)束讀取位置(?。?!包括結(jié)束位置)
  this.end = options.end;
  /**
   * 如果 autoClose 為 false,則文件描述符不會(huì)被關(guān)閉,即使有錯(cuò)誤。 
   * 需要程序負(fù)責(zé)關(guān)閉它,并且確保沒有文件描述符泄漏。 
   * 如果 autoClose 被設(shè)置為 true(默認(rèn)),則在 error 或 end 時(shí),文件描述符會(huì)被自動(dòng)關(guān)閉
   */
  this.autoClose = options.autoClose === undefined ? true : options.autoClose;
   this.pos = this.start;
   
  }
// 適合傳入句柄的情況,例如fd: 0,這樣就不是文件,而是控制臺(tái)輸入的數(shù)據(jù)了
  if (typeof this.fd !== "number")
    this.open();
  this.on("end", function() {
    if (this.autoClose) {
      this.destroy();
    }
  });
}

// 打開文件,并觸發(fā)open事件,只有打開了才能讀取,所以在回調(diào)中觸發(fā)open事件,看下步操作
ReadStream.prototype.open = function() {
  var self = this;
  fs.open(this.path, this.flags, this.mode, function(er, fd) {
    self.fd = fd;
    self.emit("open", fd);
    //  start the flow of data.
    self.read();
  });
};
Readable.prototype.read = function(n) {
    // 當(dāng)read(0)時(shí),如果緩存中已有數(shù)據(jù),則觸發(fā)readable事件,相當(dāng)于刷新下緩存。否則觸發(fā)end事件
if (n === 0 &&
      state.needReadable &&
      (state.length >= state.highWaterMark || state.ended)) {
    if (state.length === 0 && state.ended)
      endReadable(this);
    else
      emitReadable(this);
    return null;
  }
  //  若可讀流已經(jīng)被傳入了終止符(null),且緩沖中沒有遺留數(shù)據(jù),則結(jié)束這個(gè)可讀流
  if (n === 0 && state.ended) {
      if (state.length === 0)
        endReadable(this);
      return null;
    }
    //  若目前緩沖中的數(shù)據(jù)大小為空,或未超過設(shè)置的警戒線,則進(jìn)行一次數(shù)據(jù)讀取。
      if (state.length === 0 || state.length - n < state.highWaterMark) {
        doRead = true;
      }
        if (state.ended || state.reading) {
          doRead = false;
        } else if (doRead) {
          state.reading = true;
          state.sync = true;
          this._read(state.highWaterMark);
     }


}
ReadStream.prototype._read = function(n) {
  if (typeof this.fd !== "number") {
    // 防止重復(fù)綁定open事件,當(dāng)文件打開且emit open事件,此時(shí)才會(huì)進(jìn)行真正的讀操作
    return this.once("open", function() {
      this._read(n);
    });
  }
 // 然后讀數(shù)據(jù)的時(shí)候會(huì)計(jì)算實(shí)際讀的數(shù)量
 function howMuchToRead(n, state) {
    //  如果讀的數(shù)量超過highWaterMark,則重新計(jì)算highWaterMark
    if (n > state.highWaterMark)
      state.highWaterMark = computeNewHighWaterMark(n);
    if (n <= state.length)
      return n;
 }
  // 經(jīng)過上面一系列的準(zhǔn)備工作,下面開始真正的讀操作咯
fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
      if (bytesRead > 0) {
        this.bytesRead += bytesRead;
      }
      this.push(b);
  });
};

// 上面整個(gè)過程是paused的流程,其中flow模式又有所不同,如下所示
// 如果監(jiān)聽了data事件,則會(huì)調(diào)用this.resume(),開始流動(dòng)模式
Readable.prototype.on = function(ev, fn) {
  const res = Stream.prototype.on.call(this, ev, fn);
  if (ev === "data") {
    //  Start flowing on next tick if stream isn"t explicitly paused
    if (this._readableState.flowing !== false)
      this.resume();
  }
  }
// flow模式下 流內(nèi)部自動(dòng)觸發(fā)data事件,循環(huán)讀取數(shù)據(jù)
function flow(stream) {
  const state = stream._readableState;
  debug("flow", state.flowing);
  while (state.flowing && stream.read() !== null);
}
// 然后觸發(fā) data事件,循環(huán)發(fā)射數(shù)據(jù)
stream.emit("data", chunk);

總結(jié) 上面是可讀流的源碼分析,摘要了關(guān)鍵部分,下面在梳理一下,當(dāng)通過ReadStream創(chuàng)建一個(gè)流的時(shí)候,默認(rèn)會(huì)觸發(fā)readable事件,進(jìn)入暫停模式,此時(shí)內(nèi)部維護(hù)的有一個(gè)緩沖區(qū),在readable事件回調(diào)邏輯中進(jìn)行read操作,首先會(huì)通過howMuchToRead方法計(jì)算實(shí)際讀取的數(shù)量,如果現(xiàn)有數(shù)據(jù)小于highWaterMark,內(nèi)部會(huì)進(jìn)行this._read(state.highWaterMark)操作,其回調(diào)中會(huì)進(jìn)行push操作,push在調(diào)用readableAddChunk將數(shù)據(jù)放到內(nèi)部維護(hù)的緩存中,反之則從fromList中讀取緩存中的數(shù)據(jù),然后返回。而如果監(jiān)聽了data事件,代碼中所示會(huì)調(diào)用this.resume(),將流狀態(tài)設(shè)置為flowing模式,然后resume()->resume_()->flow()的調(diào)用順序執(zhí)行flow方法循環(huán)讀取數(shù)據(jù),觸發(fā)data事件,完成數(shù)據(jù)的自動(dòng)讀取,然后發(fā)射給調(diào)用者,會(huì)不停的循環(huán)整個(gè)過程。上面比較值的注意一點(diǎn)的就是flow模式和paused模式區(qū)別,如果是flow模式在addChunk的時(shí)候,如下所示

function addChunk(stream, state, chunk, addToFront) {
  if (state.flowing && state.length === 0 && !state.sync) {
    stream.emit("data", chunk);
    stream.read(0);
  } 
}

會(huì)自動(dòng)發(fā)射數(shù)據(jù),不會(huì)走緩存,而paused模式會(huì)走一遍內(nèi)部的緩存機(jī)制。
根據(jù)上面node源碼的分析過程,下面圖形化描述下整個(gè)流程。

自己實(shí)現(xiàn)的一個(gè)可讀流

可寫流源碼分析
// 1:首先第一步根據(jù)createWriteStream傳入?yún)?shù)進(jìn)行初始化
// 2:調(diào)用寫操作
Writable.prototype.write = function(chunk, encoding, cb) {
  if (state.ended)
   //在end繼續(xù)寫入會(huì)emit一個(gè)error事件
    writeAfterEnd(this, cb);
  else if (validChunk(this, state, chunk, cb)) {
  //在校驗(yàn)數(shù)據(jù)chunk合法的情況下才會(huì)進(jìn)行后續(xù)的寫邏輯
    state.pendingcb++;
    ret = writeOrBuffer(this, state, chunk, encoding, cb);
  }
return ret;
};

function writeOrBuffer(stream, state, chunk, encoding, cb) {
  chunk = decodeChunk(state, chunk, encoding);

  if (chunk instanceof Buffer)
    encoding = "buffer";
  var len = state.objectMode ? 1 : chunk.length;

  state.length += len;//實(shí)時(shí)更新緩沖區(qū)長度

  var ret = state.length < state.highWaterMark;//判斷緩存區(qū)是否超過水位線(highWaterMark,不傳默認(rèn)16k,源碼_stream_writeable.js--40行)設(shè)置
  if (!ret)
    state.needDrain = true;

  if (state.writing || state.corked) {
 //如果此時(shí)處于寫狀態(tài),將新添加的數(shù)據(jù)放到緩沖池鏈表尾部
    var last = state.lastBufferedRequest;
    state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
    if (last) {
      last.next = state.lastBufferedRequest;
    } else {
      state.bufferedRequest = state.lastBufferedRequest;
    }
    state.bufferedRequestCount += 1;
  } else {
    //寫入數(shù)據(jù)
    doWrite(stream, state, false, len, chunk, encoding, cb);
  }
return ret;
}
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  if (writev)
    //一次寫入多個(gè)數(shù)據(jù)塊
    stream._writev(chunk, state.onwrite);
  else
  //一次寫入一個(gè)數(shù)據(jù)塊
    stream._write(chunk, encoding, state.onwrite);
  state.sync = false;
}
function onwrite(stream, er) {
    if (!finished &&
        !state.corked &&
        !state.bufferProcessing &&
        state.bufferedRequest) {
        //清空緩沖池 ,不為空,則循環(huán)執(zhí)行 _write() 寫入單個(gè)數(shù)據(jù)塊
      clearBuffer(stream, state);
    }
  }
}
function clearBuffer(stream, state) {
    // 單個(gè)數(shù)據(jù)寫入
    while (entry) {
      var chunk = entry.chunk;
      var encoding = entry.encoding;
      var cb = entry.callback;
      var len = state.objectMode ? 1 : chunk.length;
        //開啟數(shù)據(jù)寫操作
      doWrite(stream, state, false, len, chunk, encoding, cb);
      entry = entry.next;
    }
}

總結(jié) 上面是可寫流源碼分析,摘要了關(guān)鍵流程,首先根據(jù)傳入?yún)?shù)進(jìn)行初始化配置,然后用戶調(diào)用write方法進(jìn)行寫入,寫入前會(huì)判斷一下是否超過水位線,超過觸發(fā)drain事件,返回false,注意一點(diǎn)此時(shí)仍可以進(jìn)行寫入,返回false只是告訴你,已經(jīng)滿了,后需要不要寫入還是靠用戶根據(jù)這個(gè)返回值來控制。如果沒超過,在寫之前會(huì)先判斷是否處于寫狀態(tài),是的話將數(shù)據(jù)放到緩存中,反之會(huì)進(jìn)行doWrite <-->clearBuffer這樣的循環(huán)操作,一直到數(shù)據(jù)緩存中數(shù)據(jù)消耗完為止。清理完了之后,后續(xù)調(diào)用write的返回值ret為false,從而繼續(xù)寫,一直循環(huán)前面描述的整個(gè)過程,直到數(shù)據(jù)源寫完為止。總的來說,因?yàn)榭蓪懥鲀?nèi)部只有一個(gè)狀態(tài),復(fù)雜度低于可讀流,整個(gè)過程還是比較清晰的,不在圖形化流程。

自己實(shí)現(xiàn)的一個(gè)可寫流

說明
node源碼分析版本基于v8.9.4
參考資料
http://nodejs.cn/api/
https://medium.freecodecamp.o...

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/107173.html

相關(guān)文章

  • [譯]RxJS文檔03——剖析Observable

    摘要:通過執(zhí)行和可以向訂閱者推送不同的通知。之后,執(zhí)行過程可能被處理掉。當(dāng)調(diào)用并得到觀察者時(shí),在中傳入的函數(shù)將會(huì)被執(zhí)行。每次執(zhí)行都會(huì)觸發(fā)一個(gè)單獨(dú)針對(duì)當(dāng)前的運(yùn)行邏輯。通知不發(fā)出任何值,表示流的結(jié)束。 原文:http://reactivex.io/rxjs/manu... Rx.Observalbe.create()或者創(chuàng)建操作符,可以 創(chuàng)建(created) Observable流。Obser...

    netScorpion 評(píng)論0 收藏0
  • 高薪程序員&amp;面試題精講系列22之說說Java的IO流,常用哪些IO流?

    摘要:一面試題及剖析今日面試題今天壹哥帶各位復(fù)習(xí)一塊可能會(huì)令初學(xué)者比較頭疼的內(nèi)容,起碼當(dāng)時(shí)讓我很有些頭疼的內(nèi)容,那就是流。在這里壹哥會(huì)從兩部分展開介紹流,即與流。除此之外盡量使用字節(jié)流。關(guān)閉此輸入流并釋放與流相關(guān)聯(lián)的任何系統(tǒng)資源。 一. 面試題及剖析 1. 今日面試題 今天 壹哥 帶各位復(fù)習(xí)一塊可...

    fnngj 評(píng)論0 收藏0
  • 《Java8實(shí)戰(zhàn)》-第四章讀書筆記(引入流Stream)

    摘要:內(nèi)部迭代與使用迭代器顯式迭代的集合不同,流的迭代操作是在背后進(jìn)行的。流只能遍歷一次請(qǐng)注意,和迭代器類似,流只能遍歷一次。 流(Stream) 流是什么 流是Java API的新成員,它允許你以聲明性方式處理數(shù)據(jù)集合(通過查詢語句來表達(dá),而不是臨時(shí)編寫一個(gè)實(shí)現(xiàn))。就現(xiàn)在來說,你可以把它們看成遍歷數(shù)據(jù)集的高級(jí)迭代器。此外,流還可以透明地并行處理,你無需寫任何多線程代碼了!我會(huì)在后面的筆記中...

    _ivan 評(píng)論0 收藏0
  • Node.js 指南(HTTP事務(wù)的剖析

    摘要:為了處理請(qǐng)求流上的錯(cuò)誤,我們將錯(cuò)誤記錄到并發(fā)送狀態(tài)碼以指示,但是,在實(shí)際應(yīng)用程序中,我們需要檢查錯(cuò)誤以確定正確的狀態(tài)碼和消息是什么,與通常的錯(cuò)誤一樣,你應(yīng)該查閱錯(cuò)誤文檔。通過對(duì)象發(fā)送狀態(tài)碼和數(shù)據(jù)。 HTTP事務(wù)的剖析 本指南的目的是讓你充分了解Node.js HTTP處理的過程,我們假設(shè)你在一般意義上知道HTTP請(qǐng)求的工作方式,無論語言或編程環(huán)境如何,我們還假設(shè)你對(duì)Node.js Ev...

    ASCH 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<