摘要:慕課網(wǎng)消息中間件極速入門與實(shí)戰(zhàn)學(xué)習(xí)總結(jié)時間年月日星期三說明本文部分內(nèi)容均來自慕課網(wǎng)。
慕課網(wǎng)《RabbitMQ消息中間件極速入門與實(shí)戰(zhàn)》學(xué)習(xí)總結(jié)
時間:2018年09月05日星期三
說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):https://www.imooc.com
教學(xué)源碼:無
學(xué)習(xí)源碼:https://github.com/zccodere/s...
第一章:RabbitMQ起步 1-1 課程導(dǎo)航課程導(dǎo)航
RabbitMQ簡介及AMQP協(xié)議
RabbitMQ安裝與使用
RabbitMQ核心概念
與SpringBoot整合
保障100%的消息可靠性投遞方案落地實(shí)現(xiàn)
1-2 RabbitMQ簡介初識RabbitMQ
RabbitMQ是一個開源的消息代理和隊(duì)列服務(wù)器
用來通過普通協(xié)議在完全不同的應(yīng)用之間共享數(shù)據(jù)
RabbitMQ是使用Erlang語言來編寫的
并且RabbitMQ是基于AMQP協(xié)議的
RabbitMQ簡介
目前很多互聯(lián)網(wǎng)大廠都在使用RabbitMQ
RabbitMQ底層采用Erlang語言進(jìn)行編寫
開源、性能優(yōu)秀,穩(wěn)定性保障
與SpringAMQP完美的整合、API豐富
集群模式豐富,表達(dá)式配置,HA模式,鏡像隊(duì)列模型
保證數(shù)據(jù)不丟失的前提做到高可靠性、可用性
AMQP全稱:Advanced Message Queuing Protocol
AMQP翻譯:高級消息隊(duì)列協(xié)議
AMQP協(xié)議模型
1-3 RabbitMQ安裝學(xué)習(xí)筆記
0.安裝準(zhǔn)備 官網(wǎng)地址:http://www.rabbitmq.com/ 安裝Linux必要依賴包1-4 RabbitMQ概念下載RabbitMQ安裝包 進(jìn)行安裝,修改相關(guān)配置文件 vim /etc/hostname vim /etc/hosts 1.安裝Erlang wget https://packages.erlang-solutions.com/erlang-solutions_1.0_all.deb sudo dpkg -i erlang-solutions_1.0_all.deb sudo apt-get install erlang erlang-nox 2.安裝RabbitMQ echo "deb http://www.rabbitmq.com/debian/ testing main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add - sudo apt-get install rabbitmq-server 3.安裝RabbitMQ web管理插件 sudo rabbitmq-plugins enable rabbitmq_management sudo systemctl restart rabbitmq-server 訪問:http://localhost:15672 默認(rèn)用戶名密碼:guest/guest 4.修改RabbitMQ配置 vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.5.7/ebin/rabbit.app 比如修改密碼、配置等等;例如:loopback_users中的<<"guest">>,只保留guest 服務(wù)啟動:rabbitmq-server start & 服務(wù)停止:rabbitmqctl app_stop
RabbitMQ的整體架構(gòu)
RabbitMQ核心概念
Server:又稱Broker,接受客戶端的連接,實(shí)現(xiàn)AMQP實(shí)體服務(wù)
Connection:連接,應(yīng)用程序與Broker的網(wǎng)絡(luò)連接
Channel:網(wǎng)絡(luò)信道
幾乎所有的操作都在Channel中進(jìn)行
Channel是進(jìn)行消息讀寫的通道
客戶端可建立多個Channel
每個Channel代表一個會話任務(wù)
Message:消息
服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù),由Properties和Body組成
Properties可以對消息進(jìn)行修飾,比如消息的優(yōu)先級、延遲等高級特性
Body則就是消息體內(nèi)容
Virtual host:虛擬機(jī)
用于進(jìn)行邏輯隔離,最上層的消息路由
一個Virtual host里面可以有若干個Exchange和Queue
同一個Virtual host里面不能有相同名稱的Exchange或Queue
Exchange:交換機(jī),接收消息,根據(jù)路由鍵轉(zhuǎn)發(fā)消息到綁定的隊(duì)列
Binding:Exchange和Queue之間的虛擬連接,binding中可以包含routing key
Routing key:一個路由規(guī)則,虛擬機(jī)可用它來確定如何路由一個特定消息
Queue:也稱為Message Queue,消息隊(duì)列,保存消息并將它們轉(zhuǎn)發(fā)給消費(fèi)者
RabbitMQ消息的流轉(zhuǎn)過程
第二章:RabbitMQ使用 2-1 發(fā)送消息SpringBoot與RabbitMQ集成
引入相關(guān)依賴
對application.properties進(jìn)行配置
創(chuàng)建名為rabbitmq-producer的maven工程pom如下
47-rabbitmq com.myimooc 1.0-SNAPSHOT 4.0.0 rabbitmq-producer 2.0.4.RELEASE org.springframework.boot spring-boot-parent ${spring.boot.version} pom import org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-amqp org.apache.commons commons-lang3 commons-io commons-io 2.5 com.alibaba fastjson 1.2.36 javax.servlet javax.servlet-api provided org.slf4j slf4j-api log4j log4j 1.2.17 org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-maven-plugin
1.編寫Order類
package com.myimooc.rabbitmq.entity; import java.io.Serializable; /** *
* 標(biāo)題: 訂單實(shí)體
* 描述: 訂單實(shí)體
* 時間: 2018/09/06
* * @author zc */ public class Order implements Serializable{ private static final long serialVersionUID = 6771608755338249746L; private String id; private String name; /** * 存儲消息發(fā)送的唯一標(biāo)識 */ private String messageId; public Order() { } public Order(String id, String name, String messageId) { this.id = id; this.name = name; this.messageId = messageId; } @Override public String toString() { return "Order{" + "id="" + id + """ + ", name="" + name + """ + ", messageId="" + messageId + """ + "}"; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } }
2.編寫OrderSender類
package com.myimooc.rabbitmq.producer.producer; import com.myimooc.rabbitmq.entity.Order; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** *
* 標(biāo)題: 訂單消息發(fā)送者
* 描述: 訂單消息發(fā)送者
* 時間: 2018/09/06
* * @author zc */ @Component public class OrderSender { private RabbitTemplate rabbitTemplate; @Autowired public OrderSender( RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } /** * 發(fā)送訂單 * * @param order 訂單 * @throws Exception 異常 */ public void send(Order order) throws Exception { CorrelationData correlationData = new CorrelationData(); correlationData.setId(order.getMessageId()); // exchange:交換機(jī) // routingKey:路由鍵 // message:消息體內(nèi)容 // correlationData:消息唯一ID this.rabbitTemplate.convertAndSend("order-exchange", "order.a", order, correlationData); } }
3.編寫application.properties類
# RabbitMQ配置 spring.rabbitmq.addresses=192.168.0.105:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 # Server配置 server.servlet.context-path=/ server.port=8080 spring.http.encoding.charset=UTF-8 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=NON_NULL
4.編寫Application類
package com.myimooc.rabbitmq.producer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** *
* 標(biāo)題: 啟動類
* 描述: 啟動類
* 時間: 2018/09/06
* * @author zc */ @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
5.編寫OrderSenderTest類
package com.myimooc.rabbitmq.producer.producer; import com.myimooc.rabbitmq.entity.Order; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.UUID; /** *2-2 處理消息
* 標(biāo)題: 訂單消息發(fā)送者測試
* 描述: 訂單消息發(fā)送者測試
* 時間: 2018/09/06
* * @author zc */ @RunWith(SpringRunner.class) @SpringBootTest public class OrderSenderTest { @Autowired private OrderSender orderSender; @Test public void testSend1() throws Exception { Order order = new Order(); order.setId("201809062009010001"); order.setName("測試訂單1"); order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString().replaceAll("-","")); this.orderSender.send(order); } }
創(chuàng)建名為rabbitmq-consumer的maven工程pom如下
47-rabbitmq com.myimooc 1.0-SNAPSHOT 4.0.0 rabbitmq-consumer 2.0.4.RELEASE org.springframework.boot spring-boot-parent ${spring.boot.version} pom import org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-amqp org.apache.commons commons-lang3 commons-io commons-io 2.5 com.alibaba fastjson 1.2.36 javax.servlet javax.servlet-api provided org.slf4j slf4j-api log4j log4j 1.2.17 org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-maven-plugin
1.編寫Order類
package com.myimooc.rabbitmq.entity; import java.io.Serializable; /** *
* 標(biāo)題: 訂單實(shí)體
* 描述: 訂單實(shí)體
* 時間: 2018/09/06
* * @author zc */ public class Order implements Serializable{ private static final long serialVersionUID = 6771608755338249746L; private String id; private String name; /** * 存儲消息發(fā)送的唯一標(biāo)識 */ private String messageId; public Order() { } public Order(String id, String name, String messageId) { this.id = id; this.name = name; this.messageId = messageId; } @Override public String toString() { return "Order{" + "id="" + id + """ + ", name="" + name + """ + ", messageId="" + messageId + """ + "}"; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } }
2.編寫OrderReceiver類
package com.myimooc.rabbitmq.consumer.consumer; import com.rabbitmq.client.Channel; import com.myimooc.rabbitmq.entity.Order; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.util.Map; /** *
* 標(biāo)題: 訂單接收者
* 描述: 訂單接收者
* 時間: 2018/09/06
* * @author zc */ @Component public class OrderReceiver { /** * 接收消息 * * @param order 消息體內(nèi)容 * @param headers 消息頭內(nèi)容 * @param channel 網(wǎng)絡(luò)信道 * @throws Exception 異常 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "order-queue",durable = "true"), exchange = @Exchange(name = "order-exchange",type = "topic"), key = "order.*" )) @RabbitHandler public void onOrderMessage(@Payload Order order, @Headers Mapheaders, Channel channel) throws Exception { // 消費(fèi)者操作 System.out.println("收到消息:"); System.out.println("訂單信息:" + order.toString()); // 手動簽收消息 Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); } }
3.編寫application.properties類
# RabbitMQ連接配置 spring.rabbitmq.addresses=192.168.0.105:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 # RabbitMQ消費(fèi)配置 # 基本并發(fā):5 spring.rabbitmq.listener.simple.concurrency=5 # 最大并發(fā):10 spring.rabbitmq.listener.simple.max-concurrency=10 # 簽收模式:手動簽收 spring.rabbitmq.listener.simple.acknowledge-mode=manual # 限流策略:同一時間只有1條消息發(fā)送過來消費(fèi) spring.rabbitmq.listener.simple.prefetch=1 # Server配置 server.servlet.context-path=/ server.port=8082 spring.http.encoding.charset=UTF-8 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=NON_NULL
4.編寫Application類
package com.myimooc.rabbitmq.consumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** *第三章:可靠性投遞 3-1 設(shè)計方案
* 標(biāo)題: 啟動類
* 描述: 啟動類
* 時間: 2018/09/06
* * @author zc */ @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
保障100%消息投遞成功設(shè)計方案(一)
3-2 代碼詳解因篇幅限制,源碼請到github地址查看,這里僅展示核心關(guān)鍵類
1.編寫OrderSender類
package com.myimooc.rabbitmq.ha.producer; import com.myimooc.rabbitmq.entity.Order; import com.myimooc.rabbitmq.ha.constant.Constants; import com.myimooc.rabbitmq.ha.dao.mapper.BrokerMessageLogMapper; import com.myimooc.rabbitmq.ha.dao.po.BrokerMessageLogPO; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** *
* 標(biāo)題: 訂單消息發(fā)送者
* 描述: 訂單消息發(fā)送者
* 時間: 2018/09/06
* * @author zc */ @Component public class OrderSender { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private BrokerMessageLogMapper brokerMessageLogMapper; /** * 回調(diào)方法:confirm確認(rèn) */ private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("correlationData:" + correlationData); String messageId = correlationData.getId(); if (ack) { // 如果confirm返回成功,則進(jìn)行更新 BrokerMessageLogPO messageLogPO = new BrokerMessageLogPO(); messageLogPO.setMessageId(messageId); messageLogPO.setStatus(Constants.OrderSendStatus.SEND_SUCCESS); brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLogPO); } else { // 失敗則進(jìn)行具體的后續(xù)操作:重試或者補(bǔ)償?shù)? System.out.println("異常處理..."); } } }; /** * 發(fā)送訂單 * * @param order 訂單 */ public void send(Order order) { // 設(shè)置回調(diào)方法 this.rabbitTemplate.setConfirmCallback(confirmCallback); // 消息ID CorrelationData correlationData = new CorrelationData(order.getMessageId()); // 發(fā)送消息 this.rabbitTemplate.convertAndSend("order-exchange", "order.a", order, correlationData); } }
2.編寫OrderService類
package com.myimooc.rabbitmq.ha.service; import com.myimooc.rabbitmq.entity.Order; import com.myimooc.rabbitmq.ha.constant.Constants; import com.myimooc.rabbitmq.ha.dao.mapper.BrokerMessageLogMapper; import com.myimooc.rabbitmq.ha.dao.mapper.OrderMapper; import com.myimooc.rabbitmq.ha.dao.po.BrokerMessageLogPO; import com.myimooc.rabbitmq.ha.producer.OrderSender; import com.myimooc.rabbitmq.ha.util.FastJsonConvertUtils; import org.apache.commons.lang3.time.DateUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.Date; /** *
* 標(biāo)題: 訂單服務(wù)
* 描述: 訂單服務(wù)
* 時間: 2018/09/07
* * @author zc */ @Service public class OrderService { @Autowired private OrderMapper orderMapper; @Autowired private BrokerMessageLogMapper brokerMessageLogMapper; @Autowired private OrderSender orderSender; /** * 創(chuàng)建訂單 * * @param order 訂單 */ public void create(Order order) { // 當(dāng)前時間 Date orderTime = new Date(); // 業(yè)務(wù)數(shù)據(jù)入庫 this.orderMapper.insert(order); // 消息日志入庫 BrokerMessageLogPO messageLogPO = new BrokerMessageLogPO(); messageLogPO.setMessageId(order.getMessageId()); messageLogPO.setMessage(FastJsonConvertUtils.convertObjectToJson(order)); messageLogPO.setTryCount(0); messageLogPO.setStatus(Constants.OrderSendStatus.SENDING); messageLogPO.setNextRetry(DateUtils.addMinutes(orderTime, Constants.ORDER_TIMEOUT)); this.brokerMessageLogMapper.insert(messageLogPO); // 發(fā)送消息 this.orderSender.send(order); } }
3.編寫RetryMessageTask類
package com.myimooc.rabbitmq.ha.task; import com.myimooc.rabbitmq.entity.Order; import com.myimooc.rabbitmq.ha.constant.Constants; import com.myimooc.rabbitmq.ha.dao.mapper.BrokerMessageLogMapper; import com.myimooc.rabbitmq.ha.dao.po.BrokerMessageLogPO; import com.myimooc.rabbitmq.ha.producer.OrderSender; import com.myimooc.rabbitmq.ha.util.FastJsonConvertUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.List; /** *
* 標(biāo)題: 重發(fā)消息定時任務(wù)
* 描述: 重發(fā)消息定時任務(wù)
* 時間: 2018/09/07
* * @author zc */ @Component public class RetryMessageTask { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private OrderSender orderSender; @Autowired private BrokerMessageLogMapper brokerMessageLogMapper; /** * 啟動完成3秒后開始執(zhí)行,每隔10秒執(zhí)行一次 */ @Scheduled(initialDelay = 3000, fixedDelay = 10000) public void retrySend() { logger.debug("重發(fā)消息定時任務(wù)開始"); // 查詢 status = 0 和 timeout 的消息日志 Listpos = this.brokerMessageLogMapper.listSendFailureAndTimeoutMessage(); for (BrokerMessageLogPO po : pos) { logger.debug("處理消息日志:{}",po); if (po.getTryCount() >= Constants.MAX_RETRY_COUNT) { // 更新狀態(tài)為失敗 BrokerMessageLogPO messageLogPO = new BrokerMessageLogPO(); messageLogPO.setMessageId(po.getMessageId()); messageLogPO.setStatus(Constants.OrderSendStatus.SEND_FAILURE); this.brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLogPO); } else { // 進(jìn)行重試,重試次數(shù)+1 this.brokerMessageLogMapper.updateRetryCount(po); Order reSendOrder = FastJsonConvertUtils.convertJsonToObject(po.getMessage(), Order.class); try { this.orderSender.send(reSendOrder); } catch (Exception ex) { // 異常處理 logger.error("消息發(fā)送異常:{}", ex); } } } logger.debug("重發(fā)消息定時任務(wù)結(jié)束"); } }
4.編寫ApplicationTest類
package com.myimooc.rabbitmq.ha; import com.myimooc.rabbitmq.entity.Order; import com.myimooc.rabbitmq.ha.service.OrderService; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.UUID; /** *
* 標(biāo)題: 訂單創(chuàng)建測試
* 描述: 訂單創(chuàng)建測試
* 時間: 2018/09/07
* * @author zc */ @RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTest { @Autowired private OrderService orderService; @Test public void testCreateOrder(){ Order order = new Order(); order.setId(String.valueOf(System.currentTimeMillis())); order.setName("測試創(chuàng)建訂單"); order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString().replaceAll("-","")); this.orderService.create(order); } }
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/77035.html
摘要:時間年月日星期六說明本文部分內(nèi)容均來自慕課網(wǎng)。這個時候,可以啟動多臺積分系統(tǒng),來同時消費(fèi)這個消息中間件里面的登錄消息,達(dá)到橫向擴(kuò)展的作用。 時間:2017年07月22日星期六說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):http://www.imooc.com教學(xué)源碼:無學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:課程介紹 1-1 課程安排 Java...
時間:2018年04月11日星期三 說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):https://www.imooc.com 教學(xué)源碼:https://github.com/zccodere/s... 學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:課程介紹 1-1 課程介紹 什么是Netty 高性能、事件驅(qū)動、異步非阻塞的IO Java開源框架 基于NIO的客戶...
摘要:時間年月日星期六說明本文部分內(nèi)容均來自慕課網(wǎng)。必填用于執(zhí)行命令,當(dāng)執(zhí)行完畢后,將產(chǎn)生一個新的文件層。可選指定此鏡像啟動時默認(rèn)執(zhí)行命令??蛇x用于指定需要暴露的網(wǎng)絡(luò)端口號??蛇x向鏡像中掛載一個卷組。 時間:2017年09月16日星期六說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):http://www.imooc.com 教學(xué)源碼:無 學(xué)習(xí)源碼:無 第一章:課程簡介 1-1 課程介紹 Docke...
摘要:入門篇學(xué)習(xí)總結(jié)時間年月日星期三說明本文部分內(nèi)容均來自慕課網(wǎng)。主要的功能是日志記錄,性能統(tǒng)計,安全控制,事務(wù)處理,異常處理等等。 《Spring入門篇》學(xué)習(xí)總結(jié) 時間:2017年1月18日星期三說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):http://www.imooc.com教學(xué)示例源碼:https://github.com/zccodere/s...個人學(xué)習(xí)源碼:https://git...
閱讀 2566·2023-04-26 01:44
閱讀 2577·2021-09-10 10:50
閱讀 1420·2019-08-30 15:56
閱讀 2286·2019-08-30 15:44
閱讀 524·2019-08-29 11:14
閱讀 3429·2019-08-26 11:56
閱讀 3025·2019-08-26 11:52
閱讀 920·2019-08-26 10:27