成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

RocketMQ架構原理解析(二):消息存儲

番茄西紅柿 / 3348人閱讀

摘要:此處補充說明下,不論是還是都不提供指定區(qū)間的刷盤策略,只提供一個方法,所以無法精確控制落盤數(shù)據(jù)的大小。

一、概述

由前文可知,RocketMQ有幾個非常重要的概念:

  • broker 服務端,負責存儲、收發(fā)消息
  • producer 客戶端1,負責產生消息
  • consumer 客服端2,負責消費消息

既然是消息隊列,那消息的存儲的重要程度不言而喻,本節(jié)我們聚焦broker服務端,看下消息在broker端是如何存儲的,它的落盤策略是怎樣的,又是如何保證高效

另:后文的RocketMQ都是基于版本4.9.3

二、寫入流程

RocketMQ的普通單消息寫入流程如下
消息寫入流程

簡單可以分為三大塊:

  • 寫入前準備
  • 加鎖后消息寫入
  • 消息落盤及集群同步

2.1 準備

其實消息的寫入準備工作也比較好理解,主要是消息狀態(tài)的檢查以及各類存儲狀態(tài)的檢查,可以參看上圖中的流程

根據(jù)上圖,在準備階段前,RocketMQ會判斷操作系統(tǒng)的Page Cache是否繁忙,他是怎么做到的呢?其實Java本身沒有提供接口或函數(shù)來查看Page Cache的狀態(tài),但如果磁盤帶寬已經打滿,在Page Cache要將數(shù)據(jù)刷disk時,很有可能便陷入了阻塞,導致Page Cache資源緊張。而當我們的程序又有新的消息要寫入Page Cache時,反向阻塞寫入請求,我們說這時Page Cache就產生了回壓,也就是Page Cache相當繁忙,請求已經不能及時處理了。RocketMQ判斷Page Cache是否繁忙的條件也很簡單,就是監(jiān)控某個請求加鎖后,寫入是否超過1秒,如果超時的話,新的請求會快速失敗

2.2 消息協(xié)議

RocketMQ有一套相對復雜的消息協(xié)議編碼,大部分協(xié)議中的內容都是在加鎖前拼接生成
rmq消息存儲格式

大部分消息協(xié)議項都是定長字段,變長字段如下:

  • 1、born inet 產生消息的producer的IP信息 ipv4占用4byte,ipv6占16byte
  • 2、broker inet 接收消息的broker的IP信息 ipv4占用4byte,ipv6占16byte
  • 3、msg content 消息內容 變長字段(1-21億)byte
  • 4、topic content 消息內容 變長字段(1-127)byte
  • 5、properties content 屬性內容 變長字段(0-32767)byte

2.3 加鎖

此處rmq提供了2種加鎖方式

  • 1、基于AQS的ReentrantLock (默認方式)
  • 2、基于CAS的自旋鎖,加鎖不成功的話,會無限重試

無論采用哪種策略,都是獨占鎖,即同一時刻只允許一個線程加鎖成功。具體采用哪種方式,可通過配置修改。

兩種加鎖適用不同的場景,方式1在高并發(fā)場景下,能保持平穩(wěn)的系統(tǒng)性能,但在低并發(fā)下表現(xiàn)一般;而方式二正好相反,在高并發(fā)場景下,因為采用自旋,會浪費大量的cpu,但在低并發(fā)時,卻可以獲得很高的性能。

所以官方文檔中,為了提高性能,建議用戶在同步刷盤的時候采用獨占鎖,異步刷盤的時候采用自旋鎖。這個是根據(jù)加鎖時間長短決定的

2.4 鎖內操作

上文提到,寫入消息的鎖是獨占鎖,也就意味著同一時刻,只能有一個線程進入,我們看一下鎖內都做了哪些操作

  • 1、拿到或創(chuàng)建文件操作對象MappedFile
    • 此處涉及點較多,我們在文件寫入大節(jié)詳細展開
  • 2、二次整理要落盤的消息格式
    • 之前已經整理過消息協(xié)議了,為什么此處還要進行二次整理?因為之前一些消息協(xié)議在沒有加鎖的時候,還無法確定。主要是以下三項內容:
      • a、queueOffset 隊列偏移量,此值需要最終返回,且需要保證嚴格遞增,所以需要在鎖內進行
      • b、physicalOffset 物理偏移量,也就是全局文件的位置,注:此位置是全局文件的偏移量,不是當前文件的偏移量,所以其值可能會大于1G
      • c、storeTimestamp 存儲時間戳,此處在鎖內進行,主要是為了保證消息投遞的時間嚴格保序
  • 3、記錄寫入信息
    • 記錄當前文件寫入情況:比如已寫入字節(jié)數(shù)、存儲時間等

三、文件開辟及寫入

3.1 文件開辟

文件的開辟是異步進行,有獨立的線程專門負責開辟文件。我們可以先看下文件開辟的簡單模型
異步創(chuàng)建MappFile

也就是putMsg的線程會將開辟文件的請求委托給allocate file線程,然后進入阻塞,待allocate file線程將文件開辟完畢后,再喚醒putMsg線程

那此處我們便產生了2點疑問:

  • 1、putMsg把開辟文件的請求交給了allocate file線程,直到allocate file線程開辟完畢后才會喚醒putMsg線程,其實并沒有起到異步開辟節(jié)省時間的目的,直接在putMsg線程中開辟文件不好嗎?
  • 2、創(chuàng)建文件本身感覺并不耗時,不管是拿到文件的FileChannnel還是MappedByteBuffer,都是一件很快的操作,費盡周章的異步開辟真的有必要嗎?

這兩個疑問將逐步說明

3.1.1 開啟堆外緩沖池

至此我們要引入一個非常重要的配置變量transientStorePoolEnable,該配置項只在異步刷盤(FlushDiskType == AsyncFlush)的場景下,才會生效

如果配置項中,將transientStorePoolEnable置為false,便稱為“開啟堆外緩沖池”。那么這個變量到底起到什么作用呢?

transientStorePoolEnable類型創(chuàng)建MappFile

系統(tǒng)啟動時,會默認開辟5個(參數(shù)transientStorePoolSize控制)堆外內存DirectByteBuffer,循環(huán)利用。寫消息時,消息都暫存至此,通過線程CommitRealTimeService將數(shù)據(jù)定時刷到page cache,當數(shù)據(jù)flush到disk后,再將DirectByteBuffer歸還給緩沖池

而開辟過程是在broker啟動時進行的;如上圖所示,空間一旦開辟完畢后,文件都是預先創(chuàng)建好的,使用時直接返回文件引用即可,相當高效。但首次啟動需要大量開辟堆外內存空間,會拉長broker的啟動時長。我們看一下這塊開辟的源碼

/** * Its a heavy init method. */public void init() {    for (int i = 0; i < poolSize; i++) {        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);        ......        availableBuffers.offer(byteBuffer);    }}

注釋中也標識了這是個重量級的方法,主要耗時點在ByteBuffer.allocateDirect(fileSize),其實開辟內存并不耗時,耗時集中在為內存區(qū)域賦0操作,以下是JDK中DirectByteBuffer源碼:

DirectByteBuffer(int cap) {                   // package-private    super(-1, 0, cap, cap);    ......    long base = 0;    try {        base = unsafe.allocateMemory(size);    } catch (OutOfMemoryError x) {        Bits.unreserveMemory(size, cap);        throw x;    }    unsafe.setMemory(base, size, (byte) 0);    ......}

我們發(fā)現(xiàn)在開辟完內存后,開始執(zhí)行了賦0操作unsafe.setMemory(base, size, 0)。其實可以利用反射巧妙地繞過這個耗時點

private static Field addr;private static Field capacity;static {    try {        addr = Buffer.class.getDeclaredField("address");        addr.setAccessible(true);        capacity = Buffer.class.getDeclaredField("capacity");        capacity.setAccessible(true);    } catch (NoSuchFieldException e) {        e.printStackTrace();    }}public static ByteBuffer newFastByteBuffer(int cap) {    long address = unsafe.allocateMemory(cap);    ByteBuffer bb = ByteBuffer.allocateDirect(0).order(ByteOrder.nativeOrder());    try {        addr.setLong(bb, address);        capacity.setInt(bb, cap);    } catch (IllegalAccessException e) {        return null;    }    bb.clear();    return bb;}

3.1.2 關閉堆外緩沖池

關閉堆外內存池的話,就會啟動MappedByteBuffer

常規(guī)類型創(chuàng)建MappFile

  • a、首次啟動
    • 第一次啟動的時候,allocate線程會先后創(chuàng)建2個文件,第一個文件創(chuàng)建完畢后,便會返回putMsg線程并喚醒它,然后allocate線程進而繼續(xù)異步創(chuàng)建下一個文件
  • b、后續(xù)啟動
    • 后續(xù)請求allocate線程都會將已經創(chuàng)建好的文件直接返回給putMsg線程,然后繼續(xù)異步創(chuàng)建下一個文件,這樣便真正實現(xiàn)了異步創(chuàng)建文件的效果

3.1.3 文件預熱

我們再回顧一下本章剛開始提出的2個疑問:

  • 1、putMsg把開辟文件的請求交給了allocate file線程,直到allocate file線程開辟完畢后才會喚醒putMsg線程,其實并沒有起到異步開辟節(jié)省時間的目的,直接在putMsg線程中開辟文件不好嗎?
  • 2、創(chuàng)建文件本身感覺并不耗時,不管是拿到文件的FileChannnel還是MappedByteBuffer,都是一件很快的操作,費盡周章的異步開辟真的有必要嗎?

第一個問題已經迎刃而解,即allocate線程通過異步創(chuàng)建下一個文件的方式,實現(xiàn)真正異步

本節(jié)討論的便是第二個問題,其實如果只是單純創(chuàng)建文件的話,的確是非??斓模恢劣谠偈褂卯惒讲僮?。但RocketMQ對于新建文件有個文件預熱(通過配置warmMapedFileEnable啟停)功能,當然目的是為了磁盤提速,我么先看下源碼

org.apache.rocketmq.store.MappedFile#warmMappedFile

for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {    byteBuffer.put(i, (byte) 0);    // force flush when flush disk type is sync    if (type == FlushDiskType.SYNC_FLUSH) {        if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {            flush = i;            mappedByteBuffer.force();        }    }}

簡單來說,就是將MappedByteBuffer每隔4K就寫入一個0 byte,然后將整個文件撐滿;如果刷盤策略是同步刷盤的話,還需要調用mappedByteBuffer.force(),當然這個操作是相當相當耗時的,所以也就需要我們進行異步處理。這樣也就解釋了第二個問題

但文件預熱真的有效嗎?我們不妨做個簡單的基準測試

public class FileWriteCompare {    private static String filePath = "/Users/likangning/test/index3.data";    private static int fileSize = 1024 * 1024 * 1024;    private static boolean warmFile = true;    private static int batchSize = 4096;    @Test    public void test() throws Exception {        File file = new File(filePath);        if (file.exists()) {            file.delete();        }        file.createNewFile();        FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.WRITE, StandardOpenOption.READ);        MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, fileSize);        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(batchSize);        long beginTime = System.currentTimeMillis();        mappedByteBuffer.position(0);        while (mappedByteBuffer.remaining() >= batchSize) {            byteBuffer.position(batchSize);            byteBuffer.flip();            mappedByteBuffer.put(byteBuffer);        }        System.out.println("time cost is : " + (System.currentTimeMillis() - beginTime));    }}

簡單來說就是通過MappedByteBuffer寫入1G文件,在我本地電腦上,平均耗時在 550ms 左右

然后在MappedByteBuffer寫文件前加入預熱操作

private void warmFile(MappedByteBuffer mappedByteBuffer) {    if (!warmFile) {        return;    }    int pageSize = 4096;    long begin = System.currentTimeMillis();    for (int i = 0, j = 0; i < fileSize; i += pageSize, j++) {        mappedByteBuffer.put(i, (byte) 0);    }    System.out.println("warm file time cost " + (System.currentTimeMillis() - begin));}

耗時情況如下:

warm file time cost 492time cost is : 125

預熱后,寫文件的耗時縮短了很多,但預熱本身的耗時也幾乎等同于文件寫入的耗時了

以上是沒有強制刷盤的測試效果,如果強制刷盤(#force)的話,個人經驗是文件預熱一定會帶來性能的提升。從前兩天結束的第二屆中間件性能挑戰(zhàn)賽來看,文件預熱至少帶來10%以上的提升。但是同非強制刷盤一樣,文件預熱操作實在是太重了

整體來看,文件預熱后的寫入操作,確實能帶來性能上的提升,但是如果在系統(tǒng)壓力較大、磁盤吞吐緊張的場景下,勢必導致broker抖動,甚至請求超時,反而得不償失。明白了此層概念后,再通過大量benchmark來決定是否開啟此配置,做到有的放矢

3.2 文件寫入

經過以上整理分析后,文件寫入將變得非常輕;不論是DirectByteBuffer還是MappedByteBuffer都可以抽象為ByteBuffer,進而直接調用ByteBuffer.write()

四、刷盤策略

4.1 異步刷盤

異步刷盤策略

4.1.1 異步+關閉寫緩沖

對應如下配置

FlushDiskType == AsyncFlush && transientStorePoolEnable == false

異步刷盤,且關閉緩沖池,對應的異步刷盤線程是FlushRealTimeService

上文可知,次策略是通過MappedByteBuffer寫入的數(shù)據(jù),所以此時數(shù)據(jù)已經在 page cache 中了

我們總結一下刷盤的策略:

  • 1、固定頻率刷盤

不響應中斷,固定500ms(可配置)刷盤,但刷盤的時候,如果發(fā)現(xiàn)未落盤數(shù)據(jù)不足16K(可配置),那么將進入下一個循環(huán),如果滿16K的話,會將所有未落盤的數(shù)據(jù)落盤。此處補充說明下,不論是FileChannel還是MappedByteBuffer都不提供指定區(qū)間的刷盤策略,只提供一個force()方法,所以無法精確控制落盤數(shù)據(jù)的大小。

如果數(shù)據(jù)寫入量很少,一直沒有填充滿16K,就不會落盤了嗎?不是的,此處兜底的方案是,線程發(fā)現(xiàn)距離上次無條件全量刷盤已經超過10000ms(可配置),那么此時就會無條件觸發(fā)全量刷盤

  • 2、非固定頻率刷盤

與「固定頻率刷盤」比較相似,唯一不同點是,當前刷盤策略是響應中斷的,即每次有新的消息到來的時候,都會發(fā)送喚醒信號,如果刷盤線程正好處在500ms等待期間的話,將被喚醒。但此處的喚醒并非嚴謹?shù)膯拘?,有可能發(fā)送了喚醒信號,但刷盤線程并未成功響應,兜底方案便是500ms的重試。下面簡單黏貼一下等待、喚醒的代碼,不再贅述

org.apache.rocketmq.common.ServiceThread

// 喚醒public void wakeup() {    if (hasNotified.compareAndSet(false, true)) {        waitPoint.countDown(); // notify    }}// 睡眠并響應喚醒protected void waitForRunning(long interval) {    if (hasNotified.compareAndSet(true, false)) {        this.onWaitEnd();        return;    }    //entry to wait    waitPoint.reset();    try {        waitPoint.await(interval, TimeUnit.MILLISECONDS);    } catch (InterruptedException e) {        log.error("Interrupted", e);    } finally {        hasNotified.set(false);        this.onWaitEnd();    }}

綜上,數(shù)據(jù)在page cache中最長的等待時間為(10000+500)ms

4.1.2 異步+開啟寫緩沖

對應如下配置

FlushDiskType == AsyncFlush && transientStorePoolEnable == true

異步刷盤,且開啟緩沖池,對應的異步刷盤線程是CommitRealTimeService

首先需要明確一點的是,當前配置下,在寫入階段,數(shù)據(jù)是直接寫入DirectByteBuffer的,這樣做的好處及弊端也非常鮮明。

  • 好處:數(shù)據(jù)不用寫page cache,放入DirectByteBuffer后便很快返回,減少了用戶態(tài)與內核態(tài)的切換開銷,性能非常高
  • 弊端:數(shù)據(jù)可靠性降為最低級別,即進程掛掉的話,就會丟數(shù)據(jù)。因為數(shù)據(jù)即沒有寫入page cache,也沒有落盤至disk,僅僅是在進程內部維護了一塊臨時緩存,進程重啟或crash掉的話,數(shù)據(jù)一定會丟失

值得一提的是,此種刷盤模式,寫入動作使用的是FileChannel,且僅僅調用FileChannel.write()方法將數(shù)據(jù)寫入page cache,并沒有直接強制刷盤,而是將強制落盤的任務轉交給FlushRealTimeService線程來操作,而FlushRealTimeService線程最終也會調用FileChannel進行強制刷盤

在RocketMQ內部,無論采用什么刷盤策略,都是單一操作對象在寫入/讀取文件;即如果使用MappedByteBuffer寫文件,那一定會通過MappedByteBuffer刷盤,如果使用FileChannel寫文件,那一定會通過FileChannel 刷盤,不存在混合操作的情況

疑問:為什么RocketMQ不依賴操作系統(tǒng)的異步刷盤,而費勁周章的設計如此刷盤策略呢?

個人理解,作為一個成熟開源的組件,數(shù)據(jù)的安全性至關重要,還是要盡可能保證數(shù)據(jù)穩(wěn)步有序落盤;OS的異步刷盤固然好使,但RocketMQ對其把控較弱,當操作系統(tǒng)crash或者斷電的時候,造成的數(shù)據(jù)丟失影響不可控

4.2 同步刷盤

需要說明的是,如果FlushDiskType配置的是同步刷盤的話,那么此處數(shù)據(jù)一定已經被MappedByteBuffer寫入了pageCache,接下來要做的便是真正的落盤操作。與異步落盤相似,同步落盤要根據(jù)配置項Message.isWaitStoreMsgOK()(等待消息落盤)來分別說明

同步刷盤的落盤線程統(tǒng)一都是GroupCommitService

同步刷盤策略

4.2.1 不等待落盤ack

當前模式如圖所示,整體流程比較簡單,寫入線程僅僅負責喚醒落盤線程,然后便執(zhí)行后續(xù)邏輯,線程不阻塞;落盤線程每次休息10ms(可被寫入線程喚醒)后,如果發(fā)現(xiàn)有數(shù)據(jù)未落盤,便將page cache中的數(shù)據(jù)強制force到磁盤

我們發(fā)現(xiàn),其實相比較異步刷盤來說,同步刷盤輪訓的時間只有10ms,遠小于異步刷盤的500ms,也是比較好理解的。但當前模式寫入線程不會阻塞,也就是不會等待消息真正存儲到disk后再返回,如果此時反生操作系統(tǒng)crash或者斷電,那未落盤的數(shù)據(jù)便會丟失

個人感覺,將FlushDiskType已經設置為Sync,表明數(shù)據(jù)會強制落盤,卻又引入Message.isWaitStoreMsgOK(),來左右落盤策略,多多少少會給使用者造成使用及理解上的困惑

4.2.2 等待落盤ack

相比較上文,本小節(jié)便是數(shù)據(jù)需要真正存儲到disk后才進行返回。寫入線程在喚醒落盤線程后便進入阻塞,直至落盤線程將數(shù)據(jù)刷到disk后再將其喚醒

不過這里需要處理一個邊界問題,即舊CommitLog的tail,及新CommitLog的head。例如現(xiàn)在有2個寫入線程將數(shù)據(jù)寫入了page cache,而這2個請求一個落在前CommitLog的尾部,另外一個落在新CommitLog的頭部,這個時候,落盤線程需要檢測到這兩個消息的分布,然后依次將兩個CommitLog數(shù)據(jù)落盤

五、線程模型

2_線程模型

RocketMQ中所有的異步處理線程都繼承自抽象類org.apache.rocketmq.common.ServiceThread,此類定義了簡單的喚醒、通知模型,但并不嚴格保證喚醒,而是通過輪訓作為兜底方案。實測發(fā)現(xiàn)喚醒動作在數(shù)據(jù)量較大時,存在性能損耗,改為簡單的輪詢落盤模式,性能提高明顯

六、結束語

本章我們聚焦分析了一條消息在broker端落地的全過程,但整個流程還是比較復雜的,不過有些部分沒有提及(比如說消息在master落地后是如何同步至salve端的),主要是考慮這些部分跟存儲關聯(lián)度不是很強,放在一起思路容易發(fā)散,這些部分會放在后文專門開標題闡述

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉載請注明本文地址:http://systransis.cn/yun/123639.html

相關文章

  • 高并發(fā)異步解耦利器:RocketMQ究竟強在哪里?

    摘要:它是阿里巴巴于年開源的第三代分布式消息中間件。是一個分布式消息中間件,具有低延遲高性能和可靠性萬億級別的容量和靈活的可擴展性,它是阿里巴巴于年開源的第三代分布式消息中間件。上篇文章消息隊列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊列的發(fā)展史:并且詳細介紹了RabbitMQ,其功能也是挺強大的,那么,為啥又要搞一個RocketMQ出來呢?是重復造輪子嗎?本文我們就帶大家來詳...

    tainzhi 評論0 收藏0
  • 消息中間件——RabbitMQ()各大主流消息中間件綜合對比介紹!

    摘要:主流消息中間件介紹是由出品,是一個完全支持和規(guī)范的實現(xiàn)。主流消息中間件介紹是阿里開源的消息中間件,目前也已經孵化為頂級項目。 showImg(https://img-blog.csdnimg.cn/20190509221741422.gif);showImg(https://img-blog.csdnimg.cn/20190718204938932.png?x-oss-process=...

    hiyang 評論0 收藏0
  • RocketMQ我們學到了什么之NameServer

    摘要:故事中的下屬們,就是消息生產者角色,屋子右面墻根那塊地就是消息持久化,呂秀才就是消息調度中心,而你就是消息消費者角色。下屬們匯報的消息,應該疊放在哪里,這個消息又應該在哪里才能找到,全靠呂秀才的驚人記憶力,才可以讓消息準確的被投放以及消費。 微信公眾號:IT一刻鐘大型現(xiàn)實非嚴肅主義現(xiàn)場一刻鐘與你分享優(yōu)質技術架構與見聞,做一個有劇情的程序員關注可了解更多精彩內容。問題或建議,請公眾號留言...

    wangbjun 評論0 收藏0
  • RocketMQ我們學到了什么之NameServer

    摘要:故事中的下屬們,就是消息生產者角色,屋子右面墻根那塊地就是消息持久化,呂秀才就是消息調度中心,而你就是消息消費者角色。下屬們匯報的消息,應該疊放在哪里,這個消息又應該在哪里才能找到,全靠呂秀才的驚人記憶力,才可以讓消息準確的被投放以及消費。 微信公眾號:IT一刻鐘大型現(xiàn)實非嚴肅主義現(xiàn)場一刻鐘與你分享優(yōu)質技術架構與見聞,做一個有劇情的程序員關注可了解更多精彩內容。問題或建議,請公眾號留言...

    Arno 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<