摘要:而在進(jìn)程執(zhí)行把進(jìn)程添加到調(diào)度器中時(shí)添加了一個(gè)回調(diào)函數(shù),回調(diào)函數(shù)了一個(gè)帶的消息,并且為,就是這個(gè)消息觸發(fā)了發(fā)送的函數(shù)的執(zhí)行。
最近做了點(diǎn)nodejs項(xiàng)目,對(duì)nodejs的cluster怎么利用多進(jìn)程處理請(qǐng)求產(chǎn)生了疑問(wèn),于是著手進(jìn)行了研究,之后發(fā)現(xiàn)這其中竟大有文章!一切還是先從遙遠(yuǎn)的TCP說(shuō)起吧。。。
TCP與Socket說(shuō)到TCP,相信很多人都相當(dāng)了解了,大學(xué)已經(jīng)教過(guò),但是又相信有很多人也不是很了解,要不是當(dāng)時(shí)沒(méi)聽(tīng),要不也可能是自身的編程能力不足以去實(shí)踐相關(guān)內(nèi)容,寫(xiě)到這我還特意去翻了一下大學(xué)的計(jì)算機(jī)網(wǎng)絡(luò)教材,內(nèi)容是很豐富的,但教人實(shí)踐的內(nèi)容還是太少了,里面的內(nèi)容都把學(xué)生當(dāng)成了有相當(dāng)?shù)腖inux編程能力的人了,所以結(jié)果就是大部分只上了一年編程課剛學(xué)會(huì)幾個(gè)Hello world程序的大二學(xué)生,聽(tīng)了這門(mén)課后一臉懵逼,即使記住了也因?yàn)闆](méi)什么實(shí)踐很快忘了,當(dāng)年我就是這么懵逼過(guò)來(lái)的。
所以,扯了這些,結(jié)果是什么呢,結(jié)果就是我們要多動(dòng)手!而要?jiǎng)邮纸⒁粭lTCP連接可以用socket來(lái)實(shí)現(xiàn),不過(guò)這里不是要說(shuō)socket用法,只是來(lái)簡(jiǎn)單聊一聊他們之間的一點(diǎn)小聯(lián)系,以便于理解后面的內(nèi)容。
應(yīng)用層通過(guò)傳輸層進(jìn)行TCP通信時(shí),有時(shí)TCP需要為多個(gè)應(yīng)用程序進(jìn)程提供并發(fā)服務(wù)。多個(gè)TCP連接或多個(gè)應(yīng)用程序進(jìn)程可能需要通過(guò)同一個(gè)TCP協(xié)議端口傳輸數(shù)據(jù)。為了區(qū)別不同的應(yīng)用程序進(jìn)程和連接,許多計(jì)算機(jī)操作系統(tǒng)為應(yīng)用程序與TCP協(xié)議交互提供了稱為套接字 (Socket)的接口,區(qū)分不同應(yīng)用程序進(jìn)程間的網(wǎng)絡(luò)通信和連接。
我們可以用一個(gè)四元組來(lái)確定一條TCP連接(源ip,源端口,目標(biāo)ip,目標(biāo)端口),而連接是通過(guò)socket來(lái)建立的(服務(wù)端進(jìn)行bind和listen->客戶端發(fā)起connect->服務(wù)端accept),計(jì)算機(jī)系統(tǒng)就是通過(guò)socket來(lái)區(qū)分不同的TCP連接的。所以我們可以看出來(lái),只要目標(biāo)ip/端口不同,服務(wù)端可以用同一個(gè)端口生成多個(gè)socket,建立多條連接。
但是,一個(gè)進(jìn)程只能監(jiān)聽(tīng)一個(gè)端口,一個(gè)端口怎么生成多個(gè)socket呢?其實(shí)服務(wù)器端程序一般會(huì)把socket和服務(wù)器某個(gè)端口(ip+端口)bind起來(lái), 這樣構(gòu)成了一個(gè)特殊的socket, 這個(gè)socket沒(méi)有目標(biāo)ip和端口。socket進(jìn)行l(wèi)isten之后當(dāng)有新的連接進(jìn)來(lái)時(shí), 系統(tǒng)將請(qǐng)求存進(jìn)隊(duì)列(此時(shí)TCP握三次手完成), 后續(xù)可以再調(diào)用accept拿到隊(duì)列的請(qǐng)求,返回一個(gè)新的socket, 這個(gè)socket是由四元組建立的, 也就對(duì)應(yīng)了一個(gè)唯一的連接。
說(shuō)完這些,可以來(lái)聊一聊nodejs是怎樣建立一個(gè)TCP服務(wù)的了。
nodejs createServer啟動(dòng)TCP服務(wù)小解析一般我們用nodejs啟動(dòng)一個(gè)TCP服務(wù)可能是這樣的:
require("net").createServer(function(sock) { sock.on("data", function(data) { sock.write("Hello world"); }); }).listen(8080, "127.0.0.1");
進(jìn)到createServer一看(代碼都在net模塊中),里面return了一個(gè)Server對(duì)象,Server繼承EventEmitter,將createServer的參數(shù)做為connection事件的回調(diào)函數(shù),這塊比較簡(jiǎn)單就不貼代碼了。我們需要關(guān)注的是Server的listen方法,其不同的參數(shù)最終都會(huì)調(diào)用到listenInCluster方法。cluster!是的這和cluster有關(guān),但先不管它,我們先管在主進(jìn)程中它的執(zhí)行:
function listenInCluster(server, address, port, addressType, backlog, fd, exclusive) { // ... if (cluster.isMaster || exclusive) { // ... server._listen2(address, port, addressType, backlog, fd); return; } // ... }
從代碼我們可以看到listenInCluster最終是調(diào)用了_listen2方法,它就是服務(wù)啟動(dòng)的關(guān)鍵,其定義如下:
function setupListenHandle(address, port, addressType, backlog, fd) { // ... var rval = null; // ... if (rval === null) rval = createServerHandle(address, port, addressType, fd); // ... this._handle = rval; // ... this._handle.onconnection = onconnection; this._handle.owner = this; var err = this._handle.listen(backlog || 511); // ... }
其中createServerHandle方法就不展開(kāi)了,它就如之前所說(shuō)的:把socket和服務(wù)器某個(gè)端口(ip+端口)bind起來(lái), 這樣構(gòu)成了一個(gè)特殊的socket, 這個(gè)socket沒(méi)有目標(biāo)ip和端口。它綁定了address+port并返回了一個(gè)特殊socket(句柄)rval,可以看到最后它調(diào)用了listen對(duì)端口進(jìn)行監(jiān)聽(tīng),并且指定了一個(gè)回調(diào)函數(shù)onconnection,函數(shù)會(huì)在C++層當(dāng)accept請(qǐng)求時(shí)觸發(fā),其回調(diào)參數(shù)之一就是前面提到的accept后與客戶端連接的新socket句柄。到這里再看一下onconnection的代碼:
function onconnection(err, clientHandle) { // ... var self = handle.owner; var socket = new Socket({ handle: clientHandle, allowHalfOpen: self.allowHalfOpen, pauseOnCreate: self.pauseOnConnect }); socket.readable = socket.writable = true; // ... self.emit("connection", socket); }
可以看到nodejs在對(duì)socket句柄進(jìn)一步封裝后(封裝成nodejs的Socket對(duì)象),再觸發(fā)server(由createServer創(chuàng)建)的connection事件。這時(shí)我們?cè)倩氐角懊?b>createServer的介紹,其監(jiān)聽(tīng)了connection事件,所以最終流程走下來(lái)createServer的的方法參數(shù)將被觸發(fā),并且可以拿到一個(gè)nodejs的Socket對(duì)象進(jìn)行write與read操作,與客戶端進(jìn)行通信。
至此我們已經(jīng)對(duì)nodejs啟動(dòng)一個(gè)TCP服務(wù)的流程有了了解,接下來(lái)就到主題cluster了。
cluster為我們做了什么開(kāi)始說(shuō)代碼之前,先來(lái)聊一聊喂鴿子吧。假設(shè)你坐在布拉格廣場(chǎng)前靜靜地坐著,然后往前面撒了一把狗糧,喔不對(duì)是鴿糧,然后周圍的一群鴿子都震驚了并往你這邊飛搶東西吃。這個(gè)現(xiàn)象可以用一個(gè)詞來(lái)形容就是“驚群“。然而這只是我的瞎掰,我們程序員理解的驚群應(yīng)該是:多個(gè)進(jìn)程/線程同時(shí)阻塞等待某個(gè)事件,當(dāng)事件發(fā)生時(shí)喚醒了所有等待的進(jìn)程/線程,但最終只有一個(gè)能對(duì)事件進(jìn)行處理。很明顯這對(duì)cpu造成了浪費(fèi),而cluster的多進(jìn)程模型對(duì)此做了處理:只用一個(gè)master進(jìn)程等待請(qǐng)求,然后有請(qǐng)求到來(lái)時(shí)使用round-robin輪詢分配請(qǐng)求給各個(gè)子進(jìn)程進(jìn)行處理,這塊后面提到的源碼會(huì)涉及到,這里就不深入了。除了round-robin,還有其他的一些cluster為我們做的,就用代碼來(lái)talk吧:
const cluster = require("cluster"); const http = require("http"); if (cluster.isMaster) { const numCPUs = require("os").cpus().length; for (let i = 0; i < numCPUs; i++) { cluster.fork(); } } else { // Worker processes have a http server. http.Server((req, res) => { res.writeHead(200); res.end("hello world "); }).listen(8000); }
以上代碼就是cluster的典型用法,在nodejs啟動(dòng)文件判斷當(dāng)前進(jìn)程,如果當(dāng)前進(jìn)程是master進(jìn)程,那么就根據(jù)cpu的核數(shù)fork出相同數(shù)量的進(jìn)程,否則(worker進(jìn)程)就啟動(dòng)一個(gè)http服務(wù),所以一般這樣會(huì)給一個(gè)核心分配一個(gè)worker進(jìn)程來(lái)啟動(dòng)一個(gè)服務(wù),搭起一個(gè)小服務(wù)集群。但是問(wèn)題來(lái)了,為什么這里可以有多個(gè)進(jìn)程同時(shí)監(jiān)聽(tīng)一個(gè)端口呢,是因?yàn)閘isten做的一些文章,下面再一步步深入解析。由于http.Server其實(shí)是繼承了net.Server,所以跟前面創(chuàng)建TCP服務(wù)一樣,listen最終也是調(diào)用到listenInCluster,我們從這里重新開(kāi)始。
function listenInCluster(server, address, port, addressType, backlog, fd, exclusive) { // ... const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags: 0 }; // Get the master"s server handle, and listen on it cluster._getServer(server, serverQuery, listenOnMasterHandle); // ... }
listenInCluster在worker進(jìn)程中調(diào)用cluster._getServer,并且傳入了一個(gè)函數(shù)listenOnMasterHandle。這里還不知道它做了什么,所以再進(jìn)入cluster._getServer看看(由于當(dāng)前是在worker進(jìn)程,cluster模塊文件是lib/internal/cluster/child.js):
cluster._getServer = function(obj, options, cb) { // ... const message = util._extend({ act: "queryServer", index: indexes[indexesKey], data: null }, options); send(message, (reply, handle) => { if (typeof obj._setServerData === "function") obj._setServerData(reply.data); if (handle) shared(reply, handle, indexesKey, cb); // Shared listen socket. else rr(reply, indexesKey, cb); // Round-robin. }); // ... };
關(guān)注send方法,它調(diào)用了sendHelper方法,該方法是在internal/cluster/utils定義的,相當(dāng)一個(gè)消息轉(zhuǎn)發(fā)器處理進(jìn)程間通信,它發(fā)送一個(gè)“進(jìn)程內(nèi)部消息“(internalMessage),而worker進(jìn)程在master進(jìn)程被fork出來(lái)的時(shí)候監(jiān)聽(tīng)了internalMessage:
// lib/internal/cluster/master.js worker.process.on("internalMessage", internal(worker, onmessage));
所以最終在worker進(jìn)程發(fā)送的消息,觸發(fā)了master進(jìn)程執(zhí)行了onmessage方法,onmessage判斷message.act === "queryServer"執(zhí)行queryServer,而就是在這個(gè)方法中,新建了一個(gè)RoundRobinHandle調(diào)度器,就是這個(gè)東西分配請(qǐng)求做了負(fù)載均衡。這里用地址和端口號(hào)作為key將調(diào)度器存儲(chǔ)起來(lái),調(diào)度器不會(huì)被worker創(chuàng)建兩次,最后將worker進(jìn)程add到隊(duì)列。相關(guān)代碼如下:
// lib/internal/cluster/master.js function queryServer(worker, message) { // ... var handle = handles[key]; if (handle === undefined) { var constructor = RoundRobinHandle; // ... handles[key] = handle = new constructor(key, message.address, message.port, message.addressType, message.fd, message.flags); } // ... // Set custom server data handle.add(worker, (errno, reply, handle) => { // ... }); }
然后我們?cè)賮?lái)看看RoundRobinHandle,它里面調(diào)用net.createServer方法新建了一個(gè)server,并且開(kāi)始監(jiān)聽(tīng),這塊可以看前面內(nèi)容。不過(guò)與前面不同的是,server在listening事件完成時(shí)拿到監(jiān)聽(tīng)端口的那個(gè)特殊socket句柄,重置了onconnection方法,當(dāng)新的連接建立時(shí)方法被調(diào)用,將accept連接的socket句柄分發(fā)到隊(duì)列里的worker進(jìn)行處理(distribute)。對(duì)于listening事件,它在Server.listen執(zhí)行后就會(huì)觸發(fā),代碼就在setupListenHandle方法里面。RoundRobinHandle代碼如下:
// lib/internal/cluster/round_robin_handle.js function RoundRobinHandle(key, address, port, addressType, fd) { // ... this.server = net.createServer(assert.fail); if (fd >= 0) this.server.listen({ fd }); else if (port >= 0) this.server.listen(port, address); else this.server.listen(address); // UNIX socket path. this.server.once("listening", () => { this.handle = this.server._handle; this.handle.onconnection = (err, handle) => this.distribute(err, handle); // ... }); } RoundRobinHandle.prototype.distribute = function(err, handle) { this.handles.push(handle); const worker = this.free.shift(); if (worker) this.handoff(worker); }; RoundRobinHandle.prototype.handoff = function(worker) { // ... const message = { act: "newconn", key: this.key }; sendHelper(worker.process, message, handle, (reply) => { // ... }); };
從代碼上看到最終調(diào)度器調(diào)用handoff方法,通過(guò)sendHelper向worker進(jìn)程發(fā)送一個(gè)新連接到達(dá)的消息newconn,執(zhí)行worker進(jìn)程的server的onconnection方法,worker進(jìn)程相關(guān)代碼如下:
// lib/internal/cluster/child.js cluster._setupWorker = function() { // ... process.on("internalMessage", internal(worker, onmessage)); send({ act: "online" }); function onmessage(message, handle) { if (message.act === "newconn") onconnection(message, handle); else if (message.act === "disconnect") _disconnect.call(worker, true); } }; // Round-robin connection. function onconnection(message, handle) { const key = message.key; const server = handles[key]; const accepted = server !== undefined; send({ ack: message.seq, accepted }); if (accepted) server.onconnection(0, handle); }
走到這里worker進(jìn)程的server就拿到了連接的socket句柄可以進(jìn)行處理,但是好像有點(diǎn)問(wèn)題,worker進(jìn)程的server好像還沒(méi)起起來(lái)啊,前面講的只是在master進(jìn)程的調(diào)度器啟動(dòng)了一個(gè)server,worker進(jìn)程并沒(méi)有server。我們又得翻回前面的內(nèi)容看一看了,看看之前提到的workder進(jìn)程的cluster._getServer,里面send方法發(fā)送了一個(gè)函數(shù),函數(shù)里面的rr(reply, indexesKey, cb);就是創(chuàng)建了workder進(jìn)程server的代碼。
先來(lái)看看cluster._getServer中發(fā)送的函數(shù)怎么被調(diào)用的。這里需要來(lái)了解一下之前出現(xiàn)了幾次的sendHelper,它是cluster模塊用來(lái)做進(jìn)程間通信的,另外還有一個(gè)internal方法用來(lái)處理通信的回調(diào)。cluster._getServer的send會(huì)調(diào)用sendHelper,它會(huì)用message.seq當(dāng)key把send的函數(shù)存儲(chǔ)起來(lái)。然后在internal方法處理通信的回調(diào)時(shí)判斷message是否有這個(gè)key,是否能找到這個(gè)函數(shù),可以的話就執(zhí)行。而在master進(jìn)程執(zhí)行queryServer把worker進(jìn)程添加到調(diào)度器中時(shí)添加了一個(gè)回調(diào)函數(shù),回調(diào)函數(shù)send了一個(gè)帶seq的消息,并且handle為null,就是這個(gè)消息觸發(fā)了cluster._getServer發(fā)送的函數(shù)的執(zhí)行。相關(guān)代碼如下:
// `internal/cluster/utils.js` const callbacks = {}; var seq = 0; function sendHelper(proc, message, handle, cb) { // ... if (typeof cb === "function") callbacks[seq] = cb; message.seq = seq; // ... } function internal(worker, cb) { return function onInternalMessage(message, handle) { // ... var fn = cb; if (message.ack !== undefined && callbacks[message.ack] !== undefined) { fn = callbacks[message.ack]; delete callbacks[message.ack]; } // ... }; } // lib/internal/cluster/master.js function queryServer(worker, message) { // ... // Set custom server data handle.add(worker, (errno, reply, handle) => { reply = util._extend({ // ... ack: message.seq, // ... }, reply); // ... send(worker, reply, handle); });
最終,rr(reply, indexesKey, cb);執(zhí)行,它構(gòu)造了一個(gè)假的socket句柄,句柄設(shè)置了一個(gè)不做操作的listen方法。然后執(zhí)行cb,這個(gè)cb也就是前面提到過(guò)的listenOnMasterHandle,它會(huì)把假socket句柄賦值給worker進(jìn)程的server._handle,隨后由于server._handle的存在,server._listen2(address, port, addressType, backlog, fd);也不會(huì)做任何操作,也就是說(shuō)worker進(jìn)程創(chuàng)建的server是不會(huì)對(duì)端口進(jìn)行監(jiān)聽(tīng)的。相關(guān)代碼如下:
// lib/internal/cluster/child.js function rr(message, indexesKey, cb) { function listen(backlog) { // ... return 0; } // ... cb(0, handle); } // lib/net.js function listenOnMasterHandle(err, handle) { // ... server._handle = handle; server._listen2(address, port, addressType, backlog, fd); } // setupListenHandle就是_listen2 function setupListenHandle(address, port, addressType, backlog, fd) { // ... if (this._handle) { debug("setupListenHandle: have a handle already"); } // ...
至此,cluster模塊如何建立多進(jìn)程服務(wù)的就算講完了。畫(huà)個(gè)草圖總結(jié)下吧:
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/89327.html
摘要:嚴(yán)格來(lái)說(shuō),并不是單線程的。其他異步和事件驅(qū)動(dòng)相關(guān)的線程通過(guò)來(lái)實(shí)現(xiàn)內(nèi)部的線程池和線程調(diào)度。線程是最小的進(jìn)程,因此也是單進(jìn)程的。子進(jìn)程中執(zhí)行的是非程序,提供一組參數(shù)后,執(zhí)行的結(jié)果以回調(diào)的形式返回。在子進(jìn)程中通過(guò)和的機(jī)制來(lái)接收和發(fā)送消息。 ??node遵循的是單線程單進(jìn)程的模式,node的單線程是指js的引擎只有一個(gè)實(shí)例,且在nodejs的主線程中執(zhí)行,同時(shí)node以事件驅(qū)動(dòng)的方式處理IO...
摘要:一般由客戶端發(fā)送,用來(lái)表示報(bào)文段中第一個(gè)數(shù)據(jù)字節(jié)在數(shù)據(jù)流中的序號(hào),主要用來(lái)解決網(wǎng)絡(luò)包亂序的問(wèn)題。為有效,為無(wú)效表示,當(dāng)數(shù)據(jù)包得到后,立馬給應(yīng)用程序使用到最頂端用來(lái)確保連接的安全。親,那進(jìn)程和線程區(qū)別是什么嘞這算是計(jì)算機(jī)的基本知識(shí)吧。 在正文之前,我想問(wèn)大家一個(gè)問(wèn)題:問(wèn):親,你有基礎(chǔ)嗎?答: 有啊,你說(shuō)前端嗎? 不就是HTML,JS,CSS 嗎? so easy~問(wèn): oh-my-zsh...
摘要:通常的解決方案,便是使用中自帶的模塊,以模式啟動(dòng)多個(gè)應(yīng)用實(shí)例。最后中的模塊除了上述提到的功能外,其實(shí)還提供了非常豐富的供和進(jìn)程之前通信,對(duì)于不同的操作系統(tǒng)平臺(tái),也提供了不同的默認(rèn)行為。如果大家有閑,非常推薦完整領(lǐng)略一下模塊的代碼實(shí)現(xiàn)。 眾所周知,Node.js中的JavaScript代碼執(zhí)行在單線程中,非常脆弱,一旦出現(xiàn)了未捕獲的異常,那么整個(gè)應(yīng)用就會(huì)崩潰。這在許多場(chǎng)景下,尤其是web...
摘要:通過(guò)將的給出來(lái)的進(jìn)程。恩吞吐率關(guān)于吞吐率有多種解讀,一種是描繪服務(wù)器單位時(shí)間處理請(qǐng)求的能力。而根據(jù)這個(gè)描述的話他的單位就為而這個(gè)指標(biāo)就是上面數(shù)據(jù)中的當(dāng)然,肯定是越大越好了吞吐量這個(gè)和上面的吞吐率很有點(diǎn)關(guān)系的。 首先鄭重聲明:nodeJS 是一門(mén)單線程!異步!非阻塞語(yǔ)言!nodeJS 是一門(mén)單線程!異步!非阻塞語(yǔ)言!nodeJS 是一門(mén)單線程!異步!非阻塞語(yǔ)言! 重要的事情說(shuō)3遍。 因?yàn)?..
閱讀 2248·2021-11-24 11:15
閱讀 3099·2021-11-24 10:46
閱讀 1400·2021-11-24 09:39
閱讀 3933·2021-08-18 10:21
閱讀 1488·2019-08-30 15:53
閱讀 1402·2019-08-30 11:19
閱讀 3335·2019-08-29 18:42
閱讀 2333·2019-08-29 16:58