成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

消息隊列ActiveMQ的使用詳解

niceforbear / 3468人閱讀

摘要:學(xué)習(xí)消息隊列的使用之前,我們先來搞清。是操作消息的接口。消息生產(chǎn)者由創(chuàng)建,并用于將消息發(fā)送到。接收消息打印結(jié)果這是接收到的消息消費者啟動。。。。

通過上一篇文章 《消息隊列深入解析》,我們已經(jīng)消息隊列是什么、使用消息隊列的好處以及常見消息隊列的簡單介紹。

這一篇文章,主要帶大家詳細(xì)了解一下消息隊列ActiveMQ的使用。

學(xué)習(xí)消息隊列ActiveMQ的使用之前,我們先來搞清JMS

JMS 1. JMS基本概念

JMS(JAVA Message Service,java消息服務(wù))是java的消息服務(wù),JMS的客戶端之間可以通過JMS服務(wù)進(jìn)行異步的消息傳輸。JMS(JAVA Message Service,java消息服務(wù))API是一個消息服務(wù)的標(biāo)準(zhǔn)或者說是規(guī)范,允許應(yīng)用程序組件基于JavaEE平臺創(chuàng)建、發(fā)送、接收和讀取消息。它使分布式通信耦合度更低,消息服務(wù)更加可靠以及異步性。

2. JMS五種不同的消息正文格式

JMS定義了五種不同的消息正文格式,以及調(diào)用的消息類型,允許你發(fā)送并接收以一些不同形式的數(shù)據(jù),提供現(xiàn)有消息格式的一些級別的兼容性。

StreamMessage -- Java原始值的數(shù)據(jù)流

MapMessage--一套名稱-值對

TextMessage--一個字符串對象

ObjectMessage--一個序列化的 Java對象

BytesMessage--一個字節(jié)的數(shù)據(jù)流

3.JMS兩種消息模型 1 .點到點(P2P)模型


使用隊列(Queue)作為消息通信載體;滿足生產(chǎn)者與消費者模式,一條消息只能被一個消費者使用,未被消費的消息在隊列中保留直到被消費或超時。比如:我們生產(chǎn)者發(fā)送100條消息的話,兩個消費者來消費一般情況下兩個消費者會按照消息發(fā)送的順序各自消費一半(也就是你一個我一個的消費。)后面我們會通過代碼演示來驗證。

2. 發(fā)布/訂閱(Pub/Sub)模型


發(fā)布訂閱模型(Pub/Sub) 使用主題(Topic)作為消息通信載體,類似于廣播模式;發(fā)布者發(fā)布一條消息,該消息通過主題傳遞給所有的訂閱者,在一條消息廣播之后才訂閱的用戶則是收不到該條消息的。

4.JMS編碼接口之間的關(guān)系

ConnectionFactory:創(chuàng)建Connection對象的工廠,針對兩種不同的jms消息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種??梢酝ㄟ^JNDI來查找ConnectionFactory對象。

Connection:Connection表示在客戶端和JMS系統(tǒng)之間建立的鏈接(對TCP/IP socket的包裝)。Connection可以產(chǎn)生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種類型:QueueConnection和TopicConnection。

Session:Session是操作消息的接口。可以通過session創(chuàng)建生產(chǎn)者、消費者、消息等。Session提供了事務(wù)的功能。當(dāng)需要使用session發(fā)送/接收多個消息時,可以將這些發(fā)送/接收動作放到一個事務(wù)中。同樣,也分QueueSession和TopicSession。

MessageProducer:消息生產(chǎn)者由Session創(chuàng)建,并用于將消息發(fā)送到Destination。同樣,消息生產(chǎn)者分兩種類型:QueueSender和TopicPublisher。可以調(diào)用消息生產(chǎn)者的方法(send或publish方法)發(fā)送消息。

MessageConsumer :消息消費者由Session創(chuàng)建,用于接收被發(fā)送到Destination的消息。兩種類型:QueueReceiver和TopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來創(chuàng)建。當(dāng)然,也可以session的creatDurableSubscriber方法來創(chuàng)建持久化的訂閱者。

Destination:Destination的意思是消息生產(chǎn)者的消息發(fā)送目標(biāo)或者說消息消費者的消息來源。對于消息生產(chǎn)者來說,它的Destination是某個隊列(Queue)或某個主題(Topic);對于消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)。

MessageListener: 消息監(jiān)聽器。如果注冊了消息監(jiān)聽器,一旦消息到達(dá),將自動調(diào)用監(jiān)聽器的onMessage方法。

參考:https://blog.csdn.net/shaobin...

消息隊列ActiveMQ 1.簡介
ActiveMQ 是Apache出品,最流行的,能力強(qiáng)勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實現(xiàn),盡管JMS規(guī)范出臺已經(jīng)是很久的事情了,但是JMS在當(dāng)今的J2EE應(yīng)用中間仍然扮演著特殊的地位。
2.簡單使用

安裝過程很簡單這里就不貼安裝過程了,可以自行g(shù)oogle.

添加Maven依賴

    
        org.apache.activemq
        activemq-all
        5.15.3
    
2.1.測試點對點模型通信

生產(chǎn)者發(fā)送消息測試方法:

    @Test
    public void testQueueProducer() throws Exception {

        // 1、創(chuàng)建一個連接工廠對象,需要指定服務(wù)的ip及端口。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616");
        // 2、使用工廠對象創(chuàng)建一個Connection對象。
        Connection connection = connectionFactory.createConnection();
        // 3、開啟連接,調(diào)用Connection對象的start方法。
        connection.start();
        // 4、創(chuàng)建一個Session對象。
        // 第一個參數(shù):是否開啟事務(wù)。如果true開啟事務(wù),第二個參數(shù)無意義。一般不開啟事務(wù)false。
        // 第二個參數(shù):應(yīng)答模式。自動應(yīng)答或者手動應(yīng)答。一般自動應(yīng)答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、使用Session對象創(chuàng)建一個Destination對象。兩種形式queue、topic,現(xiàn)在應(yīng)該使用queue
        Queue queue = session.createQueue("test-queue");
        // 6、使用Session對象創(chuàng)建一個Producer對象。
        MessageProducer producer = session.createProducer(queue);
        // 7、創(chuàng)建一個Message對象,可以使用TextMessage。
        for (int i = 0; i < 50; i++) {
            TextMessage textMessage = session.createTextMessage("第"+i+ "一個ActiveMQ隊列目的地的消息");
            // 8、發(fā)送消息
            producer.send(textMessage);
        }

        // 9、關(guān)閉資源
        producer.close();
        session.close();
        connection.close();
    }

消費者消費消息測試方法

    @Test
    public void testQueueConsumer() throws Exception {
        // 創(chuàng)建一個ConnectionFactory對象連接MQ服務(wù)器
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616");
        // 創(chuàng)建一個連接對象
        Connection connection = connectionFactory.createConnection();
        // 開啟連接
        connection.start();
        // 使用Connection對象創(chuàng)建一個Session對象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 創(chuàng)建一個Destination對象。queue對象
        Queue queue = session.createQueue("test-queue");
        // 使用Session對象創(chuàng)建一個消費者對象。
        MessageConsumer consumer = session.createConsumer(queue);
        // 接收消息
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                // 打印結(jié)果
                TextMessage textMessage = (TextMessage) message;
                String text;
                try {
                    text = textMessage.getText();
                    System.out.println("這是接收到的消息:" + text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }

            }
        });
        // 等待接收消息
        System.in.read();
        // 關(guān)閉資源
        consumer.close();
        session.close();
        connection.close();
    }

我們開啟兩個消費者進(jìn)程來監(jiān)聽(運行兩次testQueueConsumer()方法)。

然后我們運行運行生產(chǎn)者測試方法發(fā)送消息.先發(fā)送消息還是先監(jiān)聽消息一般不會不影響。

效果如下:

兩個消費者各自消費一半消息,而且還是按照消息發(fā)送到消息隊列的順序,這也驗證了我們上面的說法。
第一個消費者

第二個消費者

2.2.測試發(fā)布/訂閱(Pub/Sub)模型通信

生產(chǎn)者發(fā)送消息測試方法:

    @Test
    public void testTopicProducer() throws Exception {
        // 1、創(chuàng)建一個連接工廠對象,需要指定服務(wù)的ip及端口。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616");
        // 2、使用工廠對象創(chuàng)建一個Connection對象。
        Connection connection = connectionFactory.createConnection();
        // 3、開啟連接,調(diào)用Connection對象的start方法。
        connection.start();
        // 4、創(chuàng)建一個Session對象。
        // 第一個參數(shù):是否開啟事務(wù)。如果true開啟事務(wù),第二個參數(shù)無意義。一般不開啟事務(wù)false。
        // 第二個參數(shù):應(yīng)答模式。自動應(yīng)答或者手動應(yīng)答。一般自動應(yīng)答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、使用Session對象創(chuàng)建一個Destination對象。兩種形式queue、topic,現(xiàn)在應(yīng)該使用topic
        Topic topic = session.createTopic("test-topic");
        // 6、使用Session對象創(chuàng)建一個Producer對象。
        MessageProducer producer = session.createProducer(topic);
        // 7、創(chuàng)建一個Message對象,可以使用TextMessage。
        for (int i = 0; i < 50; i++) {
            TextMessage textMessage = session.createTextMessage("第"+i+ "一個ActiveMQ隊列目的地的消息");
            // 8、發(fā)送消息
            producer.send(textMessage);
        }
        // 9、關(guān)閉資源
        producer.close();
        session.close();
        connection.close();
    }

消費者消費消息測試方法:

    @Test
    public void testTopicConsumer() throws Exception {
        // 創(chuàng)建一個ConnectionFactory對象連接MQ服務(wù)器
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616");
        // 創(chuàng)建一個連接對象
        Connection connection = connectionFactory.createConnection();
        // 開啟連接
        connection.start();
        // 使用Connection對象創(chuàng)建一個Session對象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 創(chuàng)建一個Destination對象。topic對象
        Topic topic = session.createTopic("test-topic");
        // 使用Session對象創(chuàng)建一個消費者對象。
        MessageConsumer consumer = session.createConsumer(topic);
        // 接收消息
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                // 打印結(jié)果
                TextMessage textMessage = (TextMessage) message;
                String text;
                try {
                    text = textMessage.getText();
                    System.out.println("這是接收到的消息:" + text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }

            }
        });
        System.out.println("topic消費者啟動。。。。");
        // 等待接收消息
        System.in.read();
        // 關(guān)閉資源
        consumer.close();
        session.close();
        connection.close();
    }

先運行兩個消費者進(jìn)程(提前訂閱,不然收不到發(fā)送的消息),然后運行生產(chǎn)者測試方法發(fā)送消息。

結(jié)果是:
兩個消費者進(jìn)程都可以接收到生產(chǎn)者發(fā)送過來的所有消息,我這里就不貼圖片了,
這樣驗證了我們上面的說法。

我們從上面代碼就可以看出,點對點通信和發(fā)布訂閱通信模式的區(qū)別就是創(chuàng)建生產(chǎn)者和消費者對象時提供的Destination對象不同,如果是點對點通信創(chuàng)建的Destination對象是Queue,發(fā)布訂閱通信模式通信則是Topic。

3.整合Spring使用

整合spring除了我們上面依賴的Jar包還要依賴

     
        org.springframework
        spring-jms
        4.2.7.RELEASE
    
    
        org.springframework
        spring-context-support
        4.2.7.RELEASE
     

比如我們在我們的系統(tǒng)中現(xiàn)在有兩個服務(wù),第一個服務(wù)發(fā)送消息,第二個服務(wù)接收消息,我們下面看看這是如何實現(xiàn)的。

發(fā)送消息

發(fā)送消息的配置文件:




    
    
        
    
    
    
        
        
    
    
    
    
        
        
    
    
    
        
            spring-queue
        
    
    
    
        
    

發(fā)送消息的測試方法:

@Test
    public void testSpringActiveMq() throws Exception {
        //初始化spring容器
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
        //從spring容器中獲得JmsTemplate對象
        JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
        //從spring容器中取Destination對象
        Destination destination = (Destination) applicationContext.getBean("queueDestination");
        //使用JmsTemplate對象發(fā)送消息。
        jmsTemplate.send(destination, new MessageCreator() {
            
            @Override
            public Message createMessage(Session session) throws JMSException {
                //創(chuàng)建一個消息對象并返回
                TextMessage textMessage = session.createTextMessage("spring activemq queue message");
                return textMessage;
            }
        });
    }

我們上面直接ApplicationContext的getBean方法獲取的對象,實際在項目使用依賴注入即可。

接收消息

創(chuàng)建一個MessageListener的實現(xiàn)類。

public class MyMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        
        try {
            TextMessage textMessage = (TextMessage) message;
            //取消息內(nèi)容
            String text = textMessage.getText();
            System.out.println(text);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

接收消息的配置文件:




    
    
        
    
    
    
        
        
    
    
    
        
            spring-queue
        
    
    
    
        
    
    
    
    
    
    
        
        
        
    

測試接收消息的代碼

@Test
    public void testQueueConsumer() throws Exception {
        //初始化spring容器
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
        //等待
        System.in.read();
    }

歡迎關(guān)注我的微信公眾號:"Java面試通關(guān)手冊"(一個有溫度的微信公眾號,期待與你共同進(jìn)步~~~堅持原創(chuàng),分享美文,分享各種Java學(xué)習(xí)資源):

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/69198.html

相關(guān)文章

  • 后端必備——數(shù)據(jù)通信知識(RPC、消息隊列)一站式總結(jié)

    摘要:具體可以參考消息隊列之具體可以參考實戰(zhàn)之快速入門十分鐘入門阿里中間件團(tuán)隊博客是一個分布式的可分區(qū)的可復(fù)制的基于發(fā)布訂閱的消息系統(tǒng)主要用于大數(shù)據(jù)領(lǐng)域當(dāng)然在分布式系統(tǒng)中也有應(yīng)用。目前市面上流行的消息隊列就是阿里借鑒的原理用開發(fā)而得。 我自己總結(jié)的Java學(xué)習(xí)的系統(tǒng)知識點以及面試問題,目前已經(jīng)開源,會一直完善下去,歡迎建議和指導(dǎo)歡迎Star: https://github.com/Snail...

    Kahn 評論0 收藏0
  • 【備戰(zhàn)春招/秋招系列】美團(tuán)Java面經(jīng)總結(jié)進(jìn)階篇 (附詳解答案)

    摘要:我在前面的文章中也提到了應(yīng)該怎么做自我介紹與項目介紹,詳情可以查看這篇文章備戰(zhàn)春招秋招系列初出茅廬的程序員該如何準(zhǔn)備面試。因此基于事件消息對象驅(qū)動的業(yè)務(wù)架構(gòu)可以是一系列流程。 showImg(https://user-gold-cdn.xitu.io/2018/11/14/16711ac29c2ae52c?w=928&h=531&f=png&s=798562); 一 消息隊列MQ的...

    chengjianhua 評論0 收藏0
  • 慕課網(wǎng)_《Java消息中間件》學(xué)習(xí)總結(jié)

    摘要:時間年月日星期六說明本文部分內(nèi)容均來自慕課網(wǎng)。這個時候,可以啟動多臺積分系統(tǒng),來同時消費這個消息中間件里面的登錄消息,達(dá)到橫向擴(kuò)展的作用。 時間:2017年07月22日星期六說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):http://www.imooc.com教學(xué)源碼:無學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:課程介紹 1-1 課程安排 Java...

    twohappy 評論0 收藏0
  • 深入理解工廠模式

    摘要:工廠模式的分類簡單工廠模式,又稱靜態(tài)工廠方法模式。工廠方法模式,又稱多態(tài)性工廠模式或虛擬構(gòu)造子模式抽象工廠模式,又稱工具箱或模式。具體產(chǎn)品角色抽象工廠模式所創(chuàng)建的任何產(chǎn)品對象都是某一個具體產(chǎn)品類的實例。 Java面試通關(guān)手冊(Java學(xué)習(xí)指南,歡迎Star,會一直完善下去,歡迎建議和指導(dǎo)):https://github.com/Snailclimb/Java_Guide 歷史回顧: 深...

    zhou_you 評論0 收藏0
  • ActiveMQ 嵌入Tomcat

    摘要:嵌入在一些項目中,單獨開啟一個,對于項目實施來說有時略顯繁瑣。待啟動后,選擇所在的進(jìn)程。連接后選擇頁簽紅框的地方分別為已消費和已進(jìn)入中的消息的條數(shù)。 ActiveMQ 嵌入Tomcat 在一些項目中,單獨開啟一個ActiveMQ,對于項目實施來說有時略顯繁瑣。所以我們將ActiveMQ內(nèi)嵌到Tomcat,Tomcat啟動同時就順帶啟動了ActiveMQ。由此我們需要掌握三個個重要的知識...

    curlyCheng 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<