摘要:傳的最后一次參數(shù)是一個回調(diào)函數(shù),當(dāng)命令成功或失敗之后會立即被調(diào)用?;卣{(diào)函數(shù)中,我們明確地處理連接錯誤的情況,設(shè)置狀態(tài)為,并再次調(diào)用重連。如果沒有發(fā)生錯誤,調(diào)用回調(diào)函數(shù)結(jié)束當(dāng)前工作項目。嘗試連接的時候,使用增加每次重連的時間間隔。
Node.js 中的隊列
本文轉(zhuǎn)載自:眾成翻譯
譯者:文藺
鏈接:http://www.zcfy.cc/article/662
原文:http://blog.yld.io/2016/05/10/introducing-queues/
這是深入探索 Node.js 中使用工作隊列(work queues)管理異步工作流的系列文章的第一篇,來自the Node Patterns series。
開始享受吧!
很常見的是,在應(yīng)用程序流中,應(yīng)用有著可以異步處理的工作負(fù)載。一個常見的例子是發(fā)送郵件。比方說,新用戶注冊時,可能需要給 Ta 發(fā)送一封確認(rèn)郵件來確認(rèn)用戶剛剛輸入的 email 地址是 Ta 自己的。這包括從模板中生成消息,向電子郵件服務(wù)提供商發(fā)送請求,解析結(jié)果,處理任何可能發(fā)送的最終錯誤,重試,等等…… 這個流程可能比較復(fù)雜,容易出錯,或者在 HTTP 服務(wù)器的周期中花費(fèi)太長時間。不過也有另外一個選擇,可以向持久化存儲中插入一個文檔,該文檔描述我們有一條待發(fā)送給這個用戶的消息。另一個進(jìn)程可能拿到這個文檔,做一些比較重的工作:從模板生成消息,向服務(wù)器發(fā)送請求,解析錯誤,并在必要的情況下重排這個工作。
此外,系統(tǒng)需要和其他系統(tǒng)整合的情況也很常見。在我曾做過的一些項目中,需要不同的系統(tǒng)之間的進(jìn)行用戶配置文件的雙向同步:當(dāng)用戶在一個系統(tǒng)中更新了個人資料,這些變化需要傳遞給其他系統(tǒng),反之亦然。如果兩個系統(tǒng)之間不需要很強(qiáng)的一致性,資料同步之間有一個小的延遲也許是可接受的,那這項工作就可以使用另一個進(jìn)程異步處理。
更一般地說,在整個系統(tǒng)中有一個工作隊列將工作生產(chǎn)者和消費(fèi)者分開,這是一種常見的模式。生產(chǎn)者往工作隊列中插入工作,消費(fèi)者從隊列中拿到工作并執(zhí)行需要的任務(wù)。
使用這樣的拓?fù)浣Y(jié)構(gòu)有許多原因和優(yōu)點,如:
解耦工作生產(chǎn)者和消費(fèi)者
使重試邏輯更易于實現(xiàn)
跨時間分配工作負(fù)載
跨空間(nodes 節(jié)點)分配工作負(fù)載
異步工作
使外部系統(tǒng)更容易整合(最終的一致性)
讓我們來分析一下其中的一些問題吧。
獨(dú)立 (Isolate)發(fā)送郵件是許多應(yīng)用需要做的工作。一個例子是,用戶修改了密碼,一些應(yīng)用很友好地發(fā)送郵件通知用戶有人(最好不是其他人)修改了密碼。現(xiàn)在發(fā)送郵件,通常是通過調(diào)用第三方郵件提供商提供的 HTTP API來完成的。如果服務(wù)緩慢或無法訪問時候會怎樣?你可不想就因為一封郵件發(fā)布出去就把密碼給回滾了。當(dāng)然,你也不想就因為在處理請求失敗時碰到了工作中的一個非重要的部分,使得密碼更改請求就這樣崩掉了。密碼修改后希望可以很快就發(fā)送出這封郵件,但不能有如此的代價。
重試 (Retry)還有,修改密碼意味著,你要為這個用戶在兩個系統(tǒng)中都做更改:一個中央用戶數(shù)據(jù)庫和一個遺留系統(tǒng)(legacy system)。(我知道這很惡心啊,不過我可不止見過一次 —— 現(xiàn)實就這么骨感。)假如第一個成功了、第二個失敗了,咋辦?
在這些情形下,你可以想一直重試直至成功:在遺留系統(tǒng)中更改密碼是一個可以多次重復(fù)的結(jié)果相同的操作,而郵件也可以重復(fù)發(fā)送多次。
分布及規(guī)模 (Distribute and scale)舉例子,假如遺留系統(tǒng)修改密碼了但未能成功返回通知,如果操作是冪等的,你可以稍后重試。
甚至,非冪等操作也可以從工作隊列處理中嘗到甜頭。比如,你可以將一次貨幣交易插入到工作隊列中 :給每次交易一個通用唯一標(biāo)識符(UUID, universal unique identifier),稍后接收交易請求的系統(tǒng)可以保證不會發(fā)生重復(fù)交易。
在這個例子中,你基本只需要擔(dān)心工作隊列提供的必要的持久性保證:如果系統(tǒng)故障,你希望將交易丟失的風(fēng)險降到最低。
另一個將工作生產(chǎn)者和消費(fèi)者解耦的原因是,你可能想將工作集群規(guī)?;喝绻蝿?wù)消耗大量資源,如果任務(wù)是重 CPU 型的或者需要大量內(nèi)存或操作系統(tǒng)資源,你可以將其與應(yīng)用其他部分分離出來,放到工作隊列中。
在任何應(yīng)用中,一些操作比其他的要重。這可能會在整個節(jié)點引入有差異的工作負(fù)載:一個不幸的節(jié)點可能因處理太多的高并發(fā)業(yè)務(wù)而負(fù)擔(dān)過重,而其它節(jié)點卻被閑置。使用工作隊列,將具體工作平均分配,可以將影響最小化。
工作隊列的另一個效果是吸收工作峰(absorb work peaks):你可以為工作集群計劃給定的最大容量,并確保容量永遠(yuǎn)不會超過。如果工作數(shù)量在短時間內(nèi)急劇上升,工作隊列完全可以解決,遠(yuǎn)離工作峰的壓力。
防止崩潰 (Survive crashes)系統(tǒng)監(jiān)控在這里起到重要作用:你應(yīng)當(dāng)持續(xù)監(jiān)控工作隊列的長度,工作時間(完成一項任務(wù)的時間),工作占用,以及容量,以確定在高峰時間保證令人滿意的操作時間需要的最佳、最小的資源。
如果你不需要以上任何一點東西,使用持久化工作隊列的一個理由是防止崩潰。即使是同一個進(jìn)程中的內(nèi)存隊列也能滿足你的應(yīng)用需求,持續(xù)的隊列使你的應(yīng)用在進(jìn)程重啟的時候更具彈性。
好了,理論講得差不多了 —— 我們來看具體實現(xiàn)。
最簡單的案例:內(nèi)存工作隊列(In-Memory Work Queue)可以設(shè)計出的最簡單的工作隊列是一個內(nèi)存隊列。實現(xiàn)內(nèi)存隊列可能是個學(xué)校的練習(xí)(留給讀者)。這里我們使用 Async 的 queue。
假設(shè)你在做的這個演示應(yīng)用和一個控制你的房子的硬件單元相連接。你的 Node.js 應(yīng)用和該單元通過一個串行端口對話,且有線協(xié)議只能同時接受一個掛起的命令。
這個協(xié)議被包裝在我們的 domotic.js 模塊中,模塊暴露三個函數(shù):
.connect() - 連接 domotic 模塊
.command() - 發(fā)送命令,等待響應(yīng)
.disconnect() - 切斷與模塊的連接
下面的代碼模擬了這樣一個模塊:
domotic.js:
exports.connect = connect; exports.command = command; exports.disconnect = disconnect; function connect(cb) { setTimeout(cb, 100); // simulate connection } function command(cmd, options, cb) { if (succeeds()) { setTimeout(cb, 100); // simulate command } else { setTimeout(function() { var err = Error("error connecting"); err.code = "ECONN"; cb(err); }, 100); } } function disconnect(cb) { if (cb) setTimeout(cb, 100); // simulate disconnection } function succeeds() { return Math.random() > 0.5; }
注意我們并沒有和任何 domotic 模塊交互;我們只是假裝,100 毫秒后成功調(diào)用回調(diào)函數(shù)。
同樣, .command 函數(shù)模擬了連接錯誤: 如果 succeeds() 返回 false,連接失敗,命令失敗,這有 50% 的可能性(我們的 domotic 串行連接很容易出錯)。這使我們能夠測試在發(fā)生失敗之后,我們的應(yīng)用是否會成功重連并重試命令。
然后我們新建另一個模塊,可以在隊列后面發(fā)出命令。
domotic_queue.js:
var async = require("async"); var Backoff = require("backoff"); var domotic = require("./domotic"); var connected = false; var queue = async.queue(work, 1); function work(item, cb) { ensureConnected(function() { domotic.command(item.command, item.options, callback); }); function callback(err) { if (err && err.code == "ECONN") { connected = false; work(item); } else cb(err); } } /// command exports.command = pushCommand; function pushCommand(command, options, cb) { var work = { command: command, options: options }; console.log("pushing command", work); queue.push(work, cb); } function ensureConnected(cb) { if (connected) { return cb(); } else { var backoff = Backoff.fibonacci(); backoff.on("backoff", connect); backoff.backoff(); } function connect() { domotic.connect(connected); } function connected(err) { if (err) { backoff.backoff(); } else { connected = true; cb(); } } } /// disconnect exports.disconnect = disconnect; function disconnect() { if (! queue.length()) { domotic.disconnect(); } else { console.log("waiting for the queue to drain before disonnecting"); queue.drain = function() { console.log("disconnecting"); domotic.disconnect(); }; } }
做了不少工作 —— 我們來一段段地分析。
var async = require("async"); var Backoff = require("backoff"); var domotic = require("./domotic");
這里我們引入了一些包:
async - 提供內(nèi)存隊列的實現(xiàn)
backoff - 讓我們增加每一次失敗后嘗試重新連接的時間間隔
./domotic - 模擬 domotic 的模塊
我們的模塊從連接斷開狀態(tài)開始啟動:
`var connected = false;`
建立我們的 async 隊列:
`var queue = async.queue(work, 1);`
這里提供一個叫做 worker 的工作函數(shù)(在代碼中進(jìn)一步定義的)和一個最大并發(fā)量 1。我們在這里強(qiáng)制設(shè)置,是因為我們定義了 domotic 模塊協(xié)議一次只允許一個命令。
然后定義 worker 函數(shù),它每次處理一個隊列元素:
function work(item, cb) { ensureConnected(function() { domotic.command(item.command, item.options, callback); }); function callback(err) { if (err && err.code == "ECONN") { connected = false; work(item); } else cb(err); } }
當(dāng)我們的 async 隊列加入另一個工作項目,會調(diào)用 work 函數(shù),傳遞該工作項目和一個當(dāng)工作完成時候為我們所調(diào)用的回調(diào)函數(shù)。
對每個工作項目來說,我們要確認(rèn)已經(jīng)連接了。一旦連接上,使用工作項目中會有的 command 和 options 屬性,來用 domotic 模塊來執(zhí)行命令。傳的最后一次參數(shù)是一個回調(diào)函數(shù),當(dāng)命令成功或失敗之后會立即被調(diào)用。
回調(diào)函數(shù)中,我們明確地處理連接錯誤的情況,設(shè)置 connected 狀態(tài)為 false,并再次調(diào)用 work重連。
如果沒有發(fā)生錯誤,調(diào)用回調(diào)函數(shù) cb 結(jié)束當(dāng)前工作項目。
function ensureConnected(cb) { if (connected) { return cb(); } else { var backoff = Backoff.fibonacci(); backoff.on("backoff", connect); backoff.backoff(); } function connect() { domotic.connect(connected); } function connected(err) { if (err) { backoff.backoff(); } else { connected = true; cb(); } } }
ensureConnected 函數(shù)現(xiàn)在負(fù)責(zé)處于連接狀態(tài)時調(diào)用回調(diào)或相反情況下嘗試連接。嘗試連接的時候,使用 backoff 增加每次重連的時間間隔。 每次 domotic.connect 函數(shù)帶著錯誤被調(diào)用,在 backoff 事件觸發(fā)之前增加間隔時間。觸發(fā) backoff 時,嘗試連接。一旦連接成功,調(diào)用 cb 回調(diào);否則保持重試。
這個模塊暴露一個 .command 函數(shù):
/// command exports.command = pushCommand; function pushCommand(command, options, cb) { var work = { command: command, options: options }; console.log("pushing command", work); queue.push(work, cb); }
這個命令簡單的解析一個工作項目并將其推入隊列。
最后,這個模塊同樣暴露出 .disconnect 函數(shù)。
/// disconnect exports.disconnect = disconnect; function disconnect() { if (! queue.length()) { domotic.disconnect(); } else { console.log("waiting for the queue to drain before disonnecting"); queue.drain = function() { console.log("disconnecting"); domotic.disconnect(); }; } }
這里我們只是確保在調(diào)用 domotic 模塊的 disconnected 方法之前隊列是空的。如果隊列非空,在真正斷開連接之前會等待其耗盡(drain)。
可選:在隊列未被耗盡的情況下,您可以設(shè)置一個超時時間,然后強(qiáng)制斷開連接。
然后我們來新建一個 domotic 客戶端:
client.js:
var domotic = require("./domotic_queue"); for(var i = 0 ; i < 20; i ++) { domotic.command("toggle light", i, function(err) { if (err) throw err; console.log("command finished"); }); } domotic.disconnect();
這里我們并行得向 domotic 模塊添加了 20 個 settime 命令,同時傳遞了回調(diào)函數(shù),當(dāng)命令完成時就會被調(diào)用。如果有命令出錯,簡單地拋出錯誤并中斷執(zhí)行。
添加所有命令之后我們馬上斷開連接,不過模塊會等待所有命令被執(zhí)行之后才會真正將其斷開。
讓我們在命令行中試一下:
$ node client.js pushing command { command: "toggle light", options: 0 } pushing command { command: "toggle light", options: 1 } pushing command { command: "toggle light", options: 2 } pushing command { command: "toggle light", options: 3 } pushing command { command: "toggle light", options: 4 } pushing command { command: "toggle light", options: 5 } pushing command { command: "toggle light", options: 6 } pushing command { command: "toggle light", options: 7 } pushing command { command: "toggle light", options: 8 } pushing command { command: "toggle light", options: 9 } pushing command { command: "toggle light", options: 10 } pushing command { command: "toggle light", options: 11 } pushing command { command: "toggle light", options: 12 } pushing command { command: "toggle light", options: 13 } pushing command { command: "toggle light", options: 14 } pushing command { command: "toggle light", options: 15 } pushing command { command: "toggle light", options: 16 } pushing command { command: "toggle light", options: 17 } pushing command { command: "toggle light", options: 18 } pushing command { command: "toggle light", options: 19 } waiting for the queue to drain before disonnecting command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished disconnecting
這里我們可以看到,所有命令被立即放到隊列中,并且命令是被一些隨機(jī)時間間隔著有序完成的。最后,所有命令完成之后連接切斷。
下一篇文章本系列的下一篇文章,我們將探索如何避免崩潰以及通過持久化工作項目來限制內(nèi)存影響。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/79779.html
摘要:文件系統(tǒng)請求和相關(guān)請求都會放進(jìn)這個線程池處理其他的請求,如網(wǎng)絡(luò)平臺特性相關(guān)的請求會分發(fā)給相應(yīng)的系統(tǒng)處理單元參見設(shè)計概覽。 譯者按:在 Medium 上看到這篇文章,行文脈絡(luò)清晰,闡述簡明利落,果斷點下翻譯按鈕。第一小節(jié)背景鋪陳略啰嗦,可以略過。剛開始我給這部分留了個 blah blah blah 直接翻后面的,翻完之后回頭看,考慮完整性才把第一節(jié)給補(bǔ)上。接下來的內(nèi)容干貨滿滿,相信對 N...
摘要:正在失業(yè)中的課多周刊第期我們的微信公眾號,更多精彩內(nèi)容皆在微信公眾號,歡迎關(guān)注。若有幫助,請把課多周刊推薦給你的朋友,你的支持是我們最大的動力。是一種禍害譯本文淺談了在中關(guān)于的不好之處。淺談超時一運(yùn)維的排查方式。 正在失業(yè)中的《課多周刊》(第3期) 我們的微信公眾號:fed-talk,更多精彩內(nèi)容皆在微信公眾號,歡迎關(guān)注。 若有幫助,請把 課多周刊 推薦給你的朋友,你的支持是我們最大的...
摘要:正在失業(yè)中的課多周刊第期我們的微信公眾號,更多精彩內(nèi)容皆在微信公眾號,歡迎關(guān)注。若有幫助,請把課多周刊推薦給你的朋友,你的支持是我們最大的動力。是一種禍害譯本文淺談了在中關(guān)于的不好之處。淺談超時一運(yùn)維的排查方式。 正在失業(yè)中的《課多周刊》(第3期) 我們的微信公眾號:fed-talk,更多精彩內(nèi)容皆在微信公眾號,歡迎關(guān)注。 若有幫助,請把 課多周刊 推薦給你的朋友,你的支持是我們最大的...
原文 先說1.1總攬: Reactor模式 Reactor模式中的協(xié)調(diào)機(jī)制Event Loop Reactor模式中的事件分離器Event Demultiplexer 一些Event Demultiplexer處理不了的復(fù)雜I/O接口比如File I/O、DNS等 復(fù)雜I/O的解決方案 未完待續(xù) 前言 nodejs和其他編程平臺的區(qū)別在于如何去處理I/O接口,我們聽一個人介紹nodejs,總是...
閱讀 2097·2021-10-08 10:21
閱讀 2490·2021-09-29 09:34
閱讀 3505·2021-09-22 15:51
閱讀 4946·2021-09-22 15:46
閱讀 2324·2021-08-09 13:42
閱讀 3447·2019-08-30 15:52
閱讀 2733·2019-08-29 17:13
閱讀 1564·2019-08-29 11:30