摘要:并且指定收到消息,以及端口的監(jiān)聽方法。四代碼示例多房間實時聊天室配置版本須在里配置定義,并設(shè)置。使同一個的請求能夠落在同一個機器同一個進程中。通過主進程統(tǒng)一管理維護子進程,每個進程監(jiān)聽一個端口。
一、相關(guān)技術(shù)介紹:
消息實時推送,指的是將消息實時地推送到瀏覽器,用戶不需要刷新瀏覽器就可以實時獲取最新的消息,實時聊天室的技術(shù)原理也是如此。傳統(tǒng)的Web站點為了實現(xiàn)推送技術(shù),所用的技術(shù)都是輪詢,這種傳統(tǒng)的模式帶來很明顯的缺點,即瀏覽器需要不斷的向服務(wù)器發(fā)出請求。
短輪詢(Polling)
客戶端需要定時往瀏覽器輪詢發(fā)送請求,且只有當服務(wù)有數(shù)據(jù)更新后,客戶端的下一次輪詢請求才能拿到更新后的數(shù)據(jù),在數(shù)據(jù)更新前的多次請求相當于無效。這對帶寬資源造成了極大的浪費,若提高輪詢定時器時間,又會有數(shù)據(jù)更新不及時的煩惱。
commet
為了解決短輪詢的弊端,一種基于http長連接的"服務(wù)器推"方式被hack出來。其與短輪詢的區(qū)別主要是,采用commet時,客戶端與服務(wù)端保持一個長連接,當數(shù)據(jù)發(fā)生改變時,服務(wù)端主動將數(shù)據(jù)推送到客戶端。Comet 又可以被細分為兩種實現(xiàn)方式,一種是長輪詢機制,一種是流技術(shù)。
長輪詢
長輪詢跟短輪詢不同的地方是,客戶端往服務(wù)端發(fā)送請求后,服務(wù)端判斷是否有數(shù)據(jù)更新,若沒有,則將請求hold住,等待數(shù)據(jù)更新時,才返回響應(yīng)。這樣則避免了大量無效的http請求,但即使采用長輪詢方式,接受數(shù)據(jù)更新的最小時間間隔還是為2*RTT(往返時間)。
流技術(shù)
流技術(shù)(http stream)基于iframe實現(xiàn)。通過HTML標簽iframe src指向服務(wù)端,建立一個長連接。當有數(shù)據(jù)推送,則往客戶端返回,無須再請求。但流技術(shù)有個缺點就是,在瀏覽器頂部會一直出現(xiàn)頁面未加載完成的loading標示。
websocket
為了解決服務(wù)端如何更快地實時推送數(shù)據(jù)到客戶端以及以上推送方式技術(shù)的不足,HTML5中定義了Websocket協(xié)議,它是一種在單個TCP連接上進行全雙工通訊的協(xié)議。與http協(xié)議不同的請求/響應(yīng)模式不同,Websocket在建立連接之前有一個Handshake(Opening Handshake)過程,建立連接之后,雙方即可雙向通信。當然,由于websocket是html5新特性,在部分瀏覽器(IE10以下)是不支持的。
我們來看下websocket的握手報文:
請求報文:
GET /chat HTTP/1.1 Host: server.example.com Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ== Sec-WebSocket-Protocol: chat Sec-WebSocket-Version: 13 Origin: http://example.com
"Upgrade "、"Connection": 告訴服務(wù)器這個請求是一個websocket協(xié)議,需要區(qū)別處理
"Upgrade: websocket": 表明這是一個 WebSocket 類型請求,意在告訴 server 需要將通信協(xié)議切換到 WebSocket
"Sec-WebSocket-Key": 是 client 發(fā)送的一個 base64 編碼的密文,要求 server 必須返回一個對應(yīng)加密的 "Sec-WebSocket-Accept" 應(yīng)答,否則 client 會拋出 "Error during WebSocket handshake" 錯誤,并關(guān)閉連接
"Sec-WebSocket-Protocol":一個用戶定義的字符串,用來區(qū)分同URL下,不同的服務(wù)所需要的協(xié)議
"Sec-WebSocket-Version":Websocket Draft (協(xié)議版本)
響應(yīng)報文:
HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo= Sec-WebSocket-Protocol: chat
"Sec-WebSocket-Accept": 這個則是經(jīng)過服務(wù)器確認,并且加密過后的 Sec-WebSocket-Key。加密方式為將Sec-WebSocket-Key與一段固定的 GUID 字符串進行連接,然后進行SHA-1 hash,接著base64編碼得到。
socket.io(http://socket.io)
是一個完全由JavaScript實現(xiàn),基于Node.js、支持WebSocket的協(xié)議用于實時通信、跨平臺的開源框架。Socket.IO除了支持WebSocket通訊協(xié)議外,還支持許多種輪詢機制以及其它實時通信方式,并封裝成了通用的接口,并能夠根據(jù)瀏覽器對通訊機制的支持情況自動地選擇最佳的方式來實現(xiàn)網(wǎng)絡(luò)實時應(yīng)用。
首先,我們創(chuàng)建一個socket.io server對象,指定監(jiān)聽80端口。并且指定收到message消息,以及socket端口的監(jiān)聽方法。接著,當socket建立連接后,通過socket.emit方法,可以往客戶端發(fā)送消息。
var io = require("socket.io")(); io.on("connection", function(socket) { //接受消息 socket.on("message", function (msg) { console.log("receive messge : " + msg ); }); //發(fā)送消息 socket.emit("message", "hello"); //斷開連接回調(diào) socket.on("disconnect", function () { console.log("socket disconnect"); }); }); io.listen(80);
客戶端的代碼也非常簡單,只要引入socket.io對應(yīng)的客戶端庫(https://github.com/socketio/s...。
在socket建立連接的回調(diào)中,使用socket.emit以及socket.on就可以分別做消息的發(fā)送以及監(jiān)聽了。
若只是單機部署應(yīng)用,單純使用socket.io的消息事件監(jiān)聽處理即可滿足我們的需求。但隨著業(yè)務(wù)的擴大,我們需要考慮多機集群部署,客戶端可以連接到任一節(jié)點,并發(fā)送消息。如何做到多節(jié)點的同時推送,我們需要建立一套多節(jié)點之間的消息分發(fā)/訂閱架構(gòu)。這時我們引入redis的pub/sub功能。
redis
redis是一個key-value存儲系統(tǒng),在該項目中主要起到一個消息分發(fā)中心(publish/subscribe)的作用。用戶通過socket.io namespace 訂閱房間號后,socket.io server則往redis訂閱(subscribe)該房間號channel。當在該房間中的某一用戶發(fā)送消息時,則通過redis的publish功能往redis該房間號channel publish消息。這樣所有訂閱該房間號channel的websocket連接則會收到消息回調(diào),然后推送給客戶端。
nginx
由于采用了集群架構(gòu),則需要nginx來做反向代理。需要注意的是,websocket的支持需要nginx1.3以上版本。并且我們需要通過配置ip_hash做粘性會話(ip_hash)處理,避免在低版本瀏覽器socket.io使用兼容方案輪詢請求,請求到不同機器,造成session異常。
####三、架構(gòu)設(shè)計圖
客戶端通過socket.io namespace 指定對應(yīng)roomid,請求到nginx。nginx根據(jù)ip_hash反向代理到對應(yīng)機器的某一端口的socket.io server 進程。建立websocket連接,并往redis訂閱對應(yīng)到房間(roomid)channel。到這個時候,一個訂閱了某一房間的websocket通道建立完成。
當用戶發(fā)送消息時,socket.io server捕獲到該房間到消息后,即往redis對應(yīng)房間id的channel publish消息。這時所有訂閱了該房間id channel的socket.io server就會收到訂閱響應(yīng),接著找到對應(yīng)房間id的webscoket通道,并將消息推送到客戶端。
nginx配置(nginx版本須>1.3):
在http{}里配置定義upstream,并設(shè)置ip_hash。使同一個ip的請求能夠落在同一個機器同一個進程中。 如果改節(jié)點掛了,則自動重連到另外一個節(jié)點,該方案對于后期擴容也非常方便。
upstream io_nodes { ip_hash; server 127.0.0.1:6001; server 127.0.0.1:6002; server 127.0.0.1:6003; server 127.0.0.1:6004; server 127.0.0.1:6005; server 127.0.0.1:6006; server 127.0.0.1:6007; server 127.0.0.1:6008; server 10.x.x.x:6001; server 10.x.x.x:6002; server 10.x.x.x:6003; server 10.x.x.x:6004; server 10.x.x.x:6005; server 10.x.x.x:6006; server 10.x.x.x:6007; server 10.x.x.x:6008; }
在server中,配置location:
location / { proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header Host $host; proxy_http_version 1.1; proxy_pass http://io_nodes; proxy_redirect off; }
cluster.js
我們采用了多進程的設(shè)計,充分利用cpu多核優(yōu)勢。通過主進程統(tǒng)一管理維護子進程,每個進程監(jiān)聽一個端口。
var cupNum = require("os").cpus().length, workerArr = [], roomInfo = []; var connectNum = 0; for (var i = 0; i < cupNum; i++) { workerArr.push(fork("./fork_server.js", [6001 + i])); workerArr[i].on("message", function(msg) { if (msg.cmd && msg.cmd === "client connect") { connectNum++; console.log("socket server connectnum:" + connectNum); } if (msg.cmd && msg.cmd === "client disconnect") { connectNum--; console.log("socket server connectnum:" + connectNum); } });
fork_server.js
var process = require("process"); var io = require("socket.io")(); var num = 0; var redis = require("redis"); var redisClient = redis.createClient; //建立redis pub、sub連接 var pub = redisClient({port:13800, host: "127.0.0.1", password:"xxxx"}); var sub = redisClient({port: 13800, host:"127.0.0.1", password:"xxxx"}); var roomSet = {}; //獲取父進程傳遞端口 var port = parseInt(process.argv[2]); //當websocket連接時 io.on("connection", function(socket) { //客戶端請求ws URL: http://127.0.0.1:6001?roomid=k12_webcourse_room_1 var roomid = socket.handshake.query.roomid; console.log("worker pid: " + process.pid + " join roomid: "+ roomid); socket.on("join", function (data) { socket.join(roomid); //加入房間 // 往redis訂閱房間id if(!roomSet[roomid]){ roomSet[roomid] = {}; console.log("sub channel " + roomid); sub.subscribe(roomid); } roomSet[roomid][socket.id] = {}; reportConnect(); console.log(data.username + " join, IP: " + socket.client.conn.remoteAddress); roomSet[roomid][socket.id].username = data.username; // 往該房間id的reids channel publish用戶進入房間消息 pub.publish(roomid, JSON.stringify({"event":"join","data": data})); }); //用戶發(fā)言 推送消息到redis socket.on("say", function (data) { console.log("Received Message: " + data.text); pub.publish(roomid, JSON.stringify({"event":"broadcast_say","data": { username: roomSet[roomid][socket.id].username, text: data.text }})); }); socket.on("disconnect", function() { num--; console.log("worker pid: " + process.pid + " clien disconnection num:" + num); process.send({ cmd: "client disconnect" }); if (roomSet[roomid] && roomSet[roomid][socket.id] && roomSet[roomid][socket.id].username) { console.log(roomSet[roomid][socket.id].username + " quit"); pub.publish(roomid, JSON.stringify({"event":"broadcast_quit","data": { username: roomSet[roomid][socket.id].username }})); } roomSet[roomid] && roomSet[roomid][socket.id] && (delete roomSet[roomid][socket.id]); }); }); /** * 訂閱redis 回調(diào) * @param {[type]} channel [頻道] * @param {[type]} count [數(shù)量] * @return {[type]} [description] */ sub.on("subscribe", function (channel, count) { console.log("worker pid: " + process.pid + " subscribe: " + channel); }); /** * 收到redis publish 對應(yīng)channel的消息 * @param {[type]} channel [description] * @param {[type]} message * @return {[type]} [description] */ sub.on("message", function (channel, message) { console.log("message channel " + channel + ": " + message); //往對應(yīng)房間廣播消息 io.to(channel).emit("message", JSON.parse(message)); }); /** * 上報連接到master進程 * @return {[type]} [description] */ var reportConnect = function(){ num++; console.log("worker pid: " + process.pid + " client connect connection num:" + num); process.send({ cmd: "client connect" }); }; io.listen(port); console.log("worker pid: " + process.pid + " listen port:" + port);
客戶端:
gihub源碼地址:https://github.com/493326889/...
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/91657.html
摘要:同時借助實現(xiàn)在非接口中推送消息流。每分秒鐘最多的彈幕數(shù)目彈幕數(shù)量過多時優(yōu)先加載最新的。 項目起始原因 源于數(shù)據(jù)庫課設(shè)和以前的一次突發(fā)奇想。其實還有其他微信公眾號的彈幕系統(tǒng),但是我發(fā)現(xiàn)使用體驗不佳,因為那種彈幕系統(tǒng)都是私用,并且只支持同時進行一個房間的使用。所以便萌生了自己寫一個的想法。(第一次寫md,有點不會,希望諒解--) 主要技術(shù)點 Redis(結(jié)合socket實現(xiàn)在非socke...
摘要:項目簡介主要是通過做一個多人在線多房間群聊的小項目來練手全棧技術(shù)的結(jié)合運用。編譯運行開啟服務(wù),新建命令行窗口啟動服務(wù)端,新建命令行窗口啟動前端頁面然后在瀏覽器多個窗口打開,注冊不同賬號并登錄即可進行多用戶多房間在線聊天。 項目簡介 主要是通過做一個多人在線多房間群聊的小項目、來練手全棧技術(shù)的結(jié)合運用。 項目源碼:chat-vue-node 主要技術(shù): vue2全家桶 + socket....
閱讀 1714·2021-11-02 14:47
閱讀 3661·2019-08-30 15:44
閱讀 1350·2019-08-29 16:42
閱讀 1743·2019-08-26 13:53
閱讀 945·2019-08-26 10:41
閱讀 3476·2019-08-23 17:10
閱讀 615·2019-08-23 14:24
閱讀 1729·2019-08-23 11:59