摘要:執(zhí)行該方法前,先進入切面編程注冊用戶等待確認直接發(fā)送有序無序默認延時級別不延時秒小時切面中,因為郵件激活發(fā)送消息類型為默認的等待確認。
用戶注冊
github 開源項目--paascloud-master:https://github.com/paascloud/...
分布式解決方案--基于可靠消息的最終一致性:https://github.com/paascloud/...
本篇文章目的是理解該項目可靠消息服務(wù)中心(TCP)發(fā)送消息、消費消息的流程,用戶注冊發(fā)送激活郵箱和激活后發(fā)送注冊成功郵箱都是利用可靠消息服務(wù)來解決分布式事務(wù),理解了該流程也就弄懂了該項目中其他業(yè)務(wù)流程。發(fā)送激活郵箱過程
消息生產(chǎn)端:UAC
可靠消息服務(wù):TPC
消息服務(wù)端:OPC
用戶注冊后,向注冊郵箱發(fā)送一封激活郵箱。消息生產(chǎn)端(UAC)
大致流程為:
本地服務(wù) UAC 先持久化 預(yù)發(fā)送消息(等待確認消息),表 pc_mq_message_data;
調(diào)用遠端可靠消息服務(wù)TPC持久化預(yù)發(fā)送消息,可靠消息表pc_tpc_mq_message;
執(zhí)行本地事務(wù)即 保存用戶信息 ;
調(diào)用遠端可靠消息服務(wù)TPC更新第2步中的等待確認狀態(tài)為發(fā)送中sending;
同時創(chuàng)建消費待確認列表,即持久化該Topic類型的消息被哪些消費者訂閱監(jiān)聽的所有消費待確認列表,狀態(tài)為未確認,表pc_tpc_mq_confirm;
完成上面操作后,發(fā)送消息到 RocketMQ。
controller層AuthRestController.java
@PostMapping(value = "/register") @ApiOperation(httpMethod = "POST", value = "注冊用戶") public Wrapper registerUser(UserRegisterDto user) { uacUserService.register(user); return WrapMapper.ok(); }service層
用戶ID生成:雪花算法生成分布式唯一 ID
用戶密碼加密:SpringSecurity BCryptPasswordEncoder 強哈希方法加密,每次加密的結(jié)果都不一樣。
bcrypt 可以有效抵御彩虹表暴力破解,其原理就是在加鹽的基礎(chǔ)上多次 hash,關(guān)于密碼參考:https://mp.weixin.qq.com/s/Dk...
Redis存儲激活郵箱token:key(active_token):email:過期時間1天,即激活接口參數(shù):activeUserToken;
生成郵件發(fā)送模板(freeMarker):activeUserTemplate.ftl
根據(jù)上面模板和發(fā)送郵件參數(shù)生成實體:MqMessageData(pc_mq_message_data)
各個子系統(tǒng)消息落地的消息表,比如用戶服務(wù)系統(tǒng)主要就是郵件消息、短信消息等。
@Override public void register(UserRegisterDto registerDto) { // 校驗注冊信息 validateRegisterInfo(registerDto); String mobileNo = registerDto.getMobileNo(); String email = registerDto.getEmail(); Date row = new Date(); String salt = String.valueOf(generateId()); // 封裝注冊信息 long id = generateId(); // id 雪花算法生成 UacUser uacUser = new UacUser(); uacUser.setLoginName(registerDto.getLoginName()); uacUser.setSalt(salt); uacUser.setLoginPwd(Md5Util.encrypt(registerDto.getLoginPwd())); uacUser.setMobileNo(mobileNo); uacUser.setStatus(UacUserStatusEnum.DISABLE.getKey()); uacUser.setUserSource(UacUserSourceEnum.REGISTER.getKey()); uacUser.setCreatedTime(row); uacUser.setUpdateTime(row); uacUser.setEmail(email); uacUser.setId(id); uacUser.setCreatorId(id); uacUser.setCreator(registerDto.getLoginName()); uacUser.setLastOperatorId(id); uacUser.setUserName(registerDto.getLoginName()); uacUser.setLastOperator(registerDto.getLoginName()); // 發(fā)送激活郵件 String activeToken = PubUtils.uuid() + super.generateId(); redisService.setKey(RedisKeyUtil.getActiveUserKey(activeToken), email, 1, TimeUnit.DAYS); Mapparam = Maps.newHashMap(); param.put("loginName", registerDto.getLoginName()); param.put("email", registerDto.getEmail()); param.put("activeUserUrl", activeUserUrl + activeToken); param.put("dateTime", DateUtil.formatDateTime(new Date())); Set to = Sets.newHashSet(); to.add(registerDto.getEmail()); MqMessageData mqMessageData = emailProducer.sendEmailMq(to, UacEmailTemplateEnum.ACTIVE_USER, AliyunMqTopicConstants.MqTagEnum.ACTIVE_USER, param); // 即下面的第6步 userManager.register(mqMessageData, uacUser); }
userManager.register() 通過注解 @MqProducerStore 發(fā)送消息服務(wù)。
執(zhí)行該方法前,先進入切面編程
@MqProducerStore public void register(final MqMessageData mqMessageData, final UacUser uacUser) { log.info("注冊用戶. mqMessageData={}, user={}", mqMessageData, uacUser); uacUserMapper.insertSelective(uacUser); }
@Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public @interface MqProducerStore { // WAIT_CONFIRM:等待確認;SAVE_AND_SEND:直接發(fā)送; MqSendTypeEnum sendType() default MqSendTypeEnum.WAIT_CONFIRM; // ORDER(1):有序;DIS_ORDER(0):無序 MqOrderTypeEnum orderType() default MqOrderTypeEnum.ORDER; // Rocketmq 默認延時級別 // ZERO(0, 不延時);ONE(1, 1秒)....EIGHTEEN(18, 2小時) DelayLevelEnum delayLevel() default DelayLevelEnum.ZERO; }
切面中,因為郵件激活發(fā)送消息類型為默認的:等待確認。
此處本地服務(wù) UAC 消息落地:保存待確認消息 MqMessageData 到 mysql,表pc_mq_message_data;
發(fā)送待確認消息到可靠消息系統(tǒng)(TPC):發(fā)送預(yù)發(fā)送狀態(tài)的消息給消息中心
// 切面 MqMessageData domain = null; for (Object object : args) { if (object instanceof MqMessageData) { domain = (MqMessageData) object; break; } } domain.setOrderType(orderType); domain.setProducerGroup(producerGroup); // 1. 等待確認 if (type == MqSendTypeEnum.WAIT_CONFIRM) { if (delayLevelEnum != DelayLevelEnum.ZERO) { domain.setDelayLevel(delayLevelEnum.delayLevel()); } // 1.1 發(fā)送待確認消息到可靠消息系統(tǒng) // 本地服務(wù)消息落地,可靠消息服務(wù)中心也持久化預(yù)發(fā)送消息,但是不發(fā)送 mqMessageService.saveWaitConfirmMessage(domain); } result = joinPoint.proceed(); // 返回注解方法,執(zhí)行業(yè)務(wù)
@Override @Transactional(rollbackFor = Exception.class) public void saveWaitConfirmMessage(final MqMessageData mqMessageData) { // 1. 持久化到本地mysql this.saveMqProducerMessage(mqMessageData); // 2. 發(fā)送預(yù)發(fā)送狀態(tài)的消息給消息中心 TpcMqMessageDto tpcMqMessageDto = mqMessageData.getTpcMqMessageDto(); // 3. 調(diào)用遠端可靠消息服務(wù)(tpc),持久化等待確認消息 tpcMqMessageFeignApi.saveMessageWaitingConfirm(tpcMqMessageDto); // 4. mqMessageData 此時為調(diào)用遠端服務(wù)返回來的數(shù)據(jù) log.info("<== saveWaitConfirmMessage - 存儲預(yù)發(fā)送消息成功. messageKey={}", mqMessageData.getMessageKey()); }
緊接著第7步,調(diào)用遠端可靠消息服務(wù)(TCP),此時只是持久化預(yù)發(fā)送消息,但是沒有發(fā)送(等執(zhí)行完本地事務(wù)即保存用戶后在發(fā)送,即第9步)
持久化 TpcMqMessage,即 pc_tpc_mq_message(可靠消息表)
@Override public void saveMessageWaitingConfirm(TpcMqMessageDto messageDto) { if (StringUtils.isEmpty(messageDto.getMessageTopic())) { throw new TpcBizException(ErrorCodeEnum.TPC10050001); } Date now = new Date(); TpcMqMessage message = new ModelMapper().map(messageDto, TpcMqMessage.class); // 消息狀態(tài):WAIT_SEND(10, "未發(fā)送");SENDING(20, "已發(fā)送");FINISH(30, "已完成"); message.setMessageStatus(MqSendStatusEnum.WAIT_SEND.sendStatus()); message.setUpdateTime(now); message.setCreatedTime(now); tpcMqMessageMapper.insertSelective(message); }
上面執(zhí)行完后,返回注解 @MqProducerStore所在方法,執(zhí)行本地事務(wù):保存用戶到 mysql。
result = joinPoint.proceed(); // 返回注解方法,執(zhí)行業(yè)務(wù)
第9步執(zhí)行完后,再次進入切面,發(fā)送確認消息給可靠消息服務(wù)中心
result = joinPoint.proceed(); // 返回注解方法,執(zhí)行業(yè)務(wù) // 2. 直接發(fā)送 if (type == MqSendTypeEnum.SAVE_AND_SEND) { mqMessageService.saveAndSendMessage(domain); // 3. XXX } else if (type == MqSendTypeEnum.DIRECT_SEND) { mqMessageService.directSendMessage(domain); } else { // type = WAIT_CONFIRM final MqMessageData finalDomain = domain; taskExecutor.execute(() -> mqMessageService.confirmAndSendMessage(finalDomain.getMessageKey())); } return result;
緊接著上面,可靠消息服務(wù)中心(TCP):根據(jù)傳過來的 messageKey 確認并發(fā)送之前已經(jīng)持久化的預(yù)發(fā)送消息。
// TpcMqMessageFeignClient.java @Override @ApiOperation(httpMethod = "POST", value = "確認并發(fā)送消息") public Wrapper confirmAndSendMessage(@RequestParam("messageKey") String messageKey) { logger.info("確認并發(fā)送消息. messageKey={}", messageKey); tpcMqMessageService.confirmAndSendMessage(messageKey); return WrapMapper.ok(); } // TpcMqMessageServiceImpl.java @Override public void confirmAndSendMessage(String messageKey) { final TpcMqMessage message = tpcMqMessageMapper.getByMessageKey(messageKey); if (message == null) { throw new TpcBizException(ErrorCodeEnum.TPC10050002); } TpcMqMessage update = new TpcMqMessage(); update.setMessageStatus(MqSendStatusEnum.SENDING.sendStatus()); update.setId(message.getId()); update.setUpdateTime(new Date()); // 1. 更新消息狀態(tài)為:SENDING tpcMqMessageMapper.updateByPrimaryKeySelective(update); // 2. 創(chuàng)建消費待確認列表(此處topic:SEND_EMAIL_TOPIC) this.createMqConfirmListByTopic(message.getMessageTopic(), message.getId(), message.getMessageKey()); // 3. 直接發(fā)送消息 this.directSendMessage(message.getMessageBody(), message.getMessageTopic(), message.getMessageTag(), message.getMessageKey(), message.getProducerGroup(), message.getDelayLevel()); }
第11步中的第2點:TCP 服務(wù)中,創(chuàng)建消費待確認列表,根據(jù)表 pc_tpc_mq_subscribe,查詢出不同 topic 下相對應(yīng)的所有 consumer_code(消費監(jiān)聽者),即設(shè)置該消息被哪些服務(wù)(CID)監(jiān)聽消費;
- ``SEND_EMAIL_TOPIC`` --> ``CID_OPC``:該消息會被 `consumerGroup` 為 `CID_OPC` 的服務(wù)監(jiān)聽并消費。 - 同時,保存**確認消息**:``TpcMqConfirm`` --> 表 ``pc_tpc_mq_confirm``
@Override public void createMqConfirmListByTopic(final String topic, final Long messageId, final String messageKey) { Listlist = Lists.newArrayList(); TpcMqConfirm tpcMqConfirm; List consumerGroupList = tpcMqConsumerService.listConsumerGroupByTopic(topic); if (PublicUtil.isEmpty(consumerGroupList)) { throw new TpcBizException(ErrorCodeEnum.TPC100500010, topic); } for (final String cid : consumerGroupList) { tpcMqConfirm = new TpcMqConfirm(UniqueIdGenerator.generateId(), messageId, messageKey, cid); list.add(tpcMqConfirm); } tpcMqConfirmMapper.batchCreateMqConfirm(list); }
第11步中的第3點:完成上面操作后,直接發(fā)送消息到中間件 RocketMQ 隊列中。
@Override public void directSendMessage(String body, String topic, String tag, String key, String pid, Integer delayLevel) { RocketMqProducer.sendSimpleMessage(body, topic, tag, key, pid, delayLevel); } // 核心方法:重試發(fā)送消息(重試次數(shù)3次) // pid:producerGroup --> 發(fā)送郵件服務(wù)是 PID_UAC // cid: consumerGroup --> 監(jiān)聽郵件消息服務(wù)是 CID_OPC private static SendResult retrySendMessage(String pid, Message msg) { int iniCount = 1; SendResult result; while (true) { try { // Message中屬性 result = MqProducerBeanFactory.getBean(pid).send(msg); break; } catch (Exception e) { log.error("發(fā)送消息失敗:", e); if (iniCount++ >= PRODUCER_RETRY_TIMES) { throw new TpcBizException(ErrorCodeEnum.TPC100500014, msg.getTopic(), msg.getKeys()); } } } log.info("<== 發(fā)送MQ SendResult={}", result); return result; }消息消費端(OPC)
大致流程:
OPC服務(wù)通過配置類AliyunMqConfiguration.java啟動DefaultMQPushConsumer RocketMQ 消費端,并設(shè)置消息邏輯處理監(jiān)聽器OptPushMessageListener;
本地服務(wù)OPC持久化消費者確認消息,表 pc_mq_message_data;
調(diào)用遠端可靠消息服務(wù)TPC,更新之前生產(chǎn)端持久化的消費確認列表狀態(tài),未確認 --> 已確認,表pc_tpc_mq_confirm;
接著就可以發(fā)送激活郵箱;
如果發(fā)送成功,調(diào)用遠端可靠消息服務(wù)TPC,繼續(xù)更新第3步表中消費確認消息的狀態(tài)為已消費;
消費端 RocketMQ 啟動配置類DefaultMQPushConsumer 根據(jù)配置信息啟動;
并 subscribe 訂閱 該服務(wù) OPC 所有的 Topic 和 tags 消息。
包括短信、郵箱激活、附件更新刪除等所有消息。
@Slf4j @Configuration public class AliyunMqConfiguration { @Resource private PaascloudProperties paascloudProperties; @Resource private OptPushMessageListener optPushConsumer; @Resource private TaskExecutor taskExecutor; /** * Default mq push consumer default mq push consumer. * * @return the default mq push consumer * * @throws MQClientException the mq client exception */ @Bean public DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException { // 1. 新建消費者組 // RocketMQ實際上都是拉模式,這里的DefaultMQPushConsumer實現(xiàn)了推模式, // 也只是對拉消息服務(wù)做了一層封裝,即拉到消息的時候觸發(fā)業(yè)務(wù)消費者注冊到這里的callback DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(paascloudProperties.getAliyun().getRocketMq().getConsumerGroup()); // 2. 指定NameServer地址,多個地址以 ; 隔開 consumer.setNamesrvAddr(paascloudProperties.getAliyun().getRocketMq().getNamesrvAddr()); // 3. 設(shè)置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費 // 如果非第一次啟動,那么按照上次消費的位置繼續(xù)消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); String[] strArray = AliyunMqTopicConstants.ConsumerTopics.OPT.split(GlobalConstant.Symbol.COMMA); for (String aStrArray : strArray) { String[] topicArray = aStrArray.split(GlobalConstant.Symbol.AT); String topic = topicArray[0]; String tags = topicArray[1]; if (PublicUtil.isEmpty(tags)) { tags = "*"; } // 4. 進行Topic訂閱,訂閱PushTopic下Tag為push的消息 consumer.subscribe(topic, tags); log.info("RocketMq OpcPushConsumer topic = {}, tags={}", topic, tags); } // 5. 設(shè)置消息處理器 consumer.registerMessageListener(optPushConsumer); consumer.setConsumeThreadMax(2); consumer.setConsumeThreadMin(2); taskExecutor.execute(() -> { try { Thread.sleep(5000); consumer.start(); log.info("RocketMq OpcPushConsumer OK."); } catch (InterruptedException | MQClientException e) { log.error("RocketMq OpcPushConsumer, 出現(xiàn)異常={}", e.getMessage(), e); } }); return consumer; } }消息邏輯處理監(jiān)聽器
consumeMessage() 上有注解 @MqConsumerStore,執(zhí)行前先進入切面編程;
@Slf4j @Component public class OptPushMessageListener implements MessageListenerConcurrently { @Resource private OptSendSmsTopicConsumer optSendSmsTopicService; @Resource private OptSendEmailTopicConsumer optSendEmailTopicService; @Resource private MdcTopicConsumer mdcTopicConsumer; @Resource private MqMessageService mqMessageService; @Resource private StringRedisTemplate srt; /** * Consume message consume concurrently status. * * @param messageExtList the message ext list * @param consumeConcurrentlyContext the consume concurrently context * * @return the consume concurrently status */ @Override @MqConsumerStore public ConsumeConcurrentlyStatus consumeMessage(ListmessageExtList, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt msg = messageExtList.get(0); String body = new String(msg.getBody()); String topicName = msg.getTopic(); String tags = msg.getTags(); String keys = msg.getKeys(); log.info("MQ消費Topic={},tag={},key={}", topicName, tags, keys); ValueOperations ops = srt.opsForValue(); // 控制冪等性使用的key try { MqMessage.checkMessage(body, topicName, tags, keys); String mqKV = null; if (srt.hasKey(keys)) { mqKV = ops.get(keys); } if (PublicUtil.isNotEmpty(mqKV)) { log.error("MQ消費Topic={},tag={},key={}, 重復(fù)消費", topicName, tags, keys); // 消費成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } if (AliyunMqTopicConstants.MqTopicEnum.SEND_SMS_TOPIC.getTopic().equals(topicName)) { optSendSmsTopicService.handlerSendSmsTopic(body, topicName, tags, keys); } if (AliyunMqTopicConstants.MqTopicEnum.SEND_EMAIL_TOPIC.getTopic().equals(topicName)) { optSendEmailTopicService.handlerSendEmailTopic(body, topicName, tags, keys); } if (AliyunMqTopicConstants.MqTopicEnum.TPC_TOPIC.getTopic().equals(topicName)) { mqMessageService.deleteMessageTopic(body, tags); } if (AliyunMqTopicConstants.MqTopicEnum.MDC_TOPIC.getTopic().equals(topicName)) { mdcTopicConsumer.handlerSendSmsTopic(body, topicName, tags, keys); } else { log.info("OPC訂單信息消 topicName={} 不存在", topicName); } } catch (IllegalArgumentException ex) { log.error("校驗MQ message 失敗 ex={}", ex.getMessage(), ex); } catch (Exception e) { log.error("處理MQ message 失敗 topicName={}, keys={}, ex={}", topicName, keys, e.getMessage(), e); // 如果消息消費失敗,例如數(shù)據(jù)庫異常等,扣款失敗,發(fā)送失敗需要重試的場景, // 返回下面代碼,RocketMQ就認為消費失敗。 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } ops.set(keys, keys, 10, TimeUnit.DAYS); // 業(yè)務(wù)實現(xiàn)消費回調(diào)的時候,當且僅當返回下面代碼時,RocketMQ才會認為這批消息是消費完成的 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
執(zhí)行方法之前先進入切面編程執(zhí)行,獲取注解方法的參數(shù)和消息;
@Around(value = "mqConsumerStoreAnnotationPointcut()") public Object processMqConsumerStoreJoinPoint(ProceedingJoinPoint joinPoint) throws Throwable { // ... MqMessageData dto = this.getTpcMqMessageDto(messageExtList.get(0)); final String messageKey = dto.getMessageKey(); if (isStorePreStatus) { // 執(zhí)行下面3、4步 mqMessageService.confirmReceiveMessage(consumerGroup, dto); } String methodName = joinPoint.getSignature().getName(); try { // 返回注解方法; result = joinPoint.proceed(); log.info("result={}", result); if (CONSUME_SUCCESS.equals(result.toString())) { mqMessageService.saveAndConfirmFinishMessage(consumerGroup, messageKey); } } catch (Exception e) { log.error("發(fā)送可靠消息, 目標方法[{}], 出現(xiàn)異常={}", methodName, e.getMessage(), e); throw e; } finally { log.info("發(fā)送可靠消息 目標方法[{}], 總耗時={}", methodName, System.currentTimeMillis() - startTime); } return result; }
confirmReceiveMessage:消費者確認收到消息;在上面目錄【發(fā)送激活郵箱的消息/service層】的第 12 步
持久化消費者確認消息 MqMessageData 到本地服務(wù) OPC mysql 中,表 pc_mq_message_data;
@Override @Transactional(rollbackFor = Exception.class) public void confirmReceiveMessage(String cid, MqMessageData messageData) { final String messageKey = messageData.getMessageKey(); log.info("confirmReceiveMessage - 消費者={}, 確認收到key={}的消息", cid, messageKey); // 持久化消費者確認消息 MqMessageData 到本地服務(wù) mysql 中,表 pc_mq_message_data messageData.setMessageType(MqMessageTypeEnum.CONSUMER_MESSAGE.messageType()); messageData.setId(UniqueIdGenerator.generateId()); mqMessageDataMapper.insertSelective(messageData); // 調(diào)用遠端服務(wù) TPC,更新確認收到消息表狀態(tài)為已確認,TpcMqConfirm,表 pc_tpc_mq_confirm; Wrapper wrapper = tpcMqMessageFeignApi.confirmReceiveMessage(cid, messageKey); log.info("tpcMqMessageFeignApi.confirmReceiveMessage result={}", wrapper); if (wrapper == null) { throw new TpcBizException(ErrorCodeEnum.GL99990002); } if (wrapper.error()) { throw new TpcBizException(ErrorCodeEnum.TPC10050004, wrapper.getMessage(), messageKey); } }
緊接著第3步,調(diào)用遠端服務(wù) TPC,更新確認收到消息狀態(tài)為已確認,TpcMqConfirm,表 pc_tpc_mq_confirm;
status:狀態(tài), 10 - 未確認 ; 20 - 已確認; 30 已消費;
consumeCount:消費的次數(shù),加 1;
@Override public void confirmReceiveMessage(final String cid, final String messageKey) { // 1. 校驗cid // 2. 校驗messageKey // 3. 校驗cid 和 messageKey Long confirmId = tpcMqConfirmMapper.getIdMqConfirm(cid, messageKey); // 3. 更新消費信息的狀態(tài) tpcMqConfirmMapper.confirmReceiveMessage(confirmId); }
第3、4步執(zhí)行后,返回切面,執(zhí)行下面代碼,再返回注解修飾的方法;
result = joinPoint.proceed();
注解修飾方法,通過參數(shù) MessageExt 獲取該消息的 topic(主題)、tag(標簽)、keys(唯一鍵)、body(消息體);
冪等性(避免重復(fù)消費):redis 中存儲消費過的該消息的 keys;
根據(jù)消息的 topic 執(zhí)行相應(yīng)的操作處理該消息
比如此流程的發(fā)送激活郵箱,使用 spring 框架的 TaskExecutor執(zhí)行郵箱發(fā)送任務(wù)。
@Override public int sendSimpleMail(String subject, String text, Setto) { log.info("sendSimpleMail - 發(fā)送簡單郵件. subject={}, text={}, to={}", subject, text, to); int result = 1; try { SimpleMailMessage message = MailEntity.createSimpleMailMessage(subject, text, to); message.setFrom(from); taskExecutor.execute(() -> mailSender.send(message)); } catch (Exception e) { log.info("sendSimpleMail [FAIL] ex={}", e.getMessage(), e); result = 0; } return result; }
第8步 如果消息消費成功,郵件發(fā)送成功,redis 中存儲該消息(冪等,過期時間 10 天),同時返回消費成功代碼;
ops.set(keys, keys, 10, TimeUnit.DAYS); // 業(yè)務(wù)實現(xiàn)消費回調(diào)的時候,當且僅當返回下面代碼時,RocketMQ 才會認為這批消息是消費完成的 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
第8步 如果消息消費失敗,比如數(shù)據(jù)庫異常,扣款失敗,郵件發(fā)送失敗等需要重試的場景,返回重試消費代碼。
} catch (IllegalArgumentException ex) { log.error("校驗MQ message 失敗 ex={}", ex.getMessage(), ex); } catch (Exception e) { log.error("處理MQ message 失敗 topicName={}, keys={}, ex={}", topicName, keys, e.getMessage(), e); // 如果消息消費失敗,例如數(shù)據(jù)庫異常等,扣款失敗,發(fā)送失敗需要重試的場景, // 返回下面代碼,RocketMQ就認為消費失敗。 return ConsumeConcurrentlyStatus.RECONSUME_LATER; }
執(zhí)行完注解修飾方法,再次返回切面中,繼續(xù)執(zhí)行,判斷返回結(jié)果;
String methodName = joinPoint.getSignature().getName(); try { result = joinPoint.proceed(); log.info("result={}", result); if (CONSUME_SUCCESS.equals(result.toString())) { mqMessageService.saveAndConfirmFinishMessage(consumerGroup, messageKey); } } catch (Exception e) { log.error("發(fā)送可靠消息, 目標方法[{}], 出現(xiàn)異常={}", methodName, e.getMessage(), e); throw e; } finally { log.info("發(fā)送可靠消息 目標方法[{}], 總耗時={}", methodName, System.currentTimeMillis() - startTime); } return result;
第11步中,如果返回 CONSUME_SUCCESS,保存并確認消息完成;
調(diào)用遠端服務(wù) TCP,更新消費確認消息列表 pc_tpc_mq_confirm,狀態(tài)為已消費;
@Override public void saveAndConfirmFinishMessage(String cid, String messageKey) { // 1. 調(diào)用遠端服務(wù)tcp,確認完成消費消息 Wrapper wrapper = tpcMqMessageFeignApi.confirmConsumedMessage(cid, messageKey); log.info("tpcMqMessageFeignApi.confirmReceiveMessage result={}", wrapper); if (wrapper == null) { throw new TpcBizException(ErrorCodeEnum.GL99990002); } if (wrapper.error()) { throw new TpcBizException(ErrorCodeEnum.TPC10050004, wrapper.getMessage(), messageKey); } }發(fā)送激活成功郵箱過程
發(fā)送激活成功郵箱同上面發(fā)送激活郵箱一樣利用可靠消息服務(wù)完成分布式事務(wù)操作。controller層
@GetMapping(value = "/activeUser/{activeUserToken}") @ApiOperation(httpMethod = "POST", value = "激活用戶") public Wrapper activeUser(@PathVariable String activeUserToken) { uacUserService.activeUser(activeUserToken); return WrapMapper.ok("激活成功"); }service 層
activeuser():
@Override public void activeUser(String activeUserToken) { Preconditions.checkArgument(!StringUtils.isEmpty(activeUserToken), "激活用戶失敗"); String activeUserKey = RedisKeyUtil.getActiveUserKey(activeUserToken); String email = redisService.getKey(activeUserKey); if (StringUtils.isEmpty(email)) { throw new UacBizException(ErrorCodeEnum.UAC10011030); } // 修改用戶狀態(tài), 綁定訪客角色 UacUser uacUser = new UacUser(); uacUser.setEmail(email); uacUser = uacUserMapper.selectOne(uacUser); if (uacUser == null) { logger.error("找不到用戶信息. email={}", email); throw new UacBizException(ErrorCodeEnum.UAC10011004, email); } UacUser update = new UacUser(); update.setId(uacUser.getId()); update.setStatus(UacUserStatusEnum.ENABLE.getKey()); LoginAuthDto loginAuthDto = new LoginAuthDto(); loginAuthDto.setUserId(uacUser.getId()); loginAuthDto.setUserName(uacUser.getLoginName()); loginAuthDto.setLoginName(uacUser.getLoginName()); update.setUpdateInfo(loginAuthDto); UacUser user = this.queryByUserId(uacUser.getId()); Mapparam = Maps.newHashMap(); param.put("loginName", user.getLoginName()); param.put("dateTime", DateUtil.formatDateTime(new Date())); Set to = Sets.newHashSet(); to.add(user.getEmail()); // 構(gòu)建激活成功消息體 MqMessageData mqMessageData = emailProducer.sendEmailMq(to, UacEmailTemplateEnum.ACTIVE_USER_SUCCESS, AliyunMqTopicConstants.MqTagEnum.ACTIVE_USER_SUCCESS, param); // 1. 可靠消息服務(wù)發(fā)送郵件 userManager.activeUser(mqMessageData, update, activeUserKey); }
調(diào)用userManager.activeUser():可以看到該方法也是注解@MqProducerStore修飾;
@MqProducerStore public void activeUser(final MqMessageData mqMessageData, final UacUser uacUser, final String activeUserKey) { log.info("激活用戶. mqMessageData={}, user={}", mqMessageData, uacUser); // 更新用戶信息 int result = uacUserMapper.updateByPrimaryKeySelective(uacUser); if (result < 1) { throw new UacBizException(ErrorCodeEnum.UAC10011038, uacUser.getId()); } // 綁定一個訪客角色默認值roleId=10000 final Long userId = uacUser.getId(); Preconditions.checkArgument(userId != null, "用戶Id不能爲空"); final Long roleId = 10000L; UacRoleUser roleUser = new UacRoleUser(); roleUser.setUserId(userId); roleUser.setRoleId(roleId); uacRoleUserMapper.insertSelective(roleUser); // 綁定一個組織 UacGroupUser groupUser = new UacGroupUser(); groupUser.setUserId(userId); groupUser.setGroupId(GlobalConstant.Sys.SUPER_MANAGER_GROUP_ID); uacGroupUserMapper.insertSelective(groupUser); // 刪除 activeUserToken redisService.deleteKey(activeUserKey); }
本地事務(wù)執(zhí)行用戶信息更新和 redis 郵箱激活token刪除,切面編程發(fā)送激活成功郵箱分析過程和上面發(fā)送激活郵箱流程是一樣的,這里不再贅述。
這兩個過程根據(jù)發(fā)送消息的 tag 不同,從而處理邏輯不同。Topic 都是 SEND_EMAIL_TOPIC;
此處具體為郵箱內(nèi)容模板不同,其余消息生產(chǎn)端和消費端流程一樣。
public enum MqTagEnum { /** * 激活用戶. */ ACTIVE_USER("ACTIVE_USER", MqTopicEnum.SEND_EMAIL_TOPIC.getTopic(), "激活用戶"), /** * 激活用戶成功. */ ACTIVE_USER_SUCCESS("ACTIVE_USER_SUCCESS", MqTopicEnum.SEND_EMAIL_TOPIC.getTopic(), "激活用戶成功"), // ...省略其他tag String tag; String topic; String tagName; MqTagEnum(String tag, String topic, String tagName) { this.tag = tag; this.topic = topic; this.tagName = tagName; } public String getTag() { return tag; } public String getTopic() { return topic; } }
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/73093.html
摘要:依次執(zhí)行下面命令本地安裝從官方安裝包下載。管理界面提供多種管理方式命令行和界面等提供一個開源的擴展項目里面包含一個子項目配置下打個包就可以用了。 前言 github 開源項目--paascloud-master:https://github.com/paascloud/... paascloud-master 官方環(huán)境搭建:http://blog.paascloud.net/20...
摘要:它采用了請求響應(yīng)模型。通信請求只能由客戶端發(fā)起,服務(wù)端對請求做出應(yīng)答處理弊端協(xié)議無法實現(xiàn)服務(wù)器主動向客戶端發(fā)起消息。如何使用客戶端創(chuàng)建對象屬性表示連接狀態(tài)可選值表示連接尚未建立。表示連接正在進行關(guān)閉。 一言不合就上效果圖演示showImg(https://segmentfault.com/img/bVbkUDl?w=1920&h=638); 項目:http://112.74.164.1...
摘要:昨天研究了網(wǎng)站的注冊流程,感興趣的可以看下從前后端分別學(xué)習(xí)注冊登錄流程今天接著研究注冊登錄流程之登錄。為解決這個問題,引入,它是由一組隨機數(shù)組合的哈希表,當用戶登錄成功,本來發(fā)放給用戶,現(xiàn)在變成發(fā)放給用戶。 昨天研究了網(wǎng)站的注冊流程,感興趣的可以看下:從前后端分別學(xué)習(xí)——注冊/登錄流程1 今天接著研究注冊/登錄流程之登錄。 登錄 首先來看一下登陸過程:showImg(https://s...
摘要:本章講如何幫助健忘癥患者,重置用戶密碼。實際上不僅內(nèi)置了密碼重置,還包括登錄登出密碼修改等功能。總結(jié)本章學(xué)習(xí)了使用第三方庫,高效完成了重置密碼的功能。有疑問請在杜賽的個人網(wǎng)站留言,我會盡快回復(fù)。 隨著技術(shù)的發(fā)展,驗證用戶身份的手段越來越多,指紋、面容、聲紋應(yīng)有盡有,但密碼依然是最重要的手段。 互聯(lián)網(wǎng)處處都有密碼的身影,甚至變成了現(xiàn)代人的一種負擔。像筆者這樣的,動輒幾十個賬號密碼,忘記其...
摘要:中為何新增來作為主要的方式運行機制是怎樣的機制有什么優(yōu)勢運行機制是怎樣的基于通信模式,除了端和端,還有兩角色一起合作完成進程間通信功能。 目錄介紹 2.0.0.1 什么是Binder?為什么要使用Binder?Binder中是如何進行線程管理的?總結(jié)binder講的是什么? 2.0.0.2 Android中進程和線程的關(guān)系?什么是IPC?為何需要進行IPC?多進程通信可能會出現(xiàn)什么問...
閱讀 2434·2021-11-23 10:04
閱讀 1507·2021-09-02 15:21
閱讀 899·2019-08-30 15:44
閱讀 1070·2019-08-30 10:48
閱讀 716·2019-08-29 17:21
閱讀 3563·2019-08-29 13:13
閱讀 1991·2019-08-23 17:17
閱讀 1795·2019-08-23 17:04