摘要:當(dāng)用戶注銷或退出時(shí),釋放連接,清空對(duì)象中的登錄狀態(tài)。聊天管理模塊系統(tǒng)的核心模塊,這部分主要使用框架實(shí)現(xiàn),功能包括信息文件的單條和多條發(fā)送,也支持表情發(fā)送。描述讀取完連接的消息后,對(duì)消息進(jìn)行處理。
0.前言
最近一段時(shí)間在學(xué)習(xí)Netty網(wǎng)絡(luò)框架,又趁著計(jì)算機(jī)網(wǎng)絡(luò)的課程設(shè)計(jì),決定以Netty為核心,以WebSocket為應(yīng)用層通信協(xié)議做一個(gè)互聯(lián)網(wǎng)聊天系統(tǒng),整體而言就像微信網(wǎng)頁版一樣,但考慮到這個(gè)聊天系統(tǒng)的功能非常多,因此只打算實(shí)現(xiàn)核心的聊天功能,包括單發(fā)、群發(fā)、文件發(fā)送,然后把項(xiàng)目與Spring整合做成開源、可拓展的方式,給大家參考、討論、使用,歡迎大家的指點(diǎn)。
關(guān)于Netty
Netty 是一個(gè)利用 Java 的高級(jí)網(wǎng)絡(luò)的能力,隱藏其背后的復(fù)雜性而提供一個(gè)易于使用的 API 的客戶端/服務(wù)器框架。
這里借用《Essential Netty In Action》的一句話來簡(jiǎn)單介紹Netty,詳細(xì)的可參考閱讀該書的電子版
Essential Netty in Action 《Netty 實(shí)戰(zhàn)(精髓)》
關(guān)于WebSocket通信協(xié)議
簡(jiǎn)單說一下WebSocket通信協(xié)議,WebSocket是為了解決HTTP協(xié)議中通信只能由客戶端發(fā)起這個(gè)弊端而出現(xiàn)的,WebSocket基于HTTP5協(xié)議,借用HTTP進(jìn)行握手、升級(jí),能夠做到輕量的、高效的、雙向的在客戶端和服務(wù)端之間傳輸文本數(shù)據(jù)。詳細(xì)可參考以下文章:
WebSocket 是什么原理?為什么可以實(shí)現(xiàn)持久連接?
WebSocket 教程 - 阮一峰的網(wǎng)絡(luò)日志
1. 技術(shù)準(zhǔn)備操作平臺(tái):Windows 7, 64bit 8G
IDE:MyEclipse 2016
JDK版本:1.8.0_121
瀏覽器:谷歌瀏覽器、360瀏覽器(極速模式)(涉及網(wǎng)頁前端設(shè)計(jì),后端開發(fā)表示很苦悶)
涉及技術(shù):
Netty 4
WebSocket + HTTP
Spring MVC + Spring
JQuery
Bootstrap 3 + Bootstrap-fileinput
Maven 3.5
Tomcat 8.0
2. 整體說明 2.1 設(shè)計(jì)思想整個(gè)通信系統(tǒng)以Tomcat作為核心服務(wù)器運(yùn)行,其下另開一個(gè)線程運(yùn)行Netty WebSocket服務(wù)器,Tomcat服務(wù)器主要處理客戶登錄、個(gè)人信息管理等的HTTP類型請(qǐng)求(通常的業(yè)務(wù)類型),端口為8080,Netty WebSockt服務(wù)器主要處理用戶消息通信的WebSocket類型請(qǐng)求,端口為3333。用戶通過瀏覽器登錄后,瀏覽器會(huì)維持一個(gè)Session對(duì)象(有效時(shí)間30分鐘)來保持登錄狀態(tài),Tomcat服務(wù)器會(huì)返回用戶的個(gè)人信息,同時(shí)記錄在線用戶,根據(jù)用戶id建立一條WebSocket連接并保存在后端以便進(jìn)行實(shí)時(shí)通信。當(dāng)一個(gè)用戶向另一用戶發(fā)起通信,服務(wù)器會(huì)根據(jù)消息內(nèi)容中的對(duì)話方用戶id,找到保存的WebSocket連接,通過該連接發(fā)送消息,對(duì)方就能夠收到即時(shí)收到消息。當(dāng)用戶注銷或退出時(shí),釋放WebSocket連接,清空Session對(duì)象中的登錄狀態(tài)。
事實(shí)上Netty也可以用作一個(gè)HTTP服務(wù)器,而這里使用Spring MVC處理HTTP請(qǐng)求是出于熟悉的緣故,也比較接近傳統(tǒng)開發(fā)的方式。
2.2 系統(tǒng)結(jié)構(gòu)系統(tǒng)采用B/S(Browser/Server),即瀏覽器/服務(wù)器的結(jié)構(gòu),主要事務(wù)邏輯在服務(wù)器端(Server)實(shí)現(xiàn)。借鑒MVC模式的思想,從上至下具體又分為視圖層(View)、控制層(Controller)、業(yè)務(wù)層(Service)、模型層(Model)、數(shù)據(jù)訪問層(Data Access)
2.3 項(xiàng)目結(jié)構(gòu)項(xiàng)目后端結(jié)構(gòu):
項(xiàng)目前端結(jié)構(gòu):
系統(tǒng)只包括兩個(gè)模塊:登錄模塊和聊天管理模塊。
登錄模塊:既然作為一個(gè)系統(tǒng),那么登錄的角色認(rèn)證是必不可少的,這里使用簡(jiǎn)單、傳統(tǒng)的Session方式維持登錄狀態(tài),當(dāng)然也有對(duì)應(yīng)的注銷功能,但這里的注銷除了清空Session對(duì)象,還要釋放WebSocket連接,否則造成內(nèi)存泄露。
聊天管理模塊:系統(tǒng)的核心模塊,這部分主要使用Netty框架實(shí)現(xiàn),功能包括信息、文件的單條和多條發(fā)送,也支持表情發(fā)送。
其他模塊:如好友管理模塊、聊天記錄管理、注冊(cè)模塊等,我并沒有實(shí)現(xiàn),有興趣的話可以自行實(shí)現(xiàn),與傳統(tǒng)的開發(fā)方式類似。
到這里,可能會(huì)有人出現(xiàn)疑問了,首先是前面的涉及技術(shù)中沒有ORM框架(Mybatis或Hibernate),這里又沒有實(shí)現(xiàn)好友管理的功能,那用戶如何確定自己的好友并發(fā)送信息呢?
其實(shí),這里我在dao層的實(shí)現(xiàn)里并沒有連接數(shù)據(jù)庫(kù),而是用硬編碼的方式固定好系統(tǒng)的用戶以及用戶的好友表、群組表,之所以這么做是因?yàn)楫?dāng)初考慮到這個(gè)課程設(shè)計(jì)要是連接數(shù)據(jù)庫(kù)就太麻煩了光是已有的模塊(包括前后端)就差不多3k行代碼了,時(shí)間上十分劃不來,于是就用了硬編碼的方式偷懶,后面會(huì)再說明系統(tǒng)用戶的情況。
由于本系統(tǒng)涉及多個(gè)用戶狀態(tài),有必要進(jìn)行說明,下面給出本系統(tǒng)的用戶狀態(tài)轉(zhuǎn)換圖。
系統(tǒng)聊天界面如下:
不得不說的是,當(dāng)關(guān)閉Tomcat服務(wù)器時(shí),也要釋放Netty相關(guān)資源,否則會(huì)造成內(nèi)存泄漏,關(guān)閉方法如下面的close(),如果只是使用shutdownGracefully()方法的話,關(guān)閉時(shí)會(huì)報(bào)內(nèi)存泄露Memory Leak異常(但I(xiàn)DE可能來不及輸出到控制臺(tái))
/** * 描述: Netty WebSocket服務(wù)器 * 使用獨(dú)立的線程啟動(dòng) * @author Kanarien * @version 1.0 * @date 2018年5月18日 上午11:22:51 */ public class WebSocketServer implements Runnable{ private final Logger logger = LoggerFactory.getLogger(WebSocketServer.class); @Autowired private EventLoopGroup bossGroup; @Autowired private EventLoopGroup workerGroup; @Autowired private ServerBootstrap serverBootstrap; private int port; private ChannelHandler childChannelHandler; private ChannelFuture serverChannelFuture; // 構(gòu)造方法少了會(huì)報(bào)錯(cuò) public WebSocketServer() {} @Override public void run() { build(); } /** * 描述:?jiǎn)?dòng)Netty Websocket服務(wù)器 */ public void build() { try { long begin = System.currentTimeMillis(); serverBootstrap.group(bossGroup, workerGroup) //boss輔助客戶端的tcp連接請(qǐng)求 worker負(fù)責(zé)與客戶端之前的讀寫操作 .channel(NioServerSocketChannel.class) //配置客戶端的channel類型 .option(ChannelOption.SO_BACKLOG, 1024) //配置TCP參數(shù),握手字符串長(zhǎng)度設(shè)置 .option(ChannelOption.TCP_NODELAY, true) //TCP_NODELAY算法,盡可能發(fā)送大塊數(shù)據(jù),減少充斥的小塊數(shù)據(jù) .childOption(ChannelOption.SO_KEEPALIVE, true)//開啟心跳包活機(jī)制,就是客戶端、服務(wù)端建立連接處于ESTABLISHED狀態(tài),超過2小時(shí)沒有交流,機(jī)制會(huì)被啟動(dòng) .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(592048))//配置固定長(zhǎng)度接收緩存區(qū)分配器 .childHandler(childChannelHandler); //綁定I/O事件的處理類,WebSocketChildChannelHandler中定義 long end = System.currentTimeMillis(); logger.info("Netty Websocket服務(wù)器啟動(dòng)完成,耗時(shí) " + (end - begin) + " ms,已綁定端口 " + port + " 阻塞式等候客戶端連接"); serverChannelFuture = serverBootstrap.bind(port).sync(); } catch (Exception e) { logger.info(e.getMessage()); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); e.printStackTrace(); } } /** * 描述:關(guān)閉Netty Websocket服務(wù)器,主要是釋放連接 * 連接包括:服務(wù)器連接serverChannel, * 客戶端TCP處理連接bossGroup, * 客戶端I/O操作連接workerGroup * * 若只使用 * bossGroupFuture = bossGroup.shutdownGracefully(); * workerGroupFuture = workerGroup.shutdownGracefully(); * 會(huì)造成內(nèi)存泄漏。 */ public void close(){ serverChannelFuture.channel().close(); Future> bossGroupFuture = bossGroup.shutdownGracefully(); Future> workerGroupFuture = workerGroup.shutdownGracefully(); try { bossGroupFuture.await(); workerGroupFuture.await(); } catch (InterruptedException ignore) { ignore.printStackTrace(); } } public ChannelHandler getChildChannelHandler() { return childChannelHandler; } public void setChildChannelHandler(ChannelHandler childChannelHandler) { this.childChannelHandler = childChannelHandler; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } }3.1.2 Netty服務(wù)器處理鏈
獨(dú)立出處理器鏈類,方便修改與注入,免得混在一起顯得混亂。
@Component public class WebSocketChildChannelHandler extends ChannelInitializer3.1.3 Netty服務(wù)器HTTP請(qǐng)求處理器{ @Resource(name = "webSocketServerHandler") private ChannelHandler webSocketServerHandler; @Resource(name = "httpRequestHandler") private ChannelHandler httpRequestHandler; @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("http-codec", new HttpServerCodec()); // HTTP編碼解碼器 ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); // 把HTTP頭、HTTP體拼成完整的HTTP請(qǐng)求 ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); // 分塊,方便大文件傳輸,不過實(shí)質(zhì)上都是短的文本數(shù)據(jù) ch.pipeline().addLast("http-handler", httpRequestHandler); ch.pipeline().addLast("websocket-handler",webSocketServerHandler); } }
值得一提的是,當(dāng)在處理鏈中使用Spring注入處理器的bean的時(shí)候,如果處理器類不使用@Sharable標(biāo)簽的話,會(huì)出現(xiàn)錯(cuò)誤。如果不使用Spring注入bean的方式,那么應(yīng)該new一個(gè)新的處理器對(duì)象,如ch.pipeline().addLast("http-handler", new HttpRequestHandler())。另外,判斷HTTP請(qǐng)求還是WebSocket請(qǐng)求的方式稍微不太優(yōu)雅,但我按照《Essential Netty in Action》中的方法去試,結(jié)果有問題的,只好用下面的if語句判斷。
@Component @Sharable public class HttpRequestHandler extends SimpleChannelInboundHandler3.1.4 Netty服務(wù)器WebSocket請(qǐng)求處理器
考慮到規(guī)范性與可維護(hù)性,switch語句中的case常量應(yīng)該放在常量類中聲明比較好。另外說下群發(fā)的邏輯(屬于業(yè)務(wù)邏輯,這里沒有給出代碼),群發(fā)也就是在一個(gè)群中發(fā)言,后端會(huì)掃描群中在線的用戶,逐一發(fā)送信息。用戶的WebSocket連接(即ChannelHandlerContext對(duì)象),會(huì)保存在全局變量onlineUserMap中,以用戶id作鍵,方便操作連接。關(guān)于表情的發(fā)送邏輯,與單發(fā)邏輯相同,不同的是發(fā)送內(nèi)容為對(duì)應(yīng)的img標(biāo)簽字符串。
@Component @Sharable public class WebSocketServerHandler extends SimpleChannelInboundHandler3.1.5 文件上傳{ private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServerHandler.class); @Autowired private ChatService chatService; /** * 描述:讀取完連接的消息后,對(duì)消息進(jìn)行處理。 * 這里主要是處理WebSocket請(qǐng)求 */ @Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception { handlerWebSocketFrame(ctx, msg); } /** * 描述:處理WebSocketFrame * @param ctx * @param frame * @throws Exception */ private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { // 關(guān)閉請(qǐng)求 if (frame instanceof CloseWebSocketFrame) { WebSocketServerHandshaker handshaker = Constant.webSocketHandshakerMap.get(ctx.channel().id().asLongText()); if (handshaker == null) { sendErrorMessage(ctx, "不存在的客戶端連接!"); } else { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); } return; } // ping請(qǐng)求 if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } // 只支持文本格式,不支持二進(jìn)制消息 if (!(frame instanceof TextWebSocketFrame)) { sendErrorMessage(ctx, "僅支持文本(Text)格式,不支持二進(jìn)制消息"); } // 客服端發(fā)送過來的消息 String request = ((TextWebSocketFrame)frame).text(); LOGGER.info("服務(wù)端收到新信息:" + request); JSONObject param = null; try { param = JSONObject.parseObject(request); } catch (Exception e) { sendErrorMessage(ctx, "JSON字符串轉(zhuǎn)換出錯(cuò)!"); e.printStackTrace(); } if (param == null) { sendErrorMessage(ctx, "參數(shù)為空!"); return; } String type = (String) param.get("type"); switch (type) { case "REGISTER": chatService.register(param, ctx); break; case "SINGLE_SENDING": chatService.singleSend(param, ctx); break; case "GROUP_SENDING": chatService.groupSend(param, ctx); break; case "FILE_MSG_SINGLE_SENDING": chatService.FileMsgSingleSend(param, ctx); break; case "FILE_MSG_GROUP_SENDING": chatService.FileMsgGroupSend(param, ctx); break; default: chatService.typeError(ctx); break; } } /** * 描述:客戶端斷開連接 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { chatService.remove(ctx); } /** * 異常處理:關(guān)閉channel */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } private void sendErrorMessage(ChannelHandlerContext ctx, String errorMsg) { String responseJson = new ResponseJson() .error(errorMsg) .toString(); ctx.channel().writeAndFlush(new TextWebSocketFrame(responseJson)); } }
文件上傳的思路是先把文件上傳到服務(wù)器上,再返回給對(duì)方文件的url以及相關(guān)信息。文件上傳并沒有使用WebSocket連接來上傳,而是直接使用HTTP連接結(jié)合Spring的接口簡(jiǎn)單的實(shí)現(xiàn)了,可自行修改實(shí)現(xiàn)使用WebSocket連接來上傳文件。另外,文件保存的路徑是http://localhost:8080/WebSocket/UploadFile,如果Tomcat端口不是8080或者想改變存儲(chǔ)路徑的話,請(qǐng)注意修改常量。
@Service public class FileUploadServiceImpl implements FileUploadService{ private final static String SERVER_URL_PREFIX = "http://localhost:8080/WebSocket/"; private final static String FILE_STORE_PATH = "UploadFile"; @Override public ResponseJson upload(MultipartFile file, HttpServletRequest request) { // 重命名文件,防止重名 String filename = getRandomUUID(); String suffix = ""; String originalFilename = file.getOriginalFilename(); String fileSize = FileUtils.getFormatSize(file.getSize()); // 截取文件的后綴名 if (originalFilename.contains(".")) { suffix = originalFilename.substring(originalFilename.lastIndexOf(".")); } filename = filename + suffix; String prefix = request.getSession().getServletContext().getRealPath("/") + FILE_STORE_PATH; System.out.println("存儲(chǔ)路徑為:" + prefix + "" + filename); Path filePath = Paths.get(prefix, filename); try { Files.copy(file.getInputStream(), filePath); } catch (IOException e) { e.printStackTrace(); return new ResponseJson().error("文件上傳發(fā)生錯(cuò)誤!"); } return new ResponseJson().success() .setData("originalFilename", originalFilename) .setData("fileSize", fileSize) .setData("fileUrl", SERVER_URL_PREFIX + FILE_STORE_PATH + "" + filename); } private String getRandomUUID() { return UUID.randomUUID().toString().replace("-", ""); } }3.2 WebSocket客戶端 3.2.1 瀏覽器客戶端代碼
下面只展示核心的websocket連接代碼。補(bǔ)充說明:考慮到瀏覽器的兼容性,經(jīng)測(cè)試,建議使用谷歌瀏覽器和360瀏覽器(極速模式),火狐瀏覽器和IE11的界面有點(diǎn)問題。也說明一下,UI設(shè)計(jì)的排版是從網(wǎng)上找的,由修改了下,自己嘔心瀝血的用JS補(bǔ)充了動(dòng)態(tài)功能,包括:
新消息紅標(biāo)簽提醒
新消息置頂
客戶端保存已發(fā)聊天記錄
用戶己方聊天信息靠左,接收信息靠右
聊天信息框的寬度動(dòng)態(tài)計(jì)算
詳細(xì)可見chatroom.js文件
4. 效果及操作演示 4.1 登錄操作登錄入口為:http://localhost:8080/WebSocket/login 或 http://localhost:8080/WebSocket/
當(dāng)前系統(tǒng)用戶固定為9個(gè),群組1個(gè),包括9人用戶。
用戶1 用戶名:Member001 密碼:001
用戶2 用戶名:Member002 密碼:002
······
用戶9 用戶名:Member009 密碼:009
4.2 聊天演示 4.3 文件上傳演示 5.補(bǔ)充為了使項(xiàng)目具有更好的可拓展性、可讀性、可維護(hù)性,很多地方都使用Spring的Bean進(jìn)行注入,也運(yùn)用了面向接口編程的思想,當(dāng)運(yùn)用上Mybatis等ORM框架的時(shí)候,直接修改dao層實(shí)現(xiàn)即可,無需改動(dòng)其他地方,同時(shí)也在適當(dāng)?shù)牡胤郊由狭俗⑨尅?/p>
最后附上git源碼地址:Kanarien GitHub
Copyright ? 2018, GDUT CSCW back-end Kanarien, All Rights Reserved
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/71654.html
摘要:是一個(gè)持久化的協(xié)議,相對(duì)于這種非持久的協(xié)議來說。最大的特點(diǎn)就是實(shí)現(xiàn)全雙工通信客戶端能夠?qū)崟r(shí)推送消息給服務(wù)端,服務(wù)端也能夠?qū)崟r(shí)推送消息給客戶端。參考鏈接知乎問題原理原理知乎問題編碼什么用如果文章有錯(cuò)的地方歡迎指正,大家互相交流。 前言 今天在慕課網(wǎng)上看到了Java的新教程(Netty入門之WebSocket初體驗(yàn)):https://www.imooc.com/learn/941 WebS...
摘要:廣播這是最簡(jiǎn)單的集群通訊解決方案。實(shí)現(xiàn)方法在治理中心監(jiān)聽集群服務(wù)事件,并及時(shí)更新哈希環(huán)。 問題起因 最近做項(xiàng)目時(shí)遇到了需要多用戶之間通信的問題,涉及到了WebSocket握手請(qǐng)求,以及集群中WebSocket Session共享的問題。 期間我經(jīng)過了幾天的研究,總結(jié)出了幾個(gè)實(shí)現(xiàn)分布式WebSocket集群的辦法,從zuul到spring cloud gateway的不同嘗試,總結(jié)出了...
摘要:大家明天一起去唱吧關(guān)于數(shù)據(jù)庫(kù)設(shè)計(jì)當(dāng)前一版不會(huì)固定大家的數(shù)據(jù)庫(kù)設(shè)計(jì),大家可以自己自由設(shè)計(jì),同時(shí)搭上自己的項(xiàng)目,構(gòu)建一個(gè)附帶的自項(xiàng)目。 InChat 一個(gè)IM通訊框架 一個(gè)輕量級(jí)、高效率的支持多端(應(yīng)用與硬件Iot)的異步網(wǎng)絡(luò)應(yīng)用通訊框架。(核心底層Netty) Github:InChat 版本目標(biāo):完成基本的消息通訊(僅支持文本消息),離線消息存儲(chǔ),歷史消息查詢,一對(duì)一聊天、自我聊天、群...
閱讀 873·2021-11-19 11:29
閱讀 3363·2021-09-26 10:15
閱讀 2874·2021-09-22 10:02
閱讀 2447·2021-09-02 15:15
閱讀 1983·2019-08-30 15:56
閱讀 2426·2019-08-30 15:54
閱讀 2929·2019-08-29 16:59
閱讀 646·2019-08-29 16:20