摘要:從零構建類高性能消息隊列所謂消息隊列,直觀來看有點像蓄水池,能夠在生產(chǎn)者與消費者之間完成解耦,并且平衡生產(chǎn)者與消費者之間的計算量與可計算時間之間的差異目前主流的消息隊列有著名的等等。
LocalMQ:從零構建類 RocketMQ 高性能消息隊列本文記錄了月前筆者參與阿里云中間件比賽中,實現(xiàn)的簡要具有持久化功能的消息隊列的設計與實現(xiàn)過程。需要聲明的是,LocalMQ 借鑒了 RocketMQ 在 Broker 部分的核心設計思想,最早的源碼也是基于 RocketMQ 源碼改造而來。本文涉及引用以及其他消息隊列相關資料參考這里,源代碼放于 LocalMQ 倉庫;另外筆者水平有限,后來因為畢業(yè)旅行也未繼續(xù)優(yōu)化,本文很多內容可能存在謬誤與不足,請批評指正。
所謂消息隊列,直觀來看有點像蓄水池,能夠在生產(chǎn)者與消費者之間完成解耦,并且平衡生產(chǎn)者與消費者之間的計算量與可計算時間之間的差異;目前主流的消息隊列有著名的 Kafka、RabbitMQ、RocketMQ 等等。在筆者實現(xiàn)的 LocalMQ 中,從簡到復依次實現(xiàn)了 MemoryMessageMQ、EmbeddedMessageQueue 與 LocalMessageQueue 這三個版本;需要說明的是,在三個版本的消息隊列中,都是采取所謂的拉模式,即消費者主動向消息隊列請求拉取消息的模式。在 wx.demo.* 包下提供了很多的內部功能與性能測試用例,
// 首先在這里:https://parg.co/beX 下載代碼 // 然后修改 DefaultProducer 對應的繼承類 // 測試 MemoryMessageQueue,則繼承 MemoryProducer; // 測試 EmbeddedMessageQueue,則繼承 EmbeddedProducer; // 默認測試 LocalMessageQueue,注意,需要對 DefaultPullConsumer 進行同樣修改 public class DefaultProducer extends LocalProducer // 使用 mvn 運行測試用例,也可以在 Eclipse 或者 Intellij 中打開 mvn clean package -U assembly:assembly -Dmaven.test.skip=true java -Xmx2048m -Xms2048m -cp open-messaging-wx.demo-1.0.jar wx.demo.benchmark.ProducerBenchmark
最簡單的 MemoryMessageQueue 即是將消息數(shù)據(jù)按照選定主題存放在內存中,其主要結構如下圖所示:
MemoryMessageQueue 提供了同步的消息提交與拉取操作,其利用 HashMap 堆上存儲來緩存所有的消息;并且在內存中維護了另一個所謂的 QueueOffsets 來記錄每個主題對應隊列的消費偏移量。相較于 MemoryMessageQueue 實現(xiàn)的簡單的不能進行持久化存儲的消息隊列,EmbeddedMessageQueue 則提供了稍微復雜點的支持磁盤持久化的消息隊列。EmbeddedMessageQueue 構建了基于 Java NIO 提供的 MappedByteBuffer 的 MappedPartitionQueue。每個 MappedPartitionQueue 對應磁盤上的多個物理文件,并且為上層應用抽象提供了邏輯上的單一文件。EmbeddedMessageQueue 結構如下圖所示:
EmbeddedMessageQueue 的主要流程為生產(chǎn)者同步地像 Bucket Queue 中提交消息,每個 Bucket 可以視作某個主題(Topic)或者隊列(Queue)。而 EmbeddedMessageQueue 還包含著負責定期將 MappedPartitionQueue 中數(shù)據(jù)持久化寫入到磁盤的異步線程,該線程會定期地完成 Flush 操作。EmbeddedMessageQueue 假設某個 BucketQueue 被分配給某個 Consumer 之后就被其占用,該 Consumer 會消費其中全部的緩存消息;每個 Consumer 會包含獨立地 Consumer Offset Table 來記錄當前某個隊列地消費情況。EmbeddedMessageQueue 的缺陷在于:
混合處理與標記位:EmbeddedMessageQueue 僅提供了最簡單的消息序列化模型,無法記錄額外的消息屬性;
持久化存儲到磁盤的時機:EmbeddedMessageQueue 僅使用了一級緩存,并且僅在某個 Partition 寫滿時才進行文件的持久化操作;
添加消息的后處理:EmbeddedMessageQueue 是將消息直接寫入到 BucketQueue 包含的 MappedPartitionQueue 中,無法動態(tài)地進行索引、篩選等消息后處理,其可擴展性較差。
未考慮斷續(xù)拉取的情況:EmbeddedMessageQueue 中是假設 Consumer 能夠單次處理完某個 BucketQueue 中的單個 Partition 的全部消息,因此記錄其處理值時也僅是記錄了文件級別的位移,如果存在某次是僅拉取了單個 Partition 中部分內容,則下次的起始拉取點還是下個文件首。
EmbeddedMessageQueue 中我們可以在各 Producer 線程中多帶帶將消息持久化入文件中,而在 LocalMessageQueue 中,我們是將消息統(tǒng)一寫入 MessageStore 中,然后又 PostPutMessageService 進行二次處理。 LocalMessageQueue 的結構如下所示:
LocalMessageQueue 最大的變化在于將消息統(tǒng)一存儲在獨立地 MessageStore 中(類似于 RocketMQ 中的 CommitLog),然后針對 Topic-queueId 將消息劃分到不同的 ConsumeQueue 中;這里的 queueId 是由對應的 Producer 專屬編號決定的,每個 Consumer 即會被分配占用某個 ConsumeQueue(類似于 RocketMQ 中的 consumequeue),從而保證某個 Producer 生產(chǎn)的某個主題下的消息被專一的 Consumer 消費。LocalMessageQueue 同樣使用 MappedPartitionQueue 提供底層文件系統(tǒng)抽象,并且構建了獨立的 ConsumerOffsetManager 對消費者的消費進度進行管理,從而方便異?;謴汀?/p> 設計概要 順序消費
本部分圖來源于分布式開放消息系統(tǒng)(RocketMQ)的原理與實踐
消息產(chǎn)品的一個重要特性是順序保證,也就是消息消費的順序要與發(fā)送的時間順序保持一致;在多發(fā)送端的情況下,保證全局順序代價比較大,只要求各個發(fā)送端的順序有保障即可; 舉個例子 P1 發(fā)送 M11, M12, M13,P2 發(fā)送 M21, M22, M23,在消費的時候,只要求保證 M11, M12, M13(M21,M22,M23)的順序,也就是說,實際消費順序為: M11, M21, M12, M13, M22, M23 正確; M11, M21, M22, M12, M13, M23 正確 M11, M13, M21, M22, M23, M12 錯誤,M12 與 M13 的順序顛倒了;假如生產(chǎn)者產(chǎn)生了 2 條消息:M1、M2,要保證這兩條消息的順序,最直觀的方式就是采取類似于 TCP 中的確認消息:
不過該模型中如果 M1 與 M2 分別被發(fā)送到了兩臺不同的消息服務器上,我們無法控制消息服務器發(fā)送 M1 與 M2 的先后時機;有可能 M2 已經(jīng)被發(fā)送到了消費者,M1 才被發(fā)送到了消息服務器上。針對這個問題改進版的思路即是將 M1 與 M2 發(fā)送到單一消息服務器中,然后根據(jù)先到達先消費的原則發(fā)送給對應的消費者:
不過在實際情況下往往因為網(wǎng)絡延遲或其他問題導致在 M1 發(fā)送耗時大于 M2 的情況下,M2 會先于 M1 被消費。因此如果我們要保證嚴格的順序消息,那么必須要保證生產(chǎn)者、消息服務器與消費者之間的一對一對應關系。在 LocalMQ 的實現(xiàn)中,我們首先會將消息按照生產(chǎn)者劃分到唯一的 Topic-queueId 隊列中;并且保證同一時刻該消費隊列只會被某個消費者獨占。如果某個消費者在消費完該隊列之前意外中斷,那么在保留窗口期內不會將該隊列重新分配;在窗口期之外則將該隊列分配給新的消費者,并且即使原有消費者恢復工作也無法繼續(xù)拉取該隊列中包含的消息。
數(shù)據(jù)存儲LocalMQ 中目前是實現(xiàn)了基于文件系統(tǒng)的持久化存儲,主要功能實現(xiàn)在 MappedPartition 與 MappedPartitionQueue 這兩個類中,筆者也會在下文中詳細介紹這兩個類的實現(xiàn)。本部分我們討論下數(shù)據(jù)存儲的文件格式,對于 LocalMessageQueue 而言,其文件存儲如下:
* messageStore * -- MapFile1 * -- MapFile2 * consumeQueue * -- Topic1 * ---- queueId1 * ------ MapFile1 * ------ MapFile2 * ---- queueId2 * ------ MapFile1 * ------ MapFile2 * -- Queue1 * ---- queueId1 * ------ MapFile1 * ------ MapFile2 * ---- queueId2 * ------ MapFile1 * ------ MapFile2
LocalMessageQueue 中采用了消息統(tǒng)一存儲的方案,因此所有的消息實際內容會被存放在 messageStore 目錄下。而 consumeQueue 中則存放了消息的索引,即在 messageStore 中的偏移地址。LocalMQ 中使用 MappedPartitionQueue 來管理某個邏輯上單一的文件,而根據(jù)不同的單文件大小限制會自動將其切割為多個物理上獨立的 Mapped File。每個 MappedPartition 使用 offset,即該文件首地址的全局偏移量命名;而使用 pos / position 統(tǒng)一表示單文件中局部偏移量,使用 index 表示某個文件在其文件夾中的下標。
性能優(yōu)化在編寫的過程中,筆者發(fā)現(xiàn)對于執(zhí)行流的優(yōu)化、避免重復計算與額外變量、選擇使用合適的并發(fā)策略都會對結果造成極大的影響,譬如筆者從 SpinLock 切換到重入鎖之后,本地測試 TPS 增加了約 5%。另外筆者也統(tǒng)計了消費者工作中不同階段的時間占比,其中構建(包括消息屬性的序列化)與發(fā)送操作(寫入到 MappedFileQueue 中,未使用二級緩存)都是同步進行,二者的時間占比也是最多。
[2017-06-01 12:13:21,802] INFO: 構建耗時占比:0.471270,發(fā)送耗時占比:0.428567,持久化耗時占比:0.100163 [2017-06-01 12:25:31,275] INFO: 構建耗時占比:0.275170,發(fā)送耗時占比:0.573520,持久化耗時占比:0.151309代碼級別優(yōu)化
筆者在實現(xiàn) LocalMQ 的過程中感觸最深的就是實現(xiàn)相同功能的不同代碼在性能上的差異可能會很大。在實現(xiàn)過程中應該避免冗余變量聲明與創(chuàng)建、避免額外空間申請與垃圾回收、避免冗余的執(zhí)行過程;另外盡可能選用合適的數(shù)據(jù)結構,譬如筆者在部分實現(xiàn)中從 ArrayList 遷移到了 LinkedList,從 ConcurrentHashMap 遷移到了 HashMap,都帶來了一定的評測指標提升。
異步 IO異步 IO,順序 Flush;筆者發(fā)現(xiàn),如果多個線程進行并發(fā) Flush 操作,反而不如單線程進行順序 Flush。
并發(fā)控制盡量減少鎖控制的范圍。
并發(fā)計算優(yōu)化,將所有的耗時計算放到可以并發(fā)的 Producer 中。
使用合理的鎖,重入鎖相較于自旋鎖有近 5 倍的 TPS 提升。
MemoryMessageQueue源代碼參考這里
MemoryMessageQueue 是最簡易的實現(xiàn),不過其代碼能夠反映出某個消息隊列的基本流程,首先在生產(chǎn)者我們需要創(chuàng)建消息并且發(fā)送給消息隊列:
// 創(chuàng)建消息 BytesMessage message = messageFactory.createBytesMessageToTopic(topic, body); // 發(fā)送消息 messageQueue.putMessage(topic, message);
在 putMessage 函數(shù)中則將消息存入內存存儲中:
// 存放所有消息 private Map> messageBuckets = new HashMap<>(); // 添加消息 public synchronized PutMessageResult putMessage(String bucket, Message message) { if (!messageBuckets.containsKey(bucket)) { messageBuckets.put(bucket, new ArrayList<>(1024)); } ArrayList bucketList = messageBuckets.get(bucket); bucketList.add(message); return new PutMessageResult(PutMessageStatus.PUT_OK, null); }
而 Consumer 則根據(jù)指定的 Bucket 與 queueId 來拉取消息,如果存在多個 Bucket 需要拉取則進行輪詢:
//use Round Robin int checkNum = 0; while (++checkNum <= bucketList.size()) { String bucket = bucketList.get((++lastIndex) % (bucketList.size())); Message message = messageQueue.pullMessage(queue, bucket); if (message != null) { return message; } }
而 MemoryMessageQueue 的 pullMessage 函數(shù)則首先判斷目標 Bucket 是否存在,并且根據(jù)內置的 queueOffset 中記錄的拉取偏移量來判斷是否拉取完畢。若沒有拉取完畢則返回消息并且更新本地偏移量;
private MapEmbeddedMessageQueue> queueOffsets = new HashMap<>(); ... public synchronized Message pullMessage(String queue, String bucket) { ... ArrayList bucketList = messageBuckets.get(bucket); if (bucketList == null) { return null; } HashMap offsetMap = queueOffsets.get(queue); if (offsetMap == null) { offsetMap = new HashMap<>(); queueOffsets.put(queue, offsetMap); } int offset = offsetMap.getOrDefault(bucket, 0); if (offset >= bucketList.size()) { return null; } Message message = bucketList.get(offset); offsetMap.put(bucket, ++offset); ... }
源代碼參考這里
EmbeddedMessageQueue 中引入了消息持久化支持,本部分我們也主要討論消息序列化與底層的 MappedPartitionQueue 實現(xiàn)。
消息序列化EmbeddedMessageQueue 中定義的消息格式如下:
序號 | 消息存儲結構 | 備注 | 長度(字節(jié)數(shù)) |
---|---|---|---|
1 | TOTALSIZE | 消息大小 | 4 |
2 | MAGICCODE | 消息的 MAGIC CODE | 4 |
3 | BODY | 前 4 個字節(jié)存放消息體大小值,后 bodyLength 大小的空間存儲消息體內容 | 4 + bodyLength |
4 | headers* | 前 2 個字節(jié)(short)存放頭部大小,后存放 headersLength 大小的頭部數(shù)據(jù) | 2 + headersLength |
5 | properties* | 前 2 個字節(jié)(short)存放屬性值大小,后存放 propertiesLength 大小的屬性數(shù)據(jù) | 2 + propertiesLength |
EmbeddedMessageSerializer 是繼承自 MessageSerializer 的主要負責消息持久化的類,其提供了消息長度的計算函數(shù):
/** * Description 計算某個消息的長度,注意,headersByteArray 與 propertiesByteArray 在發(fā)送消息時完成轉換 * @param message * @param headersByteArray * @param propertiesByteArray * @return */ public static int calMsgLength(DefaultBytesMessage message, byte[] headersByteArray, byte[] propertiesByteArray) { // 消息體 byte[] body = message.getBody(); int bodyLength = body == null ? 0 : body.length; // 計算頭部長度 short headersLength = (short) headersByteArray.length; // 計算屬性長度 short propertiesLength = (short) propertiesByteArray.length; // 計算消息體總長度 return calMsgLength(bodyLength, headersLength, propertiesLength); }
而 EmbeddedMessageEncoder 的 encode 函數(shù)負責具體的消息序列化操作:
/** * Description 執(zhí)行消息的編碼操作 * @param message 消息對象 * @param msgStoreItemMemory 內部緩存句柄 * @param msgLen 計算的消息長度 * @param headersByteArray 消息頭字節(jié)序列 * @param propertiesByteArray 消息屬性字節(jié)序列 */ public static final void encode( DefaultBytesMessage message, final ByteBuffer msgStoreItemMemory, int msgLen, byte[] headersByteArray, byte[] propertiesByteArray ) { // 消息體 byte[] body = message.getBody(); int bodyLength = body == null ? 0 : body.length; // 計算頭部長度 short headersLength = (short) headersByteArray.length; // 計算屬性長度 short propertiesLength = (short) propertiesByteArray.length; // 初始化存儲空間 resetByteBuffer(msgStoreItemMemory, msgLen); // 1 TOTALSIZE msgStoreItemMemory.putInt(msgLen); // 2 MAGICCODE msgStoreItemMemory.putInt(MESSAGE_MAGIC_CODE); // 3 BODY msgStoreItemMemory.putInt(bodyLength); if (bodyLength > 0) msgStoreItemMemory.put(message.getBody()); // 4 HEADERS msgStoreItemMemory.putShort((short) headersLength); if (headersLength > 0) msgStoreItemMemory.put(headersByteArray); // 5 PROPERTIES msgStoreItemMemory.putShort((short) propertiesLength); if (propertiesLength > 0) msgStoreItemMemory.put(propertiesByteArray); }
對應的反序列化操作則是由 EmbeddedMessageDecoder 完成,其主要從某個 ByteBuffer 中讀取數(shù)據(jù):
/** * Description 從輸入的 ByteBuffer 中反序列化消息對象 * * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure */ public static DefaultBytesMessage readMessageFromByteBuffer(ByteBuffer byteBuffer) { // 1 TOTAL SIZE int totalSize = byteBuffer.getInt(); // 2 MAGIC CODE int magicCode = byteBuffer.getInt(); switch (magicCode) { case MESSAGE_MAGIC_CODE: break; case BLANK_MAGIC_CODE: return null; default: // log.warning("found a illegal magic code 0x" + Integer.toHexString(magicCode)); return null; } byte[] bytesContent = new byte[totalSize]; // 3 BODY int bodyLen = byteBuffer.getInt(); byte[] body = new byte[bodyLen]; if (bodyLen > 0) { // 讀取并且校驗消息體內容 byteBuffer.get(body, 0, bodyLen); } // 4 HEADERS short headersLength = byteBuffer.getShort(); KeyValue headers = null; if (headersLength > 0) { byteBuffer.get(bytesContent, 0, headersLength); String headersStr = new String(bytesContent, 0, headersLength, EmbeddedMessageDecoder.CHARSET_UTF8); headers = string2KeyValue(headersStr); } // 5 PROPERTIES // 獲取 properties 尺寸 short propertiesLength = byteBuffer.getShort(); KeyValue properties = null; if (propertiesLength > 0) { byteBuffer.get(bytesContent, 0, propertiesLength); String propertiesStr = new String(bytesContent, 0, propertiesLength, EmbeddedMessageDecoder.CHARSET_UTF8); properties = string2KeyValue(propertiesStr); } // 返回讀取到的消息 return new DefaultBytesMessage( totalSize, headers, properties, body ); }消息寫入
EmbeddedMessageQueue 中消息的寫入實際上是由 BucketQueue 的 putMessage/putMessages 函數(shù)完成的,這里的某個 BucketQueue 就對應著 Topic-queueId 這個唯一的標識。這里以批量寫入消息為例,首先我們從 BucketQueue 包含的 MappedPartitionQueue 中獲取到最新可用的某個 MappedPartition:
mappedPartition = this.mappedPartitionQueue.getLastMappedFileOrCreate(0);
然后調用 MappedPartition 的 appendMessages 方法,該方法會在下文介紹;這里則是要討論添加消息的幾種結果對應的處理。如果添加成功,則直接返回成功;如果該 MappedPartition 剩余空間不足以寫入消息隊列中的某條消息,則需要調用 MappedPartitionQueue 創(chuàng)建新的 MappedPartition,并且重新計算待寫入的消息序列:
... // 調用對應的 MappedPartition 追加消息 // 注意,這里經(jīng)過填充之后,會逆向地將消息在 MessageStore 中的偏移與 QueueOffset 中偏移添加進去 result = mappedPartition.appendMessages(messages, this.appendMessageCallback); // 根據(jù)追加結果進行不同的操作 switch (result.getStatus()) { case PUT_OK: break; case END_OF_FILE: this.messageQueue.getFlushAndUnmapPartitionService().putPartition(mappedPartition); // 如果已經(jīng)到了文件最后,則創(chuàng)建新文件 mappedPartition = this.mappedPartitionQueue.getLastMappedFileOrCreate(0); if (null == mappedPartition) { // XXX: warn and notify me log.warning("創(chuàng)建 MappedPartition 錯誤, topic: " + messages.get(0).getTopicOrQueueName()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); } // 否則重新進行添加操作 // 從結果中獲取處理完畢的消息數(shù) int appendedMessageNum = result.getAppendedMessageNum(); // 創(chuàng)建臨時的 LeftMessages ArrayList邏輯文件存儲 Mapped PartitionleftMessages = new ArrayList<>(); // 添加所有未消費的消息 for (int i = appendedMessageNum; i < messages.size(); i++) { leftMessages.add(messages.get(i)); } result = mappedPartition.appendMessages(leftMessages, this.appendMessageCallback); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); case UNKNOWN_ERROR: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); default: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); } ...
某個 MappedPartition 映射物理上的單個文件,其初始化時如下傳入文件名與文件尺寸屬性:
/** * Description 初始化某個內存映射文件 * * @param fileName 文件名 * @param fileSize 文件尺寸 * @throws IOException 打開文件出現(xiàn)異常 */ private void init(final String fileName, final int fileSize) throws IOException { ... // 從文件名中獲取到當前文件的全局偏移量 this.fileFromOffset = Long.parseLong(this.file.getName()); ... // 嘗試打開文件 this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); // 將文件映射到內存中 this.mappedByteBuffer = this.fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, fileSize); }
初始化階段即打開文件映射,而后在寫入消息或者其他內容時,其會調用傳入的消息編碼回調(即是我們上文中介紹的消息序列化的包裹對象)將對象編碼為字節(jié)流并且寫入:
public AppendMessageResult appendMessage(final DefaultBytesMessage message, final AppendMessageCallback cb) { ... // 獲取當前的寫入位置 int currentPos = this.wrotePosition.get(); // 如果當前還是可寫的 if (currentPos < this.fileSize) { // 獲取到實際的寫入句柄 ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); // 調整當前寫入位置 byteBuffer.position(currentPos); // 記錄信息 AppendMessageResult result = null; // 調用回調函數(shù)中的實際寫入操作 result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, message); this.wrotePosition.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } ... }MappedPartitionQueue
MappedPartitionQueue 用來管理多個物理上的映射文件,其構造函數(shù)如下:
// 存放所有的映射文件 private final CopyOnWriteArrayListmappedPartitions = new CopyOnWriteArrayList (); ... /** * Description 默認構造函數(shù) * * @param storePath 傳入的存儲文件目錄,有可能傳入 MessageStore 目錄或者 ConsumeQueue 目錄 * @param mappedFileSize * @param allocateMappedPartitionService */ public MappedPartitionQueue(final String storePath, int mappedFileSize, AllocateMappedPartitionService allocateMappedPartitionService) { this.storePath = storePath; this.mappedFileSize = mappedFileSize; this.allocateMappedPartitionService = allocateMappedPartitionService; }{}
這里以 load 函數(shù)為例說明其加載過程:
/** * Description 加載內存映射文件序列 * * @return */ public boolean load() { // 讀取存儲路徑 File dir = new File(this.storePath); // 列舉目錄下所有文件 File[] files = dir.listFiles(); // 如果文件不為空,則表示有必要加載 if (files != null) { // 重排序 Arrays.sort(files); // 遍歷所有的文件 for (File file : files) { // 如果碰到某個文件尚未填滿,則返回加載完畢 if (file.length() != this.mappedFileSize) { log.warning(file + " " + file.length() + " length not matched message store config value, ignore it"); return true; } // 否則加載文件 try { // 實際讀取文件 MappedPartition mappedPartition = new MappedPartition(file.getPath(), mappedFileSize); // 設置當前文件指針到文件尾 mappedPartition.setWrotePosition(this.mappedFileSize); mappedPartition.setFlushedPosition(this.mappedFileSize); // 將文件放置到 MappedFiles 數(shù)組中 this.mappedPartitions.add(mappedPartition); // log.info("load " + file.getPath() + " OK"); } catch (IOException e) { log.warning("load file " + file + " error"); return false; } } } return true; }異步預創(chuàng)建文件
處于性能的考慮,MappedPartitionQueue 還會提前創(chuàng)建文件,在 getLastMappedFileOrCreate 函數(shù)中,當 allocateMappedPartitionService 存在的情況下則會調用該異步服務預創(chuàng)建文件:
/** * Description 根據(jù)起始偏移量查找最后一個文件 * * @param startOffset * @return */ public MappedPartition getLastMappedFileOrCreate(final long startOffset) { ... // 如果有必要創(chuàng)建文件 if (createOffset != -1) { // 獲取到下一個文件的路徑與文件名 String nextFilePath = this.storePath + File.separator + FSExtra.offset2FileName(createOffset); // 以及下下個文件的路徑與文件名 String nextNextFilePath = this.storePath + File.separator + FSExtra.offset2FileName(createOffset + this.mappedFileSize); // 指向待創(chuàng)建的映射文件句柄 MappedPartition mappedPartition = null; // 判斷是否存在創(chuàng)建映射文件的服務 if (this.allocateMappedPartitionService != null) { // 使用服務創(chuàng)建 mappedPartition = this.allocateMappedPartitionService.putRequestAndReturnMappedFile(nextFilePath, nextNextFilePath, this.mappedFileSize); // 進行預熱處理 } else { // 否則直接創(chuàng)建 try { mappedPartition = new MappedPartition(nextFilePath, this.mappedFileSize); } catch (IOException e) { log.warning("create mappedPartition exception"); } } ... return mappedPartition; } return mappedPartitionLast; }
這里的 AllocateMappedPartitionService 則會不間斷地執(zhí)行創(chuàng)建文件的請求:
@Override public void run() { ... // 循環(huán)執(zhí)行文件分配請求 while (!this.isStopped() && this.mmapOperation()) {} ... } /** * Description 循環(huán)執(zhí)行映射文件預分配 * * @Exception Only interrupted by the external thread, will return false */ private boolean mmapOperation() { ... // 執(zhí)行操作 try { // 取出最新的執(zhí)行對象 req = this.requestQueue.take(); // 取得待執(zhí)行對象在請求表中的實例 AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath()); ... // 判斷是否已經(jīng)存在創(chuàng)建好的對象 if (req.getMappedPartition() == null) { // 記錄起始創(chuàng)建時間 long beginTime = System.currentTimeMillis(); // 構建內存映射文件對象 MappedPartition mappedPartition = new MappedPartition(req.getFilePath(), req.getFileSize()); ... // 進行文件預熱,僅預熱 MessageStore if (mappedPartition.getFileSize() >= mapedFileSizeCommitLog && isWarmMappedFileEnable) { mappedPartition.warmMappedFile(); } // 將創(chuàng)建好的對象回寫到請求中 req.setMappedPartition(mappedPartition); // 異常設置為 false this.hasException = false; // 成功設置為 true isSuccess = true; } ... }異步 Flush
EmbeddedMessageQueue 中還包含了某個 flushAndUnmapPartitionServices 用于異步 Flush 文件并且完成不用映射文件的關閉操作。該服務的核心代碼如下:
private final ConcurrentLinkedQueuemappedPartitions = new ConcurrentLinkedQueue<>(); ... @Override public void run() { while (!this.isStopped()) { int interval = 100; try { if (this.mappedPartitions.size() > 0) { long startTime = now(); // 取出待處理的 MappedPartition MappedPartition mappedPartition = this.mappedPartitions.poll(); // 將當前內容寫入到磁盤 mappedPartition.flush(0); // 釋放當前不需要使用的空間 mappedPartition.cleanup(); long past = now() - startTime; // EmbeddedProducer.flushEclipseTime.addAndGet(past); if (past > 500) { log.info("Flush data to disk and unmap MappedPartition costs " + past + " ms:" + mappedPartition.getFileName()); } } else { // 定時進行 Flush 操作 this.waitForRunning(interval); } } catch (Throwable e) { log.warning(this.getServiceName() + " service has exception. "); } } }
這里的 mappedPartitions 即是在上文介紹的當添加消息且返回為 END_OF_FILE 時候添加進來的。
LocalMessageQueue消息存儲源代碼參考這里
LocalMessageQueue 中采用了中心化的消息存儲方案,其提供的 putMessage / putMessages 函數(shù)實際上會調用內置 MessageStore 對象的消息寫入函數(shù):
// 使用 MessageStore 進行提交 PutMessageResult result = this.messageStore.putMessage(message);
而 MessageStore 即是存放所有真實消息的中心存儲,LocalMessageQueue 中支持更為復雜的消息屬性:
序號 | 消息存儲結構 | 備注 | 長度(字節(jié)數(shù)) |
---|---|---|---|
1 | TOTALSIZE | 消息大小 | 4 |
2 | MAGICCODE | 消息的 MAGIC CODE | 4 |
3 | BODYCRC | 消息體 BODY CRC,用于重啟時校驗 | 4 |
4 | QUEUEID | 隊列編號,queueID | 4 |
5 | QUEUEOFFSET | 自增值,不是真正的 consume queue 的偏移量,可以代表這個隊列中消息的個數(shù),要通過這個值查找到 consume queue 中數(shù)據(jù),QUEUEOFFSET * 12 才是偏移地址 | 8 |
6 | PHYSICALOFFSET | 消息在 commitLog 中的物理起始地址偏移量 | 8 |
7 | STORETIMESTAMP | 存儲時間戳 | 8 |
8 | BODY | 前 4 個字節(jié)存放消息體大小值,后 bodyLength 大小的空間存儲消息體內容 | 4 + bodyLength |
9 | TOPICORQUEUENAME | 前 1 個字節(jié)存放 Topic 大小,后存放 topicOrQueueNameLength 大小的主題名 | 1 + topicOrQueueNameLength |
10 | headers* | 前 2 個字節(jié)(short)存放頭部大小,后存放 headersLength 大小的頭部數(shù)據(jù) | 2 + headersLength |
11 | properties* | 前 2 個字節(jié)(short)存放屬性值大小,后存放 propertiesLength 大小的屬性數(shù)據(jù) | 2 + propertiesLength |
其構造函數(shù)中初始化創(chuàng)建的 MappedPartitionQueue 是按照固定大?。J單文件 1G)的映射文件組:
// 構造映射文件類 this.mappedPartitionQueue = new MappedPartitionQueue( ((LocalMessageQueueConfig) this.messageStore.getMessageQueueConfig()).getStorePathCommitLog(), mapedFileSizeCommitLog, messageStore.getAllocateMappedPartitionService(), this.flushMessageStoreService );構建 ConsumeQueue
不同于 EmbeddedMessageQueue,LocalMessageQueue 并沒有在初次提交消息時就直接寫入按照 Topic-queueId 劃分的存儲內;而是依賴于內置的 PostPutMessageService :
/** * Description 執(zhí)行消息后操作 */ private void doReput() { for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { ... // 讀取當前的消息 SelectMappedBufferResult result = this.messageStore.getMessageStore().getData(reputFromOffset); // 如果消息不存在,則停止當前操作 if (result == null) { doNext = false; continue; } try { // 獲取當前消息的起始位置 this.reputFromOffset = result.getStartOffset(); // 順序讀取所有消息 for (int readSize = 0; readSize < result.getSize() && doNext; ) { // 讀取當前位置的消息 PostPutMessageRequest postPutMessageRequest = checkMessageAndReturnSize(result.getByteBuffer()); int size = postPutMessageRequest.getMsgSize(); readSpendTime.addAndGet(now() - startTime); startTime = now(); // 如果處理成功 if (postPutMessageRequest.isSuccess()) { if (size > 0) { // 執(zhí)行消息寫入到 ConsumeQueue 的操作 this.messageStore.putMessagePositionInfo(postPutMessageRequest); // 修正當前讀取的位置 this.reputFromOffset += size; readSize += size; } else if (size == 0) { this.reputFromOffset = this.messageStore.getMessageStore().rollNextFile(this.reputFromOffset); readSize = result.getSize(); } putSpendTime.addAndGet(now() - startTime); } else if (!postPutMessageRequest.isSuccess()) { ... } } } finally { result.release(); } } }
而在 putMessagePositionInfo 函數(shù)中即進行實際的 ConsumeQueue 創(chuàng)建:
/** * Description 將消息的位置放置到 ConsumeQueue 中 * * @param postPutMessageRequest */ public void putMessagePositionInfo(PostPutMessageRequest postPutMessageRequest) { // 尋找或者創(chuàng)建 ConsumeQueue ConsumeQueue cq = this.findConsumeQueue(postPutMessageRequest.getTopic(), postPutMessageRequest.getQueueId()); // 將消息放置到 ConsumeQueue 中合適的位置 cq.putMessagePositionInfoWrapper(postPutMessageRequest.getCommitLogOffset(), postPutMessageRequest.getMsgSize(), postPutMessageRequest.getConsumeQueueOffset()); } /** * Description 根據(jù)主題與 QueueId 查找 ConsumeQueue,如果不存在則創(chuàng)建 * * @param topic * @param queueId * @return */ public ConsumeQueue findConsumeQueue(String topic, int queueId) { ConcurrentHashMapmap = consumeQueueTable.get(topic); ... // 判斷該主題下是否存在 queueId,不存在則創(chuàng)建 ConsumeQueue logic = map.get(queueId); // 如果獲取為空,則創(chuàng)建新的 ConsumeQueue if (null == logic) { ConsumeQueue newLogic = new ConsumeQueue(// topic, // 主題 queueId, // queueId LocalMessageQueueConfig.mapedFileSizeConsumeQueue, // 映射文件尺寸 this); ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); ... } return logic; }
而在 ConsumeQueue 的構造函數(shù)中完成實際的文件映射與讀取:
/** * Description 主要構造函數(shù) * * @param topic * @param queueId * @param mappedFileSize * @param localMessageStore */ public ConsumeQueue( final String topic, final int queueId, final int mappedFileSize, final LocalMessageQueue localMessageStore) { ... // 當前隊列的路徑 String queueDir = this.storePath + File.separator + topic + File.separator + queueId; // 初始化內存映射隊列 this.mappedPartitionQueue = new MappedPartitionQueue(queueDir, mappedFileSize, null); this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE); }
ConsumeQueue 的文件格式則相對簡單:
// ConsumeQueue 文件內存放的單條 Message 尺寸 // 1 | MessageStore Offset | int 8 Byte // 2 | Size | short 8 Byte消息拉取
在 LocalPullConsumer 拉取消息時,設置的批量拉取機制;即一次性從 LocalMessageQueue 拉取多條消息到本地,然后再批次返回給本地進行處理(假設處理也有一定耗時)。在批次拉取的函數(shù)中,我們首先需要獲取當前 Consumer 處理的主題與隊列編號對應的 ConsumeQueue 是否包含數(shù)據(jù),然后再申請具體的讀取句柄并且占用該隊列:
/** * Description 批量抓取消息,注意,這里只進行預抓取,僅當消費者真正獲取后才會修正讀取偏移量 */ private void batchPoll() { // 如果是 LocalMessageQueue // 執(zhí)行預抓取 LocalMessageQueue localMessageStore = (LocalMessageQueue) this.messageQueue; // 獲取當前待抓取的桶名 String bucket = bucketList.get((lastIndex) % (bucketList.size())); // 首先獲取待抓取的隊列和偏移 long offsetInQueue = localMessageStore.getConsumerScheduler().queryOffsetAndLock("127.0.0.1:" + this.refId, bucket, this.getQueueId()); // 如果當前待抓取的 queueId 已經(jīng)被占用,則直接切換到下一個主題 if (offsetInQueue == -2) { // 將當前主題設置為 true this.isFinishedTable.put(bucket, true); // 重置當前的 LastIndex 或者 RefOffset,即 queueId this.resetLastIndexOrRefOffsetWhenNotFound(); } else { // 獲取到了有效的隊列偏移量之后,開始嘗試獲取消息 consumerOffsetTable.put(bucket, new AtomicLong(offsetInQueue)); // 設置每次最多抓一個文件內包含的消息數(shù),等價于變相的一次性讀完,注意,這里的數(shù)目還受到單個文件尺寸的限制 GetMessageResult getMessageResult = localMessageStore.getMessage(bucket, this.getQueueId(), this.consumerOffsetTable.get(bucket).get() + 1, mapedFileSizeConsumeQueue / ConsumeQueue.CQ_STORE_UNIT_SIZE); // 如果沒有找到數(shù)據(jù),則切換到下一個 if (getMessageResult.getStatus() != GetMessageStatus.FOUND) { // 將當前主題設置為 true this.isFinishedTable.put(bucket, true); this.resetLastIndexOrRefOffsetWhenNotFound(); } else { // 這里不考慮 Consumer 被惡意干掉的情況,因此直接更新遠端的 Offset 值 localMessageStore.getConsumerScheduler().updateOffset("127.0.0.1:" + this.refId, bucket, this.getQueueId(), consumerOffsetTable.get(bucket).addAndGet(getMessageResult.getMessageCount())); // 首先從文件系統(tǒng)中一次性讀出所有的消息 ArrayList消費者調度messages = readMessagesFromGetMessageResult(getMessageResult); // 將消息添加到隊列中 this.messages.addAll(messages); // 本次抓取成功后才開始抓取下一個 lastIndex++; } } }
ConsumerScheduler 為我們提供了核心的消費者調度功能,其內置的 ConsumerOffsetManager 包含了兩個核心存儲:
// 存放映射到內存中 private ConcurrentHashMap> offsetTable = new ConcurrentHashMap >(512); // 存放某個 Topic 下面的某個 Queue 被某個 Consumer 占用的信息 private ConcurrentHashMap > queueIdOccupiedByConsumerTable = new ConcurrentHashMap >(512);
分別對應了某個 ConsumeQueue 被消費的進度和被消費者的占用信息。同時 ConsumerOffsetManager 還提供了基于 JSON 格式的持久化功能,并且通過 ConsumerScheduler 中的定期服務 scheduledExecutorService 進行自動定期持久化。在消息提交階段,LocalMessageQueue 會自動調用 updateOffset 函數(shù)更初始化某個 ConsumeQueue 的偏移情況(在恢復時也會使用):
public void updateOffset(final String topic, final int queueId, final long offset) { this.consumerOffsetManager.commitOffset("Broker Inner", topic, queueId, offset); }
而某個 Consumer 在初次拉取時,會調用 queryOffsetAndLock 函數(shù)來查詢某個 ConsumeQueue 的可拉取情況:
/** * Description 修正某個 ConsumerOffset 隊列中的值 * * @param topic * @param queueId * @return */ public long queryOffsetAndLock(final String clientHostAndPort, final String topic, final int queueId) { String key = topic; // 首先判斷該 Topic-queueId 是否被占用 if (this.queueIdOccupiedByConsumerTable.containsKey(topic)) { ... } // 如果沒有被占用,則此時宣告占用 ConcurrentHashMapconsumerQueueIdMap = this.queueIdOccupiedByConsumerTable.get(key); ... // 真實進行查找操作 ConcurrentHashMap map = this.offsetTable.get(key); if (null != map) { Long offset = map.get(queueId); if (offset != null) return offset; } // 默認返回值為 -1 return -1; }
并且在拉取完畢后調用 updateOffset 函數(shù)來更新拉取進度。
消息讀取在某個 Consumer 通過 ConsumerManager 獲取可用的拉取偏移量之后,即從 LocalMessageQueue 中進行真實地消息讀取操作:
/** * Description Consumer 從存儲中讀取數(shù)據(jù)的接口 * * @param topic * @param queueId * @param offset 下一個開始抓取的起始下標 * @param maxMsgNums * @return */ public GetMessageResult getMessage(final String topic, final int queueId, final long offset, final int maxMsgNums) { ... // 根據(jù) Topic 與 queueId 構建消費者隊列 ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); // 保證當前 ConsumeQueue 存在 if (consumeQueue != null) { // 獲取當前 ConsumeQueue 中包含的最小的消息在 MessageStore 中的位移 minOffset = consumeQueue.getMinOffsetInQueue(); // 注意,最大的位移地址即是不可達地址,是當前所有消息的下一個消息的下標 maxOffset = consumeQueue.getMaxOffsetInQueue(); // 如果 maxOffset 為零,則表示沒有可用消息 if (maxOffset == 0) { status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; nextBeginOffset = 0; } else if (offset < minOffset) { status = GetMessageStatus.OFFSET_TOO_SMALL; nextBeginOffset = minOffset; } else if (offset == maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_ONE; nextBeginOffset = offset; } else if (offset > maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_BADLY; if (0 == minOffset) { nextBeginOffset = minOffset; } else { nextBeginOffset = maxOffset; } } else { // 根據(jù)偏移量獲取當前 ConsumeQueue 的緩存 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); if (bufferConsumeQueue != null) { try { status = GetMessageStatus.NO_MATCHED_MESSAGE; long nextPhyFileStartOffset = Long.MIN_VALUE; long maxPhyOffsetPulling = 0; int i = 0; // 設置每次獲取的最大消息數(shù) final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); // 遍歷所有的 Consume Queue 中的消息指針 for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); maxPhyOffsetPulling = offsetPy; if (nextPhyFileStartOffset != Long.MIN_VALUE) { if (offsetPy < nextPhyFileStartOffset) continue; } boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy); if (isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) { break; } // 從 MessageStore 中獲取消息 SelectMappedBufferResult selectResult = this.messageStore.getMessage(offsetPy, sizePy); // 如果沒有獲取到數(shù)據(jù),則切換到下一個文件繼續(xù) if (null == selectResult) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.MESSAGE_WAS_REMOVING; } nextPhyFileStartOffset = this.messageStore.rollNextFile(offsetPy); continue; } // 如果獲取到了,則返回結果 getResult.addMessage(selectResult); status = GetMessageStatus.FOUND; nextPhyFileStartOffset = Long.MIN_VALUE; } nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long diff = maxOffsetPy - maxPhyOffsetPulling; // 獲取當前內存情況 long memory = (long) (getTotalPhysicalMemorySize() * (LocalMessageQueueConfig.accessMessageInMemoryMaxRatio / 100.0)); getResult.setSuggestPullingFromSlave(diff > memory); } finally { bufferConsumeQueue.release(); } } else { status = GetMessageStatus.OFFSET_FOUND_NULL; nextBeginOffset = consumeQueue.rollNextFile(offset); log.warning("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: " + maxOffset + ", but access logic queue failed."); } } } else { ... } ... }
注意,這里返回的其實只是消息在 MessageStore 中的存放地址,真實地消息讀取還需要通過 readMessagesFromGetMessageResult 函數(shù):
/** * Description 從 GetMessageResult 中抓取全部的消息 * * @param getMessageResult * @return */ public static ArrayList后記readMessagesFromGetMessageResult(final GetMessageResult getMessageResult) { ArrayList messages = new ArrayList<>(); try { List messageBufferList = getMessageResult.getMessageBufferList(); for (ByteBuffer bb : messageBufferList) { messages.add(readMessageFromByteBuffer(bb)); } } finally { getMessageResult.release(); } // 獲取字節(jié)數(shù)組 return messages; } /** * Description 從輸入的 ByteBuffer 中反序列化消息對象 * * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure */ public static DefaultBytesMessage readMessageFromByteBuffer(java.nio.ByteBuffer byteBuffer) { // 1 TOTAL SIZE int totalSize = byteBuffer.getInt(); // 2 MAGIC CODE int magicCode = byteBuffer.getInt(); switch (magicCode) { case MESSAGE_MAGIC_CODE: break; case BLANK_MAGIC_CODE: return null; default: log.warning("found a illegal magic code 0x" + Integer.toHexString(magicCode)); return null; } byte[] bytesContent = new byte[totalSize]; ... }
端午前后即已停止代碼編寫,原以為周把時間可以完成文檔編寫;可惜畢業(yè)旅行和畢業(yè)聚會一直拖到了七月,最后也是匆匆寫完,也是我個人拖延癌晚期,不由感慨啊。
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://systransis.cn/yun/67284.html
摘要:它是阿里巴巴于年開源的第三代分布式消息中間件。是一個分布式消息中間件,具有低延遲高性能和可靠性萬億級別的容量和靈活的可擴展性,它是阿里巴巴于年開源的第三代分布式消息中間件。上篇文章消息隊列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊列的發(fā)展史:并且詳細介紹了RabbitMQ,其功能也是挺強大的,那么,為啥又要搞一個RocketMQ出來呢?是重復造輪子嗎?本文我們就帶大家來詳...
摘要:和之間的關系通過來綁定,來定義,即相同的,等于表示節(jié)點,非表示節(jié)點。所有的節(jié)點與集群的所有節(jié)點保持長連接,定時注冊信息到所有的。對磁盤的訪問串行化,避免磁盤竟爭,不會因為隊列增加導致增高。要保證與完全的一致,增加了編程的復雜度。 Apache RocketMQ?是一個開源的分布式消息和流數(shù)據(jù)平臺。 1、既然是消息系統(tǒng),最核心的功能就是要提供消息的發(fā)布與訂閱功能,最簡單的概念模型如下: ...
閱讀 2688·2023-04-25 20:19
閱讀 1954·2021-11-24 09:38
閱讀 1639·2021-11-16 11:44
閱讀 4378·2021-09-02 15:40
閱讀 1361·2019-08-30 15:55
閱讀 2030·2019-08-30 15:52
閱讀 3770·2019-08-29 17:20
閱讀 2283·2019-08-29 13:48