RocketMQ開源是使用文件作為持久化工具,阿里內(nèi)部未開源的性能會更高,使用oceanBase作為持久化工具。
在RocketMQ1.x和2.x使用zookeeper管理集群,3.x開始使用nameserver代替zk,更輕量級,此外RocketMQ的客戶端擁有兩種的操作方式:DefaultMQPushConsumer和DefaultMQPullConsumer。
以上所說的第一次啟動(dòng)是指從來沒有消費(fèi)過的消費(fèi)者,如果該消費(fèi)者消費(fèi)過,那么會在broker端記錄該消費(fèi)者的消費(fèi)位置,如果該消費(fèi)者掛了再啟動(dòng),那么自動(dòng)從上次消費(fèi)的進(jìn)度開始
public class MQPushConsumer { public static void main(String[] args) throws MQClientException { String groupName = "rocketMqGroup1"; // 用于把多個(gè)Consumer組織到一起,提高并發(fā)處理能力 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); // 設(shè)置nameServer地址,多個(gè)以;分隔 consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setMessageModel(MessageModel.BROADCASTING); // 訂閱topic,可以對指定消息進(jìn)行過濾,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息 consumer.subscribe("order-topic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List
ConsumeConcurrentlyStatus.RECONSUME_LATER boker會根據(jù)設(shè)置的messageDelayLevel發(fā)起重試,默認(rèn)16次。
DefaultMQPushConsumerImpl中各個(gè)對象的主要功能如下:
RebalancePushImpl:主要負(fù)責(zé)決定,當(dāng)前的consumer應(yīng)該從哪些Queue中消費(fèi)消息;
consumer.registerMessageListener執(zhí)行過程:
/** * Register a callback to execute on message arrival for concurrent consuming. * @param messageListener message handling callback. */ @Override public void registerMessageListener(MessageListenerConcurrently messageListener) { this.messageListener = messageListener; this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); }
通過源碼可以看出主要實(shí)現(xiàn)過程在DefaultMQPushConsumerImpl類中consumer.start后調(diào)用DefaultMQPushConsumerImpl的同步start方法
public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); this.serviceState = ServiceState.START_FAILED; this.checkConfig(); this.copySubscription(); if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); } this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load(); if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } this.consumeMessageService.start(); boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } mQClientFactory.start(); log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } this.updateTopicSubscribeInfoWhenSubscriptionChanged(); this.mQClientFactory.checkClientInBroker(); this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.mQClientFactory.rebalanceImmediately(); }
通過mQClientFactory.start();發(fā)我們發(fā)現(xiàn)他調(diào)用
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }
在這個(gè)方法中有多個(gè)start,我們主要看pullMessageService.start();通過這里我們發(fā)現(xiàn)RocketMQ的Push模式底層其實(shí)也是通過pull實(shí)現(xiàn)的,下面我們來看下pullMessageService處理了哪些邏輯:
private void pullMessage(final PullRequest pullRequest) { final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); } else { log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } }
我們發(fā)現(xiàn)其實(shí)他還是通過DefaultMQPushConsumerImpl類的pullMessage方法來進(jìn)行消息的邏輯處理.
PullRequest這里說明一下,上面我們已經(jīng)提了一下rocketmq的push模式其實(shí)是通過pull模式封裝實(shí)現(xiàn)的,pullrequest這里是通過長輪詢的方式達(dá)到push效果。
長輪詢方式既有pull的優(yōu)點(diǎn)又有push模式的實(shí)時(shí)性有點(diǎn)。
push方式是server端接收到消息后,主動(dòng)把消息推送給client端,實(shí)時(shí)性高。弊端是server端工作量大,影響性能,其次是client端處理能力不同且client端的狀態(tài)不受server端的控制,如果client端不能及時(shí)處理消息容易導(dǎo)致消息堆積已經(jīng)影響正常業(yè)務(wù)等。
pull方式是client循環(huán)從server端拉取消息,主動(dòng)權(quán)在client端,自己處理完一個(gè)消息再去拉取下一個(gè),缺點(diǎn)是循環(huán)的時(shí)間不好設(shè)定,時(shí)間太短容易忙等,浪費(fèi)CPU資源,時(shí)間間隔太長client的處理能力會下降,有時(shí)候有些消息會處理不及時(shí)。
PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion,long offset, int maxNums, int sysFlag,long commitOffset,long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法進(jìn)行消息拉取操作。
將回調(diào)類PullCallback傳入該方法中,當(dāng)采用異步方式拉取消息時(shí),在收到響應(yīng)之后會回調(diào)該回調(diào)類的方法。
public void pullMessage(final PullRequest pullRequest) { final ProcessQueue processQueue = pullRequest.getProcessQueue(); if (processQueue.isDropped()) { log.info("the pull request[{}] is dropped.", pullRequest.toString()); return; } pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); try { this.makeSureStateOK(); } catch (MQClientException e) { log.warn("pullMessage exception, consumer state not ok", e); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); return; } if (this.isPause()) { log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); return; } long cachedMessageCount = processQueue.getMsgCount().get(); long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } if (!this.consumeOrderly) { if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { log.warn( "the queues messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, queueMaxSpanFlowControlTimes); } return; } } else { if (processQueue.isLocked()) { if (!pullRequest.isLockedFirst()) { final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", pullRequest, offset, brokerBusy); if (brokerBusy) { log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", pullRequest, offset); } pullRequest.setLockedFirst(true); pullRequest.setNextOffset(offset); } } else { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); log.info("pull message later because not locked in broker, {}", pullRequest); return; } } final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); log.warn("find the consumers subscription failed, {}", pullRequest); return; } final long beginTimestamp = System.currentTimeMillis(); PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset); } break; case NO_NEW_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { @Override public void run() { try { DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception", e); } } }, 10000); break; default: break; } } } @Override public void onException(Throwable e) { if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception", e); } DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); } }; boolean commitOffsetEnable = false; long commitOffsetValue = 0L; if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); if (commitOffsetValue > 0) { commitOffsetEnable = true; } } String subExpression = null; boolean classFilter = false; SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (sd != null) { if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { subExpression = sd.getSubString(); } classFilter = sd.isClassFilterMode(); } int sysFlag = PullSysFlag.buildSysFlag( commitOffsetEnable, // commitOffset true, // suspend subExpression != null, // subscription classFilter // class filter ); try { // 下面我們看繼續(xù)跟進(jìn)這個(gè)方法,這個(gè)方法已經(jīng)就是客戶端如何拉取消息 this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 消息的通信方式為異步 CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception", e); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); } }
在MQClientAPIImpl.pullMessage方法中,根據(jù)入?yún)ommunicationMode的值分為異步拉取和同步拉取方式兩種。
無論是異步方式拉取還是同步方式拉取,在發(fā)送拉取請求之前都會構(gòu)造一個(gè)ResponseFuture對象,以請求消息的序列號為key值,存入NettyRemotingAbstract.responseTable:ConcurrentHashMap, ResponseFuture>變量中,對該變量有幾種情況會處理:
public PullResult pullMessage( final String addr, final PullMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); switch (communicationMode) { case ONEWAY: assert false; return null; case ASYNC: this.pullMessageAsync(addr, request, timeoutMillis, pullCallback); return null; case SYNC: return this.pullMessageSync(addr, request, timeoutMillis); default: assert false; break; } return null; }
對于同步發(fā)送方式,調(diào)用MQClientAPIImpl.pullMessageSync(String addr, RemotingCommand request, long timeoutMillis)方法,大致步驟如下:
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/124810.html
摘要:會根據(jù)設(shè)置的發(fā)起重試,默認(rèn)次。弊端是端工作量大,影響性能,其次是端處理能力不同且端的狀態(tài)不受端的控制,如果端不能及時(shí)處理消息容易導(dǎo)致消息堆積已經(jīng)影響正常業(yè)務(wù)等。 RocketMQ的前提回顧RocketMQ是一款分布式、隊(duì)列模型的消息中間件,具有以下特點(diǎn):能夠保證嚴(yán)格的消息順序提供豐富的消息拉取模式高效的訂閱者水...
摘要:最近對基礎(chǔ)教程系列的催更比較多,說一下最近的近況因?yàn)榇蛩阋黄鸶?。再次,對于中國用戶來說,還有一個(gè)非常特殊的意義它將曾經(jīng)紅極一時(shí)的,以及阿里巴巴的強(qiáng)力消息中間件融入體系。 最近對《Spring Cloud Alibaba基礎(chǔ)教程》系列的催更比較多,說一下最近的近況:因?yàn)榇蛩鉙pring Boot 2.x一起更新。所以一直在改博客Spring Boot專題頁和Git倉庫的組織。由于前端技...
摘要:棧長有話說其實(shí)項(xiàng)目就是為了阿里的項(xiàng)目能很好的結(jié)合融入使用,這個(gè)項(xiàng)目目前由阿里維護(hù)。對同時(shí)使用和阿里巴巴項(xiàng)目的人來說無疑帶來了巨大的便利,一方面能結(jié)合無縫接入,另一方面還能使用阿里巴巴的組件,也帶來了更多的可選擇性。 最近,Spring Cloud 發(fā)布了 Spring Cloud Alibaba 首個(gè)預(yù)覽版本:Spring Cloud for Alibaba 0.2.0. 大家都好奇,...
摘要:淘寶定制基于,是國內(nèi)第一個(gè)優(yōu)化定制且開源的服務(wù)器版虛擬機(jī)。數(shù)據(jù)庫開源數(shù)據(jù)庫是基于官方版本的一個(gè)分支,由阿里云數(shù)據(jù)庫團(tuán)隊(duì)維護(hù),目前也應(yīng)用于阿里巴巴集團(tuán)業(yè)務(wù)以及阿里云數(shù)據(jù)庫服務(wù)。淘寶服務(wù)器是由淘寶網(wǎng)發(fā)起的服務(wù)器項(xiàng)目。 Java JAVA 研發(fā)框架 SOFAStack SOFAStack(Scalable Open Financial Architecture Stack)是用于快速構(gòu)建金融...
閱讀 736·2023-04-25 19:43
閱讀 3981·2021-11-30 14:52
閱讀 3807·2021-11-30 14:52
閱讀 3871·2021-11-29 11:00
閱讀 3802·2021-11-29 11:00
閱讀 3904·2021-11-29 11:00
閱讀 3580·2021-11-29 11:00
閱讀 6183·2021-11-29 11:00