摘要:交易所在本教程的前幾部分中,我們發(fā)送消息并從隊(duì)列中接收消息。消費(fèi)者是接收消息的用戶的應(yīng)用程序。中的消息傳遞模型的核心思想是生產(chǎn)者永遠(yuǎn)不會(huì)將任何消息直接發(fā)送到隊(duì)列中。交換和隊(duì)列之間的關(guān)系稱為綁定。
源碼:https://github.com/ltoddy/rabbitmq-tutorial
發(fā)布 / 訂閱(using the Pika Python client)
本章節(jié)教程重點(diǎn)介紹的內(nèi)容在上一篇教程中,我們創(chuàng)建了工作隊(duì)列。工作隊(duì)列背后的假設(shè)是每個(gè)任務(wù)只能傳遞給一個(gè)工作人員。
在這一部分,我們將做一些完全不同的事情 - 我們會(huì)向多個(gè)消費(fèi)者傳遞信息。這種模式被稱為“發(fā)布/訂閱”。
為了說明這種模式,我們將建立一個(gè)簡(jiǎn)單的日志系統(tǒng)。它將包含兩個(gè)程序 - 第一個(gè)將發(fā)送日志消息,第二個(gè)將接收并打印它們。
在我們的日志系統(tǒng)中,接收程序的每個(gè)運(yùn)行副本都會(huì)收到消息。這樣我們就可以運(yùn)行一個(gè)接收器并將日志指向磁盤; 同時(shí)我們將能夠運(yùn)行另一個(gè)接收器并在屏幕上查看日志。
一般來說,發(fā)布的日志消息將以廣播的形式發(fā)給所有的接收者。
交易所在本教程的前幾部分中,我們發(fā)送消息并從隊(duì)列中接收消息?,F(xiàn)在是時(shí)候在rabbitmq中引入完整的消息傳遞模型。
讓我們快速回顧一下前面教程中的內(nèi)容:
生產(chǎn)者是發(fā)送消息的用戶的應(yīng)用程序。
隊(duì)列是存儲(chǔ)消息的緩沖器。
消費(fèi)者是接收消息的用戶的應(yīng)用程序。
RabbitMQ中的消息傳遞模型的核心思想是生產(chǎn)者永遠(yuǎn)不會(huì)將任何消息直接發(fā)送到隊(duì)列中。實(shí)際上,生產(chǎn)者通常甚至不知道郵件是否會(huì)被傳送到任何隊(duì)列中。
相反,生產(chǎn)者只能發(fā)送消息給交易所。交換是一件非常簡(jiǎn)單的事情。一方面它接收來自生產(chǎn)者的消息,另一方則推動(dòng)他們排隊(duì)。
交易所必須知道如何處理收到的消息。是否應(yīng)該附加到特定隊(duì)列?它應(yīng)該附加到許多隊(duì)列中嗎?或者它應(yīng)該被丟棄。這些規(guī)則由交換類型定義 (exchange type)。
有幾種可用的交換類型: direct, topic, header 和 fanout。我們將關(guān)注最后一個(gè) - fanout。讓我們創(chuàng)建該類型的交換,并將其稱為logs:
channel.exchange_declare(exchange="logs", exchange_type="fanout")
fanout交換非常簡(jiǎn)單。正如你可能從名字中猜出的那樣,它只是將收到的所有消息廣播到它所知道的所有隊(duì)列中。這正是我們logger所需要的。
現(xiàn)在,我們可以發(fā)布到我們的指定交易所:
channel.basic_publish(exchange="logs", routing_key="", body=message)臨時(shí)隊(duì)列
正如你以前可能記得我們正在使用具有指定名稱的隊(duì)列(還記得hello和task_queue嗎?)。能夠命名隊(duì)列對(duì)我們至關(guān)重要 - 我們需要將工作人員指向同一隊(duì)列。
當(dāng)你想在生產(chǎn)者和消費(fèi)者之間分享隊(duì)列時(shí),給隊(duì)列一個(gè)名字是很重要的。
但是,我們的記錄器并非如此。我們希望聽到所有日志消息,而不僅僅是其中的一部分。我們也只對(duì)目前流動(dòng)的消息感興趣,而不是舊消息。要解決這個(gè)問題,我們需要做兩件事。
首先,每當(dāng)我們連接到rabbitmq,我們需要一個(gè)新的,空的隊(duì)列。要做到這一點(diǎn),我們可以創(chuàng)建一個(gè)隨機(jī)名稱的隊(duì)列,或者甚至更好 - 讓服務(wù)器為我們選擇一個(gè)隨機(jī)隊(duì)列名稱。
我們可以通過不將隊(duì)列參數(shù)提供給queue_declare來實(shí)現(xiàn)這一點(diǎn):
result = channel.queue_declare()
此時(shí),result.method.queue包含一個(gè)隨機(jī)隊(duì)列名稱。例如,它可能看起來像amq.gen-i94oCE_tj3LyWsy-94KXHg。
其次,一旦消費(fèi)者連接關(guān)閉,隊(duì)列應(yīng)該被刪除。這是一個(gè)專有標(biāo)志:
result = channel.queue_declare(exclusive=True)綁定
我們已經(jīng)創(chuàng)建了一個(gè)fanout交換和一個(gè)隊(duì)列?,F(xiàn)在我們需要告訴交換所將消息發(fā)送到我們的隊(duì)列。交換和隊(duì)列之間的關(guān)系稱為綁定。
channel.queue_bind(exchange="logs", queue=result.method.queue)
從現(xiàn)在起,logs 交易所會(huì)將消息附加到我們的隊(duì)列中。
把它放在一起發(fā)出日志消息的生產(chǎn)者程序與之前的教程沒有多大區(qū)別。最重要的變化是我們現(xiàn)在想發(fā)布消息到我們的logs交易所,而不是無名字的消息。發(fā)送時(shí)我們需要提供一個(gè)routing_key,但是對(duì)于fanout交換,它的值將被忽略。這里是emit_log.py腳本的代碼 :
#!/usr/bin/env python import sys import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host="localhost")) channel = connection.channel() channel.exchange_declare(exchange="logs", exchange_type="fanout") message = " ".join(sys.argv[1:]) or "info: Hello world!" channel.basic_publish(exchange="logs", routing_key="", body=message) print(" [x] Sent %r" % message) connection.close()
如你所見,建立連接后,我們宣布交易所。這一步是必要的,因?yàn)榘l(fā)布到不存在的交易所是被禁止的。
如果沒有隊(duì)列綁定到交換機(jī)上,這些消息將會(huì)丟失,但這對(duì)我們來說沒問題; 如果沒有消費(fèi)者正在收聽,我們可以放心地丟棄消息。
receive_logs.py的代碼:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) channel = connection.channel() channel.exchange_declare(exchange="logs", exchange_type="fanout") result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange="logs", queue=queue_name) print(" [*] Waiting for logs. To exit press CTRL+C") def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
我們完成了。如果您想將日志保存到文件中,只需打開一個(gè)控制臺(tái)并輸入:
python receive_logs.py > logs_from_rabbit.log
如果你想在屏幕上看到日志,打開一個(gè)新的終端并運(yùn)行:
python receive_logs.py
當(dāng)然,
python emit_log.py
使用rabbitmqctl list_bindings,你可以驗(yàn)證代碼是否真正創(chuàng)建了綁定和隊(duì)列。當(dāng)有兩個(gè)receive_logs.py程序正在運(yùn)行,你應(yīng)該看到如下所示:
root@921edcb46341:/# rabbitmqctl list_bindings Listing bindings for vhost /... exchange amq.gen-6YXn7BycIwtI7kFuUrTbaA queue amq.gen-6YXn7BycIwtI7kFuUrTbaA [] exchange amq.gen-JhFL-rbMAoricMu5Dyo-hA queue amq.gen-JhFL-rbMAoricMu5Dyo-hA [] logs exchange amq.gen-6YXn7BycIwtI7kFuUrTbaA queue amq.gen-6YXn7BycIwtI7kFuUrTbaA [] logs exchange amq.gen-JhFL-rbMAoricMu5Dyo-hA queue amq.gen-JhFL-rbMAoricMu5Dyo-hA []
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/41446.html
摘要:每當(dāng)我們收到一條消息,這個(gè)回調(diào)函數(shù)就被皮卡庫調(diào)用。接下來,我們需要告訴這個(gè)特定的回調(diào)函數(shù)應(yīng)該從我們的隊(duì)列接收消息為了讓這個(gè)命令成功,我們必須確保我們想要訂閱的隊(duì)列存在。生產(chǎn)者計(jì)劃將在每次運(yùn)行后停止歡呼我們能夠通過發(fā)送我們的第一條消息。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 介紹 RabbitMQ是一個(gè)消息代理:它接受和轉(zhuǎn)發(fā)消息。你...
摘要:?jiǎn)卧~可以是任何東西,但通常它們指定了與該消息相關(guān)的一些功能。消息將使用由三個(gè)字兩個(gè)點(diǎn)組成的路由鍵發(fā)送。另一方面,只會(huì)進(jìn)入第一個(gè)隊(duì)列,而只會(huì)進(jìn)入第二個(gè)隊(duì)列。不匹配任何綁定,因此將被丟棄。代碼幾乎與前一個(gè)教程中的代碼相同。 源碼:https://github.com/ltoddy/rabbitmq-tutorial Topics (using the Pika Python client)...
摘要:為了避免與參數(shù)混淆,我們將其稱為綁定鍵。直接交換我們之前教程的日志記錄系統(tǒng)將所有消息廣播給所有消費(fèi)者。在這種設(shè)置中,使用路由鍵發(fā)布到交換機(jī)的消息將被路由到隊(duì)列。所有其他消息將被丟棄。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 路由 本章節(jié)教程重點(diǎn)介紹的內(nèi)容 在之前的教程中,我們構(gòu)建了一個(gè)簡(jiǎn)單的日志系統(tǒng) 我們能夠?qū)⑷罩鞠V播給許多接收...
摘要:通常用于命名回調(diào)隊(duì)列。對(duì)每個(gè)響應(yīng)執(zhí)行的回調(diào)函數(shù)做了一個(gè)非常簡(jiǎn)單的工作,對(duì)于每個(gè)響應(yīng)消息它檢查是否是我們正在尋找的。在這個(gè)方法中,首先我們生成一個(gè)唯一的數(shù)并保存回調(diào)函數(shù)將使用這個(gè)值來捕獲適當(dāng)?shù)捻憫?yīng)。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 遠(yuǎn)程過程調(diào)用(RPC) (using the Pika Python client) 本章節(jié)教程...
摘要:我們將任務(wù)封裝為消息并將其發(fā)送到隊(duì)列。為了確保消息永不丟失,支持消息確認(rèn)。沒有任何消息超時(shí)當(dāng)消費(fèi)者死亡時(shí),將重新傳遞消息。發(fā)生這種情況是因?yàn)橹辉谙⑦M(jìn)入隊(duì)列時(shí)調(diào)度消息。這告訴一次不要向工作人員發(fā)送多個(gè)消息。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 工作隊(duì)列 showImg(https://segmentfault.com/img/r...
閱讀 3419·2021-09-09 11:39
閱讀 1259·2021-09-09 09:33
閱讀 1164·2019-08-30 15:43
閱讀 581·2019-08-29 14:08
閱讀 1757·2019-08-26 13:49
閱讀 2412·2019-08-26 10:09
閱讀 1576·2019-08-23 17:13
閱讀 2325·2019-08-23 12:57