成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

慕課網(wǎng)_《Java消息中間件》學(xué)習(xí)總結(jié)

twohappy / 3084人閱讀

摘要:時間年月日星期六說明本文部分內(nèi)容均來自慕課網(wǎng)。這個時候,可以啟動多臺積分系統(tǒng),來同時消費這個消息中間件里面的登錄消息,達到橫向擴展的作用。

時間:2017年07月22日星期六
說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):http://www.imooc.com
教學(xué)源碼:無
學(xué)習(xí)源碼:https://github.com/zccodere/s...

第一章:課程介紹 1-1 課程安排

Java消息中間件(入門篇)

為什么需要使用消息中間件
消息中間件概述
JMS規(guī)范
JMS代碼演練

Java消息中間件(拓展篇)

ActiveMQ集群配置
消息中間件在大型系統(tǒng)中的最佳實踐
使用其它消息中間件
1-2 使用消息中間件原因

通過服務(wù)調(diào)用讓其它系統(tǒng)感知事件發(fā)生

系統(tǒng)之間高耦合
程序執(zhí)行效率低

通過消息中間件解耦服務(wù)調(diào)用

生活中的案例

微信公眾號
老師在黑板上寫字
電視機
等等

消息中間件帶來的好處

解耦:系統(tǒng)解耦
異步:異步執(zhí)行
橫向擴展 
安全可靠
順序保證

橫向擴展解釋

當(dāng)?shù)卿浵到y(tǒng),需要很多用戶登錄。這些消息全部需要告知積分系統(tǒng),去增加積分,而增加積分這個處理過程可能比較麻煩、比較耗時。這個時候,可以啟動多臺積分系統(tǒng),來同時消費這個消息中間件里面的登錄消息,達到橫向擴展的作用。
第二章:概述 2-1 消息中間件概述

什么是中間件

非底層操作系統(tǒng)軟件,非業(yè)務(wù)應(yīng)用軟件,不是直接給最終用戶使用的,不能直接給客戶帶來價值的軟件統(tǒng)稱為中間件

什么是消息中間件

關(guān)注于數(shù)據(jù)的發(fā)送和接受,利用高效可靠的異步消息傳遞機制集成分布式系統(tǒng)

示意圖

什么是JMS

Java消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個Java平臺中關(guān)于面向消息中間件的API,用于在兩個應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進行異步通信。Java消息服務(wù)是一個與具體平臺無關(guān)的API,絕大多數(shù)MOM提供商都對JMS提供支持。

什么是AMQP

AMQP(Advanced Message Queuing Protocol)是一個提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級消息隊列協(xié)議,基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產(chǎn)品,不同的開發(fā)語言等條件的限制。

JMS和AMQP對比

ActiveMQ

ActiveMQ是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ是一個完全支持JMS1.1和J2EE1.4規(guī)范的JMS Provider實現(xiàn),盡管JMS規(guī)范出臺已經(jīng)是很久的事情了,但是JMS在當(dāng)今J2EE應(yīng)用中間件仍然扮演者特殊的地位。

ActiveMQ特性

多種語言和協(xié)議編寫客戶端。
    語言:Java、C、C++、C#、Ruby、Perl、Python、PHP
    應(yīng)用協(xié)議:OpenWire、Stomp、REST、WS Notification、XMPP、AMQP
完全支持JMS1.1和J2EE1.4規(guī)范(持久化、XA消息、事務(wù))
虛擬主題、組合目的、鏡像隊列

RabbitMQ

RabbitMQ是一個開源的AMQP實現(xiàn),服務(wù)器端用Erlang語言編寫。用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息,在易用性、擴展性、高可用性等方面表現(xiàn)不俗。

RabbitMQ特性

支持多種客戶端
    如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript等
AMQP的完整實現(xiàn)(vhost、Exchange、Binding、Routing Key等)
事務(wù)支持/發(fā)布確認(rèn)
消息持久化

Kafka

Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),是一個分布式、分區(qū)的、可高的分布式日志存儲服務(wù)。它通過一種獨一無二的設(shè)計提供了一個消息系統(tǒng)的功能。

Kafka特性

通過O(1)的磁盤數(shù)據(jù)結(jié)構(gòu)提供消息的持久化,
    這種結(jié)構(gòu)對于即使數(shù)以TB的消息存儲也能夠保持長時間的穩(wěn)定性能
高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒數(shù)百萬的消息
Partition、Consumer Group

綜合評價

第三章:JMS規(guī)范 3-1 JMS規(guī)范

Java消息服務(wù)定義

Java消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個Java平臺中關(guān)于面向消息中間件的API,用于在兩個應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進行異步通信。

JMS相關(guān)概念

提供者:實現(xiàn)JMS規(guī)范的消息中間件服務(wù)器
客戶端:發(fā)送或接收消息的應(yīng)用程序
生產(chǎn)者/發(fā)布者:創(chuàng)建并發(fā)送消息的客戶端
消費者/訂閱者:接收并處理消息的客戶端
消息:應(yīng)用程序之間傳遞的數(shù)據(jù)內(nèi)容
消息模式:在客戶端之間傳遞消息的模式,JMS中定義了主題和隊列兩種模式

JMS消息模式:隊列模式

客戶端包括生產(chǎn)者和消費者
隊列中的消息只能被一個消費者消費
消費者可以隨時消費隊列中的消息

隊列模型示意圖

JMS消息模式:主題模型

客戶端包括發(fā)布者和訂閱者
主題中的消息被所有訂閱者消費
消費者不能消費訂閱之前就發(fā)送到主題中的消息

主題模型示意圖

JMS編碼接口

ConnectionFactory:用于創(chuàng)建連接到消息中間件的連接工廠
Connection:代表了應(yīng)用程序和消息服務(wù)器之間的通信鏈路
Destination:指消息發(fā)布和接收的地點,包括隊列和主題
Session:表示一個單線程的上下文,用于發(fā)送和接收消息
MessageConsumer:由會話創(chuàng)建,用戶接收發(fā)送到目標(biāo)的消息
MessageProducer:由會話創(chuàng)建,用于發(fā)送消息到目標(biāo)
Message:是在消費者和生產(chǎn)者之間傳送的對象,消息頭,一組消息屬性,一個消息體

JMS編碼接口之間的關(guān)系

第四章:使用ActiveMQ 4-1 Windows安裝ActiveMQ

在Windows安裝ActiveMQ

下載安裝包
直接啟動
使用服務(wù)啟動

安裝驗證

訪問地址:http://127.0.0.1:8161/
默認(rèn)用戶:admin
默認(rèn)密碼:admin
4-2 Linux安裝ActiveMQ

在Linux安裝ActiveMQ

下載并解壓安裝包
啟動

啟動驗證

進入到bin目錄,使用命令./activemq start啟動服務(wù)
使用命令ps -ef |grep activemq查看進程是否存在
使用命令./activemq stop關(guān)閉服務(wù)

安裝驗證

訪問地址:http://Linux主機IP:8161/
默認(rèn)用戶:admin
默認(rèn)密碼:admin
4-3 隊列模式的消息演示

使用JMS接口規(guī)范連接ActiveMQ

創(chuàng)建生產(chǎn)者
創(chuàng)建消費者
創(chuàng)建發(fā)布者
創(chuàng)建訂閱者

回顧JMS編碼接口之間的關(guān)系

代碼演示

1.編寫AppProducer類

package com.myimooc.jms.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * App 生產(chǎn)者-隊列模式
 * @author ZhangCheng on 2017-07-22
 *
 */
public class AppProducer {
    /** 指定ActiveMQ服務(wù)的地址 */
    private static final String URL = "tcp://127.0.0.1:61616";
    /** 指定隊列的名稱 */
    private static final String QUEUE_NAME = "queue-test";
    
    public static void main(String[] args) throws JMSException {
        
        // 1.創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        // 2.創(chuàng)建Connection
        Connection connection = connectionFactory.createConnection();
        
        // 3.啟動連接
        connection.start();
        
        // 4.創(chuàng)建會話(第一個參數(shù):是否在事務(wù)中處理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 5. 創(chuàng)建一個目標(biāo)
        Destination destination = session.createQueue(QUEUE_NAME);
        
        // 6.創(chuàng)建一個生產(chǎn)者
        MessageProducer producer = session.createProducer(destination);
        
        for (int i = 0; i < 100; i++) {
            
            // 7.創(chuàng)建消息
            TextMessage textMessage = session.createTextMessage("test" + i);
            
            // 8.發(fā)布消息
            producer.send(textMessage);
            
            System.out.println("消息發(fā)送:" + textMessage.getText());
        }
        
        // 9.關(guān)閉連接
        connection.close();
    }
    
}

2.編寫AppConsumer類

package com.myimooc.jms.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * App 消費者-隊列模式
 * @author ZhangCheng on 2017-07-22
 *
 */
public class AppConsumer {
    /** 指定ActiveMQ服務(wù)的地址 */
    private static final String URL = "tcp://127.0.0.1:61616";
    /** 指定隊列的名稱 */
    private static final String QUEUE_NAME = "queue-test";
    
    public static void main(String[] args) throws JMSException {
        
        // 1.創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        // 2.創(chuàng)建Connection
        Connection connection = connectionFactory.createConnection();
        
        // 3.啟動連接
        connection.start();
        
        // 4.創(chuàng)建會話(第一個參數(shù):是否在事務(wù)中處理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 5.創(chuàng)建一個目標(biāo)
        Destination destination = session.createQueue(QUEUE_NAME);
        
        // 6.創(chuàng)建一個消費者
        MessageConsumer consumer = session.createConsumer(destination);
        
        // 7.創(chuàng)建一個監(jiān)聽器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println("接收消息:" + textMessage.getText());
                } catch (JMSException e) {
                    System.out.println("接收消息異常:");
                    e.printStackTrace();
                }
            }
        });
        
        // 8.關(guān)閉連接
        //connection.close();
    }
    
}
4-4 主題模式的消息演示

代碼演示

1.編寫AppProducer類

package com.myimooc.jms.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * App 生產(chǎn)者-主題模式
 * @author ZhangCheng on 2017-07-22
 *
 */
public class AppProducer {
    /** 指定ActiveMQ服務(wù)的地址 */
    private static final String URL = "tcp://127.0.0.1:61616";
    /** 指定主題的名稱 */
    private static final String TOPIC_NAME = "topic-test";
    
    public static void main(String[] args) throws JMSException {
        
        // 1.創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        // 2.創(chuàng)建Connection
        Connection connection = connectionFactory.createConnection();
        
        // 3.啟動連接
        connection.start();
        
        // 4.創(chuàng)建會話(第一個參數(shù):是否在事務(wù)中處理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 5. 創(chuàng)建一個目標(biāo)
        Destination destination = session.createTopic(TOPIC_NAME);
        
        // 6.創(chuàng)建一個生產(chǎn)者
        MessageProducer producer = session.createProducer(destination);
        
        for (int i = 0; i < 100; i++) {
            
            // 7.創(chuàng)建消息
            TextMessage textMessage = session.createTextMessage("test" + i);
            
            // 8.發(fā)布消息
            producer.send(textMessage);
            
            System.out.println("消息發(fā)送:" + textMessage.getText());
        }
        
        // 9.關(guān)閉連接
        connection.close();
    }
    
}

2.編寫AppConsumer類

package com.myimooc.jms.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * App 消費者-主題模式
 * @author ZhangCheng on 2017-07-22
 *
 */
public class AppConsumer {
    /** 指定ActiveMQ服務(wù)的地址 */
    private static final String URL = "tcp://127.0.0.1:61616";
    /** 指定主題的名稱 */
    private static final String TOPIC_NAME = "topic-test";
    
    public static void main(String[] args) throws JMSException {
        
        // 1.創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        // 2.創(chuàng)建Connection
        Connection connection = connectionFactory.createConnection();
        
        // 3.啟動連接
        connection.start();
        
        // 4.創(chuàng)建會話(第一個參數(shù):是否在事務(wù)中處理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 5.創(chuàng)建一個目標(biāo)
        Destination destination = session.createTopic(TOPIC_NAME);
        
        // 6.創(chuàng)建一個消費者
        MessageConsumer consumer = session.createConsumer(destination);
        
        // 7.創(chuàng)建一個監(jiān)聽器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println("接收消息:" + textMessage.getText());
                } catch (JMSException e) {
                    System.out.println("接收消息異常:");
                    e.printStackTrace();
                }
            }
        });
        
        // 8.關(guān)閉連接
        //connection.close();
    }
    
}
4-5 Spring jms理論

使用Spring集成JMS連接ActiveMQ

ConnectionFactory:用于管理連接的連接工廠
JmsTemplate:用于發(fā)送和接收消息的模版類
MessageListener:消息監(jiān)聽器

ConnectionFactory

一個Spring為我們提供的連接池
JmsTemplate每次發(fā)消息都會重新創(chuàng)建連接,會話和productor
Spring中提供了SingleConnectFactory和CachingConnectionFactory

JmsTemplate

是Spring提供的,只需向Spring容器內(nèi)注冊這個類就可以使用JmsTemplate方便的操作jms
JmsTemplate類是線程安全的,可以在整個應(yīng)用范圍使用

MessageListener

實現(xiàn)一個onMessage方法,該方法只接收一個Message參數(shù)
4-6 Spring jms演示

代碼演示

1.創(chuàng)建名為jmsspring的maven項目POM文件如下


    4.0.0

    com.myimooc
    jmsspring
    0.0.1-SNAPSHOT
    jar

    jmsspring
    http://maven.apache.org

    
        org.springframework.boot
        spring-boot-starter-parent
        1.5.1.RELEASE
         
    

    
        UTF-8
        UTF-8
    

    
        
            org.springframework.boot
            spring-boot-starter
        

        
            org.springframework.boot
            spring-boot-starter-web
        

        
            org.springframework
            spring-jms
        

        
        
            org.apache.activemq
            activemq-all
            5.9.0
        

    

    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    1.8
                    1.8
                
            
        
    

2.完成后的目錄結(jié)構(gòu)如下

源碼請到我的github地址查看

3.測試

使用Postman向ProducerController發(fā)起請求,將消息發(fā)送出去

對應(yīng)的ConsumerTopicMessageListener 和 ConsumerMessageListener接收到消息

第五章:大型系統(tǒng)最佳實踐 5-1 ActiceMQ集群

為什么要對消息中間件集群

實現(xiàn)高可用,以排除單點故障引起的服務(wù)中斷
實現(xiàn)負(fù)載均衡,以提升效率為更多客戶提供服務(wù)

集群方式

客戶端集群:讓多個消費者消費同一個隊列
Broker cluster:多個Broker之間同步消息
Master Slave:實現(xiàn)高可用

ActiveMQ失效轉(zhuǎn)移(failover)-客戶端配置

允許當(dāng)其中一臺消息服務(wù)器宕機時,客戶端在傳輸層上重新連接到其它消息服務(wù)器
語法:failover:(uri1,…,uriN)?transportOptions

transportOptions參數(shù)說明

randomize默認(rèn)為true,表示在URI列表中選擇URI連接時是否采用隨機策略
initialReconnectDelay默認(rèn)為10,單位毫秒,表示第一次嘗試重連之間等待的時間
maxReconnectDelay默認(rèn)為30000,單位毫秒,最長重連的時間間隔

Broker cluster集群配置-原理

NetworkConnector(網(wǎng)絡(luò)連接器)

網(wǎng)絡(luò)連接器主要用于配置ActiveMQ服務(wù)器與服務(wù)器之間的網(wǎng)絡(luò)通訊方式,用于服務(wù)器透傳消息
網(wǎng)絡(luò)連接器分為靜態(tài)連接器和動態(tài)連接器

靜態(tài)連接器

動態(tài)連接器

5-2 ActiveMQ集群理論

ActiveMQ Master Slace集群方案

Share nothing storage master/slave(已過時,5.8+后移除)
Shared storage master/slave 共享存儲
Replicated LevelDB Store基于負(fù)責(zé)的LevelDB Store

共享存儲集群的原理

基于復(fù)制的LevelDB Store的原理

兩種集群方式對比

三臺服務(wù)器的完美集群方案

5-3 ActiveMQ集群實踐

ActiveMQ集群配置方案

配置過程

1.節(jié)點準(zhǔn)備

mkdir activemq創(chuàng)建目錄
cp -rf apache-activemq-5.15.0 activemq/activemq-a
cp -rf apache-activemq-5.15.0 activemq/activemq-b
cp -rf apache-activemq-5.15.0 activemq/activemq-c
cd activemq
mkdir kahadb

2.配置a節(jié)點

cd activemq-a/
cd conf/
vim activemq.xml
    
              
    
vim jetty.xml:配置管理端口號,a節(jié)點使用默認(rèn)端口,無須配置

3.配置b節(jié)點

vim activemq.xml
配置網(wǎng)絡(luò)連接器
        
      
    
配置持久化存儲路徑
    
        
    
配置服務(wù)端口

vim jetty.xml
配置管理端口號

         
    
    

4.配置c節(jié)點

vim activemq.xml
配置網(wǎng)絡(luò)連接器
        
      
    
配置持久化存儲路徑
    
        
    
配置服務(wù)端口

vim jetty.xml
配置管理端口號

         
    
    

5.啟動服務(wù)
回到activemq目錄,分別啟動a,b,c三個節(jié)點

./activemq-a/bin/activemq start
./activemq-b/bin/activemq start
./activemq-c /bin/activemq start

檢查是否都啟動成功

ps -ef |grep activemq

檢查是否對外提供服務(wù),即端口是否被監(jiān)聽(占用)

netstat -anp |grep 61616
netstat -anp |grep 61617
netstat -anp |grep 61618

檢查發(fā)現(xiàn)61618即c節(jié)點沒有提供服務(wù),但是c節(jié)點的進程是啟動成功了的。因為b節(jié)點和c點擊是master/slave配置,現(xiàn)在b節(jié)點獲取到了共享文件夾的所有權(quán),所以c節(jié)點正在等待獲得資源,并且提供服務(wù)。即c節(jié)點在未獲得資源之前,是不提供服務(wù)的。

測試,把b節(jié)點殺掉,看c節(jié)點能不能提供61618的服務(wù)

./activemq-b/bin/activemq stop
netstat -anp |grep 61618
./activemq-b/bin/activemq start
netstat -anp |grep 61617

檢查發(fā)現(xiàn),重新啟動b節(jié)點后,b節(jié)點61617端口并沒有提供服務(wù),是因為現(xiàn)在b節(jié)點成為了slave節(jié)點,而c節(jié)點成為了master節(jié)點。所以,現(xiàn)在b節(jié)點啟動了,但是它并不對外提供服務(wù)。只有當(dāng)c節(jié)點出現(xiàn)問題后,b節(jié)點才對外提供服務(wù)。

6.通過代碼測試集群配置是否生效

生產(chǎn)者

package com.myimooc.jms.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * App 生產(chǎn)者-隊列模式-集群配置測試
 * @author ZhangCheng on 2017-07-25
 *
 */
public class AppProducerTest {
    /** failover 為狀態(tài)轉(zhuǎn)移的存在部分
     * 因a節(jié)點只作為消費者使用,所以這里不配置61616節(jié)點了。
     * */
    private static final String URL = "failover:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";
    /** 指定隊列的名稱 */
    private static final String QUEUE_NAME = "queue-test";
    
    public static void main(String[] args) throws JMSException {
        
        // 1.創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        // 2.創(chuàng)建Connection
        Connection connection = connectionFactory.createConnection();
        
        // 3.啟動連接
        connection.start();
        
        // 4.創(chuàng)建會話(第一個參數(shù):是否在事務(wù)中處理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 5. 創(chuàng)建一個目標(biāo)
        Destination destination = session.createQueue(QUEUE_NAME);
        
        // 6.創(chuàng)建一個生產(chǎn)者
        MessageProducer producer = session.createProducer(destination);
        
        for (int i = 0; i < 100; i++) {
            
            // 7.創(chuàng)建消息
            TextMessage textMessage = session.createTextMessage("test" + i);
            
            // 8.發(fā)布消息
            producer.send(textMessage);
            
            System.out.println("消息發(fā)送:" + textMessage.getText());
        }
        
        // 9.關(guān)閉連接
        connection.close();
    }
    
}

消費者

package com.myimooc.jms.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * App 消費者-隊列模式-集群配置測試
 * @author ZhangCheng on 2017-07-22
 *
 */
public class AppConsumerTest {
    /** failover 為狀態(tài)轉(zhuǎn)移的存在部分
     * */
    private static final String URL = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";
    /** 指定隊列的名稱 */
    private static final String QUEUE_NAME = "queue-test";
    
    public static void main(String[] args) throws JMSException {
        
        // 1.創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        // 2.創(chuàng)建Connection
        Connection connection = connectionFactory.createConnection();
        
        // 3.啟動連接
        connection.start();
        
        // 4.創(chuàng)建會話(第一個參數(shù):是否在事務(wù)中處理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 5.創(chuàng)建一個目標(biāo)
        Destination destination = session.createQueue(QUEUE_NAME);
        
        // 6.創(chuàng)建一個消費者
        MessageConsumer consumer = session.createConsumer(destination);
        
        // 7.創(chuàng)建一個監(jiān)聽器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println("接收消息:" + textMessage.getText());
                } catch (JMSException e) {
                    System.out.println("接收消息異常:");
                    e.printStackTrace();
                }
            }
        });
        
        // 8.關(guān)閉連接
        //connection.close();
    }
    
}

運行生產(chǎn)者,然后到管理界面查看消息發(fā)送到了那里

http://127.0.0.1:8161
http://127.0.0.1:8162
http://127.0.0.1:8163

查看發(fā)現(xiàn),8162無法訪問,是因為b節(jié)點是slave節(jié)點,不提供服務(wù),消息都發(fā)送到了c節(jié)點

把8163即c節(jié)點宕掉后,運行消費者,查看消息是否能夠使用

./activemq-c/bin/activemq stop
5-4 企業(yè)級系統(tǒng)中的最佳實踐

實際業(yè)務(wù)場景分析

實際業(yè)務(wù)場景特點

子業(yè)務(wù)系統(tǒng)都有集群的可能性
同一個消息會廣播給關(guān)注該類消息的所有子業(yè)務(wù)系統(tǒng)
同一類消息在集群中被負(fù)載消費
業(yè)務(wù)的發(fā)生和消息的發(fā)布最終一致性

需要解決的問題

不同業(yè)務(wù)系統(tǒng)分別處理同一個消息,同一業(yè)務(wù)系統(tǒng)負(fù)載處理同類消息
解決消息發(fā)送時的一致性問題
解決消息處理的冪等性問題
基于消息機制建立事件總線

集群系統(tǒng)處理消息方案-使用JMS級聯(lián)的解決方案

集群系統(tǒng)處理消息方案-使用ActiveMQ的虛擬主題解決方案

發(fā)布者:將消息發(fā)布到一個主題中,主題名以VirtualTopic開頭,如VirtualTopic.TEST
消費者:從隊列中獲取消息,在隊列名中表名自己身份,如Consumer.A.VirtualTopic.TEST

解決消息發(fā)送時的一致性問題-使用JMS中XA系列接口保證強一致性

引入分布式事務(wù)
要求業(yè)務(wù)操作必須支持XA協(xié)議

解決消息發(fā)送時的一致性問題-使用消息表的本地事務(wù)解決方案

解決消息發(fā)送時的一致性問題-使用內(nèi)存日志的解決方案

解決消息處理的冪等性問題

所謂冪等性問題,是指多次執(zhí)行所產(chǎn)生的影響(結(jié)果)與一次執(zhí)行所產(chǎn)生的影響(結(jié)果)一樣。比如:支付成功后,支付寶會發(fā)起多次通知給業(yè)務(wù)系統(tǒng),要求業(yè)務(wù)系統(tǒng)能夠處理這些重復(fù)的消息,但是又不重復(fù)處理訂單。如果在消息處理系統(tǒng)中保證冪等性,會增加系統(tǒng)復(fù)雜度,我們可以統(tǒng)一處理冪等性后,再將消息發(fā)送給消息處理系統(tǒng)。

解決消息處理的冪等性問題-使用消息表的本地事務(wù)解決方案

解決消息處理的冪等性問題-使用內(nèi)存日志的解決方案

基于消息機制的事件總線-什么是事件驅(qū)動架構(gòu)

事件驅(qū)動架構(gòu)(Event Driven Architecture,EDA)定義了一個設(shè)計和實現(xiàn)一個應(yīng)用系統(tǒng)的方法學(xué),在這個系統(tǒng)里事件可傳輸于松散耦合的組件和服務(wù)之間。特點:有事我叫你,沒事別煩我

事件驅(qū)動架構(gòu)模型

該教師正在開發(fā)該事件總線的框架,github地址https://github.com/jovezhao/nest。

第六章:使用其它消息中間件 6-1 使用其它消息中間件

分析需要做的事

解決各業(yè)務(wù)系統(tǒng)集群處理同一條消息
實現(xiàn)自己的消息提供者

常用消息中間件

ActiveMQ
RabbitMQ
Kafka

集成RabbitMQ

RabbitMQ:使用交換器綁定到隊列

示意圖

RabbitMQ消息提供者源碼解析

創(chuàng)建ConnectionFactory
創(chuàng)建Connection
創(chuàng)建Channel
定義Exchange
定義Queue并且綁定隊列

集成Kafka

Kafka使用group.id分組消費者

配置消息者參數(shù)group.id相對時對消息進行負(fù)載處理
配置服務(wù)器partitions參數(shù),控制同一個group.id下的consumer數(shù)量小于partitions
Kafka只保證同一個partition下的消息是有序的
第七章:課程總結(jié) 7-1 課程總結(jié)

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/67478.html

相關(guān)文章

  • 課網(wǎng)_《RabbitMQ消息間件極速入門與實戰(zhàn)》學(xué)習(xí)總結(jié)

    摘要:慕課網(wǎng)消息中間件極速入門與實戰(zhàn)學(xué)習(xí)總結(jié)時間年月日星期三說明本文部分內(nèi)容均來自慕課網(wǎng)。 慕課網(wǎng)《RabbitMQ消息中間件極速入門與實戰(zhàn)》學(xué)習(xí)總結(jié) 時間:2018年09月05日星期三 說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):https://www.imooc.com 教學(xué)源碼:無 學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:RabbitM...

    mykurisu 評論0 收藏0
  • 課網(wǎng)_Java實現(xiàn)Base64加密》學(xué)習(xí)總結(jié)

    摘要:時間年月日星期一說明本文部分內(nèi)容均來自慕課網(wǎng)。多用于網(wǎng)絡(luò)加密。散列函數(shù)函數(shù)或消息摘要函數(shù)主要作用散列函數(shù)用來驗證數(shù)據(jù)的完整性。 時間:2017年4月10日星期一說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):http://www.imooc.com教學(xué)示例源碼:https://github.com/zccodere/s...個人學(xué)習(xí)源碼:https://github.com/zccodere...

    verano 評論0 收藏0
  • 課網(wǎng)_《初識Java微信公眾號開發(fā)》學(xué)習(xí)總結(jié)

    摘要:時間年月日星期五說明本文部分內(nèi)容均來自慕課網(wǎng)。本套課程介紹微信公眾號開發(fā),主要涉及公眾號介紹編輯模式介紹開發(fā)模式介紹等。慕課網(wǎng)是垂直的互聯(lián)網(wǎng)技能免費學(xué)習(xí)網(wǎng)站。 時間:2017年08月11日星期五說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):http://www.imooc.com教學(xué)源碼:https://github.com/zccodere/s...學(xué)習(xí)源碼:https://github...

    PrototypeZ 評論0 收藏0
  • 課網(wǎng)_Java實現(xiàn)消息摘要算法加密》學(xué)習(xí)總結(jié)

    時間:2017年4月10日星期一說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):http://www.imooc.com教學(xué)示例源碼:https://github.com/zccodere/s...個人學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:概述 1-1 Java實現(xiàn)消息摘要算法加密 消息摘要算法 MD(Message Digest) SHA(Secure H...

    zengdongbao 評論0 收藏0
  • 課網(wǎng)_《Netty入門之WebSocket初體驗》學(xué)習(xí)總結(jié)

    時間:2018年04月11日星期三 說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):https://www.imooc.com 教學(xué)源碼:https://github.com/zccodere/s... 學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:課程介紹 1-1 課程介紹 什么是Netty 高性能、事件驅(qū)動、異步非阻塞的IO Java開源框架 基于NIO的客戶...

    Noodles 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<