摘要:的都是從消息來消費(fèi),但是為了能做到實(shí)時(shí)收消息,使用長(zhǎng)輪詢方式,可以保證消息實(shí)時(shí)性同方式一致。這種情況建議應(yīng)用,再消費(fèi)下一條消息,這樣可以減輕重試消息的壓力。邏輯請(qǐng)求按參數(shù)返回按照重置消費(fèi)從而實(shí)現(xiàn)回溯消費(fèi)
這次源碼學(xué)習(xí)的方法是帶著問題學(xué)習(xí)源碼實(shí)現(xiàn),問題列表如下
Consumer Group的概念是什么?
Consumer pull過程是怎樣的?
Consumer 支持push嗎?
Consumer 怎么實(shí)現(xiàn)單隊(duì)列并行消費(fèi)?
Consumer 怎么過濾消息?
Consumer 怎么保證一條消息只被Group中的一個(gè)服務(wù)消費(fèi)?
Consumer 負(fù)載均衡怎么實(shí)現(xiàn)?
Consumer 消費(fèi)失敗怎么辦?
Consumer 可以回溯消費(fèi)嗎?
Consumer消息消費(fèi)者,負(fù)責(zé)消費(fèi)消息,一般是后臺(tái)系統(tǒng)負(fù)責(zé)異步消費(fèi)。
Consumer Group的概念是什么?一類 Consumer 的集合名稱,這類 Consumer 通常消費(fèi)一類消息,且消費(fèi)邏輯一致。一般情況下group中Consumer的數(shù)量不能超過訂閱的topic中queue的數(shù)量,不然會(huì)有閑置的Consumer.
Consumer pull過程是怎樣的?分析過Producer,看Consumer有種似曾相識(shí)的感覺
主要邏輯
1. 根據(jù)mq信息去找broker路由信息 2. 根據(jù)相關(guān)參數(shù)構(gòu)建請(qǐng)求頭 3. 委托netty去broker獲取消息
代碼走讀
MQPullConsumer.pull的參數(shù)需指定MessageQueue,和offset(位置偏移)的.
PullResult pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
再看Pull操作的返回,有本次獲取的數(shù)據(jù)信息MessageExt,即位置信息offset
public class PullResult { //pull狀態(tài) private final PullStatus pullStatus; //下次pull的偏移量 private final long nextBeginOffset; //最小偏移量 private final long minOffset; //最大偏移量 private final long maxOffset; //獲取到的消息 private ListmsgFoundList; }
MQPullConsumer.pull
-> DefaultMQPullConsumer.pull
-> DefaultMQPullConsumerImpl.pull
-> DefaultMQPullConsumerImpl.pullSyncImpl
-> DefaultMQPullConsumerImpl.pullKernelImpl
public PullResult pullKernelImpl( final MessageQueue mq, final String subExpression, final String expressionType, final long subVersion, final long offset, final int maxNums, final int sysFlag, final long commitOffset, final long brokerSuspendMaxTimeMillis, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { //獲取broker信息 FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); } if (findBrokerResult != null) { { // check version if (!ExpressionType.isTagType(expressionType) && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) { throw new MQClientException("The broker[" + mq.getBrokerName() + ", " + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null); } } int sysFlagInner = sysFlag; if (findBrokerResult.isSlave()) { sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); } // 構(gòu)建pull請(qǐng)求頭 PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setExpressionType(expressionType); String brokerAddr = findBrokerResult.getBrokerAddr(); if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); } //委托Netty去獲取信息 PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback); return pullResult; } throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }
pull消息比較簡(jiǎn)單,一次請(qǐng)求返回,由Consumer管理offset.一般來說一個(gè)Consumer Group中Consumer的數(shù)量不能大于MessageQueue的數(shù)量.
Consumer 支持push嗎?Push Consumer
Consumer 的一種,應(yīng)用通常向 Consumer 對(duì)象注冊(cè)一個(gè) Listener 接口,一旦收到消息,Consumer 對(duì)象立
刻回調(diào) Listener 接口方法。JMS標(biāo)準(zhǔn)中為MessageListener類的onMessage方法.
Pull Consumer
Consumer 的一種,應(yīng)用通常主動(dòng)調(diào)用 Consumer 的拉消息方法從 Broker 拉消息,主動(dòng)權(quán)由應(yīng)用控制。
RocketMQ的Consumer都是從Broker pull消息來消費(fèi),但是為了能做到實(shí)時(shí)收消息,RocketMQ 使用長(zhǎng)輪詢方式,可以保證消息實(shí)時(shí)性同Push方式一致。這種長(zhǎng)輪詢方式類似于WebQQ收發(fā)消息機(jī)制。請(qǐng)參考以下信息了解更多Comet:基于 HTTP 長(zhǎng)連接的“服務(wù)器推”技術(shù)
雖然RocketMQ的consumer都是通過pull來實(shí)現(xiàn)的但是其封裝了push接口,我們先來看其使用方法
public static void main(String[] args) throws InterruptedException, MQClientException { /** * 一個(gè)應(yīng)用創(chuàng)建一個(gè)Consumer,由應(yīng)用來維護(hù)此對(duì)象,可以設(shè)置為全局對(duì)象或者單例 * 注意:ConsumerGroupName需要由應(yīng)用來保證唯一 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testmerchantLeagueConsumerGroup"); consumer.setNamesrvAddr("ip:port"); /** * 訂閱指定topic下tags分別等于TagA或TagB */ consumer.subscribe("broker-a", "TagB || TagA"); /** * 設(shè)置Consumer第一次啟動(dòng)是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi) * 如果非第一次啟動(dòng),那么按照上次消費(fèi)的位置繼續(xù)消費(fèi) */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //真正的處理消息邏輯在這里 consumer.registerMessageListener(new MessageListenerConcurrently() { /** * 默認(rèn)msgs里只有一條消息,可以通過設(shè)置consumeMessageBatchMaxSize參數(shù)來批量接收消息 */ @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); MessageExt msg = msgs.get(0); if (msg.getTopic().equals("broker-a")) { // 執(zhí)行TopicTest1的消費(fèi)邏輯 if (msg.getTags() != null && msg.getTags().equals("TagA")) { // 執(zhí)行TagA的消費(fèi) String message = new String(msg.getBody()); System.out.println(message); } else if (msg.getTags() != null && msg.getTags().equals("TagB")) { // 執(zhí)行TagB的消費(fèi) String message = new String(msg.getBody()); System.out.println(message); } } //消費(fèi)者向mq服務(wù)器返回消費(fèi)成功的消息 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //Consumer對(duì)象在使用之前必須要調(diào)用start初始化,初始化一次即可 consumer.start(); }
RocketMQ push的實(shí)現(xiàn) :
消息的拉取邏輯
維護(hù)一個(gè)pullRequestQueue,先放入一個(gè)pullRequest,當(dāng)pullResult為成功時(shí),再構(gòu)建新的pullRequest放入pullRequestQueue,另起一個(gè)線程監(jiān)測(cè)pullRequestQueue,當(dāng)起不為空時(shí),輪詢pull消息
DefaultMQPushConsumer.start
-> DefaultMQPushConsumerImpl.start
-> MQClientInstance.start
-> PullMessageService.start
我們來看PullMessageService的run方法,
//請(qǐng)求消息阻塞鏈表 private final LinkedBlockingQueuepullRequestQueue = new LinkedBlockingQueue (); @Override public void run() { log.info(this.getServiceName() + " service started"); //只要有請(qǐng)求就去pull消息 while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); if (pullRequest != null) { this.pullMessage(pullRequest); } } catch (InterruptedException e) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
pullRequestQueue在在哪里put呢?
在class里找到在executePullRequestLater方法內(nèi)會(huì)put
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { this.scheduledExecutorService.schedule(new Runnable() { @Override public void run() { PullMessageService.this.executePullRequestImmediately(pullRequest); } }, timeDelay, TimeUnit.MILLISECONDS); } public void executePullRequestImmediately(final PullRequest pullRequest) { try { this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); } }
查看此方法的調(diào)用關(guān)系,發(fā)現(xiàn)在run中的pullMessage方法中onSuccess回調(diào)中會(huì)構(gòu)建下一次的pullRequestQueue待下次請(qǐng)求
PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); //請(qǐng)求成功就構(gòu)建新的pullRequest pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(// pullResult.getMsgFoundList(), // processQueue, // pullRequest.getMessageQueue(), // dispathToConsume); //放到pullRequestQueue if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset// || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", // pullResult.getNextBeginOffset(), // firstMsgOffset, // prevRequestOffset); } break; case NO_NEW_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", // pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { @Override public void run() { try { DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception", e); } } }, 10000); break; default: break; } } }
至此獲取消息已經(jīng)搞定,再看怎么觸發(fā)MessageListener的消費(fèi)方法.
還是在DefaultMQPushConsumerImpl.pullMessage方法內(nèi)的回調(diào),有下列代碼,把消息提供給consumeMessageService處理.
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(// pullResult.getMsgFoundList(), // processQueue, // pullRequest.getMessageQueue(), // dispathToConsume);
構(gòu)建ConsumeRequest,然后提交至線程池消費(fèi)
@Override public void submitConsumeRequest(// final Listmsgs, // final ProcessQueue processQueue, // final MessageQueue messageQueue, // final boolean dispatchToConsume) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); } } else { for (int total = 0; total < msgs.size(); ) { List msgThis = new ArrayList (consumeBatchSize); for (int i = 0; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break; } } ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } this.submitConsumeRequestLater(consumeRequest); } } } }
終于在ConsumeRequest的run方法中找到了listner的consumeMessage
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
這下整個(gè)pull邏輯就完成了.
Consumer 怎么實(shí)現(xiàn)單隊(duì)列并行消費(fèi)上節(jié)代碼就是取得并行的例子,簡(jiǎn)單來說就是把消息提交給線程池,而不阻塞,就單隊(duì)列并行消費(fèi)了
Consumer 怎么過濾消息?入口還是在DefaultMQPushConsumerImpl.pullMessage
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);
往里面看,發(fā)現(xiàn)有過濾消息的邏輯
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, final SubscriptionData subscriptionData) { PullResultExt pullResultExt = (PullResultExt) pullResult; this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId()); if (PullStatus.FOUND == pullResult.getPullStatus()) { ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary()); ListConsumer 怎么保證一條消息只被Group中的一個(gè)服務(wù)消費(fèi)?msgList = MessageDecoder.decodes(byteBuffer); List msgListFilterAgain = msgList; if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) { msgListFilterAgain = new ArrayList (msgList.size()); for (MessageExt msg : msgList) { if (msg.getTags() != null) { if (subscriptionData.getTagsSet().contains(msg.getTags())) { msgListFilterAgain.add(msg); } } } } //消息過濾 if (this.hasHook()) { FilterMessageContext filterMessageContext = new FilterMessageContext(); filterMessageContext.setUnitMode(unitMode); filterMessageContext.setMsgList(msgListFilterAgain); this.executeHook(filterMessageContext); } for (MessageExt msg : msgListFilterAgain) { MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET, Long.toString(pullResult.getMinOffset())); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET, Long.toString(pullResult.getMaxOffset())); } pullResultExt.setMsgFoundList(msgListFilterAgain); } pullResultExt.setMessageBinary(null); return pullResult; }
因?yàn)閠opic的MessageQueue只能對(duì)應(yīng)Group中的一個(gè)Consumer,所以一條消息只被Group中的一個(gè)服務(wù)消費(fèi)
Consumer 負(fù)載均衡怎么實(shí)現(xiàn)?概念:
consumer同時(shí)消費(fèi)多個(gè)MessageQueue,當(dāng)topic中的MessageQueue變更時(shí),動(dòng)態(tài)調(diào)整消費(fèi)MessageQueue的數(shù)量
//RebalanceImpl public void doRebalance(final boolean isOrder) { MapsubTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry entry : subTable.entrySet()) { final String topic = entry.getKey(); try { this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); }
我們只關(guān)心集群模式
主要邏輯:
1. 獲取topic所有MessageQueue 2. 獲取同ConsumerGroup組所有Consumer信息 3. 根據(jù)制定策略分配給此Consumer
private void rebalanceByTopic(final String topic, final boolean isOrder) { switch (messageModel) { case BROADCASTING: { SetmqSet = this.topicSubscribeInfoTable.get(topic); if (mqSet != null) { boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); if (changed) { this.messageQueueChanged(topic, mqSet, mqSet); log.info("messageQueueChanged {} {} {} {}", // consumerGroup, // topic, // mqSet, // mqSet); } } else { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } break; } case CLUSTERING: { //獲取該topic所有MessageQueue Set mqSet = this.topicSubscribeInfoTable.get(topic); //獲取同consumerGroup信息 List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (null == mqSet) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } } if (null == cidAll) { log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); } if (mqSet != null && cidAll != null) { List mqAll = new ArrayList (); mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List allocateResult = null; try { //根據(jù)分配策略分配MessageQueue給當(dāng)前Consumer allocateResult = strategy.allocate(// this.consumerGroup, // this.mQClientFactory.getClientId(), // mqAll, // cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); return; } Set allocateResultSet = new HashSet (); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); this.messageQueueChanged(topic, mqSet, allocateResultSet); } } break; } default: break; } }
RocketMQ提供了幾種策略供使用
實(shí)現(xiàn)類 | 策略名 |
---|---|
AllocateMessageQueueAveragelyByCircle | 輪詢平均分配策略 |
AllocateMessageQueueByMachineRoom | 根據(jù)機(jī)房分配策略 |
AllocateMessageQueueConsistentHash | 一致Hash分配策略 |
本節(jié)編寫參考分布式消息隊(duì)列RocketMQ源碼分析之4 -- Consumer負(fù)載均衡與Kafka的Consumer負(fù)載均衡之不同點(diǎn)
Consumer 消費(fèi)失敗怎么辦Consumer 消費(fèi)消息失敗后,要提供一種重試機(jī)制,令消息再消費(fèi)一次。Consumer 消費(fèi)消息失敗通??梢哉J(rèn)為 有以下幾種情況
由于消息本身的原因,例如反序列化失敗,消息數(shù)據(jù)本身無法處理(例如話費(fèi)充值,當(dāng)前消息的手機(jī)號(hào)被注銷,無法充值)等。這種錯(cuò)誤通常需要跳過這條消息,再消費(fèi)其他消息,而這條失敗的消息即使立刻重試消費(fèi),99%也不成功,所以最好提供一種定時(shí)重試機(jī)制,即過 10s 秒后再重試。
由于依賴的下游應(yīng)用服務(wù)不可用,例如db連接不可用,外系統(tǒng)網(wǎng)絡(luò)不可達(dá)等。遇到這種錯(cuò)誤,即使跳過當(dāng)前失敗的消息,消費(fèi)其他消息同樣也會(huì)報(bào)錯(cuò)。這種情況建議應(yīng)用 sleep 30s,再 消費(fèi)下一條消息,這樣可以減輕 Broker 重試消息的壓力。
具體到代碼實(shí)現(xiàn),會(huì)根據(jù)消費(fèi)狀態(tài)進(jìn)行處理,當(dāng)無返回時(shí)會(huì)重試.
if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); //設(shè)置狀態(tài)為重試 status = ConsumeConcurrentlyStatus.RECONSUME_LATER; }
public void processConsumeResult(// final ConsumeConcurrentlyStatus status, // final ConsumeConcurrentlyContext context, // final ConsumeRequest consumeRequest// ) { int ackIndex = context.getAckIndex(); if (consumeRequest.getMsgs().isEmpty()) return; switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size()); break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: ListConsumer 可以回溯消費(fèi)嗎?msgBackFailed = new ArrayList (consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); //請(qǐng)求重試消費(fèi) this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
回溯消費(fèi)是指Consumer已經(jīng)消費(fèi)成功的消息,由于業(yè)務(wù)上需求需要重新消費(fèi),要支持此功能,Broker在向Consumer投遞成功消息后,消息仍然需要保留。并且重新消費(fèi)一般是按照時(shí)間維度,例如由于Consumer系統(tǒng)故障,恢復(fù)后需要重新消費(fèi) 1 小時(shí)前的數(shù)據(jù),那么Broker要提供一種機(jī)制,可以按照時(shí)間維度來回退消費(fèi)進(jìn)度。RocketMQ支持按照時(shí)間回溯消費(fèi),時(shí)間維度精確到毫秒,可以向前回溯,也可以向后回溯。
邏輯: 請(qǐng)求broker按參數(shù)返回offset,按照offset重置消費(fèi)offset,從而實(shí)現(xiàn)回溯消費(fèi)
public MapinvokeBrokerToResetOffset(final String addr, final String topic, final String group, final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC) throws RemotingException, MQClientException, InterruptedException { ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader(); requestHeader.setTopic(topic); requestHeader.setGroup(group); requestHeader.setTimestamp(timestamp); requestHeader.setForce(isForce); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader); if (isC) { request.setLanguage(LanguageCode.CPP); } RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { if (response.getBody() != null) { ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class); return body.getOffsetTable(); } } default: break; } throw new MQClientException(response.getCode(), response.getRemark()); }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/70344.html
摘要:本篇著重聊聊為什么會(huì)消息堆積。默認(rèn)的策略很好理解,將平均的分配給。那么最后時(shí),本來不同的,會(huì)取到相同的舉個(gè)例子,和都取到了前個(gè),從而造成有些如果有的話沒有對(duì)其消費(fèi),而沒有被消費(fèi),消息也在不停的投遞進(jìn)來,就會(huì)造成消息的大量堆積。 首先,造成這個(gè)問題的 BUG RocketMQ 官方已經(jīng)在 3月16號(hào) 的這個(gè)提交中...
摘要:每個(gè)與集群中的所有節(jié)點(diǎn)建立長(zhǎng)連接,定時(shí)注冊(cè)信息到所有。完全無狀態(tài),可集群部署。本系列源碼解析主要參照原理簡(jiǎn)介來追尋其代碼實(shí)現(xiàn)雖然版本不太一致但這也是能找到的最詳細(xì)的資料了接下來根據(jù)其模塊來源碼閱讀目錄如下 為什么選擇讀RocketMQ? 對(duì)MQ的理解一直不深,上周看了,還是覺得不夠深入,找個(gè)成熟的產(chǎn)品來學(xué)習(xí)吧,RabbitMQ是erLang寫的,Kafka是Scala寫的,非Java寫...
摘要:完全無狀態(tài),可集群部署與集群中的其中一個(gè)節(jié)點(diǎn)隨機(jī)選擇建立長(zhǎng)連接,定期從取路由信息,并向提供服務(wù)的建立長(zhǎng)連接,且定時(shí)向發(fā)送心跳。既可以從訂閱消息,也可以從訂閱消息,訂閱規(guī)則由配置決定。 問題列表: Name Server 的作用是什么? Name Server 存儲(chǔ)了Broker的什么信息? Name Server 為Producer的提供些什么信息? Name Server 為Co...
摘要:發(fā)送消息階段,不允許發(fā)送重復(fù)的消息。雖然不能嚴(yán)格保證不重復(fù),但是正常情況下很少會(huì)出現(xiàn)重復(fù)發(fā)送消費(fèi)情況,只有網(wǎng)絡(luò)異常,啟停等異常情況下會(huì)出現(xiàn)消息重復(fù)。 問題列表 Broker 怎么響應(yīng)Consumer請(qǐng)求? Broker 怎么維護(hù)ConsumeQueue? Broker 怎么處理事務(wù)消息的 ConsumeQueue ? Broker 怎么處理定時(shí)消息的 ConsumeQueue? B...
摘要:前提通過前面兩篇文章可以簡(jiǎn)單的了解和安裝,今天就將和整合起來使用。然后我運(yùn)行之前的整合項(xiàng)目,查看監(jiān)控信息如下總結(jié)整篇文章講述了與整合和監(jiān)控平臺(tái)的搭建。 showImg(https://segmentfault.com/img/remote/1460000013232432?w=1920&h=1277); 前提 通過前面兩篇文章可以簡(jiǎn)單的了解 RocketMQ 和 安裝 RocketMQ...
閱讀 2351·2019-08-30 15:44
閱讀 1274·2019-08-30 13:01
閱讀 3317·2019-08-30 11:22
閱讀 3104·2019-08-29 15:23
閱讀 1623·2019-08-29 12:22
閱讀 3385·2019-08-26 13:58
閱讀 3451·2019-08-26 12:17
閱讀 3488·2019-08-26 12:16