摘要:主題模式又名發(fā)布訂閱者模式。先啟動(dòng)兩個(gè)消費(fèi)者,然后啟動(dòng)生產(chǎn)者發(fā)布條消息,這時(shí)兩個(gè)消費(fèi)者都可以消費(fèi)服務(wù)器中的每一條消息。這就是主題模式的特點(diǎn)每個(gè)訂閱者都可以消費(fèi)主題模式中的每一條消息。
主題模式 又名 發(fā)布訂閱者模式(Pub/Sub)。
一、主題模式特點(diǎn)客戶端包括發(fā)布者和訂閱者
主題中的消息被所有訂閱者消費(fèi)
消費(fèi)者不能消費(fèi)訂閱之前就發(fā)送到主題中的消息
二、創(chuàng)建過程1.創(chuàng)建連接Connection
2.創(chuàng)建會(huì)話Session
3.通過Session來創(chuàng)建其它的(MessageProducer、MessageConsumer、Destination、TextMessage)
4.將生產(chǎn)者 MessageProducer 和消費(fèi)者 MessageConsumer 都會(huì)指向目標(biāo) Destination
5.生產(chǎn)者向目標(biāo)發(fā)送TextMessage消息send()
6.消費(fèi)者設(shè)置監(jiān)聽器,監(jiān)聽消息。
相對(duì)于前一篇文章所講的隊(duì)列模式,主題模式只需要修改其中的createQueue()為createTopic();
1. 創(chuàng)建Maven項(xiàng)目2. 生產(chǎn)者 AppProducer.java4.0.0 com.jms jms-test 1.0-SNAPSHOT org.apache.activemq activemq-all 5.9.0
public class AppProducer { private static final String url = "tcp://127.0.0.1:61616"; private static final String topicName = "topic-test"; public static void main(String[] args) throws JMSException { //1.創(chuàng)建ConnectionFactory ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); //2.創(chuàng)建Connection Connection connection = connectionFactory.createConnection(); //3.啟動(dòng)連接 connection.start(); //4.創(chuàng)建會(huì)話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創(chuàng)建一個(gè)目標(biāo) Destination destination = session.createTopic(topicName); //6.創(chuàng)建一個(gè)生產(chǎn)者 MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 10; i++) { //7.創(chuàng)建消息 TextMessage textMessage = session.createTextMessage("test" + i); //8.發(fā)布消息 producer.send(textMessage); System.out.println("發(fā)送消息"+textMessage.getText()); } //9.關(guān)閉連接 connection.close(); } }3. 消費(fèi)者 AppConsumer.java
public class AppConsumer { private static final String url = "tcp://127.0.0.1:61616"; private static final String topicName = "topic-test"; public static void main(String[] args) throws JMSException { //1. 創(chuàng)建ConnectionFactory ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); //2. 創(chuàng)建Connection Connection connection = connectionFactory.createConnection(); //3. 啟動(dòng)連接 connection.start(); //4. 創(chuàng)建會(huì)話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5. 創(chuàng)建一個(gè)目標(biāo) Destination destination = session.createTopic(topicName); //6. 創(chuàng)建一個(gè)消費(fèi)者 MessageConsumer consumer = session.createConsumer(destination); //7. 創(chuàng)建一個(gè)監(jiān)聽器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { try { System.out.println("接收消息 = [" + ((TextMessage) message).getText() + "]"); } catch (JMSException e) { e.printStackTrace(); } } }); //8.關(guān)閉連接 //connection.close(); } }四、運(yùn)行查看 1. 先運(yùn)行生產(chǎn)者 AppProducer
會(huì)發(fā)現(xiàn)有10條消息被發(fā)布
運(yùn)行AppConsumer.java后會(huì)發(fā)現(xiàn)發(fā)布的10條消息并沒有被消費(fèi)者接收,因?yàn)樵?b>主題模式中: 只有提前進(jìn)行訂閱的消費(fèi)者才能成功消費(fèi)消息。而隊(duì)列模式中消費(fèi)者不需要提前訂閱也可以消費(fèi)消息。如下圖:
3.先開啟兩個(gè)消費(fèi)者,后運(yùn)行生產(chǎn)者會(huì)發(fā)現(xiàn)生產(chǎn)者發(fā)送的10個(gè)消息,兩個(gè)消費(fèi)者都全部接收。
AppConsumer1
接收消息 = [test0] 接收消息 = [test1] 接收消息 = [test2] 接收消息 = [test3] 接收消息 = [test4] 接收消息 = [test5] 接收消息 = [test6] 接收消息 = [test7] 接收消息 = [test8] 接收消息 = [test9]
AppConsumer2
接收消息 = [test0] 接收消息 = [test1] 接收消息 = [test2] 接收消息 = [test3] 接收消息 = [test4] 接收消息 = [test5] 接收消息 = [test6] 接收消息 = [test7] 接收消息 = [test8] 接收消息 = [test9]4. 小結(jié)
先啟動(dòng)生產(chǎn)者,發(fā)布10條消息,然后再啟動(dòng)消費(fèi)者,這時(shí)消費(fèi)者是不能消費(fèi)到消息的,因?yàn)橹黝}模式中: 只有提前進(jìn)行訂閱的消費(fèi)者才能成功消費(fèi)消息。而隊(duì)列模式消費(fèi)者不需要提前訂閱也可以消費(fèi)消息
先啟動(dòng)一個(gè)消費(fèi)者,然后再啟動(dòng)生產(chǎn)者發(fā)布10條消息,這時(shí)消費(fèi)者成功消費(fèi)了ActiveMQ服務(wù)器中的消息。
先啟動(dòng)兩個(gè)消費(fèi)者,然后啟動(dòng)生產(chǎn)者發(fā)布10條消息,這時(shí)兩個(gè)消費(fèi)者都可以消費(fèi)ActiveMQ服務(wù)器中的每一條消息。這就是主題模式的特點(diǎn): 每個(gè)訂閱者都可以消費(fèi)主題模式中的每一條消息。而隊(duì)列模式中,只能平均消費(fèi)消息,被別的消費(fèi)者消費(fèi)的消息不能重復(fù)被其他的消費(fèi)者消費(fèi)
五、隊(duì)列模式和主題模式的區(qū)別是否需要提前訂閱
隊(duì)列模式:消費(fèi)者不需要提前訂閱也可以消費(fèi)消息
主題模式:只有提前進(jìn)行訂閱的消費(fèi)者才能成功消費(fèi)消息
多個(gè)消費(fèi)者如何分配消息
隊(duì)列模式:只能平均消費(fèi)消息,被別的消費(fèi)者消費(fèi)的消息不能重復(fù)被其他的消費(fèi)者消費(fèi)
主題模式:每個(gè)訂閱者都可以消費(fèi)主題模式中的每一條消息
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/69197.html
摘要:時(shí)間年月日星期六說明本文部分內(nèi)容均來自慕課網(wǎng)。這個(gè)時(shí)候,可以啟動(dòng)多臺(tái)積分系統(tǒng),來同時(shí)消費(fèi)這個(gè)消息中間件里面的登錄消息,達(dá)到橫向擴(kuò)展的作用。 時(shí)間:2017年07月22日星期六說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):http://www.imooc.com教學(xué)源碼:無學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:課程介紹 1-1 課程安排 Java...
摘要:學(xué)習(xí)消息隊(duì)列的使用之前,我們先來搞清。是操作消息的接口。消息生產(chǎn)者由創(chuàng)建,并用于將消息發(fā)送到。接收消息打印結(jié)果這是接收到的消息消費(fèi)者啟動(dòng)。。。。 通過上一篇文章 《消息隊(duì)列深入解析》,我們已經(jīng)消息隊(duì)列是什么、使用消息隊(duì)列的好處以及常見消息隊(duì)列的簡(jiǎn)單介紹。 這一篇文章,主要帶大家詳細(xì)了解一下消息隊(duì)列ActiveMQ的使用。 學(xué)習(xí)消息隊(duì)列ActiveMQ的使用之前,我們先來搞清JMS。 J...
摘要:中間件的分類基于遠(yuǎn)程過程調(diào)用的中間件?;趯?duì)象請(qǐng)求代理的中間件。消息傳遞指的是程序之間通過在消息中發(fā)送數(shù)據(jù)進(jìn)行通信,而不是通過直接調(diào)用彼此來通信,直接調(diào)用通常是用于諸如遠(yuǎn)程過程調(diào)用的技術(shù)。 一.中間件 1.1 什么是中間件? 由于業(yè)務(wù)、機(jī)構(gòu)和技術(shù)是不斷變化的,因此為其服務(wù)的軟件系統(tǒng)必須適應(yīng)這樣的變化。在合并、添加服務(wù)或擴(kuò)展可用服務(wù)之后,公司可能無力負(fù)擔(dān)重新創(chuàng)建信息系統(tǒng)所需的成本。正是在...
摘要:本文主要講述消息服務(wù)在中的使用。所以需要一個(gè)監(jiān)聽容器工廠的概念,即接口,它會(huì)引用上面創(chuàng)建好的與的連接工廠,由它來負(fù)責(zé)接收消息以及將消息分發(fā)給指定的監(jiān)聽器。為了消費(fèi)消息,訂閱者必須保持運(yùn)行的狀態(tài)。 JMS 在 SpringBoot 中的使用 摘要:本文屬于原創(chuàng),歡迎轉(zhuǎn)載,轉(zhuǎn)載請(qǐng)保留出處:https://github.com/jasonGeng88/blog> 本文所有服務(wù)均采用doc...
閱讀 2012·2021-11-15 18:09
閱讀 903·2021-09-06 15:13
閱讀 2645·2021-08-23 09:43
閱讀 2026·2019-08-30 15:54
閱讀 2219·2019-08-30 13:56
閱讀 2486·2019-08-26 11:31
閱讀 3081·2019-08-26 10:56
閱讀 705·2019-08-26 10:28