摘要:時間年月日星期六說明本文部分內(nèi)容均來自慕課網(wǎng)。這個時候,可以啟動多臺積分系統(tǒng),來同時消費這個消息中間件里面的登錄消息,達到橫向擴展的作用。
時間:2017年07月22日星期六
說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):http://www.imooc.com
教學(xué)源碼:無
學(xué)習(xí)源碼:https://github.com/zccodere/s...
Java消息中間件(入門篇)
為什么需要使用消息中間件 消息中間件概述 JMS規(guī)范 JMS代碼演練
Java消息中間件(拓展篇)
ActiveMQ集群配置 消息中間件在大型系統(tǒng)中的最佳實踐 使用其它消息中間件1-2 使用消息中間件原因
通過服務(wù)調(diào)用讓其它系統(tǒng)感知事件發(fā)生
系統(tǒng)之間高耦合 程序執(zhí)行效率低
通過消息中間件解耦服務(wù)調(diào)用
生活中的案例
微信公眾號 老師在黑板上寫字 電視機 等等
消息中間件帶來的好處
解耦:系統(tǒng)解耦 異步:異步執(zhí)行 橫向擴展 安全可靠 順序保證
橫向擴展解釋
當(dāng)?shù)卿浵到y(tǒng),需要很多用戶登錄。這些消息全部需要告知積分系統(tǒng),去增加積分,而增加積分這個處理過程可能比較麻煩、比較耗時。這個時候,可以啟動多臺積分系統(tǒng),來同時消費這個消息中間件里面的登錄消息,達到橫向擴展的作用。第二章:概述 2-1 消息中間件概述
什么是中間件
非底層操作系統(tǒng)軟件,非業(yè)務(wù)應(yīng)用軟件,不是直接給最終用戶使用的,不能直接給客戶帶來價值的軟件統(tǒng)稱為中間件
什么是消息中間件
關(guān)注于數(shù)據(jù)的發(fā)送和接受,利用高效可靠的異步消息傳遞機制集成分布式系統(tǒng)
示意圖
什么是JMS
Java消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個Java平臺中關(guān)于面向消息中間件的API,用于在兩個應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進行異步通信。Java消息服務(wù)是一個與具體平臺無關(guān)的API,絕大多數(shù)MOM提供商都對JMS提供支持。
什么是AMQP
AMQP(Advanced Message Queuing Protocol)是一個提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級消息隊列協(xié)議,基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產(chǎn)品,不同的開發(fā)語言等條件的限制。
JMS和AMQP對比
ActiveMQ
ActiveMQ是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ是一個完全支持JMS1.1和J2EE1.4規(guī)范的JMS Provider實現(xiàn),盡管JMS規(guī)范出臺已經(jīng)是很久的事情了,但是JMS在當(dāng)今J2EE應(yīng)用中間件仍然扮演者特殊的地位。
ActiveMQ特性
多種語言和協(xié)議編寫客戶端。 語言:Java、C、C++、C#、Ruby、Perl、Python、PHP 應(yīng)用協(xié)議:OpenWire、Stomp、REST、WS Notification、XMPP、AMQP 完全支持JMS1.1和J2EE1.4規(guī)范(持久化、XA消息、事務(wù)) 虛擬主題、組合目的、鏡像隊列
RabbitMQ
RabbitMQ是一個開源的AMQP實現(xiàn),服務(wù)器端用Erlang語言編寫。用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息,在易用性、擴展性、高可用性等方面表現(xiàn)不俗。
RabbitMQ特性
支持多種客戶端 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript等 AMQP的完整實現(xiàn)(vhost、Exchange、Binding、Routing Key等) 事務(wù)支持/發(fā)布確認(rèn) 消息持久化
Kafka
Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),是一個分布式、分區(qū)的、可高的分布式日志存儲服務(wù)。它通過一種獨一無二的設(shè)計提供了一個消息系統(tǒng)的功能。
Kafka特性
通過O(1)的磁盤數(shù)據(jù)結(jié)構(gòu)提供消息的持久化, 這種結(jié)構(gòu)對于即使數(shù)以TB的消息存儲也能夠保持長時間的穩(wěn)定性能 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒數(shù)百萬的消息 Partition、Consumer Group
綜合評價
第三章:JMS規(guī)范 3-1 JMS規(guī)范Java消息服務(wù)定義
Java消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個Java平臺中關(guān)于面向消息中間件的API,用于在兩個應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進行異步通信。
JMS相關(guān)概念
提供者:實現(xiàn)JMS規(guī)范的消息中間件服務(wù)器 客戶端:發(fā)送或接收消息的應(yīng)用程序 生產(chǎn)者/發(fā)布者:創(chuàng)建并發(fā)送消息的客戶端 消費者/訂閱者:接收并處理消息的客戶端 消息:應(yīng)用程序之間傳遞的數(shù)據(jù)內(nèi)容 消息模式:在客戶端之間傳遞消息的模式,JMS中定義了主題和隊列兩種模式
JMS消息模式:隊列模式
客戶端包括生產(chǎn)者和消費者 隊列中的消息只能被一個消費者消費 消費者可以隨時消費隊列中的消息
隊列模型示意圖
JMS消息模式:主題模型
客戶端包括發(fā)布者和訂閱者 主題中的消息被所有訂閱者消費 消費者不能消費訂閱之前就發(fā)送到主題中的消息
主題模型示意圖
JMS編碼接口
ConnectionFactory:用于創(chuàng)建連接到消息中間件的連接工廠 Connection:代表了應(yīng)用程序和消息服務(wù)器之間的通信鏈路 Destination:指消息發(fā)布和接收的地點,包括隊列和主題 Session:表示一個單線程的上下文,用于發(fā)送和接收消息 MessageConsumer:由會話創(chuàng)建,用戶接收發(fā)送到目標(biāo)的消息 MessageProducer:由會話創(chuàng)建,用于發(fā)送消息到目標(biāo) Message:是在消費者和生產(chǎn)者之間傳送的對象,消息頭,一組消息屬性,一個消息體
JMS編碼接口之間的關(guān)系
第四章:使用ActiveMQ 4-1 Windows安裝ActiveMQ在Windows安裝ActiveMQ
下載安裝包 直接啟動 使用服務(wù)啟動
安裝驗證
訪問地址:http://127.0.0.1:8161/ 默認(rèn)用戶:admin 默認(rèn)密碼:admin4-2 Linux安裝ActiveMQ
在Linux安裝ActiveMQ
下載并解壓安裝包 啟動
啟動驗證
進入到bin目錄,使用命令./activemq start啟動服務(wù) 使用命令ps -ef |grep activemq查看進程是否存在 使用命令./activemq stop關(guān)閉服務(wù)
安裝驗證
訪問地址:http://Linux主機IP:8161/ 默認(rèn)用戶:admin 默認(rèn)密碼:admin4-3 隊列模式的消息演示
使用JMS接口規(guī)范連接ActiveMQ
創(chuàng)建生產(chǎn)者 創(chuàng)建消費者 創(chuàng)建發(fā)布者 創(chuàng)建訂閱者
回顧JMS編碼接口之間的關(guān)系
代碼演示
1.編寫AppProducer類
package com.myimooc.jms.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * App 生產(chǎn)者-隊列模式 * @author ZhangCheng on 2017-07-22 * */ public class AppProducer { /** 指定ActiveMQ服務(wù)的地址 */ private static final String URL = "tcp://127.0.0.1:61616"; /** 指定隊列的名稱 */ private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws JMSException { // 1.創(chuàng)建ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.創(chuàng)建Connection Connection connection = connectionFactory.createConnection(); // 3.啟動連接 connection.start(); // 4.創(chuàng)建會話(第一個參數(shù):是否在事務(wù)中處理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5. 創(chuàng)建一個目標(biāo) Destination destination = session.createQueue(QUEUE_NAME); // 6.創(chuàng)建一個生產(chǎn)者 MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 100; i++) { // 7.創(chuàng)建消息 TextMessage textMessage = session.createTextMessage("test" + i); // 8.發(fā)布消息 producer.send(textMessage); System.out.println("消息發(fā)送:" + textMessage.getText()); } // 9.關(guān)閉連接 connection.close(); } }
2.編寫AppConsumer類
package com.myimooc.jms.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * App 消費者-隊列模式 * @author ZhangCheng on 2017-07-22 * */ public class AppConsumer { /** 指定ActiveMQ服務(wù)的地址 */ private static final String URL = "tcp://127.0.0.1:61616"; /** 指定隊列的名稱 */ private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws JMSException { // 1.創(chuàng)建ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.創(chuàng)建Connection Connection connection = connectionFactory.createConnection(); // 3.啟動連接 connection.start(); // 4.創(chuàng)建會話(第一個參數(shù):是否在事務(wù)中處理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.創(chuàng)建一個目標(biāo) Destination destination = session.createQueue(QUEUE_NAME); // 6.創(chuàng)建一個消費者 MessageConsumer consumer = session.createConsumer(destination); // 7.創(chuàng)建一個監(jiān)聽器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("接收消息:" + textMessage.getText()); } catch (JMSException e) { System.out.println("接收消息異常:"); e.printStackTrace(); } } }); // 8.關(guān)閉連接 //connection.close(); } }4-4 主題模式的消息演示
代碼演示
1.編寫AppProducer類
package com.myimooc.jms.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * App 生產(chǎn)者-主題模式 * @author ZhangCheng on 2017-07-22 * */ public class AppProducer { /** 指定ActiveMQ服務(wù)的地址 */ private static final String URL = "tcp://127.0.0.1:61616"; /** 指定主題的名稱 */ private static final String TOPIC_NAME = "topic-test"; public static void main(String[] args) throws JMSException { // 1.創(chuàng)建ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.創(chuàng)建Connection Connection connection = connectionFactory.createConnection(); // 3.啟動連接 connection.start(); // 4.創(chuàng)建會話(第一個參數(shù):是否在事務(wù)中處理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5. 創(chuàng)建一個目標(biāo) Destination destination = session.createTopic(TOPIC_NAME); // 6.創(chuàng)建一個生產(chǎn)者 MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 100; i++) { // 7.創(chuàng)建消息 TextMessage textMessage = session.createTextMessage("test" + i); // 8.發(fā)布消息 producer.send(textMessage); System.out.println("消息發(fā)送:" + textMessage.getText()); } // 9.關(guān)閉連接 connection.close(); } }
2.編寫AppConsumer類
package com.myimooc.jms.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * App 消費者-主題模式 * @author ZhangCheng on 2017-07-22 * */ public class AppConsumer { /** 指定ActiveMQ服務(wù)的地址 */ private static final String URL = "tcp://127.0.0.1:61616"; /** 指定主題的名稱 */ private static final String TOPIC_NAME = "topic-test"; public static void main(String[] args) throws JMSException { // 1.創(chuàng)建ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.創(chuàng)建Connection Connection connection = connectionFactory.createConnection(); // 3.啟動連接 connection.start(); // 4.創(chuàng)建會話(第一個參數(shù):是否在事務(wù)中處理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.創(chuàng)建一個目標(biāo) Destination destination = session.createTopic(TOPIC_NAME); // 6.創(chuàng)建一個消費者 MessageConsumer consumer = session.createConsumer(destination); // 7.創(chuàng)建一個監(jiān)聽器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("接收消息:" + textMessage.getText()); } catch (JMSException e) { System.out.println("接收消息異常:"); e.printStackTrace(); } } }); // 8.關(guān)閉連接 //connection.close(); } }4-5 Spring jms理論
使用Spring集成JMS連接ActiveMQ
ConnectionFactory:用于管理連接的連接工廠 JmsTemplate:用于發(fā)送和接收消息的模版類 MessageListener:消息監(jiān)聽器
ConnectionFactory
一個Spring為我們提供的連接池 JmsTemplate每次發(fā)消息都會重新創(chuàng)建連接,會話和productor Spring中提供了SingleConnectFactory和CachingConnectionFactory
JmsTemplate
是Spring提供的,只需向Spring容器內(nèi)注冊這個類就可以使用JmsTemplate方便的操作jms JmsTemplate類是線程安全的,可以在整個應(yīng)用范圍使用
MessageListener
實現(xiàn)一個onMessage方法,該方法只接收一個Message參數(shù)4-6 Spring jms演示
代碼演示
1.創(chuàng)建名為jmsspring的maven項目POM文件如下
4.0.0 com.myimooc jmsspring 0.0.1-SNAPSHOT jar jmsspring http://maven.apache.org org.springframework.boot spring-boot-starter-parent 1.5.1.RELEASE UTF-8 UTF-8 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-web org.springframework spring-jms org.apache.activemq activemq-all 5.9.0 org.apache.maven.plugins maven-compiler-plugin 1.8 1.8
2.完成后的目錄結(jié)構(gòu)如下
源碼請到我的github地址查看
3.測試
使用Postman向ProducerController發(fā)起請求,將消息發(fā)送出去
對應(yīng)的ConsumerTopicMessageListener 和 ConsumerMessageListener接收到消息
第五章:大型系統(tǒng)最佳實踐 5-1 ActiceMQ集群為什么要對消息中間件集群
實現(xiàn)高可用,以排除單點故障引起的服務(wù)中斷 實現(xiàn)負(fù)載均衡,以提升效率為更多客戶提供服務(wù)
集群方式
客戶端集群:讓多個消費者消費同一個隊列 Broker cluster:多個Broker之間同步消息 Master Slave:實現(xiàn)高可用
ActiveMQ失效轉(zhuǎn)移(failover)-客戶端配置
允許當(dāng)其中一臺消息服務(wù)器宕機時,客戶端在傳輸層上重新連接到其它消息服務(wù)器 語法:failover:(uri1,…,uriN)?transportOptions
transportOptions參數(shù)說明
randomize默認(rèn)為true,表示在URI列表中選擇URI連接時是否采用隨機策略 initialReconnectDelay默認(rèn)為10,單位毫秒,表示第一次嘗試重連之間等待的時間 maxReconnectDelay默認(rèn)為30000,單位毫秒,最長重連的時間間隔
Broker cluster集群配置-原理
NetworkConnector(網(wǎng)絡(luò)連接器)
網(wǎng)絡(luò)連接器主要用于配置ActiveMQ服務(wù)器與服務(wù)器之間的網(wǎng)絡(luò)通訊方式,用于服務(wù)器透傳消息 網(wǎng)絡(luò)連接器分為靜態(tài)連接器和動態(tài)連接器
靜態(tài)連接器
動態(tài)連接器
5-2 ActiveMQ集群理論ActiveMQ Master Slace集群方案
Share nothing storage master/slave(已過時,5.8+后移除) Shared storage master/slave 共享存儲 Replicated LevelDB Store基于負(fù)責(zé)的LevelDB Store
共享存儲集群的原理
基于復(fù)制的LevelDB Store的原理
兩種集群方式對比
三臺服務(wù)器的完美集群方案
5-3 ActiveMQ集群實踐ActiveMQ集群配置方案
配置過程
1.節(jié)點準(zhǔn)備
mkdir activemq創(chuàng)建目錄 cp -rf apache-activemq-5.15.0 activemq/activemq-a cp -rf apache-activemq-5.15.0 activemq/activemq-b cp -rf apache-activemq-5.15.0 activemq/activemq-c cd activemq mkdir kahadb
2.配置a節(jié)點
cd activemq-a/ cd conf/ vim activemq.xmlvim jetty.xml:配置管理端口號,a節(jié)點使用默認(rèn)端口,無須配置
3.配置b節(jié)點
vim activemq.xml 配置網(wǎng)絡(luò)連接器配置持久化存儲路徑 配置服務(wù)端口 vim jetty.xml 配置管理端口號
4.配置c節(jié)點
vim activemq.xml 配置網(wǎng)絡(luò)連接器配置持久化存儲路徑 配置服務(wù)端口 vim jetty.xml 配置管理端口號
5.啟動服務(wù)
回到activemq目錄,分別啟動a,b,c三個節(jié)點
./activemq-a/bin/activemq start ./activemq-b/bin/activemq start ./activemq-c /bin/activemq start
檢查是否都啟動成功
ps -ef |grep activemq
檢查是否對外提供服務(wù),即端口是否被監(jiān)聽(占用)
netstat -anp |grep 61616 netstat -anp |grep 61617 netstat -anp |grep 61618
檢查發(fā)現(xiàn)61618即c節(jié)點沒有提供服務(wù),但是c節(jié)點的進程是啟動成功了的。因為b節(jié)點和c點擊是master/slave配置,現(xiàn)在b節(jié)點獲取到了共享文件夾的所有權(quán),所以c節(jié)點正在等待獲得資源,并且提供服務(wù)。即c節(jié)點在未獲得資源之前,是不提供服務(wù)的。
測試,把b節(jié)點殺掉,看c節(jié)點能不能提供61618的服務(wù)
./activemq-b/bin/activemq stop netstat -anp |grep 61618 ./activemq-b/bin/activemq start netstat -anp |grep 61617
檢查發(fā)現(xiàn),重新啟動b節(jié)點后,b節(jié)點61617端口并沒有提供服務(wù),是因為現(xiàn)在b節(jié)點成為了slave節(jié)點,而c節(jié)點成為了master節(jié)點。所以,現(xiàn)在b節(jié)點啟動了,但是它并不對外提供服務(wù)。只有當(dāng)c節(jié)點出現(xiàn)問題后,b節(jié)點才對外提供服務(wù)。
6.通過代碼測試集群配置是否生效
生產(chǎn)者
package com.myimooc.jms.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * App 生產(chǎn)者-隊列模式-集群配置測試 * @author ZhangCheng on 2017-07-25 * */ public class AppProducerTest { /** failover 為狀態(tài)轉(zhuǎn)移的存在部分 * 因a節(jié)點只作為消費者使用,所以這里不配置61616節(jié)點了。 * */ private static final String URL = "failover:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true"; /** 指定隊列的名稱 */ private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws JMSException { // 1.創(chuàng)建ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.創(chuàng)建Connection Connection connection = connectionFactory.createConnection(); // 3.啟動連接 connection.start(); // 4.創(chuàng)建會話(第一個參數(shù):是否在事務(wù)中處理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5. 創(chuàng)建一個目標(biāo) Destination destination = session.createQueue(QUEUE_NAME); // 6.創(chuàng)建一個生產(chǎn)者 MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 100; i++) { // 7.創(chuàng)建消息 TextMessage textMessage = session.createTextMessage("test" + i); // 8.發(fā)布消息 producer.send(textMessage); System.out.println("消息發(fā)送:" + textMessage.getText()); } // 9.關(guān)閉連接 connection.close(); } }
消費者
package com.myimooc.jms.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * App 消費者-隊列模式-集群配置測試 * @author ZhangCheng on 2017-07-22 * */ public class AppConsumerTest { /** failover 為狀態(tài)轉(zhuǎn)移的存在部分 * */ private static final String URL = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true"; /** 指定隊列的名稱 */ private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws JMSException { // 1.創(chuàng)建ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.創(chuàng)建Connection Connection connection = connectionFactory.createConnection(); // 3.啟動連接 connection.start(); // 4.創(chuàng)建會話(第一個參數(shù):是否在事務(wù)中處理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.創(chuàng)建一個目標(biāo) Destination destination = session.createQueue(QUEUE_NAME); // 6.創(chuàng)建一個消費者 MessageConsumer consumer = session.createConsumer(destination); // 7.創(chuàng)建一個監(jiān)聽器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("接收消息:" + textMessage.getText()); } catch (JMSException e) { System.out.println("接收消息異常:"); e.printStackTrace(); } } }); // 8.關(guān)閉連接 //connection.close(); } }
運行生產(chǎn)者,然后到管理界面查看消息發(fā)送到了那里
http://127.0.0.1:8161 http://127.0.0.1:8162 http://127.0.0.1:8163
查看發(fā)現(xiàn),8162無法訪問,是因為b節(jié)點是slave節(jié)點,不提供服務(wù),消息都發(fā)送到了c節(jié)點
把8163即c節(jié)點宕掉后,運行消費者,查看消息是否能夠使用
./activemq-c/bin/activemq stop5-4 企業(yè)級系統(tǒng)中的最佳實踐
實際業(yè)務(wù)場景分析
實際業(yè)務(wù)場景特點
子業(yè)務(wù)系統(tǒng)都有集群的可能性 同一個消息會廣播給關(guān)注該類消息的所有子業(yè)務(wù)系統(tǒng) 同一類消息在集群中被負(fù)載消費 業(yè)務(wù)的發(fā)生和消息的發(fā)布最終一致性
需要解決的問題
不同業(yè)務(wù)系統(tǒng)分別處理同一個消息,同一業(yè)務(wù)系統(tǒng)負(fù)載處理同類消息 解決消息發(fā)送時的一致性問題 解決消息處理的冪等性問題 基于消息機制建立事件總線
集群系統(tǒng)處理消息方案-使用JMS級聯(lián)的解決方案
集群系統(tǒng)處理消息方案-使用ActiveMQ的虛擬主題解決方案
發(fā)布者:將消息發(fā)布到一個主題中,主題名以VirtualTopic開頭,如VirtualTopic.TEST 消費者:從隊列中獲取消息,在隊列名中表名自己身份,如Consumer.A.VirtualTopic.TEST
解決消息發(fā)送時的一致性問題-使用JMS中XA系列接口保證強一致性
引入分布式事務(wù) 要求業(yè)務(wù)操作必須支持XA協(xié)議
解決消息發(fā)送時的一致性問題-使用消息表的本地事務(wù)解決方案
解決消息發(fā)送時的一致性問題-使用內(nèi)存日志的解決方案
解決消息處理的冪等性問題
所謂冪等性問題,是指多次執(zhí)行所產(chǎn)生的影響(結(jié)果)與一次執(zhí)行所產(chǎn)生的影響(結(jié)果)一樣。比如:支付成功后,支付寶會發(fā)起多次通知給業(yè)務(wù)系統(tǒng),要求業(yè)務(wù)系統(tǒng)能夠處理這些重復(fù)的消息,但是又不重復(fù)處理訂單。如果在消息處理系統(tǒng)中保證冪等性,會增加系統(tǒng)復(fù)雜度,我們可以統(tǒng)一處理冪等性后,再將消息發(fā)送給消息處理系統(tǒng)。
解決消息處理的冪等性問題-使用消息表的本地事務(wù)解決方案
解決消息處理的冪等性問題-使用內(nèi)存日志的解決方案
基于消息機制的事件總線-什么是事件驅(qū)動架構(gòu)
事件驅(qū)動架構(gòu)(Event Driven Architecture,EDA)定義了一個設(shè)計和實現(xiàn)一個應(yīng)用系統(tǒng)的方法學(xué),在這個系統(tǒng)里事件可傳輸于松散耦合的組件和服務(wù)之間。特點:有事我叫你,沒事別煩我
事件驅(qū)動架構(gòu)模型
該教師正在開發(fā)該事件總線的框架,github地址https://github.com/jovezhao/nest。
第六章:使用其它消息中間件 6-1 使用其它消息中間件分析需要做的事
解決各業(yè)務(wù)系統(tǒng)集群處理同一條消息 實現(xiàn)自己的消息提供者
常用消息中間件
ActiveMQ RabbitMQ Kafka
集成RabbitMQ
RabbitMQ:使用交換器綁定到隊列
示意圖
RabbitMQ消息提供者源碼解析
創(chuàng)建ConnectionFactory 創(chuàng)建Connection 創(chuàng)建Channel 定義Exchange 定義Queue并且綁定隊列
集成Kafka
Kafka使用group.id分組消費者
配置消息者參數(shù)group.id相對時對消息進行負(fù)載處理 配置服務(wù)器partitions參數(shù),控制同一個group.id下的consumer數(shù)量小于partitions Kafka只保證同一個partition下的消息是有序的第七章:課程總結(jié) 7-1 課程總結(jié)
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/67478.html
摘要:慕課網(wǎng)消息中間件極速入門與實戰(zhàn)學(xué)習(xí)總結(jié)時間年月日星期三說明本文部分內(nèi)容均來自慕課網(wǎng)。 慕課網(wǎng)《RabbitMQ消息中間件極速入門與實戰(zhàn)》學(xué)習(xí)總結(jié) 時間:2018年09月05日星期三 說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):https://www.imooc.com 教學(xué)源碼:無 學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:RabbitM...
摘要:時間年月日星期一說明本文部分內(nèi)容均來自慕課網(wǎng)。多用于網(wǎng)絡(luò)加密。散列函數(shù)函數(shù)或消息摘要函數(shù)主要作用散列函數(shù)用來驗證數(shù)據(jù)的完整性。 時間:2017年4月10日星期一說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):http://www.imooc.com教學(xué)示例源碼:https://github.com/zccodere/s...個人學(xué)習(xí)源碼:https://github.com/zccodere...
摘要:時間年月日星期五說明本文部分內(nèi)容均來自慕課網(wǎng)。本套課程介紹微信公眾號開發(fā),主要涉及公眾號介紹編輯模式介紹開發(fā)模式介紹等。慕課網(wǎng)是垂直的互聯(lián)網(wǎng)技能免費學(xué)習(xí)網(wǎng)站。 時間:2017年08月11日星期五說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):http://www.imooc.com教學(xué)源碼:https://github.com/zccodere/s...學(xué)習(xí)源碼:https://github...
時間:2017年4月10日星期一說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):http://www.imooc.com教學(xué)示例源碼:https://github.com/zccodere/s...個人學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:概述 1-1 Java實現(xiàn)消息摘要算法加密 消息摘要算法 MD(Message Digest) SHA(Secure H...
時間:2018年04月11日星期三 說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):https://www.imooc.com 教學(xué)源碼:https://github.com/zccodere/s... 學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:課程介紹 1-1 課程介紹 什么是Netty 高性能、事件驅(qū)動、異步非阻塞的IO Java開源框架 基于NIO的客戶...
閱讀 4189·2021-09-22 15:34
閱讀 2802·2021-09-22 15:29
閱讀 521·2019-08-29 13:52
閱讀 3375·2019-08-29 11:30
閱讀 2301·2019-08-26 10:40
閱讀 869·2019-08-26 10:19
閱讀 2283·2019-08-23 18:16
閱讀 2352·2019-08-23 17:50