成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

RocketMQ源碼學習(二)-Producer

chengtao1633 / 2498人閱讀

摘要:每個優(yōu)先級可以用不同的表示,發(fā)消息時,指定不同的來表示優(yōu)先級,這種方式可以解決絕大部分的優(yōu)先級問題,但是對業(yè)務的優(yōu)先級精確性做了妥協(xié)。支持定時消息,但是不支持任意時間精度,支持特定的,例如定時,,等。

Producer 生產(chǎn)者

這次源碼學習的方法是帶著問題學習源碼實現(xiàn),問題列表如下

Producer 同步消息怎么發(fā)送?

Producer 是與NameServer什么交互?

Producer 異步消息怎么發(fā)送?

P2P,Pub/Sub 都支持嗎?

Producer 怎么保證順序消息?

Producer 負載均衡?

Producer Group是什么概念?

Producer 怎么制定消息優(yōu)先級?

Producer 的事務消息怎么實現(xiàn)?

Producer 定時消息怎么實現(xiàn)?

Producer 同步消息怎么發(fā)送? 消息體Message
    //主題
    private String topic;
    //狀態(tài)
    private int flag;
    //屬性
    private Map properties;
    //消息體
    private byte[] body;
執(zhí)行邏輯
邏輯:
1. 查找topic中的發(fā)布信息
2. 選擇topic中的某個MessageQueue
3. 使用Netty發(fā)送消息至MessageQueue
代碼走讀,重點邏輯在代碼中注釋(省略與主邏輯無關的代碼)
DefaultMQProducer.send
->DefaultMQProducerImple.send
->DefaultMQProducerImple.sendDefaultImpl
private SendResult sendDefaultImpl(//
        Message msg, //
        final CommunicationMode communicationMode, //
        final SendCallback sendCallback, //
        final long timeout//
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);

        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        //獲取topic信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                //選擇topic中的中的某個MessageQueue(相當于kafka的partition)
                MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (tmpmq != null) {
                    mq = tmpmq;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        //真正的發(fā)送數(shù)據(jù)方法
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    } 
            }
    }

本次重點關注消息體的發(fā)送
DefaultMQProducerImple.sendDefaultImpl->DefaultMQProducerImpl.sendKernelImpl

private SendResult sendKernelImpl(final Message msg, //
        final MessageQueue mq, //
        final CommunicationMode communicationMode, //
        final SendCallback sendCallback, //
        final TopicPublishInfo topicPublishInfo, //
        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        if (brokerAddr != null) {
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            byte[] prevBody = msg.getBody();
            try {
                //for MessageBatch,ID has been set in the generating process
                if (!(msg instanceof MessageBatch)) {
                    MessageClientIDSetter.setUniqID(msg);
                }

                int sysFlag = 0;
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                }

                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }

                if (hasCheckForbiddenHook()) {
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }

                if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }

                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    this.executeSendMessageHookBefore(context);
                }
                //構建請求頭部
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }

                SendResult sendResult = null;
                switch (communicationMode) {
                //異步模式發(fā)送
                    case ASYNC:
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
                            brokerAddr, // 1
                            mq.getBrokerName(), // 2
                            msg, // 3
                            requestHeader, // 4
                            timeout, // 5
                            communicationMode, // 6
                            sendCallback, // 7
                            topicPublishInfo, // 8
                            this.mQClientFactory, // 9
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
                            context, //
                            this);
                        break;
                    case ONEWAY:
                    //同步模式發(fā)送
                    case SYNC:
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }

                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                return sendResult;
            } catch (RemotingException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (MQBrokerException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (InterruptedException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } finally {
                msg.setBody(prevBody);
            }
        }

        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

后面就是底層的消息發(fā)送工具,整個RocketMQ共用
DefaultMQProducerImpl.sendKernelImpl->MQClientAPIImpl.sendMessage

public SendResult sendMessage(//
        final String addr, // 1
        final String brokerName, // 2
        final Message msg, // 3
        final SendMessageRequestHeader requestHeader, // 4
        final long timeoutMillis, // 5
        final CommunicationMode communicationMode, // 6
        final SendCallback sendCallback, // 7
        final TopicPublishInfo topicPublishInfo, // 8
        final MQClientInstance instance, // 9
        final int retryTimesWhenSendFailed, // 10
        final SendMessageContext context, // 11
        final DefaultMQProducerImpl producer // 12
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = null;
        if (sendSmartMsg || msg instanceof MessageBatch) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }

        request.setBody(msg.getBody());

        switch (communicationMode) {
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
            default:
                assert false;
                break;
        }

        return null;
    }

MQClientAPIImpl.sendMessage->MQClientAPIImpl.sendMessageSync
委托給Netty

 private SendResult sendMessageSync(//
                                       final String addr, //
                                       final String brokerName, //
                                       final Message msg, //
                                       final long timeoutMillis, //
                                       final RemotingCommand request//
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert response != null;
        return this.processSendResponse(brokerName, msg, response);
    }

MQClientAPIImpl.sendMessageSync->NettyRomotingClient.invokeSync
在調(diào)用前后觸發(fā)hook,真正的還在后面

  @Override
    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                if (this.rpcHook != null) {
                    this.rpcHook.doBeforeRequest(addr, request);
                }
                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
                if (this.rpcHook != null) {
                    this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                }
                return response;
            } catch (RemotingSendRequestException e) {
                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            } catch (RemotingTimeoutException e) {
                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr, channel);
                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                }
                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

NettyRomotingClient.invokeSync->NettyRomotingClient.invokeSyncImpl
終于到了Netty真正發(fā)數(shù)據(jù)的時候了

    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        final int opaque = request.getOpaque();

        try {
            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }

                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    responseFuture.putResponse(null);
                    PLOG.warn("send a request command to channel <" + addr + "> failed.");
                }
            });

            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }
Producer是與NameServer什么交互? TopicPublishInfo
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    //topic中MessgeQueuee信息
    private List messageQueueList = new ArrayList();
    //負責均衡策略的索引
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    //topic包含的路由信息
    private TopicRouteData topicRouteData;
執(zhí)行邏輯
邏輯:
1. 發(fā)送消息時查找topic中的發(fā)布信息
2. 如果沒找到,通過NameServer去尋找
3. 通過Netty請求NameServer
代碼走讀,重點邏輯在代碼中注釋(省略與主邏輯無關的代碼)

DefaultMQProducerImpl

 private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            //從NameServer更新
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

MQClientInstance

 public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        return updateTopicRouteInfoFromNameServer(topic, false, null);
    }
    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                        if (topicRouteData != null) {
                            for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            }
                        }
                    } else {
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
    }
                   
 public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
        throws RemotingException, MQClientException, InterruptedException {
        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
        requestHeader.setTopic(topic);
        //構建向NameServer的消息體
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
        //Netty 請求
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.TOPIC_NOT_EXIST: {
                // TODO LOG
                break;
            }
            case ResponseCode.SUCCESS: {
                byte[] body = response.getBody();
                if (body != null) {
                    return TopicRouteData.decode(body, TopicRouteData.class);
                }
            }
            default:
                break;
        }

        throw new MQClientException(response.getCode(), response.getRemark());
    }
Producer 異步消息怎么發(fā)送?

異步:

異步與同步的區(qū)別,是無需等待,后續(xù)邏輯透過回調(diào).

代碼走讀,重點邏輯在代碼中注釋(省略與主邏輯無關的代碼)

異步發(fā)送方法參數(shù)多了SendCallback參數(shù),且沒有返回值

   public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, sendCallback);
    }

SendCallback是發(fā)送成功和失敗后的回調(diào)函數(shù)

public interface SendCallback {
    public void onSuccess(final SendResult sendResult);

    public void onException(final Throwable e);
}

前面的和同步類似都跳過,直接看

  private void sendMessageAsync(//
        final String addr, //
        final String brokerName, //
        final Message msg, //
        final long timeoutMillis, //
        final RemotingCommand request, //
        final SendCallback sendCallback, //
        final TopicPublishInfo topicPublishInfo, //
        final MQClientInstance instance, //
        final int retryTimesWhenSendFailed, //
        final AtomicInteger times, //
        final SendMessageContext context, //
        final DefaultMQProducerImpl producer //
    ) throws InterruptedException, RemotingException {
        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
            @Override
            public void operationComplete(ResponseFuture responseFuture) {
                RemotingCommand response = responseFuture.getResponseCommand();
                if (null == sendCallback && response != null) {

                    try {
                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                        if (context != null && sendResult != null) {
                            context.setSendResult(sendResult);
                            context.getProducer().executeSendMessageHookAfter(context);
                        }
                    } catch (Throwable e) {
                        //
                    }

                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                    return;
                }

                if (response != null) {
                    try {
                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                        assert sendResult != null;
                        if (context != null) {
                            context.setSendResult(sendResult);
                            context.getProducer().executeSendMessageHookAfter(context);
                        }
        // 上面重復了,sendCallBack的判斷應該放在這
                        try {
        // 異步成功回調(diào)點
                            sendCallback.onSuccess(sendResult);
                        } catch (Throwable e) {
                        }

                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                    } catch (Exception e) {
                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                        //異常回調(diào)點
                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, e, context, false, producer);
                    }
                } else {
                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                    if (!responseFuture.isSendRequestOK()) {
                        MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                    } else if (responseFuture.isTimeout()) {
                        MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
                            responseFuture.getCause());
                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                    } else {
                        MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                    }
                }
            }
        });
    }
    @Override
    public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException {
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                if (this.rpcHook != null) {
                    this.rpcHook.doBeforeRequest(addr, request);
                }
                this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
            } catch (RemotingSendRequestException e) {
                log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }
   public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        final int opaque = request.getOpaque();
        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);

            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
            this.responseTable.put(opaque, responseFuture);
            try {
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        if (f.isSuccess()) {
                            responseFuture.setSendRequestOK(true);
                            return;
                        } else {
                            responseFuture.setSendRequestOK(false);
                        }

                        responseFuture.putResponse(null);
                        responseTable.remove(opaque);
                        try {
                        //執(zhí)行回調(diào)
                            executeInvokeCallback(responseFuture);
                        } catch (Throwable e) {
                            PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e);
                        } finally {
                            responseFuture.release();
                        }

                        PLOG.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                    }
                });
            } catch (Exception e) {
                responseFuture.release();
                PLOG.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
            } else {
                String info =
                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
                        timeoutMillis, //
                        this.semaphoreAsync.getQueueLength(), //
                        this.semaphoreAsync.availablePermits()//
                    );
                PLOG.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
    }
P2P,Pub/Sub都支持嗎?

JMS系統(tǒng)有P2P和Pub/Sub兩種模式,一條消息只能被消費一次時即為P2P,RocketMQ的廣播模式即為了Pub/Sub模式.

Producer怎么保證順序消息?

順序消息:

消費消息的順序要同發(fā)送消息的順序一致,在 RocketMQ 中,主要指的是局部順序,即一類消息為滿足順序性,必須 Producer 單線程順序發(fā)送,且發(fā)送到同一個隊列,這樣 Consumer 就可以按照 Producer 發(fā)送的順序去消費消息。

普通順序消息:

順序消息的一種,正常情況下可以保證完全的順序消息,但是一旦發(fā)生通信異常,Broker 重啟,由于隊列總數(shù)發(fā)生變化,哈希取模后定位的隊列會變化,產(chǎn)生短暫的消息順序不一致。 如果業(yè)務能容忍在集群異常情況(如某個 Broker 宕機或者重啟)下,消息短暫的亂序,使用普通順序方式比較合適。

嚴格順序消息:

順序消息的一種,無論正常異常情況都能保證順序,但是犧牲了分布式 Failover 特性,即 Broker 集群中只 要有一臺機器不可用,則整個集群都不可用,服務可用性大大降低。 如果服務器部署為同步雙寫模式,此缺陷可通過備機自動切換為主避免,不過仍然會存在幾分鐘的服務不可用。(依賴同步雙寫,主備自動切換,自動切換功能目前還未實現(xiàn))目前已知的應用只有數(shù)據(jù)庫binlog同步強依賴嚴格順序消息,其他應用絕大部分都可以容忍短暫亂序,推薦使用普通的順序消息。

要實現(xiàn)嚴格的順序消息,簡單且可行的辦法就是:

保證生產(chǎn)者 - MQServer - 消費者是一對一對一的關系

關于順序消息建議閱讀分布式開放消息系統(tǒng)(RocketMQ)的原理與實踐
放在Producer就是要保證需要嚴格順序消息就是把同類型的消息發(fā)送到同一MessageQueue,通過實現(xiàn)特定的MessageQueueSelector,可以定制跟消息體相關的MessageQueue

    public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout());
    }
public interface MessageQueueSelector {
    MessageQueue select(final List mqs, final Message msg, final Object arg);
}
Producer 負載均衡?

同步發(fā)送消息中,有這樣一個方法來選定MessageQueue

MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
   public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
    }

保證低延遲

   public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
   //如果要保證最低發(fā)送延遲走此邏輯
   //建議:這里改成策略模式
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                //輪詢看能不能有低延遲的MessageQueue
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }
        //如果沒找到就隨機找一個
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }
        
        //不然走此邏輯,輪詢模式
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

選最低延遲的broker,根據(jù)FaultItem的可用性和開始時間選擇

    @Override
    public String pickOneAtLeast() {
        final Enumeration elements = this.faultItemTable.elements();
        List tmpList = new LinkedList();
        while (elements.hasMoreElements()) {
            final FaultItem faultItem = elements.nextElement();
            tmpList.add(faultItem);
        }

        if (!tmpList.isEmpty()) {
        
                @Override
    public String pickOneAtLeast() {
        final Enumeration elements = this.faultItemTable.elements();
        List tmpList = new LinkedList();
        while (elements.hasMoreElements()) {
            final FaultItem faultItem = elements.nextElement();
            tmpList.add(faultItem);
        }

        if (!tmpList.isEmpty()) {
            Collections.shuffle(tmpList);
            //按延遲排序
            Collections.sort(tmpList);

            final int half = tmpList.size() / 2;
            if (half <= 0) {
                return tmpList.get(0).getName();
            } else {
            //從最爛的后一個開始選
                final int i = this.whichItemWorst.getAndIncrement() % half;
                return tmpList.get(i).getName();
            }
        }

        return null;
    }

FaultItem排序策略

        @Override
        public int compareTo(final FaultItem other) {
            if (this.isAvailable() != other.isAvailable()) {
                if (this.isAvailable())
                    return -1;

                if (other.isAvailable())
                    return 1;
            }

            if (this.currentLatency < other.currentLatency)
                return -1;
            else if (this.currentLatency > other.currentLatency) {
                return 1;
            }

            if (this.startTimestamp < other.startTimestamp)
                return -1;
            else if (this.startTimestamp > other.startTimestamp) {
                return 1;
            }

            return 0;
        }

輪詢模式:

輪詢?nèi)〔坏扔趌astBrokerName的一個MessageQueue
  public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }

沒有l(wèi)astBrokerName,就輪詢?nèi)“?/p>

   public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }
Producer Group是什么概念?

一類 Producer 的集合名稱,這類Producer通常發(fā)送一類消息,且發(fā)送邏輯一致。

Producer 怎么制定消息優(yōu)先級?

優(yōu)先級:

每條消息都有不同的優(yōu)先級,一般用整數(shù)來描述,優(yōu)先級高的消 息先投遞,如果消息完全在一個內(nèi)存隊列中,那么在投遞前可以按照優(yōu)先級排序,令優(yōu)先級高的先投遞。由于RocketMQ所有消息都是持久化的,所以如果按照優(yōu)先級來排序,開銷會非常大,因此RocketMQ沒有特意支持消息優(yōu)先級,但是可以通過變通的方式實現(xiàn)類似功能,即多帶帶配置一個優(yōu)先級高的隊列,和一個普通優(yōu)先級 的隊列, 將不同優(yōu)先級發(fā)送到不同隊列即可。

對于優(yōu)先級問題,可以歸納為 2 類

只要達到優(yōu)先級目的即可,不是嚴格意義上的優(yōu)先級,通常將優(yōu)先級劃分為高、中、低,或者再多幾個級別。每個優(yōu)先級可以用不同的topic表示,發(fā)消息時,指定不同的topic來表示優(yōu)先級,這種方式可以解決絕大部分的優(yōu)先級問題,但是對業(yè)務的優(yōu)先級精確性做了妥協(xié)。

嚴格的優(yōu)先級,優(yōu)先級用整數(shù)表示,例如0~65535,這種優(yōu)先級問題一般使用不同 topic 解決就非常不合適。如果要讓 MQ 解決此問題,會對 MQ 的性能造成非常大的影響。這里要確保一點,業(yè)務上是否確實需要這種嚴格的優(yōu)先級,如果將優(yōu)先級壓縮成幾個,對業(yè)務的影響有多大?

Producer 的事務消息怎么實現(xiàn)?

邏輯:

1. 發(fā)送prepare消息
2. 執(zhí)行生產(chǎn)者業(yè)務邏輯(業(yè)務邏輯代碼需實現(xiàn)LocalTransactionExecuter)
3. 結束事務消息(commit,rollback,notType)

TransactionMQProducer.sendMessageInTransaction->DefaultMQProducerImpl.sendMessageInTransaction

    public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg)
        throws MQClientException {
        if (null == tranExecuter) {
            throw new MQClientException("tranExecutor is null", null);
        }
        Validators.checkMessage(msg, this.defaultMQProducer);

        SendResult sendResult = null;
        // 設置事務消息屬性
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            1. 發(fā)送消息,同普通消息一樣
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }

        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    //執(zhí)行生產(chǎn)者業(yè)務邏輯
                    localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            //如果1就發(fā)送失敗,就回滾
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }

        try {
        //結束事務狀態(tài),by本地業(yè)務邏輯執(zhí)行情況
            this.endTransaction(sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }

        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }
    public void endTransaction(//
        final SendResult sendResult, //
        final LocalTransactionState localTransactionState, //
        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
        final MessageId id;
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }
        String transactionId = sendResult.getTransactionId();
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        switch (localTransactionState) {
        //如果本地執(zhí)行成功就提交
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
        //如果本地失敗,就回滾
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
        //如果本地未知,那就打死未知
            case UNKNOW:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }

        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    }

特殊情況:如果事務結束消息發(fā)送失敗,該怎么辦?
broker會檢查如果是prepared狀態(tài),則會向Producer發(fā)起CheckTransaction請求.

Producer 延時消息怎么實現(xiàn)?

定時消息:

指消息發(fā)到 Broker 后,不能立刻被 Consumer 消費,要到特定的時間點或者等待特定的時間后才能 被消費。
如果要支持任意的時間精度,在Broker層面,必須要做消息排序,如果再涉及到持久化,那么消息排序要不 可避免的產(chǎn)生巨大性能開銷。

RocketMQ 支持定時消息,但是不支持任意時間精度,支持特定的 level,例如定時 5s,10s,1m 等。

實現(xiàn):
在消息體中設置Message延遲級別,剩下的就交給Broker實現(xiàn)吧

    public void setDelayTimeLevel(int level) {
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }

文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉載請注明本文地址:http://systransis.cn/yun/70317.html

相關文章

  • RocketMQ源碼學習(一)-概述

    摘要:每個與集群中的所有節(jié)點建立長連接,定時注冊信息到所有。完全無狀態(tài),可集群部署。本系列源碼解析主要參照原理簡介來追尋其代碼實現(xiàn)雖然版本不太一致但這也是能找到的最詳細的資料了接下來根據(jù)其模塊來源碼閱讀目錄如下 為什么選擇讀RocketMQ? 對MQ的理解一直不深,上周看了,還是覺得不夠深入,找個成熟的產(chǎn)品來學習吧,RabbitMQ是erLang寫的,Kafka是Scala寫的,非Java寫...

    godlong_X 評論0 收藏0
  • RocketMQ源碼學習(六)-Name Server

    摘要:完全無狀態(tài),可集群部署與集群中的其中一個節(jié)點隨機選擇建立長連接,定期從取路由信息,并向提供服務的建立長連接,且定時向發(fā)送心跳。既可以從訂閱消息,也可以從訂閱消息,訂閱規(guī)則由配置決定。 問題列表: Name Server 的作用是什么? Name Server 存儲了Broker的什么信息? Name Server 為Producer的提供些什么信息? Name Server 為Co...

    Joyven 評論0 收藏0
  • SpringBoot RocketMQ 整合使用和監(jiān)控

    摘要:前提通過前面兩篇文章可以簡單的了解和安裝,今天就將和整合起來使用。然后我運行之前的整合項目,查看監(jiān)控信息如下總結整篇文章講述了與整合和監(jiān)控平臺的搭建。 showImg(https://segmentfault.com/img/remote/1460000013232432?w=1920&h=1277); 前提 通過前面兩篇文章可以簡單的了解 RocketMQ 和 安裝 RocketMQ...

    Jacendfeng 評論0 收藏0
  • 高并發(fā)異步解耦利器:RocketMQ究竟強在哪里?

    摘要:它是阿里巴巴于年開源的第三代分布式消息中間件。是一個分布式消息中間件,具有低延遲高性能和可靠性萬億級別的容量和靈活的可擴展性,它是阿里巴巴于年開源的第三代分布式消息中間件。上篇文章消息隊列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊列的發(fā)展史:并且詳細介紹了RabbitMQ,其功能也是挺強大的,那么,為啥又要搞一個RocketMQ出來呢?是重復造輪子嗎?本文我們就帶大家來詳...

    tainzhi 評論0 收藏0
  • 本地RocketMQ的安裝與調(diào)試

    摘要:本地的安裝與調(diào)試標簽啟動進入的源碼項目。消息發(fā)送的高性能與低延遲。強大的消息堆積能力與消息處理能力。嚴格的順序消息存儲。保證消息至少被消費一次,但不承諾消息不會被消費者多次消費。其消息的冪等由消費者自己實現(xiàn)。 本地RocketMQ的安裝與調(diào)試 標簽:【RocketMQ】 1. 啟動 進入RocketMQ-ALL的源碼項目。 執(zhí)行maven打包: mvn -Prelease-all ...

    icattlecoder 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<