摘要:近期對(duì)消息隊(duì)列比較感興趣因此特意看了一下相關(guān)的知識(shí)不過(guò)在學(xué)時(shí)對(duì)的消息模型總是理解的不透徹于是在官網(wǎng)上找了一篇介紹消息模型的文章詳細(xì)地看了一下還是要感嘆一下啊官網(wǎng)的文章果然是最權(quán)威的看了以后有了不小的收獲下面是我學(xué)習(xí)消息模型時(shí)的記錄其內(nèi)容大部
近期對(duì)消息隊(duì)列比較感興趣, 因此特意看了一下 RabbitMQ 相關(guān)的知識(shí), 不過(guò)在學(xué) RabbitMQ 時(shí), 對(duì) AMQP 的消息模型總是理解的不透徹, 于是在官網(wǎng)上找了一篇介紹 AMQP 消息模型的文章, 詳細(xì)地看了一下.
還是要感嘆一下啊, 官網(wǎng)的文章果然是最權(quán)威的, 看了以后有了不小的收獲.
下面是我學(xué)習(xí) AMQP 消息模型時(shí)的記錄, 其內(nèi)容大部分是翻譯自官網(wǎng), 部分添加了自己的理解.
https://www.rabbitmq.com/tuto...
AMQP 消息模型簡(jiǎn)介AMQP 的消息模型如下圖所示:
通過(guò)此圖我們可以知道, 一個(gè)消息的發(fā)送流程有如下幾個(gè)步驟:
消息生產(chǎn)者將消息發(fā)布(Public)到 Exchange 中.
Exchange 根據(jù)隊(duì)列的綁定關(guān)系將消息分發(fā)到不同的 Queue 中.
AMQP broker 根據(jù)訂閱規(guī)則將消息發(fā)送給消費(fèi)者 或 消費(fèi)者自行根據(jù)需要從消息隊(duì)列中獲取消息.
Exchange 和 Exchange 類(lèi)型Exchange 的主要任務(wù)是接收消息并將消息路由到0個(gè)或多個(gè) Queue 中, 而路由的算法受 Exchange 類(lèi)型和綁定(binding) 關(guān)系的影響. AMQP 0-9-1 broker 提供如下四個(gè) exchange 類(lèi)型:
類(lèi)型 | 默認(rèn)預(yù)定義的名字 |
---|---|
Direct Exchange | 空字符串和 amq.direct |
Fanout Exchange | amq.fanout |
Topic Exchange | amq.topic |
Headers Exchange | amq.match(在 RabbitMQ 中, 額外提供amq.headers) |
每個(gè) Exchange 都有如下幾個(gè)屬性:
Name, Exchange 的 名字
Durability, 是否是持久的 Exchange, 當(dāng)為真時(shí), broker 重啟后也會(huì)保留此 Exchange
Auto-delete, 當(dāng)為真時(shí), 如果所有綁定的的 Queue 都不再使用時(shí), 此 Exchange 會(huì)自動(dòng)刪除
關(guān)于默認(rèn) Exchange默認(rèn)的 exchange 是一個(gè)由 broker 預(yù)創(chuàng)建的匿名的(即名字是空字符串) direct exchagne. 對(duì)于簡(jiǎn)單的程序來(lái)說(shuō), 默認(rèn)的 exchange 有一個(gè)實(shí)用的屬性: 如果沒(méi)有顯示地綁定 Exchnge, 那么創(chuàng)建的每個(gè) queue 都會(huì)自動(dòng)綁定到這個(gè)默認(rèn)的 exchagne 中, 并且此時(shí)這個(gè) queue 的 route key 就是這個(gè)queue 的名字.
例如當(dāng)我們聲明了一個(gè)名為 "search-indexing-online" 的 queue, 那么 AMQP broker 會(huì)以 "search-indexing-online" 作為 route key 將此 queue 綁定到默認(rèn)的 exchange 中. 因此當(dāng)一個(gè)消息以 route key 為 "search-indexing-online" 投遞到默認(rèn)的 exchange 中時(shí), 此消息就會(huì)被路由到這個(gè) queue 中去. 換句話說(shuō), 由于有默認(rèn)的 exchagne 的存在, 我們就好像可以直接將消息投遞到指定的 queue 中去而不需要經(jīng)過(guò) exchange 一樣.
例如:
Send:
public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent "" + message + """); channel.close(); connection.close(); } }
Recv:
public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + message + """); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
在這個(gè)例子中, 我們并沒(méi)有定義 exchange, 也沒(méi)有顯示地將 queue 綁定到 exchange 中, 因此 queue "hello" 就自動(dòng)綁定到默認(rèn)的 exchange 中了, 并且在默認(rèn)的 exchange 中, 其 route key 和 queue 名一致, 即 "hello".
由于這個(gè)原因, 我們就可以使用:
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
來(lái)發(fā)送消息. 調(diào)用 channel.basicPublish 時(shí), 第一個(gè)參數(shù)是 exchange 名, 為空就是默認(rèn)的 exchange, 第二個(gè)參數(shù)是 route key, 和 queue 名相同.
direct exchangedirect exchange 可以使用如下圖表示:
direct exchange 根據(jù)消息的 route key 來(lái)將消息分發(fā)到不同的 queue 中. direct exchange 適合用于消息的單播發(fā)送. direct exchange 的工作流程如下:
一個(gè) queue 使用 K 作為 route key 綁定到 direct exchange 中.
當(dāng)direct exchange 收到一個(gè) route key 為 R 的消息時(shí), 如果 R == K, 則此 exchange 會(huì)將此消息路由到此 queue 中.
direct exchange 經(jīng)常用于在多個(gè) worker 中分配任務(wù)(即一個(gè) Master 和多個(gè)相同的 Slave). 當(dāng)使用這個(gè)模型時(shí), 需要注意的是:
When doing so, it is important to understand that, in AMQP 0-9-1, messages are load balanced between consumers and not between queues.
即 AMQP 0-9-1 的負(fù)載均衡是以consumer為單位的, 而不是以 queue 為單位.
fanout exchange一個(gè) fanout exchange 會(huì)將消息分發(fā)給所有綁定到此 exchange 的 queue 中, 而不會(huì)考慮 queue 的 route key. 即如果有 N 個(gè) Queue 綁定到一個(gè) fanout exchange 時(shí), 那么當(dāng)此 exchange 收到消息時(shí), 會(huì)將此消息分發(fā)到這 N 個(gè) queue 中. 由于此性質(zhì), fanout exchange 也常用消息的廣播(broadcast).
fanout 可以使用下圖表示:
topic exchange 會(huì)根據(jù) route key 將消息分發(fā)到與此消息的 route key 相匹配的并且綁定到此 exchagne 中的 queue 中(如果有多個(gè) queue 使用了相同的 route key 綁定到此 exchange, 那么這些 queue 都會(huì)收到消息). 根據(jù)此性質(zhì), topic exchange 經(jīng)常用于實(shí)現(xiàn) publish/subscribe 模型, 即消息的多播模型.
header exchangeheader exchange 不使用 route key 作為路由的依據(jù), 而是使用消息頭屬性來(lái)路由消息.
QueueAMQP 中的 隊(duì)列 的概念和其他消息隊(duì)列中 隊(duì)列 的概念類(lèi)似, 它有如下幾個(gè)重要的概念:
Name, 名字
Durable, 是否是持久的. 當(dāng)為真時(shí), 即使 broker 重啟時(shí), 此 queue 也不會(huì)被刪除.
Exclusive, 是否是獨(dú)占的, 當(dāng)為真時(shí), 表示此 queue 只能有一個(gè)消費(fèi)者, 并且當(dāng)此消費(fèi)者的連接斷開(kāi)時(shí), 此 queue 會(huì)被刪除.
Auto-delete, 當(dāng)為真時(shí), 此 隊(duì)列 會(huì)在最后一個(gè)消費(fèi)者取消訂閱時(shí)被刪除.
在使用一個(gè) 隊(duì)列 時(shí), 需要先進(jìn)行聲明. 如果我們聲明的隊(duì)列不存在, 那么 broker 就會(huì)自動(dòng)創(chuàng)建它. 不過(guò)如果此隊(duì)列已經(jīng)存在時(shí), 我們就需要注意了, 若我們聲明的隊(duì)列的屬性和已存在的隊(duì)列的屬性一致, 則不會(huì)有任何的問(wèn)題, 但是如果先后兩次聲明的隊(duì)列的屬性不一致, 則會(huì)有 PRECONDITION_FAILED 錯(cuò)誤(錯(cuò)誤碼為406).
關(guān)于隊(duì)列名AMQP 的隊(duì)列名不能以 "amq." 開(kāi)頭, 因?yàn)檫@樣的隊(duì)列名是 AMQP broker 內(nèi)部所使用的. 當(dāng)我們使用了這樣的隊(duì)列名時(shí), 那么會(huì)有一個(gè) ACCESS_REFUSED 錯(cuò)誤(錯(cuò)誤碼為 403)
關(guān)于持久隊(duì)列持久隊(duì)列會(huì)被持久化到磁盤(pán)中, 因此即使 broker 重啟了, 持久隊(duì)列也依然存在.
不過(guò)需要注意的是, 不要將持久隊(duì)列和消息的持久化混淆. 當(dāng) broker 重啟時(shí), 持久隊(duì)列會(huì)自動(dòng)重新聲明, 然而只有隊(duì)列中的持久化消息(persistent message)才會(huì)被恢復(fù).
隊(duì)列的綁定關(guān)系是 exchagne 用于消息路由的規(guī)則, 即一個(gè) exchange 能夠?qū)⑾⒙酚傻侥硞€(gè)隊(duì)列的前提是此隊(duì)列已經(jīng)綁定到這個(gè) exchange 中了. 當(dāng)隊(duì)列綁定到一個(gè) exchange 中時(shí), 我們還可以設(shè)置一個(gè)額外的參數(shù), 即 route key, 這個(gè) key 會(huì)被 direct exchange 和 topic exchange 作為額外的路由信息而使用, 換句話說(shuō), route key 扮演著過(guò)濾器的角色.
當(dāng)一個(gè)消息沒(méi)有被路由到任意的隊(duì)列時(shí)(例如此 exchange 沒(méi)有任何的 queue 綁定著), 那么此時(shí)會(huì)根據(jù)消息的屬性來(lái)決定是將此消息丟棄還是返回給生產(chǎn)者.
AMQP 0-9-1 支持兩種消息分發(fā)模式:
push 模式, 即 broker 主動(dòng)推送消息給消費(fèi)者
pull 模式, 即消費(fèi)者主動(dòng)從 broker 中拉取消息.
在 push 模式時(shí), 應(yīng)用程序需要告知 broker 它對(duì)哪些消息感興趣, 即也就是我們所說(shuō)的訂閱一個(gè)消息主題. 每個(gè)消費(fèi)者都有一個(gè)惟一的標(biāo)識(shí)符, 即consumer tag, 我們可以用這個(gè) tag 來(lái)取消一個(gè)消費(fèi)者對(duì)某個(gè)主題的訂閱(unsubscribe).
消息的 ACKAMQP 0-9-1 有兩種消息 ACK 模式:
自動(dòng) ACK 模式
手動(dòng) ACK 模式
在自動(dòng) ACK 模式下, 當(dāng) broker 發(fā)送消息成功后, 會(huì)立即將此消息從消息隊(duì)列中刪除, 而不會(huì)等待消費(fèi)者的 ACK 回復(fù). 而在手動(dòng) ACK 模式下, 當(dāng) broker 發(fā)送消息給消費(fèi)者時(shí), 不會(huì)立即將此消息刪除, 而是需要等待消費(fèi)者的 ACK 回復(fù)后才會(huì)刪除消息. 因此在手動(dòng) ACK 模式下, 當(dāng)消費(fèi)者收到消息并處理完成后, 需要向 broker 顯示地發(fā)送 ACK 指令.
在手動(dòng) ACK 模式下, 如果消費(fèi)者因?yàn)橐馔獾?crash 而沒(méi)有發(fā)送 ACK 給 broker, 那么此時(shí) broker 會(huì)將此消息轉(zhuǎn)發(fā)給其他的消費(fèi)者(如果此時(shí)沒(méi)有消費(fèi)者了, 那么 broker 會(huì)緩存此消息, 直到有新的消費(fèi)者注冊(cè)).
當(dāng)一個(gè)消費(fèi)者處理消息失敗或此時(shí)不能處理消息時(shí), 那么可以給 broker 發(fā)送一個(gè)拒絕消息的指令, 并且可以要求 broker 丟棄或重新分發(fā)此消息.
不過(guò)需要注意的是, 如果此時(shí)只有一個(gè)消費(fèi)者, 那么當(dāng)此消費(fèi)者拒收消息并要求 broker 重新分發(fā)此消息時(shí), 那么就會(huì)造成了此消息不斷地分發(fā)和拒收, 形成了死循環(huán).
通過(guò)預(yù)取消息機(jī)制, 消費(fèi)者可以一次性批量取出消息, 然后在處理后對(duì)這些批量消息進(jìn)行統(tǒng)一的 ACK 回復(fù), 這樣可以提高消息的吞吐量.
不過(guò), 需要注意的時(shí), RabbitMQ 僅支持 channel 級(jí)別的預(yù)取消息的數(shù)量配置, 不支持基于連接的預(yù)取消息數(shù)量配置.
AMQP 的連接是長(zhǎng)連接, 它是一個(gè)使用 TCP 作為可靠傳輸?shù)膽?yīng)用層協(xié)議.
通道(Channel)AMQP 不推薦一個(gè)應(yīng)用程序發(fā)起多個(gè)對(duì) broker 的連接, 因?yàn)檫@樣會(huì)消耗系統(tǒng)資源并且也不利于防火墻的配置. 但是如果應(yīng)用程序確實(shí)需要有多個(gè)不互相干擾的連接來(lái)進(jìn)行不同的操作時(shí)該怎么辦呢? 為了解決這個(gè)問(wèn)題, AMQP 引入了 Channel 的 概念. 在 AMQP 0-9-1 中, 一個(gè)與 broker 的連接是被多個(gè) Channel 復(fù)用的, 因此我們可以將 channel 理解為: 一個(gè)共享同一個(gè) TCP 連接的輕量級(jí)的連接.
基于同一個(gè) TCP 連接的兩個(gè)不同的 channel 直接是不會(huì)有任何的干擾的(在邏輯上可以等效地理解為兩個(gè)獨(dú)立的連接), 因此客戶端和 broker 之間交互時(shí), 需要附帶上 channel id.
通常來(lái)說(shuō), 在一個(gè)多線程消費(fèi)消息的模型中, 每個(gè)線程多帶帶打開(kāi)一個(gè) channel 是一個(gè)推薦的做法, 而最好不要在各個(gè)線程中共享一個(gè) channel.
為了在一個(gè) broker 中實(shí)現(xiàn)不同的相互隔離的環(huán)境(例如每個(gè)環(huán)境中有不同的用戶, 不同的 exchange, 不同的隊(duì)列等), AMQP 引入了一個(gè)叫做 virtual host(vhost) 的概念. 在連接 broker 時(shí), 客戶端可以指定需要使用哪個(gè) vhost.
本文由 yongshun 發(fā)表于個(gè)人博客, 采用 署名-相同方式共享 3.0 中國(guó)大陸許可協(xié)議.
Email: [email protected]
本文標(biāo)題為: RabbitMQ AMQP 消息模型攻略
本文鏈接為: https://segmentfault.com/a/1190000007123977
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/66128.html
摘要:后續(xù)介紹交換機(jī),生產(chǎn)者直接將消息投遞到中。消息,服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù),由和組成。也稱(chēng)為消息隊(duì)列,保存消息并將它們轉(zhuǎn)發(fā)給消費(fèi)者。主要是應(yīng)為和有一個(gè)綁定的關(guān)系。 showImg(https://img-blog.csdnimg.cn/20190509221741422.gif); showImg(https://img-blog.csdnimg.cn/20190731191914...
摘要:通過(guò)以上分析我們可以得出消息隊(duì)列具有很好的削峰作用的功能即通過(guò)異步處理,將短時(shí)間高并發(fā)產(chǎn)生的事務(wù)消息存儲(chǔ)在消息隊(duì)列中,從而削平高峰期的并發(fā)事務(wù)。 該文已加入開(kāi)源項(xiàng)目:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識(shí)的文檔類(lèi)項(xiàng)目,Star 數(shù)接近 16k)。地址:https://github.com/Snailclimb... 本文內(nèi)容思維導(dǎo)圖:showImg(ht...
摘要:需要特別明確的概念交換機(jī)的持久化,并不等于消息的持久化。消息的處理,是有兩種方式,一次性。在上述示例中,使用的,意味著接收全部的消息。注意與是兩個(gè)不同的隊(duì)列。后端處理,可以針對(duì)每一個(gè)啟動(dòng)一個(gè)或多個(gè),以提高消息處理的實(shí)時(shí)性。 RabbitMQ與PHP(一) 項(xiàng)目中使用RabbitMQ作為隊(duì)列處理用戶消息通知,消息由前端PHP代碼產(chǎn)生,處理消息使用Python,這就導(dǎo)致代碼一致性問(wèn)題,調(diào)...
摘要:慕課網(wǎng)消息中間件極速入門(mén)與實(shí)戰(zhàn)學(xué)習(xí)總結(jié)時(shí)間年月日星期三說(shuō)明本文部分內(nèi)容均來(lái)自慕課網(wǎng)。 慕課網(wǎng)《RabbitMQ消息中間件極速入門(mén)與實(shí)戰(zhàn)》學(xué)習(xí)總結(jié) 時(shí)間:2018年09月05日星期三 說(shuō)明:本文部分內(nèi)容均來(lái)自慕課網(wǎng)。@慕課網(wǎng):https://www.imooc.com 教學(xué)源碼:無(wú) 學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:RabbitM...
摘要:消息持久化控制的屬性就是消息的持久化。當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為時(shí),兩個(gè)消費(fèi)者都會(huì)收到消息并處理當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為時(shí),只有消費(fèi)者可以接收到消息。八的消息確認(rèn)機(jī)制在中,可以通過(guò)持久化數(shù)據(jù)解決服務(wù)器異常的數(shù)據(jù)丟失問(wèn)題。 一、內(nèi)容大綱&使用場(chǎng)景 1. 消息隊(duì)列解決了什么問(wèn)題? 異步處理 應(yīng)用解耦 流量削鋒 日志處理 ...... 2. rabbitMQ安裝與配置 3. Java操...
閱讀 1628·2021-11-22 14:45
閱讀 1085·2021-11-17 09:33
閱讀 3331·2021-09-02 09:48
閱讀 978·2019-08-30 15:54
閱讀 2775·2019-08-30 15:53
閱讀 2564·2019-08-30 12:54
閱讀 2251·2019-08-29 12:37
閱讀 2430·2019-08-26 13:58