摘要:本文仍以該實(shí)例為例,探討該自定義通信協(xié)議的具體工作流程,以及如何以注冊(cè)的形式靈活插拔通信消息對(duì)象。進(jìn)行二進(jìn)制數(shù)據(jù)幀的解碼操作時(shí),數(shù)據(jù)幀中已包含了消息的功能位,據(jù)此可獲取相應(yīng)的編解碼器,而后可以對(duì)該數(shù)據(jù)幀進(jìn)行解析,生成相應(yīng)的消息對(duì)象。
本文為該系列的第三篇文章,設(shè)計(jì)需求為:服務(wù)端程序和眾多客戶端程序通過(guò) TCP 協(xié)議進(jìn)行通信,通信雙方需通信的消息種類眾多。上一篇文章以一個(gè)具體的需求為例,探討了指定的 Java 消息對(duì)象與其相應(yīng)的二進(jìn)制數(shù)據(jù)幀相互轉(zhuǎn)換的方法。本文仍以該實(shí)例為例,探討該自定義通信協(xié)議的具體工作流程,以及如何以注冊(cè)的形式靈活插拔通信消息對(duì)象。
1. 以注冊(cè)的形式實(shí)現(xiàn)通信消息對(duì)象的統(tǒng)一管理通過(guò)該系列的第二篇文章可知,各個(gè)消息對(duì)象的編解碼器類均擁有一個(gè)靜態(tài)工廠方法,用于手動(dòng)傳入功能位及功能文字描述,進(jìn)而生成包含這些參數(shù)的編解碼器。如此設(shè)計(jì),使得所有消息的功能位和文字描述均能夠統(tǒng)一管理,降低維護(hù)成本。
根據(jù)上述需求,可通過(guò) Map 容器管理所有的編解碼器,有如下優(yōu)點(diǎn):
進(jìn)行消息對(duì)象生成操作時(shí),可直接使用相應(yīng)編解碼器的消息對(duì)象靜態(tài)創(chuàng)建方法。
進(jìn)行消息對(duì)象的編碼操作時(shí),已擁有該 Java 消息對(duì)象,即可知道消息對(duì)象的功能位,據(jù)此可獲取相應(yīng)的編解碼器;或者,每個(gè) Java 消息對(duì)象均內(nèi)含相應(yīng)編解碼器的引用,故可直接對(duì)該消息對(duì)象進(jìn)行編碼操作。
進(jìn)行二進(jìn)制數(shù)據(jù)幀的解碼操作時(shí),數(shù)據(jù)幀中已包含了消息的功能位,據(jù)此可獲取相應(yīng)的編解碼器,而后可以對(duì)該數(shù)據(jù)幀進(jìn)行解析,生成相應(yīng)的 Java 消息對(duì)象。
通信消息對(duì)象注冊(cè)方法如下所示:
/** * 消息對(duì)象的注冊(cè) * * @param toolkit 消息對(duì)象編解碼器容器的工具類 */ private void initialMsg() { saveNormalMsgCodec(new MsgCodecDeviceUnlock(0x10, 0x11, "客戶端解鎖")); saveNormalMsgCodec(new MsgCodecDeviceClear(0x10, 0x13, "客戶端初始化")); saveNormalMsgCodec(new MsgCodecDeviceId(0x10, 0x1B, "客戶端ID設(shè)置")); saveNormalMsgCodec(new MsgCodecEmployeeName(0x10, 0x1C, "客戶端別名設(shè)置")); ... ... } /** * 將普通消息對(duì)象及其回復(fù)消息對(duì)象的編解碼器均保存到 HashMap 中 * * @param baseMsgCodec 特定的消息對(duì)象編解碼器 */ private void saveNormalMsgCodec(BaseMsgCodec baseMsgCodec) { saveSpecialMsgCodec(baseMsgCodec); baseMsgCodec = new MsgCodecReplyNormal(baseMsgCodec.getMajorMsgId() + 0x10, baseMsgCodec.getSubMsgId(), baseMsgCodec.getDetail()); saveSpecialMsgCodec(baseMsgCodec); } /** * 將消息對(duì)象的編解碼器保存到 HashMap 中 * * @param baseMsgCodec 特定的編解碼器 */ private void saveSpecialMsgCodec(BaseMsgCodec baseMsgCodec) { HASH_MAP.put(figureFrameId(baseMsgCodec.getMajorMsgId(), baseMsgCodec.getSubMsgId()), baseMsgCodec); }
上述代碼表明,如果有新的業(yè)務(wù)需求,需要增刪「插拔」業(yè)務(wù)消息對(duì)象,只需在 initialMsg() 方法中,對(duì)相應(yīng)編解碼器的注冊(cè)語(yǔ)句進(jìn)行增刪即可。
saveNormalMsgCodec(BaseMsgCodec) 方法可以同時(shí)注冊(cè)特定業(yè)務(wù)消息對(duì)象及其通用回復(fù)消息對(duì)象,操作方法清晰、簡(jiǎn)潔。
所以,在啟動(dòng)該 Java 程序時(shí),只需要在啟動(dòng)過(guò)程中,執(zhí)行上述 initialMsg() 方法,即可完成所有業(yè)務(wù)消息對(duì)象的注冊(cè)。
2. 多個(gè)消息對(duì)象自由組合進(jìn)同一個(gè)數(shù)據(jù)幀的實(shí)現(xiàn)原理由該系列的第一篇文章可知,如果某二進(jìn)制數(shù)據(jù)幀所要傳輸?shù)臄?shù)據(jù)體部分內(nèi)容很少,導(dǎo)致一個(gè)幀的大部分容量均被幀頭占據(jù),導(dǎo)致有效數(shù)據(jù)的占比很小,這就產(chǎn)生了巨大的浪費(fèi),故數(shù)據(jù)幀的數(shù)據(jù)體部分由子幀組成,同一類子幀均可被組裝進(jìn)同一個(gè)數(shù)據(jù)幀。如此做法,整個(gè)通信鏈路的數(shù)據(jù)量會(huì)明顯減少,IO 負(fù)擔(dān)也會(huì)因此減輕。
該需求的實(shí)現(xiàn)原理如下所示:
/** * 啟動(dòng)一個(gè)Channel的定時(shí)任務(wù),用于間隔指定的時(shí)間對(duì)消息隊(duì)列進(jìn)行輪詢,并發(fā)送指定數(shù)據(jù)幀 * * @param deque 指定的消息發(fā)送隊(duì)列 * @param channelId 指定 Channel 的序號(hào) */ private void startMessageQueueTask(LinkedBlockingDequedeque, Integer channelId) { executorService.scheduleWithFixedDelay(() -> { try { BaseMsg baseMsg = deque.take(); // 從隊(duì)列中取出一個(gè)消息對(duì)象,隊(duì)列為空時(shí)阻塞 Thread.sleep(AWAKE_TO_PROCESS_INTERVAL);// 等待極短的時(shí)間,保證隊(duì)列中緩存盡可能多的對(duì)象 Channel channel = touchChannel(channelId); // 獲取指定的待發(fā)送的 Channel List dataList = new ArrayList<>();// 子幀容器 ByteBuf data = baseMsg.subFrameEncode(channel.alloc().buffer());// 編碼一個(gè)子幀 dataList.add(data); touchNeedReplyMsg(baseMsg); // 對(duì)該子幀設(shè)置檢錯(cuò)重發(fā)任務(wù) int length = data.readableBytes(); int flag = baseMsg.combineFrameFlag(); // 獲取消息對(duì)象標(biāo)識(shí) while (true) { BaseMsg subMsg = deque.peek(); // 查看隊(duì)列中的第一個(gè)消息對(duì)象 if (subMsg == null || subMsg.combineFrameFlag() != flag) { break; // 消息對(duì)象標(biāo)識(shí)不同,即欲生成的主幀幀頭不同,不能組合進(jìn)同一主幀 } data = subMsg.subFrameEncode(channel.alloc().buffer()); if (length + data.readableBytes() > FrameSetting.MAX_DATA_LENGTH) { break; } length += data.readableBytes(); dataList.add(data); // 組合進(jìn)了同一主幀 deque.poll(); // 從隊(duì)列中移除該消息對(duì)象 touchNeedReplyMsg(subMsg); } FrameMajorHeader frameHeader = new FrameMajorHeader( baseMsg.getMajorMsgId(), baseMsg.getGroupId(), baseMsg.getDeviceId(), length); // 生成主幀幀頭消息對(duì)象 channel.writeAndFlush(new SendableMsgContainer(frameHeader, dataList)); // 送入Channel進(jìn)行發(fā)送 } catch (InterruptedException e) { logger.warn("消息隊(duì)列定時(shí)發(fā)送任務(wù)被中斷"); } }, channelId, CommSetting.FRAME_SEND_INTERVAL, TimeUnit.MILLISECONDS); }
由代碼可知,待發(fā)送的消息對(duì)象均被送入指定的發(fā)送隊(duì)列進(jìn)行緩存,某客戶端相應(yīng)的線程對(duì)隊(duì)列進(jìn)行操作,取出消息對(duì)象并進(jìn)行編碼、組裝、發(fā)送等。當(dāng)然,當(dāng)客戶端數(shù)量較多時(shí),上述的線程實(shí)現(xiàn)方式可采用 Netty 的 NIO 方式進(jìn)行優(yōu)化,以降低系統(tǒng)開(kāi)銷。
由上述描述可知,欲發(fā)送一個(gè)消息對(duì)象,只需將該消息對(duì)象送入相應(yīng)的發(fā)送隊(duì)列即可。
3. 實(shí)際業(yè)務(wù)消息對(duì)象的編解碼 3.1 消息對(duì)象的編碼方式由于每個(gè) Java 消息對(duì)象均內(nèi)含相應(yīng)編解碼器的引用,故可直接對(duì)該消息對(duì)象進(jìn)行編碼操作,代碼如下:
public abstract class BaseMsg implements Cloneable { private final BaseMsgCodec msgCodec; ... ... /** * 將 java 消息對(duì)象編碼為 TCP 子幀 * * @param buffer 空白的 TCP 子幀的容器 * @return 保存有 TCP 子幀的容器 */ public ByteBuf subFrameEncode(ByteBuf buffer) { return msgCodec.code(this, buffer); } }3.2 消息對(duì)象的解碼方式
首先根據(jù)數(shù)據(jù)幀的幀頭,即可解析出 FrameMajorHeader 對(duì)象,然后即可調(diào)用如下方法完成子幀的解析工作。實(shí)現(xiàn)原理文章開(kāi)頭已指出。
/** * TCP 幀解碼為 Java 消息對(duì)象 * * @param head 主幀頭 * @param subMsgId 子幀功能位 * @param data 子幀數(shù)據(jù) * @return 已解碼的 Java 對(duì)象 */ public BaseMsg decode(FrameMajorHeader head, int subMsgId, byte[] data) { BaseMsgCodec msgCodec = MsgCodecToolkit.getMsgCodec(head.getMsgId(), subMsgId); return msgCodec.decode(head.getGroupId(), head.getDeviceId(), data); }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/69231.html
摘要:基本消息對(duì)象的設(shè)計(jì)消息對(duì)象的設(shè)計(jì)主要由兩部分組成特定數(shù)據(jù)幀對(duì)應(yīng)的特定消息對(duì)象。該類包含上節(jié)數(shù)據(jù)幀主幀及子幀的所有公共信息,僅僅未包含子幀中的數(shù)據(jù)體信息,該需求由基本消息對(duì)象的子類實(shí)現(xiàn)。 開(kāi)發(fā)工程中,有一個(gè)常見(jiàn)的需求:服務(wù)端程序和多個(gè)客戶端程序通過(guò) TCP 協(xié)議進(jìn)行通信,通信雙方需通信的消息種類眾多,并且客戶端的數(shù)量可能有數(shù)萬(wàn)個(gè)。為此,雙方需要約定盡可能豐富、靈活的數(shù)據(jù)幀「數(shù)據(jù)包」協(xié)議,...
摘要:而實(shí)際兩者之間的通信使用的是基于的自定義二進(jìn)制數(shù)據(jù)幀,對(duì)象與數(shù)據(jù)幀之間需進(jìn)行轉(zhuǎn)換。該類實(shí)現(xiàn)了編碼解碼方法,故可對(duì)消息對(duì)象進(jìn)行編碼或?qū)?shù)據(jù)幀進(jìn)行解碼。該類的靜態(tài)方法可通過(guò)指定功能消息對(duì)象生成相應(yīng)的回復(fù)對(duì)象。 本文為該系列的第二篇文章,設(shè)計(jì)需求為:服務(wù)端程序和眾多客戶端程序通過(guò) TCP 協(xié)議進(jìn)行通信,通信雙方需通信的消息種類眾多。上一篇文章詳細(xì)描述了該通信協(xié)議的二進(jìn)制數(shù)據(jù)幀格式以及基本 J...
項(xiàng)目地址 showImg(https://segmentfault.com/img/remote/1460000019380071); 什么是 Puzzle Puzzle 是基于 Vue 和 Webpack4 實(shí)現(xiàn)的一種項(xiàng)目結(jié)構(gòu);業(yè)務(wù)模塊可以像拼圖一樣與架構(gòu)模塊組合,形成不同的系統(tǒng),而這一切都是可以在生產(chǎn)環(huán)境熱插拔的;這意味著你可以隨時(shí)向你的系統(tǒng)添加新的功能模塊,甚至改版整個(gè)系統(tǒng),而不需要全量替換...
摘要:的服務(wù)治理平臺(tái)發(fā)源于早期的個(gè)人項(xiàng)目??蛻舳税l(fā)現(xiàn)模式要求客戶端負(fù)責(zé)查詢注冊(cè)中心,獲取服務(wù)提供者的列表信息,使用負(fù)載均衡算法選擇一個(gè)合適的服務(wù)提供者,發(fā)起接口調(diào)用請(qǐng)求。系統(tǒng)和系統(tǒng)之間,少不了數(shù)據(jù)的互聯(lián)互通。隨著微服務(wù)的流行,一個(gè)系統(tǒng)內(nèi)的不同應(yīng)用進(jìn)行互聯(lián)互通也是常態(tài)。 PowerDotNet的服務(wù)治理平臺(tái)發(fā)源于早期的個(gè)人項(xiàng)目Power.Apix。這個(gè)項(xiàng)目借鑒了工作過(guò)的公司的服務(wù)治理方案,站在...
閱讀 3698·2021-11-22 15:24
閱讀 1606·2021-09-26 09:46
閱讀 1919·2021-09-14 18:01
閱讀 2614·2019-08-30 15:45
閱讀 3532·2019-08-30 14:23
閱讀 1881·2019-08-30 12:43
閱讀 2919·2019-08-30 10:56
閱讀 805·2019-08-29 12:20