摘要:官方鏡像倉(cāng)庫(kù)地址本地運(yùn)行訪(fǎng)問(wèn)可視化面板地址默認(rèn)賬號(hào)默認(rèn)密碼集成基本參數(shù)配置配置配置定義優(yōu)先級(jí)隊(duì)列定義交換器定義參考官方文檔應(yīng)用啟動(dòng)后,會(huì)自動(dòng)創(chuàng)建和,并相互綁定,優(yōu)先級(jí)隊(duì)列會(huì)有如圖所示標(biāo)識(shí)。
Docker With RabbitMQ
官方 Docker 鏡像倉(cāng)庫(kù)地址
https://hub.docker.com/_/rabb...
本地運(yùn)行 RabbitMQ
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
訪(fǎng)問(wèn)可視化面板
地址:http://127.0.0.1:15672/
默認(rèn)賬號(hào):guest
默認(rèn)密碼:guest
Spring Boot With RabbitMQSpring Boot 集成 RabbitMQ
org.springframework.boot spring-boot-starter-amqp
基本參數(shù)配置
# host & port spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672
Queue / Exchange / Routing 配置
/** * RabbitMQ 配置 */ @Configuration public class RabbitMQConfig { private static final String EXCHANGE = "priority-exchange"; public static final String QUEUE = "priority-queue"; private static final String ROUTING_KEY = "priority.queue.#"; /** * 定義優(yōu)先級(jí)隊(duì)列 */ @Bean Queue queue() { Mapargs= new HashMap<>(); args.put("x-max-priority", 100); return new Queue(QUEUE, false, false, false, args); } /** * 定義交換器 */ @Bean TopicExchange exchange() { return new TopicExchange(EXCHANGE); } @Bean Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); } }
priority queue 定義參考官方文檔:https://www.rabbitmq.com/priority.html
Spring Boot 應(yīng)用啟動(dòng)后,會(huì)自動(dòng)創(chuàng)建 Queue 和 Exchange ,并相互綁定,優(yōu)先級(jí)隊(duì)列會(huì)有如圖所示標(biāo)識(shí)。RabbitMQ Publisher
Spring Boot 相關(guān)配置
# 是否開(kāi)啟消息發(fā)送到交換器(Exchange)后觸發(fā)回調(diào) spring.rabbitmq.publisher-confirms=false # 是否開(kāi)啟消息發(fā)送到隊(duì)列(Queue)后觸發(fā)回調(diào) spring.rabbitmq.publisher-returns=false # 消息發(fā)送失敗重試相關(guān)配置 spring.rabbitmq.template.retry.enabled=true spring.rabbitmq.template.retry.initial-interval=3000ms spring.rabbitmq.template.retry.max-attempts=3 spring.rabbitmq.template.retry.max-interval=10000ms spring.rabbitmq.template.retry.multiplier=1
發(fā)送消息
@Component @AllArgsConstructor public class FileMessageSender { private static final String EXCHANGE = "priority-exchange"; private static final String ROUTING_KEY_PREFIX = "priority.queue."; private final RabbitTemplate rabbitTemplate; /** * 發(fā)送設(shè)置有優(yōu)先級(jí)的消息 * * @param priority 優(yōu)先級(jí) */ public void sendPriorityMessage(String content, Integer priority) { rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY_PREFIX + "test", content, message -> { message.getMessageProperties().setPriority(priority); return message; }); } }RabbitMQ Consumer
Spring Boot 相關(guān)配置
# 消息接收確認(rèn),可選模式:NONE(不確認(rèn))、AUTO(自動(dòng)確認(rèn))、MANUAL(手動(dòng)確認(rèn)) spring.rabbitmq.listener.simple.acknowledge-mode=AUTO # 最小線(xiàn)程數(shù)量 spring.rabbitmq.listener.simple.concurrency=10 # 最大線(xiàn)程數(shù)量 spring.rabbitmq.listener.simple.max-concurrency=10 # 每個(gè)消費(fèi)者可能未完成的最大未確認(rèn)消息數(shù)量 spring.rabbitmq.listener.simple.prefetch=1
消費(fèi)者執(zhí)行耗時(shí)較長(zhǎng)的話(huà),建議 spring.rabbitmq.listener.simple.prefetch 設(shè)置為較小數(shù)值,讓優(yōu)先級(jí)越高的消息更快加入到消費(fèi)者線(xiàn)程。
監(jiān)聽(tīng)消息
@Slf4j @Component public class MessageListener { /** * 處理消息 */ @RabbitListener(queues = "priority-queue") public void listen(String message) { log.info(message); } }番外補(bǔ)充
1、自定義消息發(fā)送確認(rèn)的回調(diào)
配置如下:
# 開(kāi)啟消息發(fā)送到交換器(Exchange)后觸發(fā)回調(diào) spring.rabbitmq.publisher-confirms=true # 開(kāi)啟消息發(fā)送到隊(duì)列(Queue)后觸發(fā)回調(diào) spring.rabbitmq.publisher-returns=true
自定義 RabbitTemplate.ConfirmCallback 實(shí)現(xiàn)類(lèi)
@Slf4j public class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback{ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息唯一標(biāo)識(shí): {}", correlationData); log.info("確認(rèn)狀態(tài): {}", ack); log.info("造成原因: {}", cause); } }
自定義 RabbitTemplate.ConfirmCallback 實(shí)現(xiàn)類(lèi)
@Slf4j public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息主體: {}", message); log.info("回復(fù)編碼: {}", replyCode); log.info("回復(fù)內(nèi)容: {}", replyText); log.info("交換器: {}", exchange); log.info("路由鍵: {}", routingKey); } }
配置 rabbitTemplate
@Component @AllArgsConstructor public class RabbitTemplateInitializingBean implements InitializingBean { private final RabbitTemplate rabbitTemplate; @Override public void afterPropertiesSet() { rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack()); rabbitTemplate.setReturnCallback(new RabbitReturnCallback()); } }
2、RabbitMQ Exchange 類(lèi)型
中文:RabbitMQ Exchange類(lèi)型詳解
English: RabbitMQ Tutorials
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/74706.html
摘要:另一種就是用中的位于包下,本質(zhì)是由和實(shí)現(xiàn)的阻塞優(yōu)先級(jí)隊(duì)列。表明了一條消息可在隊(duì)列中存活的最大時(shí)間。當(dāng)某條消息被設(shè)置了或者當(dāng)某條消息進(jìn)入了設(shè)置了的隊(duì)列時(shí),這條消息會(huì)在時(shí)間后死亡成為。 SpringBoot 是為了簡(jiǎn)化 Spring 應(yīng)用的創(chuàng)建、運(yùn)行、調(diào)試、部署等一系列問(wèn)題而誕生的產(chǎn)物,自動(dòng)裝配的特性讓我們可以更好的關(guān)注業(yè)務(wù)本身而不是外部的XML配置,我們只需遵循規(guī)范,引入相關(guān)的依賴(lài)就可...
摘要:慕課網(wǎng)消息中間件極速入門(mén)與實(shí)戰(zhàn)學(xué)習(xí)總結(jié)時(shí)間年月日星期三說(shuō)明本文部分內(nèi)容均來(lái)自慕課網(wǎng)。 慕課網(wǎng)《RabbitMQ消息中間件極速入門(mén)與實(shí)戰(zhàn)》學(xué)習(xí)總結(jié) 時(shí)間:2018年09月05日星期三 說(shuō)明:本文部分內(nèi)容均來(lái)自慕課網(wǎng)。@慕課網(wǎng):https://www.imooc.com 教學(xué)源碼:無(wú) 學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:RabbitM...
摘要:可以在地址看到如何使用講解下上面命令行表示控制臺(tái)端口號(hào),可以在瀏覽器中通過(guò)控制臺(tái)來(lái)執(zhí)行的相關(guān)操作。同時(shí)從控制臺(tái)可以看到發(fā)送的速率多線(xiàn)程測(cè)試性能開(kāi)了個(gè)線(xiàn)程,每個(gè)線(xiàn)程發(fā)送條消息。 showImg(http://ww2.sinaimg.cn/large/006tNc79ly1g5jjb62t88j30u00gwdi2.jpg); 前提 上次寫(xiě)了篇文章,《SpringBoot Kafka 整合...
摘要:用于控制活動(dòng)人數(shù),將超過(guò)此一定閥值的訂單直接丟棄。緩解短時(shí)間的高流量壓垮應(yīng)用。目前比較推薦的就是我們手動(dòng)然后將消費(fèi)錯(cuò)誤的消息轉(zhuǎn)移到其它的消息隊(duì)列中,做補(bǔ)償處理消費(fèi)者該方案是默認(rèn)的方式不太推薦。 SpringBoot 是為了簡(jiǎn)化 Spring 應(yīng)用的創(chuàng)建、運(yùn)行、調(diào)試、部署等一系列問(wèn)題而誕生的產(chǎn)物,自動(dòng)裝配的特性讓我們可以更好的關(guān)注業(yè)務(wù)本身而不是外部的XML配置,我們只需遵循規(guī)范,引入相...
摘要:它還為具有偵聽(tīng)器容器的消息驅(qū)動(dòng)的提供支持。接收消息當(dāng)存在基礎(chǔ)結(jié)構(gòu)時(shí),可以使用任何來(lái)注釋以創(chuàng)建偵聽(tīng)器端點(diǎn)。默認(rèn)情況下,如果禁用重試并且偵聽(tīng)器拋出異常,則會(huì)無(wú)限期地重試傳遞。 Spring-amqp-tutorial Spring AMQP項(xiàng)目將核心Spring概念應(yīng)用于基于AMQP的消息傳遞解決方案的開(kāi)發(fā)。它提供了一個(gè)模板作為發(fā)送和接收消息的高級(jí)抽象。它還為具有偵聽(tīng)器容器的消息驅(qū)動(dòng)的PO...
閱讀 1808·2021-09-03 10:50
閱讀 1339·2019-08-30 15:55
閱讀 3382·2019-08-30 15:52
閱讀 1242·2019-08-30 15:44
閱讀 954·2019-08-30 15:44
閱讀 3326·2019-08-30 14:23
閱讀 3559·2019-08-28 17:51
閱讀 2298·2019-08-26 13:52