成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

SpringBoot+RabbitMq實現(xiàn)延時消息隊列

alighters / 1512人閱讀

背景:
   在一些應用場景中,程序并不需要同步執(zhí)行,例如用戶注冊之后的郵件或者短信通知提醒。這種場景的實現(xiàn)則是在當前線程,開啟一個新線
 程,當前線程在開啟新線程之后會繼續(xù)往下執(zhí)行,無需等待新線程執(zhí)行完成。
   但例如一些需要延時的場景則不只是開啟新線程執(zhí)行如此簡單了。譬如提交訂單后在15分鐘內(nèi)沒有完成支付,訂單需要關(guān)閉,這種情
 況,是否只開啟一個異步線程就不適用了呢。

那么就單單實現(xiàn)在提交訂單后的15分鐘內(nèi),如果沒有完成支付,系統(tǒng)關(guān)閉訂單。有哪些可行的方案呢。

方案:

使用定時任務輪詢訂單表,查詢出訂單創(chuàng)建了15分鐘以上并且未支付的訂單,如果有查詢出此類訂單則執(zhí)行關(guān)閉。

缺點:假設(shè)每1分鐘輪詢一次,則會存在秒級誤差,如果秒級輪詢,則會極其消耗性能,影響程序的健壯性。

提交訂單時開啟一個新線程,而新線程直接休眠15分鐘,休眠結(jié)束后開始執(zhí)行訂單關(guān)閉

缺點:如果在線程休眠時,重啟了整個服務,那么會怎樣呢?

使用延時消息隊列

缺點:需要額外部署消息中間件

綜上考慮:使用延時消息隊列則成為最佳選擇,消息延時發(fā)布之后,保存在消息中間件中,在15分鐘后才會正式發(fā)布至隊列,延時隊列監(jiān)聽器在15分鐘后監(jiān)聽到消息時,才開始執(zhí)行,而這期間,即使項目重啟也沒有關(guān)系。

以springboot為基礎(chǔ)框架,集成rabbitmq實現(xiàn)延時隊列
注意:這里不采用網(wǎng)上流傳的死信隊列轉(zhuǎn)發(fā),而是采用rabbitmq3.7+版本的延時隊列插件,所以務必安裝3.7+版本并啟用延時隊列插件。
增加amqp依賴
      
             org.springframework.boot
             spring-boot-starter-parent
             1.5.4.RELEASE
     
     
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-starter-amqp
        
    
修改application.yml文件,配置rabbitmq,并且開啟消息的手動應答
spring:
    rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: admin
        password: admin
        listener:
            direct:
                acknowledge-mode: MANUAL
            simple:
                acknowledge-mode: MANUAL
配置隊列,路由,交換機
package cn.rongyuan.config;

import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;



/**
 * @title rabbitmq配置類
 * @author zengzp
 * @time 2018年8月20日 上午10:46:43
 * @Description 
 */
@Configuration
public class RabbitConfig {
    
    // 支付超時延時交換機
    public static final String Delay_Exchange_Name = "delay.exchange";

    // 超時訂單關(guān)閉隊列
    public static final String Timeout_Trade_Queue_Name = "close_trade";

    
    @Bean
    public Queue delayPayQueue() {
        return new Queue(RabbitConfig.Timeout_Trade_Queue_Name, true);
    }
    
    
    // 定義廣播模式的延時交換機 無需綁定路由
    @Bean
    FanoutExchange delayExchange(){
        Map args = new HashMap();
        args.put("x-delayed-type", "direct");
        FanoutExchange topicExchange = new FanoutExchange(RabbitConfig.Delay_Exchange_Name, true, false, args);
        topicExchange.setDelayed(true);
        return topicExchange;
    }
    
    // 綁定延時隊列與交換機
    @Bean  
    public Binding delayPayBind() {  
        return BindingBuilder.bind(delayPayQueue()).to(delayExchange());  
    }
    
    // 定義消息轉(zhuǎn)換器
    @Bean
    Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    
    // 定義消息模板用于發(fā)布消息,并且設(shè)置其消息轉(zhuǎn)換器
    @Bean
    RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }
    @Bean
    RabbitAdmin rabbitAdmin(final ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

}
在提交訂單時發(fā)布消息至延時隊列并且指定延時時長
    @Autowired
    RabbitTemplate rabbitTemplate;
    // 通過廣播模式發(fā)布延時消息 延時30分鐘 持久化消息 消費后銷毀 這里無需指定路由,會廣播至每個綁定此交換機的隊列
    rabbitTemplate.convertAndSend(RabbitConfig.Delay_Exchange_Name, "", trade.getTradeCode(), message ->{
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        message.getMessageProperties().setDelay(30 * (60*1000));   // 毫秒為單位,指定此消息的延時時長
        return message;
    });
消費端監(jiān)聽延時隊列
package cn.rongyuan.mq.consumer;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

import cn.rongyuan.config.RabbitConfig;
import cn.rongyuan.service.TradeService;
import cn.rongyuan.util.ExceptionUtil;


/**
 * @title 消息消費端
 * @author zengzp
 * @time 2018年8月20日 上午11:00:26
 * @Description 
 */
@Component
public class PayTimeOutConsumer {
    
    @Autowired
    TradeService tradeService;
    
    private Logger logger = LoggerFactory.getLogger(getClass());
    
    @RabbitListener(queues = RabbitConfig.Timeout_Trade_Queue_Name)
    public void process(String tradeCode, Message message, Channel channel) throws IOException{
        try {
            logger.info("開始執(zhí)行訂單[{}]的支付超時訂單關(guān)閉......", tradeCode);
            tradeService.cancelTrade(tradeCode);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            logger.info("超時訂單處理完畢");
        } catch (Exception e) {
            logger.error("超時訂單處理失敗:{}", ExceptionUtil.getMessage(e));
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        } 
    }

}
參考資料:
  1、spring amqp 官方文檔:https://docs.spring.io/spring-amqp/docs/2.0.0.M2/reference/htmlsingle/#delayed-message-exchange
  2、rabbitmq 官方文檔:http://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/76788.html

相關(guān)文章

  • 一起來學SpringBoot | 第十三篇:RabbitMQ延遲隊列

    摘要:另一種就是用中的位于包下,本質(zhì)是由和實現(xiàn)的阻塞優(yōu)先級隊列。表明了一條消息可在隊列中存活的最大時間。當某條消息被設(shè)置了或者當某條消息進入了設(shè)置了的隊列時,這條消息會在時間后死亡成為。 SpringBoot 是為了簡化 Spring 應用的創(chuàng)建、運行、調(diào)試、部署等一系列問題而誕生的產(chǎn)物,自動裝配的特性讓我們可以更好的關(guān)注業(yè)務本身而不是外部的XML配置,我們只需遵循規(guī)范,引入相關(guān)的依賴就可...

    selfimpr 評論0 收藏0
  • SpringBoot RabbitMQ 整合使用

    摘要:可以在地址看到如何使用講解下上面命令行表示控制臺端口號,可以在瀏覽器中通過控制臺來執(zhí)行的相關(guān)操作。同時從控制臺可以看到發(fā)送的速率多線程測試性能開了個線程,每個線程發(fā)送條消息。 showImg(http://ww2.sinaimg.cn/large/006tNc79ly1g5jjb62t88j30u00gwdi2.jpg); 前提 上次寫了篇文章,《SpringBoot Kafka 整合...

    yuanxin 評論0 收藏0
  • springboot 集成rabbitmq 實例

    摘要:集成實例個人在學習時發(fā)現(xiàn)網(wǎng)上很少有系統(tǒng)性介紹和如何集成的,其他人總結(jié)的都片段化,所以結(jié)合個人調(diào)研過程,整理此篇文章。 springboot 集成rabbitmq 實例 個人在學習rabbitmq時發(fā)現(xiàn)網(wǎng)上很少有系統(tǒng)性介紹springboot和rabbitmq如何集成的,其他人總結(jié)的都片段化,所以結(jié)合個人調(diào)研過程,整理此篇文章。 本文章共分為以下部分: rabbitmq簡介 sprin...

    springDevBird 評論0 收藏0
  • RabbitMQ發(fā)布訂閱實戰(zhàn)-實現(xiàn)延時重試隊列

    摘要:本文將會講解如何使用實現(xiàn)延時重試和失敗消息隊列,實現(xiàn)可靠的消息消費,消費失敗后,自動延時將消息重新投遞,當達到一定的重試次數(shù)后,將消息投遞到失敗消息隊列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發(fā)的開源消息隊列。本文假設(shè)讀者對RabbitMQ是什么已經(jīng)有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網(wǎng)的 RabbitMQ Tutorials 入門...

    Heier 評論0 收藏0
  • RabbitMQ發(fā)布訂閱實戰(zhàn)-實現(xiàn)延時重試隊列

    摘要:本文將會講解如何使用實現(xiàn)延時重試和失敗消息隊列,實現(xiàn)可靠的消息消費,消費失敗后,自動延時將消息重新投遞,當達到一定的重試次數(shù)后,將消息投遞到失敗消息隊列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發(fā)的開源消息隊列。本文假設(shè)讀者對RabbitMQ是什么已經(jīng)有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網(wǎng)的 RabbitMQ Tutorials 入門...

    vslam 評論0 收藏0

發(fā)表評論

0條評論

閱讀需要支付1元查看
<