摘要:啟動一個線程,獲取阻塞隊列的元素,當通道發(fā)生事件時,隊列會被放入事件對象啟動一個定時器,每個執(zhí)行一次,掃描,超時沒有獲取結(jié)果的會被移除掉客戶端跟服務(wù)器端差不多。而這個對象會在傳輸之前進行編碼,消息接收到進行解碼。
rocketMQ通信模塊
Rocketmq的通信層是基于通信框架netty 4.0.21.Final之上做了簡單的協(xié)議封裝,基本的類圖如下:
通訊模塊是怎么進行的消息傳輸?shù)?/b>先來看看服務(wù)器端啟動做了什么:
netty服務(wù)器啟動,監(jiān)聽在8888;netty設(shè)置了一個心跳檢測器IdleStateHandler,讀寫超時時間為120s,在120s后都沒有讀寫操作將會觸發(fā)相應(yīng)事件。
啟動一個線程,獲取阻塞隊列eventQueue的元素,當netty channel通道發(fā)生CONNECT, CLOSE,IDLE,EXCEPTION事件時,隊列會被放入事件對象
啟動一個定時器Timer,每個1s執(zhí)行一次,掃描ResponseFuture,超時沒有獲取結(jié)果的會被移除掉
客戶端跟服務(wù)器端差不多。
rocketmq提供了三種通信方式:
一、invokeSyncImpl 同步調(diào)用(主要實現(xiàn)參見NettyRemotingAbstract.invokeSyncImpl)
同步調(diào)用是指客戶端發(fā)起遠程調(diào)用后,當前線程會被阻塞,直到服務(wù)器端返回結(jié)果或發(fā)生超時異常,我們在發(fā)送消息時需要同步知道消息發(fā)送成功還是失敗,一般使用這種方式。
我們知道,netty是異步基于事件驅(qū)動的,當我們使用netty向遠程服務(wù)器發(fā)送消息是通過channel.writeAndFlush方法,此方法是異步的,那我們?nèi)绾瓮降墨@取服務(wù)器的返回結(jié)果呢?這里的做法是在向服務(wù)器發(fā)送消息時設(shè)置一個唯一的序列號,本地會通過上下文保存一個ResponseFuture對象在Map中,key就是這個唯一的序列號,value就是這個ResponseFuture對象,ResponseFuture對象會設(shè)置一個CountDownLatch,每當發(fā)送完消息后,就會調(diào)用CountDownLatch的await方法掛起當前線程;當服務(wù)器返回結(jié)果時也會攜帶之前客戶端傳遞過去的唯一序列號,這樣就可以找到ResponseFuture對象,再調(diào)用CountDownLatch的countDown方法,此時客戶端之前掛起的線程就會蘇醒過來,完成一次同步調(diào)用。
二、invokeAsyncImpl異步調(diào)用(主要實現(xiàn)參見NettyRemotingAbstract.invokeAsyncImpl)
客戶端發(fā)起遠程調(diào)用前會先設(shè)置一個InvokeCallback類,當然也是設(shè)置在ResponseFuture對象中,調(diào)用結(jié)束后不會等待結(jié)果,當服務(wù)器返回時也是跟同步調(diào)用一樣會在新的線程里面先找到ResponseFuture,然后執(zhí)行回調(diào)接口也就是InvokeCallback的operationComplete方法。如果服務(wù)器返回結(jié)果超時,也會進行回調(diào),客戶端可以根據(jù)相關(guān)的狀態(tài)來執(zhí)行相關(guān)邏輯。
異步調(diào)用不會阻塞線程,調(diào)用后會立即返回,調(diào)用結(jié)果會在異步線程里面執(zhí)行回調(diào)來獲取,使用Async需要控制好節(jié)奏,不能發(fā)送的太快以防止壓垮服務(wù)器端。所以在invokeAsyncImpl方法里面設(shè)置了一個信號量,默認是64個,只有獲取到許可的請求才能真正發(fā)起遠程調(diào)用。
三、invokeOnewayImpl 單向調(diào)用(主要實現(xiàn)參見NettyRemotingAbstract.invokeOnewayImpl)
客戶端發(fā)送請求后不會等待服務(wù)端返回的結(jié)果,并且會忽略服務(wù)端的處理結(jié)果;當前線程調(diào)用完畢,調(diào)用方并不關(guān)心服務(wù)器端的處理結(jié)果,也不會被阻塞,跟異步調(diào)用一樣需要控制好節(jié)奏以防壓垮服務(wù)器端。在invokeOnewayImpl方法里面也設(shè)置了一個信號量,默認是256個,只有獲取到許可的請求才能真正發(fā)起遠程調(diào)用。
三種通信方式的對比
調(diào)用方式 | 特點 | 使用場景 |
---|---|---|
Sync | 同步阻塞 | 需要同步獲取結(jié)果的場景 |
Async | 異步不阻塞 | 當前不需要結(jié)果,但是當服務(wù)器處理完后,需要做一些其他事情 |
Oneway | 異步不阻塞 | 不要需要結(jié)果,不保證消息一定發(fā)送成功 |
RemotingCommand是rocketMQ消息傳輸?shù)拿浇椋械南⒍紩b成RemotingCommand來進行傳輸。而這個對象會在netty傳輸之前進行編碼,消息接收到進行解碼。
RemotingCommand是由頭部(header)和消息體(body)組成,消息發(fā)送的時候,頭部和消息體會分開進行編碼。那么RemotingCommand是如何組成的呢?
RemotingCommand的核心字段:
public class RemotingCommand{ private int code; private LanguageCode language = LanguageCode.JAVA; private int version = 0; private int opaque = requestId.getAndIncrement(); private int flag = 0; private String remark; private HashMap頭部(header)extFields; private transient CommandCustomHeader customHeader; private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer; private transient byte[] body; }
請求頭接收方和發(fā)起方的含義略有不同,下面的表格詳細的說明:
字段名 | 類型 | Request | Resposne |
---|---|---|---|
code | int | 請求操作代碼,接收方根據(jù)不同的代碼做不同的操作 | 應(yīng)答結(jié)果代碼,0表示成功,非0表示各種錯誤代碼 |
language | 枚舉 | 請求方實現(xiàn)的語言,默認Java | 接收方實現(xiàn)的語言 |
version | int | 請求方版本 | 接收方版本 |
opaque | int | 請求方在同一連接上不同的請求標識代碼,多線程連接服用使用 | 接收方不做修改,直接返回 |
flag | int | 通信層的標志位 | 通信層的標志位 |
remark | String | 傳輸自定義文本信息 | 錯誤詳細描述 |
extFields | Map | 自定義字段 | 自定義字段 |
頭信息里面還包括了CommandCustomHeader的自定義的一些頭信息,會被通過反射的方式放在extFields字段里面
消息體消息體是直接變?yōu)閎yte數(shù)組,由客戶端自己序列化,這兩部分后一起放入netty傳輸?shù)腂yteBuffer中,一起傳輸?shù)浇邮斩?/p> 報文格式與序列化
length | header length | headerData | bodyData |
---|---|---|---|
4個字節(jié) | 4個字節(jié)(高一位字節(jié)表示序列化類型,低三位字節(jié)表示長度) |
length:表示整個數(shù)據(jù)包的長度 占4個字節(jié)
header length:表示header的長度(高一位字節(jié)表示序列化類型,低三位字節(jié)表示長度)
headerData的序列化有兩種方式:
json:使用fastjson進行序列化
自定義:使用bytebuffer自定義序列化
Netty服務(wù)器端在啟動時設(shè)置了TCP參數(shù)的含義SO_BACKLOG:1024
指定全連接隊列數(shù),linux系統(tǒng)在文件/proc/sys/net/core/somaxconn指定,默認128;
還有一個半連接隊列數(shù),linux在文件/proc/sys/net/ipv4/tcp_max_syn_backlog指定
SO_REUSEADDR:true
重用處于time_wait狀態(tài)下的連接
SO_KEEPALIVE:false
?;顧C制
TCP_NODELAY:true
關(guān)閉Nagle算法,Nagle算法可以降低網(wǎng)絡(luò)里小包的數(shù)量,從而提升網(wǎng)絡(luò)性能,關(guān)閉可以提高實時性
SO_SNDBUF:65535
發(fā)送緩存區(qū)大小
SO_RCVBUF:65535
接受緩存區(qū)大小
SO_RCVLOWAT:接收緩存水位線
SO_SNDLOWAT:發(fā)送緩存水位線
它們一般被I/O復(fù)用系統(tǒng)調(diào)用用來判斷socket是否可讀或可寫。當TCP接收緩沖區(qū)中可讀數(shù)據(jù)的總數(shù)大于其低水位標記時,I/O復(fù)用系統(tǒng)調(diào)用將通知應(yīng)用程序可以從對應(yīng)的socket上讀取數(shù)據(jù);當TCP發(fā)送緩沖區(qū)中的空閑空間(可以寫入數(shù)據(jù)的空間)大于其低水位標記時,I/O復(fù)用系統(tǒng)調(diào)用將通知應(yīng)用程序可以往對應(yīng)的socket上寫入數(shù)據(jù)
在netty中好像沒有看到有設(shè)置這兩個參數(shù)
CONNECT_TIMEOUT_MILLIS:3000
連接超時時間
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/77012.html
摘要:具體可以參考消息隊列之具體可以參考實戰(zhàn)之快速入門十分鐘入門阿里中間件團隊博客是一個分布式的可分區(qū)的可復(fù)制的基于發(fā)布訂閱的消息系統(tǒng)主要用于大數(shù)據(jù)領(lǐng)域當然在分布式系統(tǒng)中也有應(yīng)用。目前市面上流行的消息隊列就是阿里借鑒的原理用開發(fā)而得。 我自己總結(jié)的Java學習的系統(tǒng)知識點以及面試問題,目前已經(jīng)開源,會一直完善下去,歡迎建議和指導歡迎Star: https://github.com/Snail...
摘要:它是阿里巴巴于年開源的第三代分布式消息中間件。是一個分布式消息中間件,具有低延遲高性能和可靠性萬億級別的容量和靈活的可擴展性,它是阿里巴巴于年開源的第三代分布式消息中間件。上篇文章消息隊列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊列的發(fā)展史:并且詳細介紹了RabbitMQ,其功能也是挺強大的,那么,為啥又要搞一個RocketMQ出來呢?是重復(fù)造輪子嗎?本文我們就帶大家來詳...
摘要:分布式高并發(fā)微服務(wù)問阿里京東螞蟻等大廠面試真題解析道跳槽漲薪必備精選面試題最新版大廠面試真題集點擊這里免費領(lǐng)取點擊這里免費領(lǐng)取 估計很多Java程序員平時主要的工作就是一些Web系統(tǒng)的業(yè)務(wù)開發(fā),對于服務(wù)端IO程序以及網(wǎng)絡(luò)通信編程做得并不多,但是對于高級或者資深程序員來說,IO通信以及服務(wù)端編...
閱讀 1856·2021-10-20 13:49
閱讀 1389·2019-08-30 15:52
閱讀 2894·2019-08-29 16:37
閱讀 1063·2019-08-29 10:55
閱讀 3103·2019-08-26 12:14
閱讀 1683·2019-08-23 17:06
閱讀 3260·2019-08-23 16:59
閱讀 2571·2019-08-23 15:42