成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

通過源碼解析 Node.js 中導(dǎo)流(pipe)的實現(xiàn)

defcon / 2878人閱讀

摘要:回調(diào)函數(shù)中檢測該次寫入是否被緩沖,若是,觸發(fā)事件。若目標(biāo)可寫流表示該寫入操作需要進行緩沖,則立刻將源可讀流切換至?xí)和DJ健1O(jiān)聽源可讀流的事件,相應(yīng)地結(jié)束目標(biāo)可寫流。

Node.js中,流(Stream)是其眾多原生對象的基類,它對處理潛在的大文件提供了支持,也抽象了一些場景下的數(shù)據(jù)處理和傳遞。在它對外暴露的接口中,最為神奇的,莫過于導(dǎo)流(pipe)方法了。鑒于近期自己正在閱讀Node.js中的部分源碼,也來從源碼層面分享下導(dǎo)流的具體實現(xiàn)。

正題

以下是一個關(guān)于導(dǎo)流的簡單例子:

"use strict"
import {createReadStream, createWriteStream} from "fs"

createReadStream("/path/to/a/big/file").pipe(createWriteStream("/path/to/the/dest"))

再結(jié)合官方文檔,我們可以把pipe方法的主要功能分解為:

不斷從來源可讀流中獲得一個指定長度的數(shù)據(jù)。

將獲取到的數(shù)據(jù)寫入目標(biāo)可寫流。

平衡讀取和寫入速度,防止讀取速度大大超過寫入速度時,出現(xiàn)大量滯留數(shù)據(jù)。

好,讓我們跟隨Node.js項目里lib/_stream_readable.jslib/_stream_writable.js中的代碼,逐個解析這三個主要功能的實現(xiàn)。

讀取數(shù)據(jù)

剛創(chuàng)建出的可讀流只是一個記錄了一些初始狀態(tài)的空殼,里面沒有任何數(shù)據(jù),并且其狀態(tài)不屬于官方文檔中的流動模式(flowing mode)和暫停模式(paused mode)中的任何一種,算是一種偽暫停模式,因為此時實例的狀態(tài)中記錄它是否為暫停模式的變量還不是標(biāo)準(zhǔn)的布爾值,而是null,但又可通過將暫停模式轉(zhuǎn)化為流動模式的行為(調(diào)用實例的resume()方法),將可讀流切換至流動模式。在外部代碼中,我們可以手動監(jiān)聽可讀流的data事件,讓其進入流動模式:

// lib/_stream_readable.js
// ...

Readable.prototype.on = function(ev, fn) {
  var res = Stream.prototype.on.call(this, ev, fn);

  if (ev === "data" && false !== this._readableState.flowing) {
    this.resume();
  }

  // ...

  return res;
};

可見,可讀流類通過二次封裝父類(EventEmitter)的on()方法,替我們在監(jiān)聽data事件時,將流切換至了流動模式。而開始讀取數(shù)據(jù)的動作,則存在于resume()方法調(diào)用的內(nèi)部方法resume_()中,讓我們一窺究竟:

// lib/_stream_readable.js
// ...

function resume_(stream, state) {
  if (!state.reading) {
    debug("resume read 0");
    stream.read(0);
  }

  // ...
}

通過向可讀流讀取一次空數(shù)據(jù)(大小為0),將會觸發(fā)實例層面實現(xiàn)的_read()方法,開始讀取數(shù)據(jù),然后利用讀到的數(shù)據(jù)觸發(fā)data事件:

// lib/_stream_readable.js
// ...

Readable.prototype.read = function(n) {
  // ...
  // 此次判斷的意圖為,如果可讀流的緩沖中已滿,則只空觸發(fā)readable事件。
  if (n === 0 &&
      state.needReadable &&
      (state.length >= state.highWaterMark || state.ended)) {
    if (state.length === 0 && state.ended)
      endReadable(this);
    else
      emitReadable(this);
    return null;
  }
  
  // 若可讀流已經(jīng)被傳入了終止符(null),且緩沖中沒有遺留數(shù)據(jù),則結(jié)束這個可讀流
  if (n === 0 && state.ended) {
    if (state.length === 0)
      endReadable(this);
    return null;
  }

  // 若目前緩沖中的數(shù)據(jù)大小為空,或未超過設(shè)置的警戒線,則進行一次數(shù)據(jù)讀取。
  if (state.length === 0 || state.length - n < state.highWaterMark) {
    doRead = true;
  }

  if (doRead) {
    // ...
    this._read(state.highWaterMark);
  }

  // ...

  if (ret !== null)
    this.emit("data", ret);

  return ret;
};

可見,在可讀流的read()方法內(nèi)部,通過調(diào)用在實例層面實現(xiàn)的_read(size)方法,取得了一段(設(shè)置的警戒線)大小的數(shù)據(jù),但是,你可能會疑惑,這只是讀取了一次數(shù)據(jù)啊,理想情況下,應(yīng)該是循環(huán)調(diào)用_read(size)直至取完所有數(shù)據(jù)才對?。??其實,這部分的邏輯存在于我們實現(xiàn)_read(size)方法時,在其內(nèi)部調(diào)用的this.push(data)方法中,在最后其會調(diào)用私有方法maybeReadMore_(),再次觸發(fā)read(0),接著在read(0)函數(shù)的代碼中再次判斷可讀流是否能夠結(jié)束,否則再進行一次_read(size)讀取:

// lib/_stream_readable.js
// ...

Readable.prototype.push = function(chunk, encoding) {
  var state = this._readableState;
  // ...
  return readableAddChunk(this, state, chunk, encoding, false);
};

function readableAddChunk(stream, state, chunk, encoding, addToFront) {
  // ...
  if (er) {
    stream.emit("error", er);
  } else if (chunk === null) {
    state.reading = false;
    onEofChunk(stream, state); // 當(dāng)傳入終止符時,將可讀流的結(jié)束標(biāo)識(state.ended)設(shè)為true
  }
  // ...
      maybeReadMore(stream, state);
    }
  } 

  // ...
}

function maybeReadMore(stream, state) {
  if (!state.readingMore) {
    // ...
    process.nextTick(maybeReadMore_, stream, state);
  }
}

function maybeReadMore_(stream, state) {
    // ...
    stream.read(0);
}

function onEofChunk(stream, state) {
  if (state.ended) return;
  // ...
  state.ended = true;
  // ...
}

好的,此時從可讀流中讀取數(shù)據(jù)的整個核心流程已經(jīng)實現(xiàn)了,讓我們歸納一下:

剛創(chuàng)建出的可讀流只是一個空殼,保存著一些初始狀態(tài)。

監(jiān)聽它的data事件,將會自動調(diào)用該可讀流的resume()方法,使流切換至流動模式。

resume()方法的內(nèi)部函數(shù)_resume()中,對可讀流進行了一次read(0)調(diào)用。

read(0)調(diào)用的內(nèi)部,首先檢查流是否符合了結(jié)束條件,若符合,則結(jié)束之。否則調(diào)用實例實現(xiàn)的_read(size)方法讀取一段預(yù)設(shè)的警戒線(highWaterMark)大小的數(shù)據(jù)。

在實例實現(xiàn)_read(size)函數(shù)時內(nèi)部調(diào)用的this.push(data)方法里,會先判斷的讀到的數(shù)據(jù)是否為結(jié)束符,若是,則將流的狀態(tài)設(shè)為結(jié)束,然后再一次對可讀流調(diào)用read(0)。

寫入數(shù)據(jù)

和可讀流一樣,剛創(chuàng)建出的可寫流也只是一個記錄了相關(guān)狀態(tài)(包括預(yù)設(shè)的寫入緩沖大?。┑目諝ぁV苯诱{(diào)用它的write方法,該方法會在其內(nèi)部調(diào)用writeOrBuffer函數(shù)來對數(shù)據(jù)是否可以直接一次性全部寫入進行判斷:

// lib/_stream_writable.js
// ...

function writeOrBuffer(stream, state, chunk, encoding, cb) {
  // ...
  var ret = state.length < state.highWaterMark;

  // 記錄可寫流是否需要出發(fā)drain事件
  if (!ret)
    state.needDrain = true;

  if (state.writing || state.corked) {
    // 若可寫流正在被寫入或被人工阻塞,則先將寫入操作排隊
    // ...
  } else {
    doWrite(stream, state, false, len, chunk, encoding, cb);
  }

  return ret;
}

function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  // ...
  if (writev)
    stream._writev(chunk, state.onwrite);
  else
    stream._write(chunk, encoding, state.onwrite);
  // ...
}

從代碼中可知,在writeOrBuffer函數(shù)記錄下了數(shù)據(jù)是否可以被一次性寫入后,調(diào)用了實例層實現(xiàn)的_write()_writev()方法進行了實際的寫入操作。那么,如果不能一次性寫入完畢,那么在真正寫入完畢時,又是如何進行通知的呢?嗯,答案就在設(shè)置的state.onwrite回調(diào)中:

// lib/_stream_writable.js
// ...

function onwrite(stream, er) {
  // ...

  if (er)
    onwriteError(stream, state, sync, er, cb);
  else {
    // ...
    if (sync) {
      process.nextTick(afterWrite, stream, state, finished, cb);
    } else {
      afterWrite(stream, state, finished, cb);
    }
  }
}

function afterWrite(stream, state, finished, cb) {
  if (!finished)
    onwriteDrain(stream, state);
  // ...
}

function onwriteDrain(stream, state) {
  if (state.length === 0 && state.needDrain) {
    state.needDrain = false;
    stream.emit("drain");
  }
}

可見,在回調(diào)函數(shù)的執(zhí)行中,會對該可寫流該次被寫入的數(shù)據(jù)是否超過了警戒線的狀態(tài)進行判斷,如果是,則觸發(fā)drain事件,進行通知。

我們也可以調(diào)用end()方法來表明要結(jié)束這個寫入流,并進行最后一次寫入,end()方法的內(nèi)部最終會調(diào)用endWritable()函數(shù)來講可寫流的狀態(tài)切換為已結(jié)束:

// lib/_stream_writable.js
// ...

function endWritable(stream, state, cb) {
  // ...
  state.ended = true;
  stream.writable = false;
}

此時,向可寫流中寫入數(shù)據(jù)的整個核心流程已經(jīng)實現(xiàn)了,這個流程和可寫流的循環(huán)讀取流程不同,它是直線的,歸納一下:

剛創(chuàng)建出的可寫流只是一個空殼,保存著一些初始狀態(tài)。

調(diào)用write()方法,其內(nèi)部的writeOrBuffer()檢測該次寫入的數(shù)據(jù)是否需要被暫存在緩沖區(qū)中。

writeOrBuffer()函數(shù)調(diào)用實例實現(xiàn)的_write()_writev()方法,進行實際的寫入,完成后調(diào)用回調(diào)函數(shù)state.onwrite。

回調(diào)函數(shù)中檢測該次寫入是否被緩沖,若是,觸發(fā)drain事件。

重復(fù)以上過程,直至調(diào)用end()方法結(jié)束該可寫流。

導(dǎo)流

在摸清了從可讀流中讀數(shù)據(jù),和向可寫流中寫數(shù)據(jù)實現(xiàn)的核心流程后,Node.js中實現(xiàn)導(dǎo)流的核心流程其實已經(jīng)呼之欲出了。首先,為了開始從源可讀流讀取數(shù)據(jù),在pipe()方法的內(nèi)部,它主動為源可讀流添加了data事件的監(jiān)聽函數(shù):

// lib/_stream_readable.js
// ...

Readable.prototype.pipe = function(dest, pipeOpts) {
  // ...

  src.on("data", ondata);
  function ondata(chunk) {
      // ...
      src.pause();
    }
  }

  // ...
  return dest;
};

從代碼中可見,若向目標(biāo)可寫流寫入一次數(shù)據(jù)時,目標(biāo)可寫流表示該次寫入它需要進行緩沖,則主動將源可讀流切換至?xí)和DJ?。那么,源可讀流通過什么手段得知可以再次讀取數(shù)據(jù)并寫入呢?嗯,通過監(jiān)聽目標(biāo)可寫流的drain事件:

// lib/_stream_readable.js
// ...

Readable.prototype.pipe = function(dest, pipeOpts) {
  // ...
  var ondrain = pipeOnDrain(src);
  dest.on("drain", ondrain);

  // ...
  return dest;
};

function pipeOnDrain(src) {
  return function() {
    var state = src._readableState;
    
    // 目標(biāo)可寫流可能會存在多次寫入需要進行緩沖的情況,需確保所有需要緩沖的寫入都
    // 完成后,再次將可讀流切換至流動模式。
    if (state.awaitDrain)
      state.awaitDrain--;
    if (state.awaitDrain === 0 && EE.listenerCount(src, "data")) {
      state.flowing = true;
      flow(src);
    }
  };
}

最后,監(jiān)聽源可讀流的結(jié)束事件,對應(yīng)著結(jié)束目標(biāo)可寫流:

// lib/_stream_readable.js
// ...

Readable.prototype.pipe = function(dest, pipeOpts) {
  // ...
  var endFn = doEnd ? onend : cleanup;
  if (state.endEmitted)
    process.nextTick(endFn);
  else
    src.once("end", endFn);

  function onend() {
    debug("onend");
    dest.end();
  }

  // ...
  return dest;
};

由于前面的鋪墊,實際導(dǎo)流操作的核心流程其實實現(xiàn)得非常輕松,歸納一下:

主動監(jiān)聽源可讀流的data事件,在該事件的監(jiān)聽函數(shù)中,向目標(biāo)可寫流寫入數(shù)據(jù)。

若目標(biāo)可寫流表示該寫入操作需要進行緩沖,則立刻將源可讀流切換至?xí)和DJ健?/p>

監(jiān)聽目標(biāo)可寫流的drain事件,當(dāng)目標(biāo)可寫流里所有需要緩沖的寫入操作都完畢后,將流重新切換回流動模式。

監(jiān)聽源可讀流的end事件,相應(yīng)地結(jié)束目標(biāo)可寫流。

最后

Node.js中流的實際實現(xiàn)其實非常龐大,復(fù)雜,精妙。每一個流的內(nèi)部,都管理著大量狀態(tài)。本文僅僅只是在龐大的流的實現(xiàn)中,選擇了一條主線,進行了闡述。大家如果有閑,非常推薦完整地閱讀一遍其實現(xiàn)。

參考:

https://github.com/nodejs/node/blob/master/lib/_stream_readable.js

https://github.com/nodejs/node/blob/master/lib/_stream_writable.js

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/78652.html

相關(guān)文章

  • 初識 Node Stream

    摘要:是在完成處理數(shù)據(jù)塊后需要調(diào)用的函數(shù)。這是寫數(shù)據(jù)成功與否的標(biāo)志。若要發(fā)出故障信號,請用錯誤對象調(diào)用回調(diào)函數(shù)。雙工流的可讀性和可寫性操作完全獨立于彼此。這僅僅是將兩個特性組合成一個對象。 showImg(https://segmentfault.com/img/remote/1460000013228112?w=533&h=300); Streams 是一個數(shù)據(jù)集——和數(shù)組、字符串一樣。不...

    fobnn 評論0 收藏0
  • [譯]關(guān)于Node.js streams你需要知道一切

    摘要:當(dāng)一個客戶端的響應(yīng)對象是一個可讀流,那么在服務(wù)器端這就是一個可寫流。的模塊給我們提供了一個可以操作任何文件的可讀流通過方法創(chuàng)建。創(chuàng)建一個可讀流創(chuàng)建可讀流,我們需要類創(chuàng)建一個可讀流非常簡單??梢酝ㄟ^修改可讀流配置里面的方法實現(xiàn)。 Node.js的stream模塊是有名的應(yīng)用困難,更別說理解了。那現(xiàn)在可以告訴你,這些都不是問題了。 多年來,開發(fā)人員在那里創(chuàng)建了大量的軟件包,其唯一目的就是使...

    bang590 評論0 收藏0
  • JavaScript 編程精解 中文第三版 二十、Node.js

    摘要:在這樣的程序中,異步編程通常是有幫助的。最初是為了使異步編程簡單方便而設(shè)計的。在年設(shè)計時,人們已經(jīng)在瀏覽器中進行基于回調(diào)的編程,所以該語言的社區(qū)用于異步編程風(fēng)格。 來源:ApacheCN『JavaScript 編程精解 中文第三版』翻譯項目原文:Node.js 譯者:飛龍 協(xié)議:CC BY-NC-SA 4.0 自豪地采用谷歌翻譯 部分參考了《JavaScript 編程精解(第 2 版)...

    qqlcbb 評論0 收藏0
  • [轉(zhuǎn)]nodejs Stream使用手冊

    摘要:方法也可以接收一個參數(shù)表示數(shù)據(jù)請求著請求的數(shù)據(jù)大小,但是可讀流可以根據(jù)需要忽略這個參數(shù)。讀取數(shù)據(jù)大部分情況下我們只要簡單的使用方法將可讀流的數(shù)據(jù)重定向到另外形式的流,但是在某些情況下也許直接從可讀流中讀取數(shù)據(jù)更有用。 介紹本文介紹了使用 node.js streams 開發(fā)程序的基本方法。 We should have some ways of connecting programs ...

    luffyZh 評論0 收藏0
  • 淺談node.jsstream()

    摘要:在可讀流事件里我們就必須調(diào)用方法。當(dāng)一個對象就意味著我們想發(fā)出信號這個流沒有更多數(shù)據(jù)了自定義可寫流為了實現(xiàn)可寫流,我們需要使用流模塊中的構(gòu)造函數(shù)。我們只需給構(gòu)造函數(shù)傳遞一些選項并創(chuàng)建一個對象。 前言 什么是流呢?看字面意思,我們可能會想起生活中的水流,電流。但是流不是水也不是電,它只是描述水和電的流動;所以說流是抽象的。在node.js中流是一個抽象接口,它不關(guān)心文件內(nèi)容,只關(guān)注是否從...

    elliott_hu 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<