成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

SpringBoot集成RabbitMQ(死信隊(duì)列)

honhon / 3115人閱讀

摘要:介紹死信隊(duì)列沒有被及時消費(fèi)的消息存放的隊(duì)列,消息沒有被及時消費(fèi)有以下幾點(diǎn)原因有消息被拒絕并且隊(duì)列達(dá)到最大長度消息過期場景小時進(jìn)入初始隊(duì)列,等待分鐘后進(jìn)入分鐘隊(duì)列消息等待分鐘后進(jìn)入執(zhí)行隊(duì)列執(zhí)行失敗后重新回到分鐘隊(duì)列失敗次后,消息進(jìn)入小時隊(duì)列消

介紹

死信隊(duì)列:沒有被及時消費(fèi)的消息存放的隊(duì)列,消息沒有被及時消費(fèi)有以下幾點(diǎn)原因:
1.有消息被拒絕(basic.reject/ basic.nack)并且requeue=false
2.隊(duì)列達(dá)到最大長度
3.消息TTL過期

場景

1.小時進(jìn)入初始隊(duì)列,等待30分鐘后進(jìn)入5分鐘隊(duì)列
2.消息等待5分鐘后進(jìn)入執(zhí)行隊(duì)列
3.執(zhí)行失敗后重新回到5分鐘隊(duì)列
4.失敗5次后,消息進(jìn)入2小時隊(duì)列
5.消息等待2小時進(jìn)入執(zhí)行隊(duì)列
6.失敗5次后,將消息丟棄或做其他處理

使用

安裝MQ

使用docker方式安裝,選擇帶mangement的版本

docker pull rabbitmq:management
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

訪問 localhost: 15672,默認(rèn)賬號密碼guest/guest

項(xiàng)目配置

(1)創(chuàng)建springboot項(xiàng)目
(2)在application.properties配置文件中配置mq連接信息

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

(3)隊(duì)列配置

package com.df.ps.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MqConfig {

    //time
    @Value("${spring.df.buffered.min:120}")
    private int springdfBufferedTime;

    @Value("${spring.df.high-buffered.min:5}")
    private int springdfHighBufferedTime;

    @Value("${spring.df.low-buffered.min:120}")
    private int springdfLowBufferedTime;

    // 30min Buffered Queue
    @Value("${spring.df.queue:spring-df-buffered-queue}")
    private String springdfBufferedQueue;

    @Value("${spring.df.topic:spring-df-buffered-topic}")
    private String springdfBufferedTopic;

    @Value("${spring.df.route:spring-df-buffered-route}")
    private String springdfBufferedRouteKey;

    // 5M Buffered Queue
    @Value("${spring.df.high-buffered.queue:spring-df-high-buffered-queue}")
    private String springdfHighBufferedQueue;

    @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
    private String springdfHighBufferedTopic;

    @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
    private String springdfHighBufferedRouteKey;

    // High Queue
    @Value("${spring.df.high.queue:spring-df-high-queue}")
    private String springdfHighQueue;

    @Value("${spring.df.high.topic:spring-df-high-topic}")
    private String springdfHighTopic;

    @Value("${spring.df.high.route:spring-df-high-route}")
    private String springdfHighRouteKey;

    // 2H Low Buffered Queue
    @Value("${spring.df.low-buffered.queue:spring-df-low-buffered-queue}")
    private String springdfLowBufferedQueue;

    @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
    private String springdfLowBufferedTopic;

    @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
    private String springdfLowBufferedRouteKey;

    // Low Queue
    @Value("${spring.df.low.queue:spring-df-low-queue}")
    private String springdfLowQueue;

    @Value("${spring.df.low.topic:spring-df-low-topic}")
    private String springdfLowTopic;

    @Value("${spring.df.low.route:spring-df-low-route}")
    private String springdfLowRouteKey;


    @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedQueue")
    Queue springdfBufferedQueue() {
        int bufferedTime = 1000 * 60 * springdfBufferedTime;
        return createBufferedQueue(springdfBufferedQueue, springdfHighBufferedTopic, springdfHighBufferedRouteKey, bufferedTime);
    }

    @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedQueue")
    Queue springdfHighBufferedQueue() {
        int highBufferedTime = 1000 * 60 * springdfHighBufferedTime;
        return createBufferedQueue(springdfHighBufferedQueue, springdfHighTopic, springdfHighRouteKey, highBufferedTime);
    }

    @Bean(autowire = Autowire.BY_NAME, value = "springdfHighQueue")
    Queue springdfHighQueue() {
        return new Queue(springdfHighQueue, true);
    }

    @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedQueue")
    Queue springdfLowBufferedQueue() {
        int lowBufferedTime = 1000 * 60 * springdfLowBufferedTime;
        return createBufferedQueue(springdfLowBufferedQueue, springdfLowTopic, springdfLowRouteKey, lowBufferedTime);
    }

    @Bean(autowire = Autowire.BY_NAME, value = "springdfLowQueue")
    Queue springdfLowQueue() {
        return new Queue(springdfLowQueue, true);
    }


    @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedTopic")
    TopicExchange springdfBufferedTopic() {
        return new TopicExchange(springdfBufferedTopic);
    }

    @Bean
    Binding springBuffereddf(Queue springdfBufferedQueue, TopicExchange springdfBufferedTopic) {
        return BindingBuilder.bind(springdfBufferedQueue).to(springdfBufferedTopic).with(springdfBufferedRouteKey);
    }


    @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedTopic")
    TopicExchange springdfHighBufferedTopic() {
        return new TopicExchange(springdfHighBufferedTopic);
    }

    @Bean
    Binding springHighBuffereddf(Queue springdfHighBufferedQueue, TopicExchange springdfHighBufferedTopic) {
        return BindingBuilder.bind(springdfHighBufferedQueue).to(springdfHighBufferedTopic).with(springdfHighBufferedRouteKey);
    }

    @Bean(autowire = Autowire.BY_NAME, value = "springdfHighTopic")
    TopicExchange springdfHighTopic() {
        return new TopicExchange(springdfHighTopic);
    }

    @Bean
    Binding springHighdf(Queue springdfHighQueue, TopicExchange springdfHighTopic) {
        return BindingBuilder.bind(springdfHighQueue).to(springdfHighTopic).with(springdfHighRouteKey);
    }

    @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedTopic")
    TopicExchange springdfLowBufferedTopic() {
        return new TopicExchange(springdfLowBufferedTopic);
    }

    @Bean
    Binding springLowBuffereddf(Queue springdfLowBufferedQueue, TopicExchange springdfLowBufferedTopic) {
        return BindingBuilder.bind(springdfLowBufferedQueue).to(springdfLowBufferedTopic).with(springdfLowBufferedRouteKey);
    }

    @Bean(autowire = Autowire.BY_NAME, value = "springdfLowTopic")
    TopicExchange springdfLowTopic() {
        return new TopicExchange(springdfLowTopic);
    }

    @Bean
    Binding springLowdf(Queue springdfLowQueue, TopicExchange springdfLowTopic) {
        return BindingBuilder.bind(springdfLowQueue).to(springdfLowTopic).with(springdfLowRouteKey);
    }


    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                             MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(springdfHighQueue, springdfLowQueue);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(IntegrationReceiver receiver) {


        MessageListenerAdapter adapter = new MessageListenerAdapter(receiver);
        adapter.setDefaultListenerMethod("receive");
        Map queueOrTagToMethodName = new HashMap<>();
        queueOrTagToMethodName.put(springdfHighQueue, "springdfHighReceive");
        queueOrTagToMethodName.put(springdfLowQueue, "springdfLowReceive");
        adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
        return adapter;

    }


    private Queue createBufferedQueue(String queueName, String topic, String routeKey, int bufferedTime) {
        Map args = new HashMap<>();
        args.put("x-dead-letter-exchange", topic);
        args.put("x-dead-letter-routing-key", routeKey);
        args.put("x-message-ttl", bufferedTime);
        // 是否持久化
        boolean durable = true;
        // 僅創(chuàng)建者可以使用的私有隊(duì)列,斷開后自動刪除
        boolean exclusive = false;
        // 當(dāng)所有消費(fèi)客戶端連接斷開后,是否自動刪除隊(duì)列
        boolean autoDelete = false;

        return new Queue(queueName, durable, exclusive, autoDelete, args);
    }
}

消費(fèi)者配置

package com.df.ps.mq;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

import java.util.Map;

public class MqReceiver {

    private static Logger logger = LoggerFactory.getLogger(MqReceiver.class);

    @Value("${high-retry:5}")
    private int highRetry;

    @Value("${low-retry:5}")
    private int lowRetry;

    @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
    private String springdfHighBufferedTopic;

    @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
    private String springdfHighBufferedRouteKey;

    @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
    private String springdfLowBufferedTopic;

    @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
    private String springdfLowBufferedRouteKey;

    private final RabbitTemplate rabbitTemplate;
    @Autowired
    public MqReceiver(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void receive(Object message) {
        if (logger.isInfoEnabled()) {
            logger.info("default receiver: " + message);
        }
    }

    /**
     * 消息從初始隊(duì)列進(jìn)入5分鐘的高速緩沖隊(duì)列
     * @param message
     */
    public void highReceiver(Object message){
        ObjectMapper mapper = new ObjectMapper();
        Map msg = mapper.convertValue(message, Map.class);

        try{
            logger.info("這里做消息處理...");
        }catch (Exception e){
            int times = msg.get("times") == null ? 0 : (int) msg.get("times");
            if (times < highRetry) {
                msg.put("times", times + 1);
                rabbitTemplate.convertAndSend(springdfHighBufferedTopic,springdfHighBufferedRouteKey,message);
            } else {
                msg.put("times", 0);
                rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);
            }
        }
    }

    /**
     * 消息從5分鐘緩沖隊(duì)列進(jìn)入2小時緩沖隊(duì)列
     * @param message
     */
    public void lowReceiver(Object message){
        ObjectMapper mapper = new ObjectMapper();
        Map msg = mapper.convertValue(message, Map.class);
        
        try {
            logger.info("這里做消息處理...");
        }catch (Exception e){
            int times = msg.get("times") == null ? 0 : (int) msg.get("times");
            if (times < lowRetry) {
                rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);
            }else{
                logger.info("消息無法被消費(fèi)...");
            }
        } 
    }
}

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/74366.html

相關(guān)文章

  • SpringBoot+RabbitMq實(shí)現(xiàn)延時消息隊(duì)列

    背景: 在一些應(yīng)用場景中,程序并不需要同步執(zhí)行,例如用戶注冊之后的郵件或者短信通知提醒。這種場景的實(shí)現(xiàn)則是在當(dāng)前線程,開啟一個新線 程,當(dāng)前線程在開啟新線程之后會繼續(xù)往下執(zhí)行,無需等待新線程執(zhí)行完成。 但例如一些需要延時的場景則不只是開啟新線程執(zhí)行如此簡單了。譬如提交訂單后在15分鐘內(nèi)沒有完成支付,訂單需要關(guān)閉,這種情 況,是否只開啟一個異步線程就不適用了呢。 那么就單單實(shí)現(xiàn)...

    alighters 評論0 收藏0
  • 一起來學(xué)SpringBoot | 第十三篇:RabbitMQ延遲隊(duì)列

    摘要:另一種就是用中的位于包下,本質(zhì)是由和實(shí)現(xiàn)的阻塞優(yōu)先級隊(duì)列。表明了一條消息可在隊(duì)列中存活的最大時間。當(dāng)某條消息被設(shè)置了或者當(dāng)某條消息進(jìn)入了設(shè)置了的隊(duì)列時,這條消息會在時間后死亡成為。 SpringBoot 是為了簡化 Spring 應(yīng)用的創(chuàng)建、運(yùn)行、調(diào)試、部署等一系列問題而誕生的產(chǎn)物,自動裝配的特性讓我們可以更好的關(guān)注業(yè)務(wù)本身而不是外部的XML配置,我們只需遵循規(guī)范,引入相關(guān)的依賴就可...

    selfimpr 評論0 收藏0
  • rabbitmq

    摘要:一關(guān)鍵字和之間的連接關(guān)系實(shí)際存儲消息。生產(chǎn)者進(jìn)行接受應(yīng)答,用來確定這條消息是否正常的發(fā)送到了,這種方式也是消息的可靠性投遞的核心保障。支持消息的過期時間,在消息發(fā)送時可以進(jìn)行指定。可以監(jiān)聽這個隊(duì)列中消息做相應(yīng)的處理。 一、rabbitmq關(guān)鍵字 Binding:Exchange和Exchange、Queue之間的連接關(guān)系Queue:實(shí)際存儲消息。Durability:是否持久化,Du...

    Hwg 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<