成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

讓你看懂的RocketMQ事務(wù)消息源碼分析(干貨)

zsirfs / 2049人閱讀

摘要:但是服務(wù)器又確實(shí)是收到了這條消息的,只是給客戶端的響應(yīng)丟失了,所以導(dǎo)致的結(jié)果就是扣款失敗,成功發(fā)貨。所有的半消息都會(huì)寫在為的半消息隊(duì)列里,并且每條半消息,在整個(gè)鏈路里會(huì)被寫多次,如果并發(fā)很大且大部分消息都是事務(wù)消息的話,可靠性會(huì)存在問(wèn)題。

前言

得益于MQ削峰填谷,系統(tǒng)解耦,操作異步等功能特性,在互聯(lián)網(wǎng)行業(yè),可以說(shuō)有分布式服務(wù)的地方,MQ都往往不會(huì)缺席。由阿里自研的RocketMQ更是經(jīng)歷了多年的雙十一高并發(fā)挑戰(zhàn),其中4.3.0版本推出了事務(wù)消息的新特性,本文對(duì)RocketMQ 4.5.0版本事務(wù)消息相關(guān)的源碼跟蹤介紹,通過(guò)閱讀讀者可以知道:

事務(wù)消息解決什么樣的問(wèn)題

事務(wù)消息的實(shí)現(xiàn)原理及其設(shè)計(jì)亮點(diǎn)

解決什么問(wèn)題

假設(shè)我所在的系統(tǒng)現(xiàn)在有這樣一個(gè)場(chǎng)景:

本地開啟數(shù)據(jù)庫(kù)事務(wù)進(jìn)行扣款操作,成功后發(fā)送MQ消息給庫(kù)存中心進(jìn)行發(fā)貨。

有人會(huì)想到開啟mybatis事務(wù)實(shí)現(xiàn),把本地事務(wù)和MQ消息放在一起不就行了嗎?如果MQ發(fā)送成功,就提交事務(wù),發(fā)送失敗就回滾事務(wù),整套操作一氣呵成。

transaction{
  扣款();
  boolean success = 發(fā)送MQ();
    if(success){
    commit();
  }else{
    rollBack();
  }
}

看似沒(méi)什么問(wèn)題,但是網(wǎng)絡(luò)是不可靠的。

假設(shè)MQ返回過(guò)來(lái)的響應(yīng)因?yàn)榫W(wǎng)絡(luò)原因遲遲沒(méi)有收到,所以在面對(duì)不確定的MQ返回結(jié)果只好進(jìn)行回滾。但是MQ 服務(wù)器又確實(shí)是收到了這條消息的,只是給客戶端的響應(yīng)丟失了,所以導(dǎo)致的結(jié)果就是扣款失敗,成功發(fā)貨。

既然MQ消息的發(fā)送不能和本地事務(wù)寫在一起,那如何來(lái)保證其整體具有原子性的需求呢?答案就是今天我們介紹的主角:事務(wù)消息

概覽

總體而言RocketMQ事務(wù)消息分為兩條主線

定時(shí)任務(wù)發(fā)送流程:發(fā)送half message(半消息),執(zhí)行本地事務(wù),發(fā)送事務(wù)執(zhí)行結(jié)果

定時(shí)任務(wù)回查流程:MQ服務(wù)器回查本地事務(wù),發(fā)送事務(wù)執(zhí)行結(jié)果

因此本文也通過(guò)這兩條主線對(duì)源碼進(jìn)行分析

源碼分析 半消息發(fā)送流程 本地應(yīng)用(client)

在本地應(yīng)用發(fā)送事務(wù)消息的核心類是TransactionMQProducer,該類通過(guò)繼承DefaultMQProducer來(lái)復(fù)用大部分發(fā)送消息相關(guān)的邏輯,這個(gè)類的代碼量非常少只有100來(lái)行,下面是這個(gè)類的sendMessageTransaction方法

@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
    final Object arg) throws MQClientException {
    if (null == this.transactionListener) {
        throw new MQClientException("TransactionListener is null", null);
    }

    return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}

這個(gè)方法做了兩件事,

檢查transactionListener是否存在

調(diào)用父類執(zhí)行事務(wù)消息發(fā)送

TransactionListener在事務(wù)消息流程中起到至關(guān)重要的作用,一起看看這個(gè)接口

public interface TransactionListener {
    /**
     * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
     *
     * @param msg Half(prepare) message
     * @param arg Custom business parameter
     * @return Transaction state
     */
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

    /**
     * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
     * method will be invoked to get local transaction status.
     *
     * @param msg Check message
     * @return Transaction state
     */
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

接口注釋說(shuō)的很明白,配合上面的概覽圖來(lái)看就是,executeLocalTransaction方法對(duì)應(yīng)的就是執(zhí)行本地事務(wù)操作,checkLocalTransaction對(duì)應(yīng)的就是回查本地事務(wù)操作。

下面是DefaultMQProducer類的sendMessageInTransaction方法源碼

public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                      final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
    ...
    SendResult sendResult = null;
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
                            ...
        sendResult = this.send(msg);
                            ...
    switch (sendResult.getSendStatus()) {
        case SEND_OK: {
                    ...
        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                ...
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }

                            ...
        this.endTransaction(sendResult, localTransactionState, localException);
                                ...
}

為了使源碼的邏輯更加直觀,筆者精簡(jiǎn)了核心代碼。sendMessageInTransaction方法主要做了以下事情

給消息打上事務(wù)消息相關(guān)的標(biāo)記,用于MQ服務(wù)端區(qū)分普通消息和事務(wù)消息

發(fā)送半消息(half message)

發(fā)送成功則由transactionListener執(zhí)行本地事務(wù)

執(zhí)行endTransaction方法,如果半消息發(fā)送失敗本地事務(wù)執(zhí)行失敗告訴服務(wù)端是刪除半消息,半消息發(fā)送成功本地事務(wù)執(zhí)行成功則告訴服務(wù)端生效半消息。

發(fā)送半消息流程,Client端代碼到這里差不多就結(jié)束了,接下來(lái)看看RocketMQ Server端是如何處理的

RocketMQ Server

Server在接收到消息過(guò)后會(huì)進(jìn)行一些領(lǐng)域?qū)ο蟮霓D(zhuǎn)化和是否支持事務(wù)消息的權(quán)限校驗(yàn),對(duì)理解事務(wù)消息用處不大,此處就省略對(duì)旁枝末節(jié)的介紹了。下面是TransactionalMessageBridge類處理half message的源碼

public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
    return store.putMessage(parseHalfMessageInner(messageInner));
}

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

這兩個(gè)方法主要做了以下事情:

public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;

    private String topic;
    private int flag;
    private Map properties;
    private byte[] body;
    private String transactionId;
}

將消息的topic,queueId放進(jìn)消息體自身的map里進(jìn)行緩存

將消息的topic 設(shè)置為“RMQ_SYS_TRANS_OP_HALF_TOPIC”,queueId設(shè)置為0

將消息寫入磁盤持久化

可以看到所有的事務(wù)半消息都會(huì)被放進(jìn)同一個(gè)topic的同一個(gè)queue里面,通過(guò)對(duì)topic的區(qū)分,從而避免了半消息被consumer給消費(fèi)到

Server將半消息持久化后然后會(huì)發(fā)送結(jié)果給我們本地的應(yīng)用程序。到了這里Server端對(duì)半消息的處理就結(jié)束了,緊接著的是定時(shí)任務(wù)的登場(chǎng)。

定時(shí)任務(wù)回查流程 RocketMQ Server

定時(shí)任務(wù)是一個(gè)叫TransactionalMessageService類的線程,下面是該類的check方法

@Override
public void check(long transactionTimeout, int transactionCheckMax,
    AbstractTransactionalMessageCheckListener listener) {
                  ...
     if (!putBackHalfMsgQueue(msgExt, i)) {
        continue;
     }
       listener.resolveHalfMsg(msgExt);
   } 
                                    ...
}

check方法非常長(zhǎng),省略的代碼大致都是對(duì)半消息進(jìn)行過(guò)濾(如超過(guò)72小時(shí)的事務(wù)消息,就被算作過(guò)期),只保留符合條件的半消息對(duì)其進(jìn)行回查。

其中很有意思的是putBackHalfMsgQueue方法,因?yàn)槊看伟寻胂拇疟P拉到內(nèi)存里進(jìn)行處理都會(huì)對(duì)其屬性進(jìn)行改變(例如TRANSACTION_CHECK_TIMES,這是是否丟棄事務(wù)消息的關(guān)鍵信息),所以在發(fā)送回查消息之前需要對(duì)半消息再次放進(jìn)磁盤。RocketMQ采取的方法是基于最新的物理偏移量重新寫入,而不是對(duì)原有的半消息進(jìn)行修改,其中的目的就是RocketMQ的存儲(chǔ)設(shè)計(jì)采用順序?qū)懀绻バ薷南?,無(wú)法做到高性能。

下面是resolveHalfMsg方法,主要就是開啟一個(gè)線程然后發(fā)送check消息。

public void resolveHalfMsg(final MessageExt msgExt) {
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            try {
                sendCheckMessage(msgExt);
            } catch (Exception e) {
                LOGGER.error("Send check message error!", e);
            }
        }
    });
}
本地應(yīng)用(client)

下面是DefaultMQProducerImpl的checkTransactionState方法,是本地應(yīng)用對(duì)回查消息的處理邏輯

@Override
public void checkTransactionState(final String addr, final MessageExt msg,
    final CheckTransactionStateRequestHeader header) {
    Runnable request = new Runnable() {
        ...
        @Override
        public void run() {
            ...
     TransactionListener transactionListener = getCheckListener();
            ...
     localTransactionState = transactionListener.checkLocalTransaction(message);
               ...
                 
      this.processTransactionState(
                    localTransactionState,
                    group,
                    exception);        
        }
      
        private void processTransactionState(
           ...
 DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
                    3000);
           ...
        }
    };
    this.checkExecutor.submit(request);
}

精簡(jiǎn)代碼邏輯后可以清晰的看到

開啟一個(gè)線程來(lái)執(zhí)行回查的邏輯

執(zhí)行transactionListener的checkLocalTransaction方法來(lái)獲取本地事務(wù)執(zhí)行的結(jié)果

RocketMQ Server

RocketMQ 服務(wù)器在收到Client發(fā)過(guò)來(lái)的Commit消息后會(huì)

讀出半消息——>恢復(fù)topic等原消息體的信息——>和普通消息一樣再次寫入磁盤——>刪除之前的半消息

如果是Rollback消息則直接刪除之前的半消息

到此,整條RocketMQ 事務(wù)消息的調(diào)用鏈就結(jié)束了

思考

1. 分布式事務(wù)等于事務(wù)消息嗎?

兩者并沒(méi)有關(guān)系,事務(wù)消息僅僅保證本地事務(wù)和MQ消息發(fā)送形成整體的原子性,而投遞到MQ服務(wù)器后,消費(fèi)者是否能一定消費(fèi)成功是無(wú)法保證的。

2. 源碼設(shè)計(jì)上有什么亮點(diǎn)嗎?

通過(guò)對(duì)整條鏈路源碼的學(xué)習(xí)理解發(fā)現(xiàn)還是有不少亮點(diǎn)的

server端回查消息的發(fā)送,client端回查消息邏輯的處理,client端commit/rollback消息的提交都是用了異步進(jìn)行,可以說(shuō)能異步的地方都用了異步,通過(guò)異步+重試的方式保證了在分布式環(huán)境中即使短暫的網(wǎng)絡(luò)狀況不良好,也不會(huì)影響整體邏輯。

引入TransactionListener,真正做到了開閉原則以及依賴倒置原則,面向接口編程。整體擴(kuò)展性做得非常好,使用者只需要編寫自己的Listener就可以做到事務(wù)消息的發(fā)送,非常方便

TransactionMQProducer通過(guò)繼承DefaultMQProducer極大地復(fù)用了關(guān)于發(fā)送消息相關(guān)的邏輯

3. 源碼設(shè)計(jì)上有什么不足嗎?

RocketMQ作為一款極其成功的消息中間件,要發(fā)現(xiàn)不足不是那么容易了,筆者談幾點(diǎn)看法

sendMessageIntransaction等事務(wù)相關(guān)的方法被劃分在了DefaultMQProducer里面,從內(nèi)聚的角度來(lái)說(shuō)這是跟事務(wù)相關(guān)的發(fā)送消息方法應(yīng)該被劃分在TransactionMQProducer。

所有topic的半消息都會(huì)寫在topic為RMQ_SYS_TRANS_OP_HALF_TOPIC的半消息隊(duì)列里,并且每條半消息,在整個(gè)鏈路里會(huì)被寫多次,如果并發(fā)很大且大部分消息都是事務(wù)消息的話,可靠性會(huì)存在問(wèn)題。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/75354.html

相關(guān)文章

  • 一定能懂的RocketMQ事務(wù)消息源碼分析(干貨)

    摘要:但是服務(wù)器又確實(shí)是收到了這條消息的,只是給客戶端的響應(yīng)丟失了,所以導(dǎo)致的結(jié)果就是扣款失敗,成功發(fā)貨。既然消息的發(fā)送不能和本地事務(wù)寫在一起,那如何來(lái)保證其整體具有原子性的需求呢答案就是今天我們介紹的主角事務(wù)消息。 前言 得益于MQ削峰填谷,系統(tǒng)解耦,操作異步等功能特性,在互聯(lián)網(wǎng)行業(yè),可以說(shuō)有分布式服務(wù)的地方,MQ都往往不會(huì)缺席。由阿里自研的RocketMQ更是經(jīng)歷了多年的雙十一高并發(fā)挑戰(zhàn)...

    myshell 評(píng)論0 收藏0
  • 后端經(jīng)驗(yàn)

    摘要:在結(jié)構(gòu)上引入了頭結(jié)點(diǎn)和尾節(jié)點(diǎn),他們分別指向隊(duì)列的頭和尾,嘗試獲取鎖入隊(duì)服務(wù)教程在它提出十多年后的今天,已經(jīng)成為最重要的應(yīng)用技術(shù)之一。隨著編程經(jīng)驗(yàn)的日積月累,越來(lái)越感覺(jué)到了解虛擬機(jī)相關(guān)要領(lǐng)的重要性。 JVM 源碼分析之 Jstat 工具原理完全解讀 http://click.aliyun.com/m/8315/ JVM 源碼分析之 Jstat 工具原理完全解讀 http:...

    i_garfileo 評(píng)論0 收藏0
  • 【備戰(zhàn)春招/秋招系列】美團(tuán)Java面經(jīng)總結(jié)進(jìn)階篇 (附詳解答案)

    摘要:我在前面的文章中也提到了應(yīng)該怎么做自我介紹與項(xiàng)目介紹,詳情可以查看這篇文章備戰(zhàn)春招秋招系列初出茅廬的程序員該如何準(zhǔn)備面試。因此基于事件消息對(duì)象驅(qū)動(dòng)的業(yè)務(wù)架構(gòu)可以是一系列流程。 showImg(https://user-gold-cdn.xitu.io/2018/11/14/16711ac29c2ae52c?w=928&h=531&f=png&s=798562); 一 消息隊(duì)列MQ的...

    chengjianhua 評(píng)論0 收藏0
  • Java相關(guān)

    摘要:本文是作者自己對(duì)中線程的狀態(tài)線程間協(xié)作相關(guān)使用的理解與總結(jié),不對(duì)之處,望指出,共勉。當(dāng)中的的數(shù)目而不是已占用的位置數(shù)大于集合番一文通版集合番一文通版垃圾回收機(jī)制講得很透徹,深入淺出。 一小時(shí)搞明白自定義注解 Annotation(注解)就是 Java 提供了一種元程序中的元素關(guān)聯(lián)任何信息和著任何元數(shù)據(jù)(metadata)的途徑和方法。Annotion(注解) 是一個(gè)接口,程序可以通過(guò)...

    wangtdgoodluck 評(píng)論0 收藏0
  • 新手也能看懂消息隊(duì)列其實(shí)很簡(jiǎn)單

    摘要:通過(guò)以上分析我們可以得出消息隊(duì)列具有很好的削峰作用的功能即通過(guò)異步處理,將短時(shí)間高并發(fā)產(chǎn)生的事務(wù)消息存儲(chǔ)在消息隊(duì)列中,從而削平高峰期的并發(fā)事務(wù)。 該文已加入開源項(xiàng)目:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識(shí)的文檔類項(xiàng)目,Star 數(shù)接近 16k)。地址:https://github.com/Snailclimb... 本文內(nèi)容思維導(dǎo)圖:showImg(ht...

    Clect 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<