成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

[轉(zhuǎn)]nodejs Stream使用手冊(cè)

luffyZh / 721人閱讀

摘要:方法也可以接收一個(gè)參數(shù)表示數(shù)據(jù)請(qǐng)求著請(qǐng)求的數(shù)據(jù)大小,但是可讀流可以根據(jù)需要忽略這個(gè)參數(shù)。讀取數(shù)據(jù)大部分情況下我們只要簡(jiǎn)單的使用方法將可讀流的數(shù)據(jù)重定向到另外形式的流,但是在某些情況下也許直接從可讀流中讀取數(shù)據(jù)更有用。

介紹
本文介紹了使用 node.js streams 開發(fā)程序的基本方法。

"We should have some ways of connecting programs like garden hose--screw in
another segment when it becomes necessary to massage data in
another way. This is the way of IO also."

Doug McIlroy. October 11, 1964

最早接觸Stream是從早期的unix開始的
數(shù)十年的實(shí)踐證明Stream 思想可以很簡(jiǎn)單的開發(fā)出一些龐大的系統(tǒng)。在unix里,Stream是通過(guò)
|實(shí)現(xiàn)的;在node中,作為內(nèi)置的stream模塊,很多核心模塊和三方模塊都使用到。和unix一樣,
node Stream主要的操作也是.pipe(),使用者可以使用反壓力機(jī)制來(lái)控制讀和寫的平衡。
Stream 可以為開發(fā)者提供可以重復(fù)使用統(tǒng)一的接口,通過(guò)抽象的Stream接口來(lái)控制Stream之間的讀寫平衡。

為什么使用Stream
node中的I/O是異步的,因此對(duì)磁盤和網(wǎng)絡(luò)的讀寫需要通過(guò)回調(diào)函數(shù)來(lái)讀取數(shù)據(jù),下面是一個(gè)文件下載服務(wù)器

的簡(jiǎn)單代碼:

var http = require("http");
var fs = require("fs");

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + "/data.txt", function (err, data) {
        res.end(data);
    });
});
server.listen(8000);

這些代碼可以實(shí)現(xiàn)需要的功能,但是服務(wù)在發(fā)送文件數(shù)據(jù)之前需要緩存整個(gè)文件數(shù)據(jù)到內(nèi)存,如果"data.txt"文件很
大并且并發(fā)量很大的話,會(huì)浪費(fèi)很多內(nèi)存。因?yàn)橛脩粜枰鹊秸麄€(gè)文件緩存到內(nèi)存才能接受的文件數(shù)據(jù),這樣導(dǎo)致
用戶體驗(yàn)相當(dāng)不好。不過(guò)還好(req, res)兩個(gè)參數(shù)都是Stream,這樣我們可以用fs.createReadStream()代替fs.readFile():

var http = require("http");
var fs = require("fs");

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + "/data.txt");
    stream.pipe(res);
});
server.listen(8000);

.pipe()方法監(jiān)聽fs.createReadStream()"data""end"事件,這樣"data.txt"文件就不需要緩存整
個(gè)文件,當(dāng)客戶端連接完成之后馬上可以發(fā)送一個(gè)數(shù)據(jù)塊到客戶端。使用.pipe()另一個(gè)好處是可以解決當(dāng)客戶
端延遲非常大時(shí)導(dǎo)致的讀寫不平衡問(wèn)題。如果想壓縮文件再發(fā)送,可以使用三方模塊實(shí)現(xiàn):

var http = require("http");
var fs = require("fs");
var oppressor = require("oppressor");

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + "/data.txt");
    stream.pipe(oppressor(req)).pipe(res);
});
server.listen(8000);

這樣文件就會(huì)對(duì)支持gzipdeflate的瀏覽器進(jìn)行壓縮。oppressor 模塊會(huì)處理所有的content-encoding。

Stream使開發(fā)程序變得簡(jiǎn)單。

基礎(chǔ)概念
有五種基本的Stream: readable, writable, transform, duplex, and"classic”.

pipe

所有類型的Stream收是使用 .pipe() 來(lái)創(chuàng)建一個(gè)輸入輸出對(duì),接收一個(gè)可讀流src并將其數(shù)據(jù)輸出到可寫流dst,如下:

src.pipe(dst)    

.pipe(dst)方法為返回dst流,這樣就可以接連使用多個(gè).pipe(),如下:

a.pipe(b).pipe(c).pipe(d)

功能與下面的代碼相同:

a.pipe(b);
b.pipe(c);
c.pipe(d);

這樣的用法十分類似于unix命令下面用法:

a | b | c | d
readable streams

通過(guò)調(diào)用Readable streams的 .pipe()方法可以把Readable streams的數(shù)據(jù)寫入一個(gè)

Writable , Transform, 或者Duplex stream。

readableStream.pipe(dst)
創(chuàng)建 readable stream

這里我們創(chuàng)建一個(gè)readable stream!

var Readable = require("stream").Readable;

var rs = new Readable;
rs.push("beep ");
rs.push("boop
");
rs.push(null);

rs.pipe(process.stdout);


$ node read0.js
beep boop

rs.push(null) 通知數(shù)據(jù)接收者數(shù)據(jù)已經(jīng)發(fā)送完畢.
注意到我們?cè)趯⑺袛?shù)據(jù)內(nèi)容壓入可讀流之前并沒有調(diào)用rs.pipe(process.stdout);,但是我們壓入的所有數(shù)據(jù)
內(nèi)容還是完全的輸出了,這是因?yàn)榭勺x流在接收者沒有讀取數(shù)據(jù)之前,會(huì)緩存所有壓入的數(shù)據(jù)。但是在很多情況下,更好的方法是只有數(shù)據(jù)接收著請(qǐng)求數(shù)據(jù)的時(shí)候,才壓入數(shù)據(jù)到可讀流而不是緩存整個(gè)數(shù)據(jù)。下面我們重寫 一下
._read()函數(shù):

var Readable = require("stream").Readable;
var rs = Readable();

var c = 97;
rs._read = function () {
    rs.push(String.fromCharCode(c++));
    if (c > "z".charCodeAt(0)) rs.push(null);
};

rs.pipe(process.stdout);

$ node read1.js
abcdefghijklmnopqrstuvwxyz

上面的代碼通過(guò)重寫_read()方法實(shí)現(xiàn)了只有在數(shù)據(jù)接受者請(qǐng)求數(shù)據(jù)才向可讀流中壓入數(shù)據(jù)。_read()方法也可以接收一個(gè)size參數(shù)表示數(shù)據(jù)請(qǐng)求著請(qǐng)求的數(shù)據(jù)大小,但是可讀流可以根據(jù)需要忽略這個(gè)參數(shù)。注意我們也可以用util.inherits()繼承可讀流。為了說(shuō)明只有在數(shù)據(jù)接受者請(qǐng)求數(shù)據(jù)時(shí)_read()方法才被調(diào)用,我們?cè)谙蚩勺x流壓入數(shù)據(jù)時(shí)做一個(gè)延時(shí),如下:

var Readable = require("stream").Readable;
var rs = Readable();

var c = 97 - 1;

rs._read = function () {
    if (c >= "z".charCodeAt(0)) return rs.push(null);

    setTimeout(function () {
        rs.push(String.fromCharCode(++c));
    }, 100);
};

rs.pipe(process.stdout);

process.on("exit", function () {
    console.error("
_read() called " + (c - 97) + " times");
});
process.stdout.on("error", process.exit);

用下面的命令運(yùn)行程序我們發(fā)現(xiàn)_read()方法只調(diào)用了5次:

$ node read2.js | head -c5
abcde
_read() called 5 times

使用計(jì)時(shí)器的原因是系統(tǒng)需要時(shí)間來(lái)發(fā)送信號(hào)來(lái)通知程序關(guān)閉管道。使用process.stdout.on("error", fn) 是為了處理系統(tǒng)因?yàn)?b>header命令關(guān)閉管道而發(fā)送SIGPIPE信號(hào),因?yàn)檫@樣會(huì)導(dǎo)致process.stdout觸發(fā)EPIPE事件。如果想創(chuàng)建一個(gè)的可以壓入任意形式數(shù)據(jù)的可讀流,只要在創(chuàng)建流的時(shí)候設(shè)置參數(shù)objectModetrue即可,例如:Readable({ objectMode: true })

讀取readable stream數(shù)據(jù)

大部分情況下我們只要簡(jiǎn)單的使用pipe方法將可讀流的數(shù)據(jù)重定向到另外形式的流,但是在某些情況下也許直接從可讀流中讀取數(shù)據(jù)更有用。如下所示:

process.stdin.on("readable", function () {
    var buf = process.stdin.read();
    console.dir(buf);
});



$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js 



null

當(dāng)可讀流中有數(shù)據(jù)可讀取時(shí),流會(huì)觸發(fā)"readable" 事件,這樣就可以調(diào)用.read()方法來(lái)讀取相關(guān)數(shù)據(jù),當(dāng)可讀流中沒有數(shù)據(jù)可讀取時(shí),.read() 會(huì)返回null,這樣就可以結(jié)束.read() 的調(diào)用, 等待下一次"readable" 事件的觸發(fā)。下面是一個(gè)使用.read(n)從標(biāo)準(zhǔn)輸入每次讀取3個(gè)字節(jié)的例子:

process.stdin.on("readable", function () {
    var buf = process.stdin.read(3);
    console.dir(buf);
});

如下運(yùn)行程序發(fā)現(xiàn),輸出結(jié)果并不完全!

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js 



這是應(yīng)為額外的數(shù)據(jù)數(shù)據(jù)留在流的內(nèi)部緩沖區(qū)里了,而我們需要通知流我們要讀取更多的數(shù)據(jù).read(0) 可以達(dá)到這個(gè)目的。

process.stdin.on("readable", function () {
    var buf = process.stdin.read(3);
    console.dir(buf);
    process.stdin.read(0);
});

這次運(yùn)行結(jié)果如下:

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js 



我們可以使用 .unshift() 將數(shù)據(jù)重新押回流數(shù)據(jù)隊(duì)列的頭部,這樣可以接續(xù)讀取押回的數(shù)據(jù)。如下面的代碼,會(huì)按行輸出標(biāo)準(zhǔn)輸入的內(nèi)容:

var offset = 0;

process.stdin.on("readable", function () {
    var buf = process.stdin.read();
    if (!buf) return;
    for (; offset < buf.length; offset++) {
        if (buf[offset] === 0x0a) {
            console.dir(buf.slice(0, offset).toString());
            buf = buf.slice(offset + 1);
            offset = 0;
            process.stdin.unshift(buf);
            return;
        }
    }
    process.stdin.unshift(buf);
});

$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js 
"hearties"
"heartiest"
"heartily"
"heartiness"
"heartiness"s"
"heartland"
"heartland"s"
"heartlands"
"heartless"
"heartlessly"

當(dāng)然,有很多模塊可以實(shí)現(xiàn)這個(gè)功能,如:split

writable streams

重寫 ._write(chunk, enc, next) 方法就可以接受一個(gè)readable stream的數(shù)據(jù)。

var Writable = require("stream").Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
    console.dir(chunk);
    next();
};

process.stdin.pipe(ws);

$ (echo beep; sleep 1; echo boop) | node write0.js 


第一個(gè)參數(shù)chunk是數(shù)據(jù)輸入者寫入的數(shù)據(jù)。第二個(gè)參數(shù)end是數(shù)據(jù)的編碼格式。第三個(gè)參數(shù)next(err)通過(guò)回調(diào)函數(shù)通知數(shù)據(jù)寫入者可以寫入更多的時(shí)間。如果readable stream寫入的是字符串,那么字符串會(huì)默認(rèn)轉(zhuǎn)換為Buffer,如果在創(chuàng)建流的時(shí)候設(shè)置Writable({ decodeStrings: false })參數(shù),那么不會(huì)做轉(zhuǎn)換。如果readable stream寫入的數(shù)據(jù)時(shí)對(duì)象,那么需要這樣創(chuàng)建writable stream

Writable({ objectMode: true })

寫數(shù)據(jù)到 writable stream

調(diào)用writable stream的.write(data)方法即可完成數(shù)據(jù)寫入。

process.stdout.write("beep boop
");

調(diào)用.end()方法通知writable stream 數(shù)據(jù)已經(jīng)寫入完成。

var fs = require("fs");
var ws = fs.createWriteStream("message.txt");

ws.write("beep ");

setTimeout(function () {
    ws.end("boop
");
}, 1000);

$ node writing1.js 
$ cat message.txt
beep boop

如果需要設(shè)置writable stream的緩沖區(qū)的大小,那么在創(chuàng)建流的時(shí)候,需要設(shè)置opts.highWaterMark,這樣如果緩沖區(qū)里的數(shù)據(jù)超過(guò)opts.highWaterMark,.write(data)方法會(huì)返回false。當(dāng)緩沖區(qū)可寫的時(shí)候,writable stream會(huì)觸發(fā)"drain" 事件。

classic streams

Classic streams比較老的接口了,最早出現(xiàn)在node 0.4版本中,但是了解一下其運(yùn)行原理還是十分有好處的。當(dāng)一個(gè)流被注冊(cè)了"data"事件的回到函數(shù),那么流就會(huì)工作在老版本模式下,即會(huì)使用老的API。

classic readable streams

Classic readable streams事件就是一個(gè)事件觸發(fā)器,如果Classic readable streams有數(shù)據(jù)可讀取,那么其觸發(fā) "data" 事件,等到數(shù)據(jù)讀取完畢時(shí),會(huì)觸發(fā)"end" 事件。.pipe() 方法通過(guò)檢查stream.readable的值確定流是否有數(shù)據(jù)可讀。下面是一個(gè)使用Classic readable streams打印A-J字母的例子:

var Stream = require("stream");
var stream = new Stream;
stream.readable = true;

var c = 64;
var iv = setInterval(function () {
    if (++c >= 75) {
        clearInterval(iv);
        stream.emit("end");
    }
    else stream.emit("data", String.fromCharCode(c));
}, 100);

stream.pipe(process.stdout);


$ node classic0.js
ABCDEFGHIJ

如果要從classic readable stream中讀取數(shù)據(jù),注冊(cè)"data""end"兩個(gè)事件的回調(diào)函數(shù)即可,代碼如下:

process.stdin.on("data", function (buf) {
    console.log(buf);
});
process.stdin.on("end", function () {
    console.log("__END__");
});


$ (echo beep; sleep 1; echo boop) | node classic1.js 


__END__

需要注意的是如果你使用這種方式讀取數(shù)據(jù),那么會(huì)失去使用新接口帶來(lái)的好處。比如你在往一個(gè)
延遲非常大的流寫數(shù)據(jù)時(shí),需要注意讀取數(shù)據(jù)和寫數(shù)據(jù)的平衡問(wèn)題,否則會(huì)導(dǎo)致大量數(shù)據(jù)緩存在內(nèi)
存中,導(dǎo)致浪費(fèi)大量?jī)?nèi)存。一般這時(shí)候強(qiáng)烈建議使用流的.pipe()方法,這樣就不用自己監(jiān)聽"data"
"end"事件了,也不用擔(dān)心讀寫不平衡的問(wèn)題了。當(dāng)然你也可以用 through代替自己監(jiān)聽"data""end" 事件,如下面的代碼:

var through = require("through");
process.stdin.pipe(through(write, end));

function write (buf) {
    console.log(buf);
}
function end () {
    console.log("__END__");
}

$ (echo beep; sleep 1; echo boop) | node through.js 


__END__

或者也可以使用concat-stream來(lái)緩存整個(gè)流的內(nèi)容:

var concat = require("concat-stream");
process.stdin.pipe(concat(function (body) {
    console.log(JSON.parse(body));
}));


$ echo "{"beep":"boop"}" | node concat.js 
{ beep: "boop" }

當(dāng)然如果你非要自己監(jiān)聽"data""end"事件,那么你可以在寫數(shù)據(jù)的流寫的 時(shí)候使用.pause()方法暫停Classic readable streams繼續(xù)觸發(fā)"data" 事件。等到寫數(shù)據(jù)的流可寫的時(shí)候再使用.resume() 方法通知流繼續(xù)觸發(fā)"data" 事件繼續(xù)讀取數(shù)據(jù)。

classic writable streams

Classic writable streams 非常簡(jiǎn)單。只有 .write(buf), .end(buf).destroy()三個(gè)方法。.end(buf) 方法的buf參數(shù)是可選的,如果選擇該參數(shù),相當(dāng)于stream.write(buf); stream.end() 這樣的操作,需要注意的是當(dāng)流的緩沖區(qū)寫滿即流不可寫時(shí).write(buf)方法會(huì)返回false,如果流再次可寫時(shí),流會(huì)觸發(fā)drain事件。

transform

transform是一個(gè)對(duì)讀入數(shù)據(jù)過(guò)濾然輸出的流。

duplex

duplex stream是一個(gè)可讀也可寫的雙向流,如下面的a就是一個(gè)duplex stream:

a.pipe(b).pipe(a)

read more

core stream documentation

You can use the readable-stream

本文翻譯來(lái)自 https://github.com/substack/stream-handb...

本文轉(zhuǎn)載來(lái)自 http://www.open-open.com/lib/view/open13...

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/79078.html

相關(guān)文章

  • Node.js 中的緩沖區(qū)(Buffer)究竟是什么?

    摘要:在創(chuàng)建時(shí)大小已經(jīng)被確定且是無(wú)法調(diào)整的,在內(nèi)存分配這塊是由層面提供而不是具體后面會(huì)講解。在這里不知道你是否認(rèn)為這是很簡(jiǎn)單的但是上面提到的一些關(guān)鍵詞二進(jìn)制流緩沖區(qū),這些又都是什么呢下面嘗試做一些簡(jiǎn)單的介紹。 showImg(https://segmentfault.com/img/remote/1460000019894717?w=1280&h=850); 多數(shù)人都擁有自己不了解的能力和機(jī)...

    scwang90 評(píng)論0 收藏0
  • Node.js 中度體驗(yàn)

    摘要:創(chuàng)建簡(jiǎn)單應(yīng)用使用指令來(lái)載入模塊創(chuàng)建服務(wù)器使用方法創(chuàng)建服務(wù)器,并使用方法綁定端口。全局安裝將安裝包放在下。的核心就是事件觸發(fā)與事件監(jiān)聽器功能的封裝。通常我們用于從一個(gè)流中獲取數(shù)據(jù)并將數(shù)據(jù)傳遞到另外一個(gè)流中。壓縮文件為文件壓縮完成。 創(chuàng)建簡(jiǎn)單應(yīng)用 使用 require 指令來(lái)載入 http 模塊 var http = require(http); 創(chuàng)建服務(wù)器 使用 http.create...

    CastlePeaK 評(píng)論0 收藏0
  • 【開源】小程序And公眾號(hào)商城,外加后臺(tái),功能齊全!

    摘要:前言一個(gè)集微信公眾號(hào)商城小程序商城商城后臺(tái)的一個(gè)開源項(xiàng)目,后臺(tái)是基于開發(fā)的,是一個(gè)簡(jiǎn)潔而強(qiáng)大的開源微信公眾平臺(tái)開發(fā)框架,微信功能插件化開發(fā)多公眾號(hào)管理配置簡(jiǎn)單。微信小程序項(xiàng)目下載整個(gè)包之后,進(jìn)行根目錄文件夾。 前言 一個(gè)集微信公眾號(hào)商城/小程序商城/商城后臺(tái)的一個(gè)開源項(xiàng)目,后臺(tái)是基于WeiPHP5.0開發(fā)的,WeiPHP是一個(gè)簡(jiǎn)潔而強(qiáng)大的開源微信公眾平臺(tái)開發(fā)框架,微信功能插件化開發(fā),多...

    VishKozus 評(píng)論0 收藏0
  • 【開源】小程序And公眾號(hào)商城,外加后臺(tái),功能齊全!

    摘要:前言一個(gè)集微信公眾號(hào)商城小程序商城商城后臺(tái)的一個(gè)開源項(xiàng)目,后臺(tái)是基于開發(fā)的,是一個(gè)簡(jiǎn)潔而強(qiáng)大的開源微信公眾平臺(tái)開發(fā)框架,微信功能插件化開發(fā)多公眾號(hào)管理配置簡(jiǎn)單。微信小程序項(xiàng)目下載整個(gè)包之后,進(jìn)行根目錄文件夾。 前言 一個(gè)集微信公眾號(hào)商城/小程序商城/商城后臺(tái)的一個(gè)開源項(xiàng)目,后臺(tái)是基于WeiPHP5.0開發(fā)的,WeiPHP是一個(gè)簡(jiǎn)潔而強(qiáng)大的開源微信公眾平臺(tái)開發(fā)框架,微信功能插件化開發(fā),多...

    linkFly 評(píng)論0 收藏0
  • 深入淺出nodeJS - 4 - (玩轉(zhuǎn)進(jìn)程、測(cè)試、產(chǎn)品化)

    摘要:進(jìn)程間通信的目的是為了讓不同的進(jìn)程能夠互相訪問(wèn)資源,并進(jìn)程協(xié)調(diào)工作。這個(gè)過(guò)程的示意圖如下端口共同監(jiān)聽集群穩(wěn)定之路進(jìn)程事件自動(dòng)重啟負(fù)載均衡狀態(tài)共享模塊工作原理事件二測(cè)試單元測(cè)試性能測(cè)試三產(chǎn)品化項(xiàng)目工程化部署流程性能日志監(jiān)控報(bào)警穩(wěn)定性異構(gòu)共存 內(nèi)容 9.玩轉(zhuǎn)進(jìn)程10.測(cè)試11.產(chǎn)品化 一、玩轉(zhuǎn)進(jìn)程 node的單線程只不過(guò)是js層面的單線程,是基于V8引擎的單線程,因?yàn)?,V8的緣故,前后...

    henry14 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<