成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Node Stream運(yùn)行機(jī)制

objc94 / 2936人閱讀

摘要:兩種模式可以互相轉(zhuǎn)換流的初始狀態(tài)是,通過監(jiān)聽事件,或者方法,調(diào)用方法,將流轉(zhuǎn)為狀態(tài)。下面詳細(xì)介紹下兩種模式下,流的運(yùn)行機(jī)制。下面描述中的表示的是流內(nèi)部的緩沖池的大小。調(diào)用用戶實(shí)現(xiàn)的方法,來數(shù)據(jù)到緩沖池,然后發(fā)送事件,通知用戶端消費(fèi)。

如果你正在學(xué)習(xí)Node,那么流一定是一個(gè)你需要掌握的概念。如果你想成為一個(gè)Node高手,那么流一定是武功秘籍中不可缺少的一個(gè)部分。

引用自Stream-Handbook。由此可見,流對于深入學(xué)習(xí)Node的重要性。

流是什么?

你可以把流理解成一種傳輸?shù)哪芰ΑMㄟ^流,可以以平緩的方式,無副作用的將數(shù)據(jù)傳輸?shù)侥康牡?。在Node中,Node Stream創(chuàng)建的流都是專用于String和Buffer上的,一般情況下使用Buffer。Stream表示的是一種傳輸能力,Buffer是傳輸內(nèi)容的載體 (可以這樣理解,Stream:外賣小哥哥, Buffer:你的外賣)。創(chuàng)建流的時(shí)候?qū)bjectMode設(shè)置true ,Stream同樣可以傳輸任意類型的JS對象(除了null,null在流中有特殊用途)。

為什么要使用流?

現(xiàn)在有個(gè)需求,我們要向客戶端傳輸一個(gè)大文件。如果采用下面的方式

const fs = require("fs");
const server = require("http").createServer();

server.on("request", (req, res) => {
  fs.readFile("./big.file", (err, data) => {
    if (err) throw err;
    res.end(data);
  });
});

server.listen(8000);

每次接收一個(gè)請求,就要把這個(gè)大文件讀入內(nèi)存,然后再傳輸給客戶端。通過這種方式可能會(huì)產(chǎn)生以下三種后果:

內(nèi)存耗盡

拖慢其他進(jìn)程

增加垃圾回收器的負(fù)載

所以這種方式在傳輸大文件的情況下,不是一個(gè)好的方案。并發(fā)量一大,幾百個(gè)請求過來很容易就將內(nèi)存耗盡。

如果采用流呢?

const fs = require("fs");
const server = require("http").createServer();

server.on("request", (req, res) => {
  const src = fs.createReadStream("./big.file");
  src.pipe(res);
});

server.listen(8000);

采用這種方式,不會(huì)占用太多內(nèi)存,讀取一點(diǎn)就傳輸一點(diǎn),整個(gè)過程平緩進(jìn)行,非常優(yōu)雅。如果想在傳輸?shù)倪^程中,想對文件進(jìn)行處理,比如壓縮、加密等等,也很好擴(kuò)展(后面會(huì)具體介紹)。

流在Node中無處不在。從下圖中可以看出:

Stream分類

Stream分為四大類:

Readable(可讀流)

Writable (可寫流)

Duplex (雙工流)

Transform (轉(zhuǎn)換流)

Readable

可讀流中的數(shù)據(jù),在以下兩種模式下都能產(chǎn)生數(shù)據(jù)。

Flowing Mode

Non-Flowing Mode

兩種模式下,觸發(fā)的方式以及消耗的方式不一樣。

Flowing Mode:數(shù)據(jù)會(huì)源源不斷地生產(chǎn)出來,形成“流動(dòng)”現(xiàn)象。監(jiān)聽流的data事件便可進(jìn)入該模式。

Non-Flowing Mode下:需要顯示地調(diào)用read()方法,才能獲取數(shù)據(jù)。

兩種模式可以互相轉(zhuǎn)換

流的初始狀態(tài)是Null,通過監(jiān)聽data事件,或者pipe方法,調(diào)用resume方法,將流轉(zhuǎn)為Flowing Mode狀態(tài)。Flowing Mode狀態(tài)下調(diào)用pause方法,將流置為Non-Flowing Mode狀態(tài)。Non-Flowing Mode狀態(tài)下調(diào)用resume方法,同樣可以將流置為Flowing Mode狀態(tài)。

下面詳細(xì)介紹下兩種模式下,Readable流的運(yùn)行機(jī)制。

Flowing Mode

在Flowing Mode狀態(tài)下,創(chuàng)建的myReadable讀流,直接監(jiān)聽data事件,數(shù)據(jù)就源源不斷的流出來進(jìn)行消費(fèi)了。

myReadable.on("data",function(chunk){
      consume(chunk);//消費(fèi)流
})

一旦監(jiān)聽data事件之后,Readable內(nèi)部的流程如下圖所示

核心的方法是流內(nèi)部的read方法,它在參數(shù)n為不同值時(shí),分別觸發(fā)不同的操作。下面描述中的hightwatermark表示的是流內(nèi)部的緩沖池的大小。

n=undefined(消費(fèi)數(shù)據(jù),并觸發(fā)一次可讀流)

n=0(觸發(fā)一次可讀流,但是不會(huì)消費(fèi))

n>hightwatermark(修改hightwatermark的值)

n

n>buffer (可以返回null,也可以返回buffer所有的數(shù)據(jù)(當(dāng)時(shí)最后一次讀?。?

圖中黃色標(biāo)識的_read(),是用戶實(shí)現(xiàn)流所需要自己實(shí)現(xiàn)的方法,這個(gè)方法就是實(shí)際讀取流的方式(可以這樣理解,外賣平臺(tái)給你提供外賣的能力,那_read()方法就相當(dāng)于你下單點(diǎn)外賣)。后面會(huì)詳細(xì)介紹如何實(shí)現(xiàn)_read方法。

以上的流程可以描述為:監(jiān)聽data方法,Readable內(nèi)部就會(huì)調(diào)用read方法,來進(jìn)行觸發(fā)讀流操作,通過判斷是同步還是異步讀取,來決定讀取的數(shù)據(jù)是否放入緩沖區(qū)。如果為異步的,那么就要調(diào)用flow方法,來繼續(xù)觸發(fā)read方法,來讀取流,同時(shí)根據(jù)size參數(shù)判定是否emit("data")來消費(fèi)流,循環(huán)讀取。如果是同步的,那就emit("data")來消費(fèi)流,同時(shí)繼續(xù)觸發(fā)read方法,來讀取流。一旦push方法傳入的是null,整個(gè)流就結(jié)束了。

從使用者的角度來看,在這種模式下,你可以通過下面的方式來使用流

const fs = require("./fs");
const readFile = fs.createReadStream("./big.file");
const writeFile = fs.createWriteStream("./writeFile.js");
readFile.on("data",function(chunk){
      writeFile1.write(chunk);
})
Non-Flowing Mode

相對于Flowing mode,Non-Flowing Mode要相對簡單很多。

消費(fèi)該模式下的流,需要使用下面的方式

myReadable.on(‘readable’,function(){
     const chunk = myReadable.read()
     consume(chunk);//消費(fèi)流
})

在Non-Flowing Mode下,Readable內(nèi)部的流程如下圖:

從這個(gè)圖上看出,你要實(shí)現(xiàn)該模式的讀流,同樣要實(shí)現(xiàn)一個(gè)_read方法。

整個(gè)流程如下:監(jiān)聽readable方法,Readable內(nèi)部就會(huì)調(diào)用read方法。調(diào)用用戶實(shí)現(xiàn)的_read方法,來push數(shù)據(jù)到緩沖池,然后發(fā)送emit readable事件,通知用戶端消費(fèi)。

從使用者的角度來看,你可以通過下面的方式來使用該模式下的流

const fs = require("fs");
const readFile = fs.createReadStream("./big.file");
const writeFile = fs.createWriteStream("./writeFile.js");

readFile.on("readable",function(chunk) {
    while (null !== (chunk = myReadable.read())) {
        writeFile.write(chunk);
    }
});
Writable

相對于讀流,寫流的機(jī)制就更容易理解了。

寫流使用下面的方式進(jìn)行數(shù)據(jù)寫入

myWrite.write(chunk);

調(diào)用write后,內(nèi)部Writable的流程如下圖所示

類似于讀流,實(shí)現(xiàn)一個(gè)寫流,同樣需要用戶實(shí)現(xiàn)一個(gè)_write方法。

整個(gè)流程是這樣的:調(diào)用write之后,會(huì)首先判定是否要寫入緩沖區(qū)。如果不需要,那就調(diào)用用戶實(shí)現(xiàn)的_write方法,將流寫入到相應(yīng)的地方,_write會(huì)調(diào)用一個(gè)writeable內(nèi)部的一個(gè)回調(diào)函數(shù)。

從使用者的角度來看,使用一個(gè)寫流,采用下面的代碼所示的方式。

const fs = require("fs");
const readFile = fs.createReadStream("./big.file");
const writeFile = fs.createWriteStream("./writeFile.js");

readFile.on("data",function(chunk) {
    writeFile.write(chunk);
})

可以看到,使用寫流是非常簡單的。

我們先講解一下如何實(shí)現(xiàn)一個(gè)讀流和寫流,再來看Duplex和Transform是什么,因?yàn)榱私饬巳绾螌?shí)現(xiàn)一個(gè)讀流和寫流,再來理解Duplex和Transform就非常簡單了。

實(shí)現(xiàn)自定義的Readable

實(shí)現(xiàn)自定義的Readable,只需要實(shí)現(xiàn)一個(gè)_read方法即可,需要在_read方法中調(diào)用push方法來實(shí)現(xiàn)數(shù)據(jù)的生產(chǎn)。如下面的代碼所示:

const Readable = require("stream").Readable;

class MyReadable extends Readable {
    constructor(dataSource, options) {
        super(options);
        this.dataSource = dataSource;
    }
    _read() {
        const data = this.dataSource.makeData();
        setTimeout(()=>{
            this.push(data);
        });
    }
}

// 模擬資源池
const dataSource = {
    data: new Array(10).fill("-"),
    makeData() {
        if (!dataSource.data.length) return null;
        return dataSource.data.pop();
    }
};

const myReadable = new MyReadable(dataSource,);

myReadable.on("readable", () => {
    let chunk;
    while (null !== (chunk = myReadable.read())) {
        console.log(chunk);
    }
});
實(shí)現(xiàn)自定義的writable

實(shí)現(xiàn)自定義的writable,只需要實(shí)現(xiàn)一個(gè)_write方法即可。在_write中消費(fèi)chunk寫入到相應(yīng)地方,并且調(diào)用callback回調(diào)。如下面代碼所示:

const Writable = require("stream").Writable;
class Mywritable extends  Writable{
    constuctor(options){
        super(options);
    }
    _write(chunk,endcoding,callback){
        console.log(chunk);
        callback && callback();
    }
}
const myWritable = new Mywritable();
Duplex

雙工流:簡單理解,就是講一個(gè)Readable流和一個(gè)Writable流綁定到一起,它既可以用來做讀流,又可以用來做寫流。

實(shí)現(xiàn)一個(gè)Duplex流,你需要同時(shí)實(shí)現(xiàn)_read_write方法。

有一點(diǎn)需要注意的是:它所包含的 Readable流和Writable流是完全獨(dú)立,互不影響的兩個(gè)流,兩個(gè)流使用的不是同一個(gè)緩沖區(qū)。通過下面的代碼可以驗(yàn)證

// 模擬資源池1
const dataSource1 = {
    data: new Array(10).fill("a"),
    makeData() {
        if (!dataSource1.data.length) return null;
        return dataSource1.data.pop();
    }
};
// 模擬資源池2
const dataSource2 = {
    data: new Array(10).fill("b"),
    makeData() {
        if (!dataSource2.data.length) return null;
        return dataSource2.data.pop();
    }
};

const Readable = require("stream").Readable;
class MyReadable extends Readable {
    constructor(dataSource, options) {
        super(options);
        this.dataSource = dataSource;
    }
    _read() {
        const data = this.dataSource.makeData();
        setTimeout(()=>{
            this.push(data);
        })

    }
}

const Writable = require("stream").Writable;
class MyWritable extends Writable{
    constructor(options){
        super(options);
    }
    _write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback && callback();
    }
}

const Duplex = require("stream").Duplex;
class MyDuplex extends Duplex{
    constructor(dataSource,options) {
        super(options);
        this.dataSource = dataSource;
    }
    _read() {
        const data = this.dataSource.makeData();
        setTimeout(()=>{
            this.push(data);
        })
    }
    _write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback && callback();
    }
}

const myWritable = new MyWritable();
const myReadable = new MyReadable(dataSource1);
const myDuplex = new MyDuplex(dataSource1);
myReadable.pipe(myDuplex).pipe(myWritable);

打印的結(jié)果是

abababababababababab

從這個(gè)結(jié)果可以看出,myReadable.pipe(myDuplex),myDuplex充當(dāng)?shù)氖菍懥?,寫入的?nèi)容是a;myDuplex.pipe(myWritable),myDuplex充當(dāng)?shù)氖亲x流,往myWritable寫的卻是b;所以說它所包含的 Readable流和Writable流是完全獨(dú)立的。

Transform

理解了Duplex,就更好理解Transform了。Transform是一個(gè)轉(zhuǎn)換流,它既有讀的功能又有寫的功能,但是它和Duplex不同的是,它的讀流和寫流共用同一個(gè)緩沖區(qū);也就是說,通過它讀入什么,那它就能寫入什么。

實(shí)現(xiàn)一個(gè)Transform,你只需要實(shí)現(xiàn)一個(gè)_transform方法。比如最簡單的Transform:PassThrough,其源代碼如下所示

PassThrough就是一個(gè)Transform,但是這個(gè)轉(zhuǎn)換流,什么也沒做,相當(dāng)于一個(gè)透明的轉(zhuǎn)換流??梢钥吹?strong>_transform中什么都沒有,只是簡單的將數(shù)據(jù)進(jìn)行回調(diào)。

如果我們在這個(gè)環(huán)節(jié)做些擴(kuò)展,只需要在_transform中直接擴(kuò)展就行了。比如我們可以對流進(jìn)行壓縮,加密,混淆等等操作。

BackPress

最后介紹一個(gè)流中非常重要的一個(gè)概念:背壓。要了解這個(gè),我們首先來看下pipehighWaterMaker是什么。

pipe

首先看下下面的代碼

const fs = require("./fs");
const readFile = fs.createReadStream("./big.file");
const writeFile = fs.createWriteStream("./writeFile.js");
readFile.pipe(writeFile);

上面的代碼和下面是等價(jià)的

const fs = require("./fs");
const readFile = fs.createReadStream("./big.file");
const writeFile = fs.createWriteStream("./writeFile.js");
readFile.on("data",function(data){
    var flag = ws.write(data);
    if(!flag){ // 當(dāng)前寫流緩沖區(qū)已滿,暫停讀數(shù)據(jù)
        readFile.pause();
    }
})
writeFile.on("drain",function()){
    readFile.resume();// 當(dāng)前寫流緩沖區(qū)已清空,重新開始讀流
}
readFile.on("end",function(data){
    writeFile.end();//將寫流緩沖區(qū)的數(shù)據(jù)全部寫入,并且關(guān)閉寫入的文件
})

pipe所做的操作就是相當(dāng)于為寫流和讀流自動(dòng)做了速度的匹配。

讀寫流速度不匹配的情況下,一般情況下不會(huì)造成什么問題,但是會(huì)造成內(nèi)存增加。內(nèi)存消耗增加,就有可能會(huì)帶來一系列的問題。所以在使用的流的時(shí)候,強(qiáng)烈推薦使用pipe

highWaterMaker

highWaterMaker說白了,就是定義緩沖區(qū)的大小。

默認(rèn)16Kb(Readable最大8M)

可以自定義

背壓的概念可以理解為:為了防止讀寫流速度不匹配而產(chǎn)生的一種調(diào)整機(jī)制;背壓該調(diào)整機(jī)制的觸發(fā)時(shí)機(jī),受限于highWaterMaker設(shè)置的大小。

如上面的代碼 var flag = ws.write(data);,一旦寫流的緩沖區(qū)滿了,那flag就會(huì)置為false,反向促進(jìn)讀流的速度調(diào)整。

Stream的應(yīng)用場景

主要有以下場景

1.文件操作(復(fù)制,壓縮,解壓,加密等)

下面的就很容易就實(shí)現(xiàn)了文件復(fù)制的功能。

const fs = require("fs");
const readFile = fs.createReadStream("big.file");
const writeFile = fs.createWriteStream("big_copy.file");
readFile.pipe(writeFile);

那我們想在復(fù)制的過程中對文件進(jìn)行壓縮呢?

const fs = require("fs");
const readFile = fs.createReadStream("big.file");
const writeFile = fs.createWriteStream("big.gz");
const zlib = require("zlib");
readFile.pipe(zlib.createGzip()).pipe(writeFile);

實(shí)現(xiàn)解壓、加密也是類似的。

2.靜態(tài)文件服務(wù)器

比如需要返回一個(gè)html,可以使用如下代碼。

var http = require("http");
var fs = require("fs");
http.createServer(function(req,res){
    fs.createReadStream("./a.html").pipe(res);
}).listen(8000);

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/98614.html

相關(guān)文章

  • Node事件機(jī)制小記

    摘要:事件的監(jiān)聽與事件的觸發(fā)事件一事件機(jī)制的實(shí)現(xiàn)中大部分的模塊,都繼承自模塊。從另一個(gè)角度來看,事件偵聽器模式也是一種事件鉤子的機(jī)制,利用事件鉤子導(dǎo)出內(nèi)部數(shù)據(jù)或狀態(tài)給外部調(diào)用者。的核心就是事件發(fā)射與事件監(jiān)聽器功能的封裝。 nodejs事件的監(jiān)聽與事件的觸發(fā) nodejs事件(Events)showImg(https://segmentfault.com/img/bV0Sqi?w=692&h=...

    airborne007 評論0 收藏0
  • Node提供靜態(tài)文件服務(wù)

    摘要:前言對于一個(gè)應(yīng)用,提供靜態(tài)文件圖片服務(wù)常常是必須的。本文將介紹如何做一個(gè)自己的靜態(tài)文件服務(wù)器。雖然已經(jīng)品嘗到了成功的滋味,但這個(gè)靜態(tài)文件服務(wù)器還不夠完整,因?yàn)樗苋菀壮鲥e(cuò)。 前言 對于一個(gè)web應(yīng)用,提供靜態(tài)文件(CSS、JavaScript、圖片)服務(wù)常常是必須的。本文將介紹如何做一個(gè)自己的靜態(tài)文件服務(wù)器。 創(chuàng)建一個(gè)靜態(tài)文件服務(wù)器 每個(gè)靜態(tài)文件服務(wù)器都有個(gè)根目錄,也就是提供文件服務(wù)的...

    gecko23 評論0 收藏0
  • Node.js 指南(流中的背壓)

    摘要:在數(shù)據(jù)緩沖區(qū)已超過或?qū)懭腙?duì)列當(dāng)前正忙的任何情況下,將返回。當(dāng)返回值時(shí),背壓系統(tǒng)啟動(dòng),它會(huì)暫停傳入的流發(fā)送任何數(shù)據(jù),并等待消費(fèi)者再次準(zhǔn)備就緒,清空數(shù)據(jù)緩沖區(qū)后,將發(fā)出事件并恢復(fù)傳入的數(shù)據(jù)流。 流中的背壓 在數(shù)據(jù)處理過程中會(huì)出現(xiàn)一個(gè)叫做背壓的常見問題,它描述了數(shù)據(jù)傳輸過程中緩沖區(qū)后面數(shù)據(jù)的累積,當(dāng)傳輸?shù)慕邮斩司哂袕?fù)雜的操作時(shí),或者由于某種原因速度較慢時(shí),來自傳入源的數(shù)據(jù)就有累積的趨勢,就像...

    Tony 評論0 收藏0
  • Node_模塊

    摘要:當(dāng)某個(gè)執(zhí)行完畢時(shí),將以時(shí)間的形式通知執(zhí)行操作的線程,線程執(zhí)行了這個(gè)事件的回調(diào)函數(shù)。為了處理異步,線程必須有事件循環(huán),不斷的檢查是否有未處理的時(shí)間。這種處理機(jī)制,稱為事件環(huán)機(jī)制。方面使用第三方模塊。 簡介 V8引擎本身就是用于Chrome瀏覽器的JS解釋部分,但是Ryan Dahl,把V8搬到服務(wù)器,用于做服務(wù)器的軟件。Node是一個(gè)專注于實(shí)現(xiàn)高性能Web服務(wù)器優(yōu)化的專家,在遇到V8而誕...

    zero 評論0 收藏0
  • Node.js 中的緩沖區(qū)(Buffer)究竟是什么?

    摘要:在創(chuàng)建時(shí)大小已經(jīng)被確定且是無法調(diào)整的,在內(nèi)存分配這塊是由層面提供而不是具體后面會(huì)講解。在這里不知道你是否認(rèn)為這是很簡單的但是上面提到的一些關(guān)鍵詞二進(jìn)制流緩沖區(qū),這些又都是什么呢下面嘗試做一些簡單的介紹。 showImg(https://segmentfault.com/img/remote/1460000019894717?w=1280&h=850); 多數(shù)人都擁有自己不了解的能力和機(jī)...

    scwang90 評論0 收藏0

發(fā)表評論

0條評論

最新活動(dòng)
閱讀需要支付1元查看
<