摘要:每個優(yōu)先級可以用不同的表示,發(fā)消息時,指定不同的來表示優(yōu)先級,這種方式可以解決絕大部分的優(yōu)先級問題,但是對業(yè)務的優(yōu)先級精確性做了妥協(xié)。支持定時消息,但是不支持任意時間精度,支持特定的,例如定時,,等。
Producer 生產(chǎn)者
這次源碼學習的方法是帶著問題學習源碼實現(xiàn),問題列表如下
Producer 同步消息怎么發(fā)送?
Producer 是與NameServer什么交互?
Producer 異步消息怎么發(fā)送?
P2P,Pub/Sub 都支持嗎?
Producer 怎么保證順序消息?
Producer 負載均衡?
Producer Group是什么概念?
Producer 怎么制定消息優(yōu)先級?
Producer 的事務消息怎么實現(xiàn)?
Producer 定時消息怎么實現(xiàn)?
Producer 同步消息怎么發(fā)送? 消息體Message//主題 private String topic; //狀態(tài) private int flag; //屬性 private Map執(zhí)行邏輯properties; //消息體 private byte[] body;
1. 查找topic中的發(fā)布信息 2. 選擇topic中的某個MessageQueue 3. 使用Netty發(fā)送消息至MessageQueue
DefaultMQProducer.send ->DefaultMQProducerImple.send ->DefaultMQProducerImple.sendDefaultImpl
private SendResult sendDefaultImpl(// Message msg, // final CommunicationMode communicationMode, // final SendCallback sendCallback, // final long timeout// ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; //獲取topic信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); //選擇topic中的中的某個MessageQueue(相當于kafka的partition) MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (tmpmq != null) { mq = tmpmq; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); //真正的發(fā)送數(shù)據(jù)方法 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } } }
本次重點關注消息體的發(fā)送
DefaultMQProducerImple.sendDefaultImpl->DefaultMQProducerImpl.sendKernelImpl
private SendResult sendKernelImpl(final Message msg, // final MessageQueue mq, // final CommunicationMode communicationMode, // final SendCallback sendCallback, // final TopicPublishInfo topicPublishInfo, // final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } SendMessageContext context = null; if (brokerAddr != null) { brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte[] prevBody = msg.getBody(); try { //for MessageBatch,ID has been set in the generating process if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } int sysFlag = 0; if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; } final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } if (hasCheckForbiddenHook()) { CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this.isUnitMode()); this.executeCheckForbiddenHook(checkForbiddenContext); } if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } this.executeSendMessageHookBefore(context); } //構建請求頭部 SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } SendResult sendResult = null; switch (communicationMode) { //異步模式發(fā)送 case ASYNC: sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(// brokerAddr, // 1 mq.getBrokerName(), // 2 msg, // 3 requestHeader, // 4 timeout, // 5 communicationMode, // 6 sendCallback, // 7 topicPublishInfo, // 8 this.mQClientFactory, // 9 this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10 context, // this); break; case ONEWAY: //同步模式發(fā)送 case SYNC: sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout, communicationMode, context, this); break; default: assert false; break; } if (this.hasSendMessageHook()) { context.setSendResult(sendResult); this.executeSendMessageHookAfter(context); } return sendResult; } catch (RemotingException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (MQBrokerException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (InterruptedException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } finally { msg.setBody(prevBody); } } throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }
后面就是底層的消息發(fā)送工具,整個RocketMQ共用
DefaultMQProducerImpl.sendKernelImpl->MQClientAPIImpl.sendMessage
public SendResult sendMessage(// final String addr, // 1 final String brokerName, // 2 final Message msg, // 3 final SendMessageRequestHeader requestHeader, // 4 final long timeoutMillis, // 5 final CommunicationMode communicationMode, // 6 final SendCallback sendCallback, // 7 final TopicPublishInfo topicPublishInfo, // 8 final MQClientInstance instance, // 9 final int retryTimesWhenSendFailed, // 10 final SendMessageContext context, // 11 final DefaultMQProducerImpl producer // 12 ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = null; if (sendSmartMsg || msg instanceof MessageBatch) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2); } else { request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); } request.setBody(msg.getBody()); switch (communicationMode) { case ONEWAY: this.remotingClient.invokeOneway(addr, request, timeoutMillis); return null; case ASYNC: final AtomicInteger times = new AtomicInteger(); this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer); return null; case SYNC: return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request); default: assert false; break; } return null; }
MQClientAPIImpl.sendMessage->MQClientAPIImpl.sendMessageSync
委托給Netty
private SendResult sendMessageSync(// final String addr, // final String brokerName, // final Message msg, // final long timeoutMillis, // final RemotingCommand request// ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; return this.processSendResponse(brokerName, msg, response); }
MQClientAPIImpl.sendMessageSync->NettyRomotingClient.invokeSync
在調(diào)用前后觸發(fā)hook,真正的還在后面
@Override public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { if (this.rpcHook != null) { this.rpcHook.doBeforeRequest(addr, request); } RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis); if (this.rpcHook != null) { this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response); } return response; } catch (RemotingSendRequestException e) { log.warn("invokeSync: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } catch (RemotingTimeoutException e) { if (nettyClientConfig.isClientCloseSocketIfTimeout()) { this.closeChannel(addr, channel); log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); } log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
NettyRomotingClient.invokeSync->NettyRomotingClient.invokeSyncImpl
終于到了Netty真正發(fā)數(shù)據(jù)的時候了
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); PLOG.warn("send a request command to channel <" + addr + "> failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } }Producer是與NameServer什么交互? TopicPublishInfo
private boolean orderTopic = false; private boolean haveTopicRouterInfo = false; //topic中MessgeQueuee信息 private List執(zhí)行邏輯messageQueueList = new ArrayList (); //負責均衡策略的索引 private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); //topic包含的路由信息 private TopicRouteData topicRouteData;
1. 發(fā)送消息時查找topic中的發(fā)布信息 2. 如果沒找到,通過NameServer去尋找 3. 通過Netty請求NameServer
DefaultMQProducerImpl
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); //從NameServer更新 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
MQClientInstance
public boolean updateTopicRouteInfoFromNameServer(final String topic) { return updateTopicRouteInfoFromNameServer(topic, false, null); }
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { try { if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null) { topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } } else { topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); } }
public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader(); requestHeader.setTopic(topic); //構建向NameServer的消息體 RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader); //Netty 請求 RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.TOPIC_NOT_EXIST: { // TODO LOG break; } case ResponseCode.SUCCESS: { byte[] body = response.getBody(); if (body != null) { return TopicRouteData.decode(body, TopicRouteData.class); } } default: break; } throw new MQClientException(response.getCode(), response.getRemark()); }Producer 異步消息怎么發(fā)送?
異步:
異步與同步的區(qū)別,是無需等待,后續(xù)邏輯透過回調(diào).
異步發(fā)送方法參數(shù)多了SendCallback參數(shù),且沒有返回值
public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, sendCallback); }
SendCallback是發(fā)送成功和失敗后的回調(diào)函數(shù)
public interface SendCallback { public void onSuccess(final SendResult sendResult); public void onException(final Throwable e); }
前面的和同步類似都跳過,直接看
private void sendMessageAsync(// final String addr, // final String brokerName, // final Message msg, // final long timeoutMillis, // final RemotingCommand request, // final SendCallback sendCallback, // final TopicPublishInfo topicPublishInfo, // final MQClientInstance instance, // final int retryTimesWhenSendFailed, // final AtomicInteger times, // final SendMessageContext context, // final DefaultMQProducerImpl producer // ) throws InterruptedException, RemotingException { this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { RemotingCommand response = responseFuture.getResponseCommand(); if (null == sendCallback && response != null) { try { SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response); if (context != null && sendResult != null) { context.setSendResult(sendResult); context.getProducer().executeSendMessageHookAfter(context); } } catch (Throwable e) { // } producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); return; } if (response != null) { try { SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response); assert sendResult != null; if (context != null) { context.setSendResult(sendResult); context.getProducer().executeSendMessageHookAfter(context); } // 上面重復了,sendCallBack的判斷應該放在這 try { // 異步成功回調(diào)點 sendCallback.onSuccess(sendResult); } catch (Throwable e) { } producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); } catch (Exception e) { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); //異常回調(diào)點 onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, e, context, false, producer); } } else { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); if (!responseFuture.isSendRequestOK()) { MQClientException ex = new MQClientException("send request failed", responseFuture.getCause()); onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } else if (responseFuture.isTimeout()) { MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", responseFuture.getCause()); onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } else { MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause()); onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } } } }); }
@Override public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { if (this.rpcHook != null) { this.rpcHook.doBeforeRequest(addr, request); } this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback); } catch (RemotingSendRequestException e) { log.warn("invokeAsync: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { final int opaque = request.getOpaque(); boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once); this.responseTable.put(opaque, responseFuture); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseFuture.putResponse(null); responseTable.remove(opaque); try { //執(zhí)行回調(diào) executeInvokeCallback(responseFuture); } catch (Throwable e) { PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e); } finally { responseFuture.release(); } PLOG.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); } }); } catch (Exception e) { responseFuture.release(); PLOG.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast"); } else { String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", // timeoutMillis, // this.semaphoreAsync.getQueueLength(), // this.semaphoreAsync.availablePermits()// ); PLOG.warn(info); throw new RemotingTimeoutException(info); } } }P2P,Pub/Sub都支持嗎?
JMS系統(tǒng)有P2P和Pub/Sub兩種模式,一條消息只能被消費一次時即為P2P,RocketMQ的廣播模式即為了Pub/Sub模式.
Producer怎么保證順序消息?順序消息:
消費消息的順序要同發(fā)送消息的順序一致,在 RocketMQ 中,主要指的是局部順序,即一類消息為滿足順序性,必須 Producer 單線程順序發(fā)送,且發(fā)送到同一個隊列,這樣 Consumer 就可以按照 Producer 發(fā)送的順序去消費消息。
普通順序消息:
順序消息的一種,正常情況下可以保證完全的順序消息,但是一旦發(fā)生通信異常,Broker 重啟,由于隊列總數(shù)發(fā)生變化,哈希取模后定位的隊列會變化,產(chǎn)生短暫的消息順序不一致。 如果業(yè)務能容忍在集群異常情況(如某個 Broker 宕機或者重啟)下,消息短暫的亂序,使用普通順序方式比較合適。
嚴格順序消息:
順序消息的一種,無論正常異常情況都能保證順序,但是犧牲了分布式 Failover 特性,即 Broker 集群中只 要有一臺機器不可用,則整個集群都不可用,服務可用性大大降低。 如果服務器部署為同步雙寫模式,此缺陷可通過備機自動切換為主避免,不過仍然會存在幾分鐘的服務不可用。(依賴同步雙寫,主備自動切換,自動切換功能目前還未實現(xiàn))目前已知的應用只有數(shù)據(jù)庫binlog同步強依賴嚴格順序消息,其他應用絕大部分都可以容忍短暫亂序,推薦使用普通的順序消息。
要實現(xiàn)嚴格的順序消息,簡單且可行的辦法就是:
保證生產(chǎn)者 - MQServer - 消費者是一對一對一的關系
關于順序消息建議閱讀分布式開放消息系統(tǒng)(RocketMQ)的原理與實踐
放在Producer就是要保證需要嚴格順序消息就是把同類型的消息發(fā)送到同一MessageQueue,通過實現(xiàn)特定的MessageQueueSelector,可以定制跟消息體相關的MessageQueue
public SendResult send(Message msg, MessageQueueSelector selector, Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout()); }
public interface MessageQueueSelector { MessageQueue select(final ListProducer 負載均衡?mqs, final Message msg, final Object arg); }
同步發(fā)送消息中,有這樣一個方法來選定MessageQueue
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); }
保證低延遲
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //如果要保證最低發(fā)送延遲走此邏輯 //建議:這里改成策略模式 if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); //輪詢看能不能有低延遲的MessageQueue for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } //如果沒找到就隨機找一個 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } //不然走此邏輯,輪詢模式 return tpInfo.selectOneMessageQueue(lastBrokerName); }
選最低延遲的broker,根據(jù)FaultItem的可用性和開始時間選擇
@Override public String pickOneAtLeast() { final Enumerationelements = this.faultItemTable.elements(); List tmpList = new LinkedList (); while (elements.hasMoreElements()) { final FaultItem faultItem = elements.nextElement(); tmpList.add(faultItem); } if (!tmpList.isEmpty()) { @Override public String pickOneAtLeast() { final Enumeration elements = this.faultItemTable.elements(); List tmpList = new LinkedList (); while (elements.hasMoreElements()) { final FaultItem faultItem = elements.nextElement(); tmpList.add(faultItem); } if (!tmpList.isEmpty()) { Collections.shuffle(tmpList); //按延遲排序 Collections.sort(tmpList); final int half = tmpList.size() / 2; if (half <= 0) { return tmpList.get(0).getName(); } else { //從最爛的后一個開始選 final int i = this.whichItemWorst.getAndIncrement() % half; return tmpList.get(i).getName(); } } return null; }
FaultItem排序策略
@Override public int compareTo(final FaultItem other) { if (this.isAvailable() != other.isAvailable()) { if (this.isAvailable()) return -1; if (other.isAvailable()) return 1; } if (this.currentLatency < other.currentLatency) return -1; else if (this.currentLatency > other.currentLatency) { return 1; } if (this.startTimestamp < other.startTimestamp) return -1; else if (this.startTimestamp > other.startTimestamp) { return 1; } return 0; }
輪詢模式:
輪詢?nèi)〔坏扔趌astBrokerName的一個MessageQueue
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } }
沒有l(wèi)astBrokerName,就輪詢?nèi)“?/p>
public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }Producer Group是什么概念?
一類 Producer 的集合名稱,這類Producer通常發(fā)送一類消息,且發(fā)送邏輯一致。
Producer 怎么制定消息優(yōu)先級?優(yōu)先級:
每條消息都有不同的優(yōu)先級,一般用整數(shù)來描述,優(yōu)先級高的消 息先投遞,如果消息完全在一個內(nèi)存隊列中,那么在投遞前可以按照優(yōu)先級排序,令優(yōu)先級高的先投遞。由于RocketMQ所有消息都是持久化的,所以如果按照優(yōu)先級來排序,開銷會非常大,因此RocketMQ沒有特意支持消息優(yōu)先級,但是可以通過變通的方式實現(xiàn)類似功能,即多帶帶配置一個優(yōu)先級高的隊列,和一個普通優(yōu)先級 的隊列, 將不同優(yōu)先級發(fā)送到不同隊列即可。
對于優(yōu)先級問題,可以歸納為 2 類
只要達到優(yōu)先級目的即可,不是嚴格意義上的優(yōu)先級,通常將優(yōu)先級劃分為高、中、低,或者再多幾個級別。每個優(yōu)先級可以用不同的topic表示,發(fā)消息時,指定不同的topic來表示優(yōu)先級,這種方式可以解決絕大部分的優(yōu)先級問題,但是對業(yè)務的優(yōu)先級精確性做了妥協(xié)。
嚴格的優(yōu)先級,優(yōu)先級用整數(shù)表示,例如0~65535,這種優(yōu)先級問題一般使用不同 topic 解決就非常不合適。如果要讓 MQ 解決此問題,會對 MQ 的性能造成非常大的影響。這里要確保一點,業(yè)務上是否確實需要這種嚴格的優(yōu)先級,如果將優(yōu)先級壓縮成幾個,對業(yè)務的影響有多大?
Producer 的事務消息怎么實現(xiàn)?邏輯:
1. 發(fā)送prepare消息 2. 執(zhí)行生產(chǎn)者業(yè)務邏輯(業(yè)務邏輯代碼需實現(xiàn)LocalTransactionExecuter) 3. 結束事務消息(commit,rollback,notType)
TransactionMQProducer.sendMessageInTransaction->DefaultMQProducerImpl.sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { if (null == tranExecuter) { throw new MQClientException("tranExecutor is null", null); } Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; // 設置事務消息屬性 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { 1. 發(fā)送消息,同普通消息一樣 sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } //執(zhí)行生產(chǎn)者業(yè)務邏輯 localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg); if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: //如果1就發(fā)送失敗,就回滾 case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; } try { //結束事務狀態(tài),by本地業(yè)務邏輯執(zhí)行情況 this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult; }
public void endTransaction(// final SendResult sendResult, // final LocalTransactionState localTransactionState, // final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; if (sendResult.getOffsetMsgId() != null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } String transactionId = sendResult.getTransactionId(); final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); switch (localTransactionState) { //如果本地執(zhí)行成功就提交 case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; //如果本地失敗,就回滾 case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; //如果本地未知,那就打死未知 case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }
特殊情況:如果事務結束消息發(fā)送失敗,該怎么辦?
broker會檢查如果是prepared狀態(tài),則會向Producer發(fā)起CheckTransaction請求.
定時消息:
指消息發(fā)到 Broker 后,不能立刻被 Consumer 消費,要到特定的時間點或者等待特定的時間后才能 被消費。
如果要支持任意的時間精度,在Broker層面,必須要做消息排序,如果再涉及到持久化,那么消息排序要不 可避免的產(chǎn)生巨大性能開銷。
RocketMQ 支持定時消息,但是不支持任意時間精度,支持特定的 level,例如定時 5s,10s,1m 等。
實現(xiàn):
在消息體中設置Message延遲級別,剩下的就交給Broker實現(xiàn)吧
public void setDelayTimeLevel(int level) { this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level)); }
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://systransis.cn/yun/70317.html
摘要:每個與集群中的所有節(jié)點建立長連接,定時注冊信息到所有。完全無狀態(tài),可集群部署。本系列源碼解析主要參照原理簡介來追尋其代碼實現(xiàn)雖然版本不太一致但這也是能找到的最詳細的資料了接下來根據(jù)其模塊來源碼閱讀目錄如下 為什么選擇讀RocketMQ? 對MQ的理解一直不深,上周看了,還是覺得不夠深入,找個成熟的產(chǎn)品來學習吧,RabbitMQ是erLang寫的,Kafka是Scala寫的,非Java寫...
摘要:完全無狀態(tài),可集群部署與集群中的其中一個節(jié)點隨機選擇建立長連接,定期從取路由信息,并向提供服務的建立長連接,且定時向發(fā)送心跳。既可以從訂閱消息,也可以從訂閱消息,訂閱規(guī)則由配置決定。 問題列表: Name Server 的作用是什么? Name Server 存儲了Broker的什么信息? Name Server 為Producer的提供些什么信息? Name Server 為Co...
摘要:前提通過前面兩篇文章可以簡單的了解和安裝,今天就將和整合起來使用。然后我運行之前的整合項目,查看監(jiān)控信息如下總結整篇文章講述了與整合和監(jiān)控平臺的搭建。 showImg(https://segmentfault.com/img/remote/1460000013232432?w=1920&h=1277); 前提 通過前面兩篇文章可以簡單的了解 RocketMQ 和 安裝 RocketMQ...
摘要:它是阿里巴巴于年開源的第三代分布式消息中間件。是一個分布式消息中間件,具有低延遲高性能和可靠性萬億級別的容量和靈活的可擴展性,它是阿里巴巴于年開源的第三代分布式消息中間件。上篇文章消息隊列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊列的發(fā)展史:并且詳細介紹了RabbitMQ,其功能也是挺強大的,那么,為啥又要搞一個RocketMQ出來呢?是重復造輪子嗎?本文我們就帶大家來詳...
摘要:本地的安裝與調(diào)試標簽啟動進入的源碼項目。消息發(fā)送的高性能與低延遲。強大的消息堆積能力與消息處理能力。嚴格的順序消息存儲。保證消息至少被消費一次,但不承諾消息不會被消費者多次消費。其消息的冪等由消費者自己實現(xiàn)。 本地RocketMQ的安裝與調(diào)試 標簽:【RocketMQ】 1. 啟動 進入RocketMQ-ALL的源碼項目。 執(zhí)行maven打包: mvn -Prelease-all ...
閱讀 3022·2021-11-23 09:51
閱讀 1016·2021-09-26 09:55
閱讀 3972·2021-09-22 14:58
閱讀 1505·2021-09-08 09:35
閱讀 1086·2021-08-26 14:16
閱讀 891·2019-08-23 18:17
閱讀 2073·2019-08-23 16:45
閱讀 710·2019-08-23 15:55