摘要:本文這里主要來講述的三個版本的消息格式的演變,文章偏長,建議先關(guān)注后鑒定。消息格式版本號,此版本的值為。表示消息的的長度。實際消息體的長度。外層消息保存了內(nèi)層消息中最后一條消息的絕對位移,絕對位移是指相對于整個而言的。
摘要對于一個成熟的消息中間件而言,消息格式不僅關(guān)系到功能維度的擴展,還牽涉到性能維度的優(yōu)化。隨著Kafka的迅猛發(fā)展,其消息格式也在不斷的升級改進,從0.8.x版本開始到現(xiàn)在的1.1.x版本,Kafka的消息格式也經(jīng)歷了3個版本。本文這里主要來講述Kafka的三個版本的消息格式的演變,文章偏長,建議先關(guān)注后鑒定。
Kafka根據(jù)topic(主題)對消息進行分類,發(fā)布到Kafka集群的每條消息都需要指定一個topic,每個topic將被分為多個partition(分區(qū))。每個partition在存儲層面是追加log(日志)文件,任何發(fā)布到此partition的消息都會被追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為一個long型的數(shù)值,它唯一標記一條消息。
每一條消息被發(fā)送到Kafka中,其會根據(jù)一定的規(guī)則選擇被存儲到哪一個partition中。如果規(guī)則設(shè)置的合理,所有的消息可以均勻分布到不同的partition里,這樣就實現(xiàn)了水平擴展。如上圖,每個partition由其上附著的每一條消息組成,如果消息格式設(shè)計的不夠精煉,那么其功能和性能都會大打折扣。比如有冗余字段,勢必會使得partition不必要的增大,進而不僅使得存儲的開銷變大、網(wǎng)絡(luò)傳輸?shù)拈_銷變大,也會使得Kafka的性能下降;又比如缺少字段,在最初的Kafka消息版本中沒有timestamp字段,對內(nèi)部而言,其影響了日志保存、切分策略,對外部而言,其影響了消息審計、端到端延遲等功能的擴展,雖然可以在消息體內(nèi)部添加一個時間戳,但是解析變長的消息體會帶來額外的開銷,而存儲在消息體(參考下圖中的value字段)前面可以通過指針偏量獲取其值而容易解析,進而減少了開銷(可以查看v1版本),雖然相比于沒有timestamp字段的開銷會差一點。如此分析,僅在一個字段的一增一減之間就有這么多門道,那么Kafka具體是怎么做的呢?本文只針對Kafka 0.8.x版本開始做相應(yīng)說明,對于之前的版本不做陳述。
v0版本對于Kafka消息格式的第一個版本,我們把它稱之為v0,在Kafka 0.10.0版本之前都是采用的這個消息格式。注意如無特殊說明,我們只討論消息未壓縮的情形。
上左圖中的“RECORD”部分就是v0版本的消息格式,大多數(shù)人會把左圖中的整體,即包括offset和message size字段都都看成是消息,因為每個Record(v0和v1版)必定對應(yīng)一個offset和message size。每條消息都一個offset用來標志它在partition中的偏移量,這個offset是邏輯值,而非實際物理偏移值,message size表示消息的大小,這兩者的一起被稱之為日志頭部(LOG_OVERHEAD),固定為12B。LOG_OVERHEAD和RECORD一起用來描述一條消息。與消息對應(yīng)的還有消息集的概念,消息集中包含一條或者多條消息,消息集不僅是存儲于磁盤以及在網(wǎng)絡(luò)上傳輸(Produce & Fetch)的基本形式,而且是kafka中壓縮的基本單元,詳細結(jié)構(gòu)參考上右圖。
下面來具體陳述一下消息(Record)格式中的各個字段,從crc32開始算起,各個字段的解釋如下:
1. crc32(4B):crc32校驗值。校驗范圍為magic至value之間。
2. magic(1B):消息格式版本號,此版本的magic值為0。
3. attributes(1B):消息的屬性??偣舱?個字節(jié),低3位表示壓縮類型:0表示NONE、1表示GZIP、2表示SNAPPY、3表示LZ4(LZ4自Kafka 0.9.x引入),其余位保留。
4. key length(4B):表示消息的key的長度。如果為-1,則表示沒有設(shè)置key,即key=null。
5. key:可選,如果沒有key則無此字段。
6. value length(4B):實際消息體的長度。如果為-1,則表示消息為空。
7. value:消息體??梢詾榭眨热鐃omnstone消息。
v0版本中一個消息的最小長度(RECORD_OVERHEAD_V0)為crc32 + magic + attributes + key length + value length = 4B + 1B + 1B + 4B + 4B =14B,也就是說v0版本中一條消息的最小長度為14B,如果小于這個值,那么這就是一條破損的消息而不被接受。
這里我們來做一個測試,首先創(chuàng)建一個partition數(shù)和副本數(shù)都為1的topic,名稱為“msg_format_v0”,然后往msg_format_v0中發(fā)送一條key=”key”,value=”value”的消息,之后查看對應(yīng)的日志:
[root@node1 kafka_2.10-0.8.2.1]# bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/msg_format_v0-0/00000000000000000000.log
Dumping /tmp/kafka-logs-08/msg_format_v0-0/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 isvalid: true payloadsize: 5 magic: 0 compresscodec: NoCompressionCodec crc: 592888119 keysize: 3
查看消息的大小,即00000000000000000000.log文件的大小為34B,其值正好等于LOG_OVERHEAD+RECORD_OVERHEAD_V0 + 3B的key + 5B的value = 12B + 14B + 3B + 5B = 34B。
[root@node1 msg_format_v0-0]# ll *.log
-rw-r--r-- 1 root root 34 Apr 26 02:52 00000000000000000000.log
我們再發(fā)送一條key=null, value=”value”的消息,之后查看日志的大?。?/p>
[root@node3 msg_format_v0-0]# ll *.log
-rw-r--r-- 1 root root 65 Apr 26 02:56 00000000000000000000.log
日志大小為65B,減去上一條34B的消息,可以得知本條消息的大小為31B,正好等于LOG_OVERHEAD+RECORD_OVERHEAD_V0 + 5B的value = 12B + 14B+ 5B = 31B。
v1版本kafka從0.10.0版本開始到0.11.0版本之前所使用的消息格式版本為v1,其比v0版本就多了一個timestamp字段,表示消息的時間戳。v1版本的消息結(jié)構(gòu)圖如下所示:
v1版本的magic字段值為1。v1版本的attributes字段中的低3位和v0版本的一樣,還是表示壓縮類型,而第4個bit也被利用了起來:0表示timestamp類型為CreateTime,而1表示tImestamp類型為LogAppendTime,其他位保留。v1版本的最小消息(RECORD_OVERHEAD_V1)大小要比v0版本的要大8個字節(jié),即22B。如果像v0版本介紹的一樣發(fā)送一條key=”key”,value=”value”的消息,那么此條消息在v1版本中會占用42B,具體測試步驟參考v0版的相關(guān)介紹。
消息壓縮常見的壓縮算法是數(shù)據(jù)量越大壓縮效果越好,一條消息通常不會太大,這就導(dǎo)致壓縮效果并不太好。而kafka實現(xiàn)的壓縮方式是將多條消息一起進行壓縮,這樣可以保證較好的壓縮效果。而且在一般情況下,生產(chǎn)者發(fā)送的壓縮數(shù)據(jù)在kafka broker中也是保持壓縮狀態(tài)進行存儲,消費者從服務(wù)端獲取也是壓縮的消息,消費者在處理消息之前才會解壓消息,這樣保持了端到端的壓縮。
壓縮率是壓縮后的大小與壓縮前的對比。例如:把100MB的文件壓縮后是90MB,壓縮率為90/100*100%=90%,壓縮率一般是越小壓縮效果越好。一般口語化陳述時會誤描述為壓縮率越高越好,為了避免混淆,本文不引入學(xué)術(shù)上的壓縮率而引入壓縮效果,這樣容易達成共識。
講解到這里都是針對消息未壓縮的情況,而當消息壓縮時是將整個消息集進行壓縮而作為內(nèi)層消息(inner message),內(nèi)層消息整體作為外層(wrapper message)的value,其結(jié)構(gòu)圖如下所示:
壓縮后的外層消息(wrapper message)中的key為null,所以圖右部分沒有畫出key這一部分。當生產(chǎn)者創(chuàng)建壓縮消息的時候,對內(nèi)部壓縮消息設(shè)置的offset是從0開始為每個內(nèi)部消息分配offset,詳細可以參考下圖右部:
其實每個從生產(chǎn)者發(fā)出的消息集中的消息offset都是從0開始的,當然這個offset不能直接存儲在日志文件中,對offset進行轉(zhuǎn)換時在服務(wù)端進行的,客戶端不需要做這個工作。外層消息保存了內(nèi)層消息中最后一條消息的絕對位移(absolute offset),絕對位移是指相對于整個partition而言的。參考上圖,對于未壓縮的情形,圖右內(nèi)層消息最后一條的offset理應(yīng)是1030,但是被壓縮之后就變成了5,而這個1030被賦予給了外層的offset。當消費者消費這個消息集的時候,首先解壓縮整個消息集,然后找到內(nèi)層消息中最后一條消息的inner offset,然后根據(jù)如下公式找到內(nèi)層消息中最后一條消息前面的消息的absolute offset(RO表示Relative Offset,IO表示Inner Offset,而AO表示Absolute Offset):
RO = IO_of_a_message - IO_of_the_last_message AO = AO_Of_Last_Inner_Message + RO
注意這里RO是前面的消息相對于最后一條消息的IO而言的,所以其值小于等于0,0表示最后一條消息自身。
壓縮消息,英文是compress message,Kafka中還有一個compact message,常常也會被人們直譯成壓縮消息,需要注意兩者的區(qū)別。compact message是針對日志清理策略而言的(cleanup.policy=compact),是指日志壓縮(log compaction)后的消息,這個后續(xù)的系列文章中會有介紹。本文中的壓縮消息單指compress message,即采用GZIP、LZ4等壓縮工具壓縮的消息。
在講述v1版本的消息時,我們了解到v1版本比v0版的消息多了個timestamp的字段。對于壓縮的情形,外層消息的timestamp設(shè)置為:
如果timestamp類型是CreateTime,那么設(shè)置的是內(nèi)層消息中最大的時間戳(the max timestampof inner messages if CreateTime is used)。
如果timestamp類型是LogAppendTime,那么設(shè)置的是kafka服務(wù)器當前的時間戳;
內(nèi)層消息的timestamp設(shè)置為:
如果外層消息的timestamp類型是CreateTime,那么設(shè)置的是生產(chǎn)者創(chuàng)建消息時的時間戳。
如果外層消息的timestamp類型是LogAppendTime,那么所有的內(nèi)層消息的時間戳都將被忽略。
對于attributes字段而言,它的timestamp位只在外層消息(wrapper message)中設(shè)置,內(nèi)層消息(inner message)中的timestamp類型一直都是CreateTime。
v2版本kafka從0.11.0版本開始所使用的消息格式版本為v2,這個版本的消息相比于v0和v1的版本而言改動很大,同時還參考了Protocol Buffer而引入了變長整型(Varints)和ZigZag編碼。Varints是使用一個或多個字節(jié)來序列化整數(shù)的一種方法,數(shù)值越小,其所占用的字節(jié)數(shù)就越少。ZigZag編碼以一種鋸齒形(zig-zags)的方式來回穿梭于正負整數(shù)之間,以使得帶符號整數(shù)映射為無符號整數(shù),這樣可以使得絕對值較小的負數(shù)仍然享有較小的Varints編碼值,比如-1編碼為1,1編碼為2,-2編碼為3。詳細可以參考:developers.google.com/protocol-bu…。
回顧一下kafka v0和v1版本的消息格式,如果消息本身沒有key,那么key length字段為-1,int類型的需要4個字節(jié)來保存,而如果采用Varints來編碼則只需要一個字節(jié)。根據(jù)Varints的規(guī)則可以推導(dǎo)出0-63之間的數(shù)字占1個字節(jié),64-8191之間的數(shù)字占2個字節(jié),8192-1048575之間的數(shù)字占3個字節(jié)。而kafka broker的配置message.max.bytes的默認大小為1000012(Varints編碼占3個字節(jié)),如果消息格式中與長度有關(guān)的字段采用Varints的編碼的話,絕大多數(shù)情況下都會節(jié)省空間,而v2版本的消息格式也正是這樣做的。不過需要注意的是Varints并非一直會省空間,一個int32最長會占用5個字節(jié)(大于默認的4字節(jié)),一個int64最長會占用10字節(jié)(大于默認的8字節(jié))。
v2版本中消息集謂之為Record Batch,而不是先前的Message Set了,其內(nèi)部也包含了一條或者多條消息,消息的格式參見下圖中部和右部。在消息壓縮的情形下,Record Batch Header部分(參見下圖左部,從first offset到records count字段)是不被壓縮的,而被壓縮的是records字段中的所有內(nèi)容。
先來講述一下消息格式Record的關(guān)鍵字段,可以看到內(nèi)部字段大量采用了Varints,這樣Kafka可以根據(jù)具體的值來確定需要幾個字節(jié)來保存。v2版本的消息格式去掉了crc字段,另外增加了length(消息總長度)、timestamp delta(時間戳增量)、offset delta(位移增量)和headers信息,并且attributes被棄用了,筆者對此做如下分析(對于key、key length、value、value length字段和v0以及v1版本的一樣,這里不再贅述):
1. length:消息總長度。 2. attributes:棄用,但是還是在消息格式中占據(jù)1B的大小,以備未來的格式擴展。 3. timestamp delta:時間戳增量。通常一個timestamp需要占用8個字節(jié),如果像這里保存與RecordBatch的其實時間戳的差值的話可以進一步的節(jié)省占用的字節(jié)數(shù)。 4. offset delta:位移增量。保存與RecordBatch起始位移的差值,可以節(jié)省占用的字節(jié)數(shù)。 5. headers:這個字段用來支持應(yīng)用級別的擴展,而不需要像v0和v1版本一樣不得不將一些應(yīng)用級別的屬性值嵌入在消息體里面。Header的格式如上圖最有,包含key和value,一個Record里面可以包含0至多個Header。
如果對于v1版本的消息,如果用戶指定的timestamp類型是LogAppendTime而不是CreateTime,那么消息從發(fā)送端(Producer)進入broker端之后timestamp字段會被更新,那么此時消息的crc值將會被重新計算,而此值在Producer端已經(jīng)被計算過一次;再者,broker端在進行消息格式轉(zhuǎn)換時(比如v1版轉(zhuǎn)成v0版的消息格式)也會重新計算crc的值。在這些類似的情況下,消息從發(fā)送端到消費端(Consumer)之間流動時,crc的值是變動的,需要計算兩次crc的值,所以這個字段的設(shè)計在v0和v1版本中顯得比較雞肋。在v2版本中將crc的字段從Record中轉(zhuǎn)移到了RecordBatch中。
v2版本對于消息集(RecordBatch)做了徹底的修改,參考上圖左部,除了剛剛提及的crc字段,還多了如下字段:
1. first offset:表示當前RecordBatch的起始位移。 2. length:計算partition leader epoch到headers之間的長度。 3. partition leader epoch:用來確保數(shù)據(jù)可靠性。 4. magic:消息格式的版本號,對于v2版本而言,magic等于2。 5. attributes:消息屬性,注意這里占用了兩個字節(jié)。低3位表示壓縮格式,可以參考v0和v1;第4位表示時間戳類型;第5位表示此RecordBatch是否處于事務(wù)中,0表示非事務(wù),1表示事務(wù)。第6位表示是否是Control消息,0表示非Control消息,而1表示是Control消息,Control消息用來支持事務(wù)功能。 6. last offset delta:RecordBatch中最后一個Record的offset與first offset的差值。主要被broker用來確認RecordBatch中Records的組裝正確性。 7. first timestamp:RecordBatch中第一條Record的時間戳。 8. max timestamp:RecordBatch中最大的時間戳,一般情況下是指最后一個Record的時間戳,和last offset delta的作用一樣,用來確保消息組裝的正確性。 9. producer id:用來支持冪等性。 10. producer epoch:和producer id一樣,用來支持冪等性。 11. first sequence:和producer id、producer epoch一樣,用來支持冪等性。 12. records count:RecordBatch中Record的個數(shù)。
這里我們再來做一個測試,在1.0.0的kafka中創(chuàng)建一個partition數(shù)和副本數(shù)都為1的topic,名稱為“msg_format_v2”。然后同樣插入一條key=”key”,value=”value”的消息,查看日志結(jié)果如下:
[root@node1 kafka_2.12-1.0.0]# bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/msg_format_v2-0/00000000000000000000.log --print-data-log
Dumping /tmp/kafka-logs/msg_format_v2-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 0 CreateTime: 1524709879130 isvalid: true size: 76 magic: 2 compresscodec: NONE crc: 2857248333
可以看到size字段為76,我們根據(jù)上圖中的v2版本的日志格式來驗證一下,Record Batch Header部分共61B。Record部分中attributes占1B;timestamp delta值為0,占1B;offset delta值為0,占1B;key length值為3,占1B,key占3B;value length值為5,占1B,value占5B;headers count值為0,占1B, 無headers。Record部分的總長度=1B+1B+1B+1B+3B+1B+5B+1B=14B,所以Record的length字段值為14,編碼為變長整型占1B。最后推到出這條消息的占用字節(jié)數(shù)=61B+14B+1B=76B,符合測試結(jié)果。同樣再發(fā)一條key=null,value=”value”的消息的話,可以計算出這條消息占73B。
這么看上去好像v2版本的消息比之前版本的消息占用空間要大很多,的確對于單條消息而言是這樣的,如果我們連續(xù)往msg_format_v2中再發(fā)送10條value長度為6,key為null的消息,可以得到:
baseOffset: 2 lastOffset: 11 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 149 CreateTime: 1524712213771 isvalid: true size: 191 magic: 2 compresscodec: NONE crc: 820363253
本來應(yīng)該占用740B大小的空間,實際上只占用了191B,如果在v0版本中這10條消息則需要占用320B的空間,v1版本則需要占用400B的空間,這樣看來v2版本又節(jié)省了很多的空間,因為其將多個消息(Record)打包存放到單個RecordBatch中,又通過Varints編碼極大的節(jié)省了空間。
就以v1和v2版本對比而立,至于哪個消息格式占用空間大是不確定的,要根據(jù)具體情況具體分析。比如每條消息的大小為16KB,那么一個消息集中只能包含有一條消息(參數(shù)batch.size默認大小為16384),所以v1版本的消息集大小為12B+22B+16384B=16418B。而對于v2版本而言,其消息集大小為61B+11B+16384B=17086B(length值為16384+,占用3B,value length值為16384,占用大小為3B,其余數(shù)值型的字段都可以只占用1B的空間)??梢钥吹絭1版本又會比v2版本節(jié)省些許空間。
其實可以思考一下:當消息體越小,v2版本中的Record字段的占用會比v1版本的LogHeader+Record占用越小,以至于某個臨界點可以完全忽略到v2版本中Record Batch Header的61B大小的影響。就算消息體很大,v2版本的空間占用也不會比v1版本的空間占用大太多,幾十個字節(jié)內(nèi),反觀對于這種大消息體的大小而言,這幾十個字節(jié)的大小從某種程度上又可以忽略。
由此可見,v2版本的消息不僅提供了類似事務(wù)、冪等等更多的功能,還對空間占用提供了足夠的優(yōu)化,總體提升很大。也由此體現(xiàn)一個優(yōu)秀的設(shè)計是多么的重要,雖然說我們不要過度的設(shè)計和優(yōu)化,那么是否可以著眼于前來思考一下?kafka為我們做了一個很好的榜樣。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/7111.html
摘要:本文這里主要來講述的三個版本的消息格式的演變,文章偏長,建議先關(guān)注后鑒定。消息格式版本號,此版本的值為。表示消息的的長度。實際消息體的長度。外層消息保存了內(nèi)層消息中最后一條消息的絕對位移,絕對位移是指相對于整個而言的。 摘要 對于一個成熟的消息中間件而言,消息格式不僅關(guān)系到功能維度的擴展,還牽涉到性能維度的優(yōu)化。隨著Kafka的迅猛發(fā)展,其消息格式也在不斷的升級改進,從0.8.x版本開始到現(xiàn)...
摘要:本文這里主要來講述的三個版本的消息格式的演變,文章偏長,建議先關(guān)注后鑒定。消息格式版本號,此版本的值為。表示消息的的長度。實際消息體的長度。外層消息保存了內(nèi)層消息中最后一條消息的絕對位移,絕對位移是指相對于整個而言的。 摘要 對于一個成熟的消息中間件而言,消息格式不僅關(guān)系到功能維度的擴展,還牽涉到性能維度的優(yōu)化。隨著Kafka的迅猛發(fā)展,其消息格式也在不斷的升級改進,從0.8.x版本開始到現(xiàn)...
摘要:通過以上分析我們可以得出消息隊列具有很好的削峰作用的功能即通過異步處理,將短時間高并發(fā)產(chǎn)生的事務(wù)消息存儲在消息隊列中,從而削平高峰期的并發(fā)事務(wù)。 該文已加入開源項目:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識的文檔類項目,Star 數(shù)接近 16k)。地址:https://github.com/Snailclimb... 本文內(nèi)容思維導(dǎo)圖:showImg(ht...
閱讀 736·2023-04-25 19:43
閱讀 3981·2021-11-30 14:52
閱讀 3807·2021-11-30 14:52
閱讀 3871·2021-11-29 11:00
閱讀 3802·2021-11-29 11:00
閱讀 3904·2021-11-29 11:00
閱讀 3580·2021-11-29 11:00
閱讀 6184·2021-11-29 11:00