成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

深入nodejs中流(stream)的理解

tianyu / 3046人閱讀

摘要:等文件一旦打開,立刻執(zhí)行寫入操作發(fā)射一個(gè)緩存區(qū)清空的事件自定義可寫流為了實(shí)現(xiàn)可寫流,我們需要使用流模塊中的構(gòu)造函數(shù)。

流的基本概念及理解
流是一種數(shù)據(jù)傳輸手段,是有順序的,有起點(diǎn)和終點(diǎn),比如你要把數(shù)據(jù)從一個(gè)地方傳到另外一個(gè)地方
流非常重要,gulp,webpack,HTTP里的請(qǐng)求和響應(yīng),http里的socket都是流,包括后面壓縮,加密等

流為什么這么好用還這么重要呢?

因?yàn)橛袝r(shí)候我們不關(guān)心文件的主體內(nèi)容,只關(guān)心能不能取到數(shù)據(jù),取到數(shù)據(jù)之后怎么進(jìn)行處理

對(duì)于小型的文本文件,我們可以把文件內(nèi)容全部讀入內(nèi)存,然后再寫入文件,比如grunt-file-copy

對(duì)于體積較大的二進(jìn)制文件,比如音頻、視頻文件,動(dòng)輒幾個(gè)GB大小,如果使用這種方法,很容易使內(nèi)存“爆倉(cāng)”。

理想的方法應(yīng)該是讀一部分,寫一部分,不管文件有多大,只要時(shí)間允許,總會(huì)處理完成,這里就需要用到流的概念

流是一個(gè)抽象接口,被Node中很多對(duì)象所實(shí)現(xiàn),比如HTTP服務(wù)器request和response對(duì)象都是流

Node.js 中有四種基本的流類型:

Readable - 可讀的流 (例如 fs.createReadStream()).

Writable - 可寫的流 (例如 fs.createWriteStream()).

Duplex - 可讀寫的流 (例如 net.Socket).

Transform - 在讀寫過程中可以修改和變換數(shù)據(jù)的 Duplex 流 (例如 zlib.createDeflate()).

可以通過 require("stream") 加載 Stream 基類。其中包括了 Readable 流、Writable 流、Duplex 流和 Transform 流的基類
Readable streams可讀流
可讀流(Readable streams)是對(duì)提供數(shù)據(jù)的 源頭(source)的抽象
可讀流的例子包括:

HTTP responses, on the client :客戶端請(qǐng)求

HTTP requests, on the server :服務(wù)端請(qǐng)求

fs read streams :讀文件

zlib streams :壓縮

crypto streams :加密

TCP sockets :TCP協(xié)議

child process stdout and stderr :子進(jìn)程標(biāo)準(zhǔn)輸出和錯(cuò)誤輸出

process.stdin :標(biāo)準(zhǔn)輸入

所有的 Readable 都實(shí)現(xiàn)了 stream.Readable 類定義的接口

通過流讀取數(shù)據(jù)

用Readable創(chuàng)建對(duì)象readable后,便得到了一個(gè)可讀流

如果實(shí)現(xiàn)_read方法,就將流連接到一個(gè)底層數(shù)據(jù)源

流通過調(diào)用_read向底層請(qǐng)求數(shù)據(jù),底層再調(diào)用流的push方法將需要的數(shù)據(jù)傳遞過來

當(dāng)readable連接了數(shù)據(jù)源后,下游便可以調(diào)用readable.read(n)向流請(qǐng)求數(shù)據(jù),同時(shí)監(jiān)聽readable的data事件來接收取到的數(shù)據(jù)

下面簡(jiǎn)單舉個(gè)可讀流的例子:

監(jiān)聽可讀流的data事件,當(dāng)你一旦開始監(jiān)聽data事件的時(shí)候,流就可以讀文件的內(nèi)容并且發(fā)射data,讀一點(diǎn)發(fā)射一點(diǎn)讀一點(diǎn)發(fā)射一點(diǎn)

默認(rèn)情況下,當(dāng)你監(jiān)聽data事件之后,會(huì)不停的讀數(shù)據(jù),然后觸發(fā)data事件,觸發(fā)完data事件后再次讀數(shù)據(jù)

讀的時(shí)候不是把文件整體內(nèi)容讀出來再發(fā)射出來的,而且設(shè)置一個(gè)緩沖區(qū),大小默認(rèn)是64K,比如文件是128K,先讀64K發(fā)射出來,再讀64K在發(fā)射出來,會(huì)發(fā)射兩次

緩沖區(qū)的大小可以通過highWaterMark來設(shè)置

let fs = require("fs");
//通過創(chuàng)建一個(gè)可讀流
let rs = fs.createReadStream("./1.txt",{
    flags:"r",//我們要對(duì)文件進(jìn)行何種操作
    mode:0o666,//權(quán)限位
    encoding:"utf8",//不傳默認(rèn)為buffer,顯示為字符串
    start:3,//從索引為3的位置開始讀
    //這是我的見過唯一一個(gè)包括結(jié)束索引的
    end:8,//讀到索引為8結(jié)束
    highWaterMark:3//緩沖區(qū)大小
});
rs.on("open",function () {
    console.log("文件打開");
});
rs.setEncoding("utf8");//顯示為字符串
//希望流有一個(gè)暫停和恢復(fù)觸發(fā)的機(jī)制
rs.on("data",function (data) {
    console.log(data);
    rs.pause();//暫停讀取和發(fā)射data事件
    setTimeout(function(){
        rs.resume();//恢復(fù)讀取并觸發(fā)data事件
    },2000);
});
//如果讀取文件出錯(cuò)了,會(huì)觸發(fā)error事件
rs.on("error",function () {
    console.log("error");
});
//如果文件的內(nèi)容讀完了,會(huì)觸發(fā)end事件
rs.on("end",function () {
    console.log("讀完了");
});
rs.on("close",function () {
    console.log("文件關(guān)閉");
});

/**
文件打開
334
455
讀完了
文件關(guān)閉
**/
可讀流的簡(jiǎn)單實(shí)現(xiàn)
let fs = require("fs");
let ReadStream = require("./ReadStream");
let rs = ReadStream("./1.txt", {
    flags: "r",
    encoding: "utf8",
    start: 3,
    end: 7,
    highWaterMark: 3
});
rs.on("open", function () {
    console.log("open");
});
rs.on("data", function (data) {
    console.log(data);
});
rs.on("end", function () {
    console.log("end");
});
rs.on("close", function () {
    console.log("close");
});
/**
 open
 456
 789
 end
 close
 **/
let fs = require("fs");
let EventEmitter = require("events");

class ReadStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.buffer = Buffer.alloc(this.highWaterMark);
        this.flags = options.flags || "r";
        this.encoding = options.encoding;
        this.mode = options.mode || 0o666;
        this.start = options.start || 0;
        this.end = options.end;
        this.pos = this.start;
        this.autoClose = options.autoClose || true;
        this.bytesRead = 0;
        this.closed = false;
        this.flowing;
        this.needReadable = false;
        this.length = 0;
        this.buffers = [];
        this.on("end", function () {
            if (this.autoClose) {
                this.destroy();
            }
        });
        this.on("newListener", (type) => {
            if (type == "data") {
                this.flowing = true;
                this.read();
            }
            if (type == "readable") {
                this.read(0);
            }
        });
        this.open();
    }

    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                    return this.emit("error", err);
                }
            }
            this.fd = fd;
            this.emit("open");
        });
    }

    read(n) {
        if (typeof this.fd != "number") {
            return this.once("open", () => this.read());
        }
        n = parseInt(n, 10);
        if (n != n) {
            n = this.length;
        }
        if (this.length == 0)
            this.needReadable = true;
        let ret;
        if (0 < n < this.length) {
            ret = Buffer.alloc(n);
            let b;
            let index = 0;
            while (null != (b = this.buffers.shift())) {
                for (let i = 0; i < b.length; i++) {
                    ret[index++] = b[i];
                    if (index == ret.length) {
                        this.length -= n;
                        b = b.slice(i + 1);
                        this.buffers.unshift(b);
                        break;
                    }
                }
            }
            if (this.encoding) ret = ret.toString(this.encoding);
        }

        let _read = () => {
            let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
            fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
                if (err) {
                    return
                }
                let data;
                if (bytesRead > 0) {
                    data = this.buffer.slice(0, bytesRead);
                    this.pos += bytesRead;
                    this.length += bytesRead;
                    if (this.end && this.pos > this.end) {
                        if (this.needReadable) {
                            this.emit("readable");
                        }

                        this.emit("end");
                    } else {
                        this.buffers.push(data);
                        if (this.needReadable) {
                            this.emit("readable");
                            this.needReadable = false;
                        }

                    }
                } else {
                    if (this.needReadable) {
                        this.emit("readable");
                    }
                    return this.emit("end");
                }
            })
        }
        if (this.length == 0 || (this.length < this.highWaterMark)) {
            _read(0);
        }
        return ret;
    }

    destroy() {
        fs.close(this.fd, (err) => {
            this.emit("close");
        });
    }

    pause() {
        this.flowing = false;
    }

    resume() {
        this.flowing = true;
        this.read();
    }

    pipe(dest) {
        this.on("data", (data) => {
            let flag = dest.write(data);
            if (!flag) this.pause();
        });
        dest.on("drain", () => {
            this.resume();
        });
        this.on("end", () => {
            dest.end();
        });
    }
}
module.exports = ReadStream;
自定義可讀流
為了實(shí)現(xiàn)可讀流,引用Readable接口并用它構(gòu)造新對(duì)象

我們可以直接把供使用的數(shù)據(jù)push出去。

當(dāng)push一個(gè)null對(duì)象就意味著我們想發(fā)出信號(hào)——這個(gè)流沒有更多數(shù)據(jù)了

var stream = require("stream");
var util = require("util");
util.inherits(Counter, stream.Readable);
function Counter(options) {
    stream.Readable.call(this, options);
    this._index = 0;
}
Counter.prototype._read = function() {
    if(this._index++<3){
        this.push(this._index+"");
    }else{
        this.push(null);
    }
};
var counter = new Counter();

counter.on("data", function(data){
    console.log("讀到數(shù)據(jù): " + data.toString());//no maybe
});
counter.on("end", function(data){
    console.log("讀完了");
});
可讀流的兩種模式
Readable Stream 存在兩種模式(flowing mode 與 paused mode),這兩種模式?jīng)Q定了chunk數(shù)據(jù)流動(dòng)的方式---自動(dòng)流動(dòng)還是手工流動(dòng)。那如何觸發(fā)這兩種模式呢:

flowing mode: 注冊(cè)事件data、調(diào)用resume方法、調(diào)用pipe方法

paused mode: 調(diào)用pause方法(沒有pipe方法)、移除data事件 && unpipe所有pipe

如果 Readable 切換到 flowing 模式,且沒有消費(fèi)者處理流中的數(shù)據(jù),這些數(shù)據(jù)將會(huì)丟失。 比如, 調(diào)用了 readable.resume() 方法卻沒有監(jiān)聽 "data" 事件,或是取消了 "data" 事件監(jiān)聽,就有可能出現(xiàn)這種情況

可讀流的三種狀態(tài)

在任意時(shí)刻,任意可讀流應(yīng)確切處于下面三種狀態(tài)之一:

readable._readableState.flowing = null

readable._readableState.flowing = false

readable._readableState.flowing = true

兩種模式取決于可讀流flowing狀態(tài):

若為true : flowing mode;

若為false : paused mode

flowing mode

通過注冊(cè)data、pipe、resume可以自動(dòng)獲取所需要的數(shù)據(jù),我們來看下源碼的實(shí)現(xiàn)
// data事件觸發(fā)flowing mode
 if (ev === "data") {
    // Start flowing on next tick if stream isn"t explicitly paused
    if (this._readableState.flowing !== false)
      this.resume();
  } else if (ev === "readable") {
    const state = this._readableState;
    if (!state.endEmitted && !state.readableListening) {
      state.readableListening = state.needReadable = true;
      state.emittedReadable = false;
      if (!state.reading) {
        process.nextTick(nReadingNextTick, this);
      } else if (state.length) {
        emitReadable(this);
      }
    }
  }

// resume觸發(fā)flowing mode
Readable.prototype.resume = function() {
    var state = this._readableState;
    if (!state.flowing) {
        debug("resume");
        state.flowing = true;
    resume(this, state);
  }
  return this;
}

// pipe方法觸發(fā)flowing模式
Readable.prototype.resume = function() {
    if (!state.flowing) {
        this.resume()
    }
}
flowing mode的三種方法最后均是通過resume方法,將狀態(tài)變?yōu)閠rue:state.flowing = true

paused mode

在paused mode下,需要手動(dòng)地讀取數(shù)據(jù),并且可以直接指定讀取數(shù)據(jù)的長(zhǎng)度
可以通過監(jiān)聽事件readable,觸發(fā)時(shí)手工讀取chunk數(shù)據(jù):

當(dāng)你監(jiān)聽 readable事件的時(shí)候,會(huì)進(jìn)入暫停模式

當(dāng)監(jiān)聽readable事件的時(shí)候,可讀流會(huì)馬上去向底層讀取文件,然后把讀到文件的文件放在緩存區(qū)里const state = this._readableState;

self.read(0); 只填充緩存,但是并不會(huì)發(fā)射data事件,但是會(huì)發(fā)射stream.emit("readable");事件

this._read(state.highWaterMark); 每次調(diào)用底層的方法讀取的時(shí)候是讀取3個(gè)字節(jié)

let fs = require("fs");
let rs = fs.createReadStream("./1.txt",{
    highWaterMark:3
});
rs.on("readable",function(){
    console.log(rs._readableState.length);
    //read如果不加參數(shù)表示讀取整個(gè)緩存區(qū)數(shù)據(jù)
    //讀取一個(gè)字段,如果可讀流發(fā)現(xiàn)你要讀的字節(jié)小于等于緩存字節(jié)大小,則直接返回
    let chunk = rs.read(1);
    console.log(chunk);
    console.log(rs._readableState.length);
    //當(dāng)你讀完指定的字節(jié)后,如果可讀流發(fā)現(xiàn)剩下的字節(jié)已經(jīng)比最高水位線小了。則會(huì)立馬再次讀取填滿 最高水位線
    setTimeout(function(){
        console.log(rs._readableState.length);
    },200)
});
注意:一旦注冊(cè)了readable事件,必須手工讀取read數(shù)據(jù),否則數(shù)據(jù)就會(huì)流失,我們來看下源碼的實(shí)現(xiàn)
function emitReadable(stream) {
  var state = stream._readableState;
  state.needReadable = false;
  if (!state.emittedReadable) {
    debug("emitReadable", state.flowing);
    state.emittedReadable = true;
    process.nextTick(emitReadable_, stream);
  }
}

function emitReadable_(stream) {
  var state = stream._readableState;
  debug("emit readable");
  if (!state.destroyed && (state.length || state.ended)) {
    stream.emit("readable");
  }
  state.needReadable = !state.flowing && !state.ended;
  flow(stream);
}

function flow(stream) {
  const state = stream._readableState;
  debug("flow", state.flowing);
  while (state.flowing && stream.read() !== null);
}

function endReadable(stream) {
  var state = stream._readableState;
  debug("endReadable", state.endEmitted);
  if (!state.endEmitted) {
    state.ended = true;
    process.nextTick(endReadableNT, state, stream);
  }
}

Readable.prototype.read = function(n) {
  debug("read", n);
  n = parseInt(n, 10);
  var state = this._readableState;
  var nOrig = n;
  if (n !== 0)
    state.emittedReadable = false;
  if (n === 0 &&
      state.needReadable &&
      (state.length >= state.highWaterMark || state.ended)) {
    debug("read: emitReadable", state.length, state.ended);
    if (state.length === 0 && state.ended)
      endReadable(this);
    else
      emitReadable(this);
    return null;
  }
  n = howMuchToRead(n, state);
  if (n === 0 && state.ended) {
    if (state.length === 0)
      endReadable(this);
    return null;
  }
flow方法直接read數(shù)據(jù),將得到的數(shù)據(jù)通過事件data交付出去,然而此處沒有注冊(cè)data事件監(jiān)控,因此,得到的chunk數(shù)據(jù)并沒有交付給任何對(duì)象,這樣數(shù)據(jù)就白白流失了,所以在觸發(fā)emit("readable")時(shí),需要提前read數(shù)據(jù)
Writable streams可寫流
可寫流是對(duì)數(shù)據(jù)寫入"目的地"的一種抽象
Writable:可寫流的例子包括了:

HTTP requests, on the client 客戶端請(qǐng)求

HTTP responses, on the server 服務(wù)器響應(yīng)

fs write streams 文件

zlib streams 壓縮

crypto streams 加密

TCP sockets TCP服務(wù)器

child process stdin 子進(jìn)程標(biāo)準(zhǔn)輸入

process.stdout, process.stderr 標(biāo)準(zhǔn)輸出,錯(cuò)誤輸出

下面舉個(gè)可寫流的簡(jiǎn)單例子

當(dāng)你往可寫流里寫數(shù)據(jù)的時(shí)候,不是會(huì)立刻寫入文件的,而是會(huì)很寫入緩存區(qū),緩存區(qū)的大小就是highWaterMark,默認(rèn)值是16K。然后等緩存區(qū)滿了之后再次真正的寫入文件里

let fs = require("fs");
let ws = fs.createWriteStream("./2.txt",{
   flags:"w",
   mode:0o666,
   start:3,
   highWaterMark:3//默認(rèn)是16K
});

如果緩存區(qū)已滿 ,返回false,如果緩存區(qū)未滿,返回true

如果能接著寫,返回true,如果不能接著寫,返回false

按理說如果返回了false,就不能再往里面寫了,但是如果你真寫了,如果也不會(huì)丟失,會(huì)緩存在內(nèi)存里。等緩存區(qū)清空之后再?gòu)膬?nèi)存里讀出來

let flag = ws.write("1");
console.log(flag);//true
flag =ws.write("2");
console.log(flag);//true
flag =ws.write("3");
console.log(flag);//false
flag =ws.write("4");
console.log(flag);//false

"drain" 事件

如果調(diào)用 stream.write(chunk) 方法返回 false,流將在適當(dāng)?shù)臅r(shí)機(jī)觸發(fā) "drain" 事件,這時(shí)才可以繼續(xù)向流中寫入數(shù)據(jù)

當(dāng)一個(gè)流不處在 drain 的狀態(tài), 對(duì) write() 的調(diào)用會(huì)緩存數(shù)據(jù)塊, 并且返回 false。 一旦所有當(dāng)前所有緩存的數(shù)據(jù)塊都排空了(被操作系統(tǒng)接受來進(jìn)行輸出), 那么 "drain" 事件就會(huì)被觸發(fā)

建議, 一旦 write() 返回 false, 在 "drain" 事件觸發(fā)前, 不能寫入任何數(shù)據(jù)塊

舉個(gè)簡(jiǎn)單的例子說明一下:

let fs = require("fs");
let ws = fs.createWriteStream("2.txt",{
    flags:"w",
    mode:0o666,
    start:0,
    highWaterMark:3
});
let count = 9;
function write(){
 let flag = true;//緩存區(qū)未滿
    //寫入方法是同步的,但是寫入文件的過程是異步的。在真正寫入文件后還會(huì)執(zhí)行我們的回調(diào)函數(shù)
 while(flag && count>0){
     console.log("before",count);
     flag = ws.write((count)+"","utf8",(function (i) {
         return ()=>console.log("after",i);
     })(count));
     count--;
 }
}
write();//987
//監(jiān)聽緩存區(qū)清空事件
ws.on("drain",function () {
    console.log("drain");
    write();//654 321
});
ws.on("error",function (err) {
    console.log(err);
});
/**
before 9
before 8
before 7
after 9
after 8
after 7
**/
如果已經(jīng)不再需要寫入了,可以調(diào)用end方法關(guān)閉寫入流,一旦調(diào)用end方法之后則不能再寫入
比如在ws.end();后寫ws.write("x");,會(huì)報(bào)錯(cuò)write after end

"pipe"事件

linux精典的管道的概念,前者的輸出是后者的輸入

pipe是一種最簡(jiǎn)單直接的方法連接兩個(gè)stream,內(nèi)部實(shí)現(xiàn)了數(shù)據(jù)傳遞的整個(gè)過程,在開發(fā)的時(shí)候不需要關(guān)注內(nèi)部數(shù)據(jù)的流動(dòng)

這個(gè)方法從可讀流拉取所有數(shù)據(jù), 并將數(shù)據(jù)寫入到提供的目標(biāo)中

自動(dòng)管理流量,將數(shù)據(jù)的滯留量限制到一個(gè)可接受的水平,以使得不同速度的來源和目標(biāo)不會(huì)淹沒可用內(nèi)存

默認(rèn)情況下,當(dāng)源數(shù)據(jù)流觸發(fā) end的時(shí)候調(diào)用end(),所以寫入數(shù)據(jù)的目標(biāo)不可再寫。傳 { end:false }作為options,可以保持目標(biāo)流打開狀態(tài)

pipe方法的原理

var fs = require("fs");
var ws = fs.createWriteStream("./2.txt");
var rs = fs.createReadStream("./1.txt");
rs.on("data", function (data) {
    var flag = ws.write(data);
    if(!flag)
    rs.pause();
});
ws.on("drain", function () {
    rs.resume();
});
rs.on("end", function () {
    ws.end();
});
下面舉個(gè)簡(jiǎn)單的例子說明一下pipe的用法:
let fs = require("fs");
let rs = fs.createReadStream("./1.txt",{
  highWaterMark:3
});
let ws = fs.createWriteStream("./2.txt",{
    highWaterMark:3
});
rs.pipe(ws);
//移除目標(biāo)可寫流
rs.unpipe(ws);

當(dāng)監(jiān)聽可讀流data事件的時(shí)候會(huì)觸發(fā)回調(diào)函數(shù)的執(zhí)行

可以實(shí)現(xiàn)數(shù)據(jù)的生產(chǎn)者和消費(fèi)者速度的均衡

rs.on("data",function (data) {
    console.log(data);
    let flag = ws.write(data);
   if(!flag){
       rs.pause();
   }
});

監(jiān)聽可寫流緩存區(qū)清空事件,當(dāng)所有要寫入的數(shù)據(jù)寫入完成后,接著恢復(fù)從可讀流里讀取并觸發(fā)data事件

ws.on("drain",function () {
    console.log("drain");
    rs.resume();
});

unpipe

readable.unpipe()方法將之前通過stream.pipe()方法綁定的流分離

如果寫入的目標(biāo)沒有傳入, 則所有綁定的流都會(huì)被分離

如果指定了寫入的目標(biāo),但是沒有綁定流,則什么事情都不會(huì)發(fā)生

簡(jiǎn)單距離說明下unpipe的用法:
let fs = require("fs");
var from = fs.createReadStream("./1.txt");
var to = fs.createWriteStream("./2.txt");
from.pipe(to);
setTimeout(() => {
console.log("關(guān)閉向2.txt的寫入");
from.unpipe(writable);
console.log("手工關(guān)閉文件流");
to.end();
}, 1000);
pipe的簡(jiǎn)單實(shí)現(xiàn)
let fs = require("fs");
let ReadStream = require("./ReadStream");
let rs = ReadStream("./1.txt", {
    flags: "r",
    encoding: "utf8",
    highWaterMark: 3
});
let FileWriteStream = require("./WriteStream");
let ws = FileWriteStream("./2.txt",{
    flags:"w",
    encoding:"utf8",
    highWaterMark:3
});
rs.pipe(ws);
ReadStream.prototype.pipe = function (dest) {
    this.on("data", (data)=>{
        let flag = dest.write(data);
        if(!flag){
            this.pause();
        }
    });
    dest.on("drain", ()=>{
        this.resume();
    });
    this.on("end", ()=>{
        dest.end();
    });
}
ReadStream.prototype.pause = function(){
    this.flowing = false;

}
ReadStream.prototype.resume = function(){
    this.flowing = true;
    this.read();
}
自定義管道流
const stream = require("stream")

var index = 0;
const readable = stream.Readable({
    highWaterMark: 2,
    read: function () {
        process.nextTick(() => {
            console.log("push", ++index)
            this.push(index+"");
        })
    }
})
const writable = stream.Writable({
    highWaterMark: 2,
    write: function (chunk, encoding, next) {
        console.log("寫入:", chunk.toString())
    }
})
readable.pipe(writable);
可寫流的簡(jiǎn)單實(shí)現(xiàn)
let fs = require("fs");
 let FileWriteStream = require("./FileWriteStream");
 let ws = FileWriteStream("./2.txt",{
     flags:"w",
     encoding:"utf8",
     highWaterMark:3
 });
 let i = 10;
 function write(){
     let  flag = true;
     while(i&&flag){
         flag = ws.write("1","utf8",(function(i){
             return function(){
                 console.log(i);
             }
         })(i));
         i--;
         console.log(flag);
     }
 }
 write();
 ws.on("drain",()=>{
     console.log("drain");
     write();
 });
 /**
  10
  9
  8
  drain
  7
  6
  5
  drain
  4
  3
  2
  drain
  1
  **/
let EventEmitter = require("events");
let util = require("util");
let fs = require("fs");
util.inherits(WriteStream, EventEmitter);

function WriteStream(path, options) {
    EventEmitter.call(this);
    if (!(this instanceof WriteStream)) {
        return new WriteStream(path, options);
    }
    this.path = path;
    this.fd = options.fd;
    this.encoding = options.encoding||"utf8";
    this.flags = options.flags || "w";
    this.mode = options.mode || 0o666;
    this.autoClose = options.autoClose || true;
    this.start = options.start || 0;
    this.pos = this.start;//開始寫入的索引位置
    this.open();//打開文件進(jìn)行操作
    this.writing = false;//沒有在寫入過程 中
    this.buffers = [];
    this.highWaterMark = options.highWaterMark||16*1024;
    //如果監(jiān)聽到end事件,而且要求自動(dòng)關(guān)閉的話則關(guān)閉文件
    this.on("end", function () {
        if (this.autoClose) {
            this.destroy()
        }
    });
}
WriteStream.prototype.close = function(){
    fs.close(this.fd,(err)=>{
        if(err)
            this.emit("error",err);
    });
}
WriteStream.prototype.open = function () {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
        if (err)
            return this.emit("error", err);
        this.fd = fd;//把文件描述符賦給當(dāng)前實(shí)例的fd屬性
        //發(fā)射open事件
        this.emit("open", fd);
    });
}
/**
 * 會(huì)判斷當(dāng)前是后臺(tái)是否在寫入過程中,如果在寫入過程中,則把這個(gè)數(shù)據(jù)放在待處理的緩存中,如果不在寫入過程中,可以直接寫。
 */
WriteStream.prototype.write = function (chunk, encoding, cb) {
    chunk= Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,this.encoding);

    //先把數(shù)據(jù)放在緩存里
    this.buffers.push({
        chunk,
        encoding,
        cb
    });

    let isFull = this.buffers.reduce((len, item) => len + item.chunk.length, 0)>=this.highWaterMark;
    //只有當(dāng)緩存區(qū)寫滿了,那么清空緩存區(qū)的時(shí)候才會(huì)發(fā)射drain事件,否則 不發(fā)放
    this.needDrain = isFull;
    //如果說文件還沒有打開,則把寫入的方法壓入open事件的監(jiān)聽函數(shù)。等文件一旦打開,立刻執(zhí)行寫入操作
    if (typeof this.fd !== "number") {
         this.once("open", () => {
            this._write();
        });
        return !isFull;
    }else{
        if(!this.writing){
            setImmediate(()=>{
                this._write();
                this.writing = true;
            });
        }

        return !isFull;
    }
}
WriteStream.prototype._write = function () {
    let part = this.buffers.shift();
    if (part) {
        fs.write(this.fd,part.chunk,0,part.chunk.length,null,(err,bytesWritten)=>{
            if(err)return this.emit("error",err);
            part.cb && part.cb();
            this._write();
        });
    }else{
        //發(fā)射一個(gè)緩存區(qū)清空的事件
        this.emit("drain");
        this.writing = false;
    }
}
module.exports = WriteStream;
自定義可寫流
為了實(shí)現(xiàn)可寫流,我們需要使用流模塊中的Writable構(gòu)造函數(shù)。 我們只需給Writable構(gòu)造函數(shù)傳遞一些選項(xiàng)并創(chuàng)建一個(gè)對(duì)象。唯一需要的選項(xiàng)是write函數(shù),該函數(shù)揭露數(shù)據(jù)塊要往哪里寫

chunk通常是一個(gè)buffer,除非我們配置不同的流。

encoding是在特定情況下需要的參數(shù),通常我們可以忽略它。

callback是在完成處理數(shù)據(jù)塊后需要調(diào)用的函數(shù)。這是寫數(shù)據(jù)成功與否的標(biāo)志。若要發(fā)出故障信號(hào),請(qǐng)用錯(cuò)誤對(duì)象調(diào)用回調(diào)函數(shù)

var stream = require("stream");
var util = require("util");
util.inherits(Writer, stream.Writable);
let stock = [];
function Writer(opt) {
    stream.Writable.call(this, opt);
}
Writer.prototype._write = function(chunk, encoding, callback) {
    setTimeout(()=>{
        stock.push(chunk.toString("utf8"));
        console.log("增加: " + chunk);
        callback();
    },500)
};
var w = new Writer();
for (var i=1; i<=5; i++){
    w.write("項(xiàng)目:" + i, "utf8");
}
w.end("結(jié)束寫入",function(){
    console.log(stock);
});
Duplex streams可讀寫的流(雙工流)
Duplex 流是同時(shí)實(shí)現(xiàn)了 Readable 和 Writable 接口的流
雙工流的可讀性和可寫性操作完全獨(dú)立于彼此,這僅僅是將兩個(gè)特性組合成一個(gè)對(duì)象

Duplex 流的實(shí)例包括了:

TCP sockets

zlib streams

crypto streams

下面簡(jiǎn)單實(shí)現(xiàn)雙工流:
const {Duplex} = require("stream");
const inoutStream = new Duplex({
    write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback();
    },
    read(size) {
        this.push((++this.index)+"");
        if (this.index > 3) {
            this.push(null);
        }
    }
});

inoutStream.index = 0;
process.stdin.pipe(inoutStream).pipe(process.stdout);
Transform streams轉(zhuǎn)換流
變換流(Transform streams) 是一種 Duplex 流。它的輸出與輸入是通過某種方式關(guān)聯(lián)的。和所有 Duplex 流一樣,變換流同時(shí)實(shí)現(xiàn)了 Readable 和 Writable 接口

轉(zhuǎn)換流的輸出是從輸入中計(jì)算出來的
對(duì)于轉(zhuǎn)換流,我們不必實(shí)現(xiàn)read或write的方法,我們只需要實(shí)現(xiàn)一個(gè)transform方法,將兩者結(jié)合起來。它有write方法的意思,我們也可以用它來push數(shù)據(jù)

變換流的實(shí)例包括:

zlib streams

crypto streams

下面簡(jiǎn)單實(shí)現(xiàn)轉(zhuǎn)換流:
const {Transform} = require("stream");
const upperCase = new Transform({
    transform(chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase());
        callback();
    }
});
process.stdin.pipe(upperCase).pipe(process.stdout);
對(duì)象流
默認(rèn)情況下,流處理的數(shù)據(jù)是Buffer/String類型的值。有一個(gè)objectMode標(biāo)志,我們可以設(shè)置它讓流可以接受任何JavaScript對(duì)象
const {Transform} = require("stream");
let fs = require("fs");
let rs = fs.createReadStream("./users.json");
rs.setEncoding("utf8");
let toJson = Transform({
    readableObjectMode: true,
    transform(chunk, encoding, callback) {
        this.push(JSON.parse(chunk));
        callback();
    }
});
let jsonOut = Transform({
    writableObjectMode: true,
    transform(chunk, encoding, callback) {
        console.log(chunk);
        callback();
    }
});
rs.pipe(toJson).pipe(jsonOut);

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/107224.html

相關(guān)文章

  • Node.js 中流操作實(shí)踐

    摘要:事件的觸發(fā)頻次同樣是由實(shí)現(xiàn)者決定,譬如在進(jìn)行文件讀取時(shí),可能每行都會(huì)觸發(fā)一次而在請(qǐng)求處理時(shí),可能數(shù)的數(shù)據(jù)才會(huì)觸發(fā)一次。如果有參數(shù)傳入,它會(huì)讓可讀流停止流向某個(gè)特定的目的地,否則,它會(huì)移除所有目的地。 showImg(https://segmentfault.com/img/remote/1460000016328758?w=1967&h=821); 本文節(jié)選自 Node.js Chea...

    chaos_G 評(píng)論0 收藏0
  • Node.js中流使用

    摘要:流是基于事件的用于管理和處理數(shù)據(jù)而且有不錯(cuò)的效率借助事件和非阻塞庫(kù)流模塊允許在其可用的時(shí)候動(dòng)態(tài)處理在其不需要的時(shí)候釋放掉使用流的好處舉一個(gè)讀取文件的例子使用同步讀取一個(gè)文件程序會(huì)被阻塞所有的數(shù)據(jù)都會(huì)被讀取到內(nèi)存中換用讀取文件程序不會(huì)被阻塞但 流是基于事件的API,用于管理和處理數(shù)據(jù),而且有不錯(cuò)的效率.借助事件和非阻塞I/O庫(kù),流模塊允許在其可用的時(shí)候動(dòng)態(tài)處理,在其不需要的時(shí)候釋放掉. ...

    h9911 評(píng)論0 收藏0
  • 重讀 Gulp

    摘要:當(dāng)接收一個(gè)回調(diào)函數(shù)的時(shí)候,一定要注意回調(diào)函數(shù)中的參數(shù)。主要作用就是用來讀取文件或者文件夾中的數(shù)據(jù)。表示文件的名稱指的是發(fā)生的變化使用技巧的進(jìn)一步使用,可以參照中文官網(wǎng)中的技巧集。 Gulp 簡(jiǎn)介 Gulp 對(duì)現(xiàn)在的前端而言,是一個(gè)稍微老舊的工具了,但是,為了復(fù)習(xí)以前學(xué)過的內(nèi)容,還是把它翻出來,放在自己的博客中。說不定哪天又用到了呢。 需要說明的是,這里使用的 Gulp 版本是 3.9....

    vpants 評(píng)論0 收藏0
  • Node事件機(jī)制小記

    摘要:事件的監(jiān)聽與事件的觸發(fā)事件一事件機(jī)制的實(shí)現(xiàn)中大部分的模塊,都繼承自模塊。從另一個(gè)角度來看,事件偵聽器模式也是一種事件鉤子的機(jī)制,利用事件鉤子導(dǎo)出內(nèi)部數(shù)據(jù)或狀態(tài)給外部調(diào)用者。的核心就是事件發(fā)射與事件監(jiān)聽器功能的封裝。 nodejs事件的監(jiān)聽與事件的觸發(fā) nodejs事件(Events)showImg(https://segmentfault.com/img/bV0Sqi?w=692&h=...

    airborne007 評(píng)論0 收藏0
  • 通過源碼解析 Node.js 中導(dǎo)流(pipe)實(shí)現(xiàn)

    摘要:回調(diào)函數(shù)中檢測(cè)該次寫入是否被緩沖,若是,觸發(fā)事件。若目標(biāo)可寫流表示該寫入操作需要進(jìn)行緩沖,則立刻將源可讀流切換至?xí)和DJ?。監(jiān)聽源可讀流的事件,相應(yīng)地結(jié)束目標(biāo)可寫流。 在Node.js中,流(Stream)是其眾多原生對(duì)象的基類,它對(duì)處理潛在的大文件提供了支持,也抽象了一些場(chǎng)景下的數(shù)據(jù)處理和傳遞。在它對(duì)外暴露的接口中,最為神奇的,莫過于導(dǎo)流(pipe)方法了。鑒于近期自己正在閱讀Node...

    defcon 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<