摘要:為了避免與參數(shù)混淆,我們將其稱為綁定鍵。直接交換我們之前教程的日志記錄系統(tǒng)將所有消息廣播給所有消費(fèi)者。在這種設(shè)置中,使用路由鍵發(fā)布到交換機(jī)的消息將被路由到隊(duì)列。所有其他消息將被丟棄。
源碼:https://github.com/ltoddy/rabbitmq-tutorial
路由 本章節(jié)教程重點(diǎn)介紹的內(nèi)容在之前的教程中,我們構(gòu)建了一個(gè)簡(jiǎn)單的日志系統(tǒng) 我們能夠?qū)⑷罩鞠V播給許多接收者。
在本教程中,我們將添加一個(gè)功能 - 我們將只能訂閱一部分消息。例如,我們只能將重要的錯(cuò)誤消息引導(dǎo)到日志文件(以節(jié)省磁盤空間),同時(shí)仍然能夠在控制臺(tái)上打印所有日志消息。
綁定在前面的例子中,我們已經(jīng)創(chuàng)建了綁定。您可能會(huì)回想一下代碼:
channel.queue_bind(exchange=EXCHANGE_NAME, queue=queue_name)
綁定是交換和隊(duì)列之間的關(guān)系。這可以簡(jiǎn)單地理解為: the queue is interested in messages from this exchange.
綁定可以使用額外的routing_key參數(shù)。為了避免與basic_publish參數(shù)混淆,我們將其稱為綁定鍵。這就是我們?nèi)绾问褂靡粋€(gè)鍵創(chuàng)建一個(gè)綁定:
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key="black")
綁定鍵的含義取決于交換類型。我們之前使用的 fanout 交換簡(jiǎn)單地忽略了它的價(jià)值。
直接交換我們之前教程的日志記錄系統(tǒng)將所有消息廣播給所有消費(fèi)者。我們希望將其擴(kuò)展為允許根據(jù)其進(jìn)行嚴(yán)格的過濾消息。
例如,我們可能希望將嚴(yán)重錯(cuò)誤的日志消息寫入磁盤,而不會(huì)寫入警告或信息日志消息。
我們正在使用fanout交換,這不會(huì)給我們太多的靈活性 - 它只能無(wú)意識(shí)地播放。
我們將使用direct交換。direct交換背后的路由算法很簡(jiǎn)單 - 消息進(jìn)入隊(duì)列,其綁定密鑰與消息的路由密鑰完全匹配。
為了說(shuō)明這一點(diǎn),請(qǐng)考慮以下設(shè)置:
在這個(gè)設(shè)置中,我們可以看到有兩個(gè)隊(duì)列綁定的直接交換機(jī)X. 第一個(gè)隊(duì)列用綁定鍵orange綁定,第二個(gè)隊(duì)列有兩個(gè)綁定,一個(gè)綁定鍵為black,另一個(gè)為green。
在這種設(shè)置中,使用路由鍵orange發(fā)布到交換機(jī)的消息 將被路由到隊(duì)列Q1。帶有black或gree路由鍵的消息將進(jìn)入Q2。所有其他消息將被丟棄。
多個(gè)綁定使用相同的綁定密鑰綁定多個(gè)隊(duì)列是完全合法的。在我們的例子中,我們可以使用綁定鍵black添加X和Q1之間的綁定。
在這種情況下,direct交換就像fanout一樣,并將消息廣播到所有匹配的隊(duì)列。帶有路由鍵black的消息將傳送到Q1和Q2。
我們將使用這個(gè)模型用于我們的日志系統(tǒng)。取而代之的fanout,我們將消息發(fā)送到direct交換。我們將提供嚴(yán)格的日志作為路由鍵(routing key)。
這樣接收腳本將能夠選擇想要接收的消息。我們先關(guān)注發(fā)出日志的實(shí)現(xiàn)。
像往常一樣,我們需要首先創(chuàng)建一個(gè)交換:
channel.exchange_declare(exchange="direct_logs", exchange_type="direct")
我們準(zhǔn)備發(fā)送一條消息:
channel.basic_publish(exchange="direct_logs", routing_key="", body=message)
為了簡(jiǎn)化事情,我們將假設(shè)“severity”可以是"info","warning","error"之一。
訂閱接收郵件的方式與上一個(gè)教程中的一樣,只有一個(gè)例外 - 我們將為每個(gè)我們感興趣的嚴(yán)重程度創(chuàng)建一個(gè)新綁定。
result = channel.queue_declare(exclusive=True) queue_name = result.method.queue for severity in severities: channel.queue_bind(exchange="direct_logs", queue=queue_name, routing_key=severity)把它放在一起
emit_log_direct.py的代碼:
#!/usr/bin/env python import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) channel = connection.channel() channel.exchange_declare(exchange="direct_logs", exchange_type="direct") severity = sys.args[1:] if len(sys.argv) > 2 else "info" message = " ".join(sys.argv[2:]) or "Hello World!" channel.basic_publish(exchange="direct_logs", routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
receive_logs_direct.py的代碼:
#!/usr/bin/env python import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) channel = connection.channel() channel.exchange_declare(exchange="direct_logs", exchange_type="direct") result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error] " % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange="direct_logs", queue=queue_name, routing_key=severity) print(" [*] Waiting for logs. To exit press CTRL+C") def callback(cb, method, properities, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
如果只想保存"warning"和"error"(而不是"info")將消息記錄到文件中,只需打開一個(gè)控制臺(tái)并輸入:
python receive_logs_direct.py warning error > logs_from_rabbit.log
如果您希望在屏幕上看到所有日志消息,請(qǐng)打開一個(gè)新終端并執(zhí)行以下操作:
python receive_logs_direct.py info warning error
例如,要輸出error日志消息,只需輸入:
python emit_log_direct.py error "Run. Run. Or it will explode."
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/41441.html
摘要:?jiǎn)卧~可以是任何東西,但通常它們指定了與該消息相關(guān)的一些功能。消息將使用由三個(gè)字兩個(gè)點(diǎn)組成的路由鍵發(fā)送。另一方面,只會(huì)進(jìn)入第一個(gè)隊(duì)列,而只會(huì)進(jìn)入第二個(gè)隊(duì)列。不匹配任何綁定,因此將被丟棄。代碼幾乎與前一個(gè)教程中的代碼相同。 源碼:https://github.com/ltoddy/rabbitmq-tutorial Topics (using the Pika Python client)...
摘要:每當(dāng)我們收到一條消息,這個(gè)回調(diào)函數(shù)就被皮卡庫(kù)調(diào)用。接下來(lái),我們需要告訴這個(gè)特定的回調(diào)函數(shù)應(yīng)該從我們的隊(duì)列接收消息為了讓這個(gè)命令成功,我們必須確保我們想要訂閱的隊(duì)列存在。生產(chǎn)者計(jì)劃將在每次運(yùn)行后停止歡呼我們能夠通過發(fā)送我們的第一條消息。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 介紹 RabbitMQ是一個(gè)消息代理:它接受和轉(zhuǎn)發(fā)消息。你...
摘要:我們將任務(wù)封裝為消息并將其發(fā)送到隊(duì)列。為了確保消息永不丟失,支持消息確認(rèn)。沒有任何消息超時(shí)當(dāng)消費(fèi)者死亡時(shí),將重新傳遞消息。發(fā)生這種情況是因?yàn)橹辉谙⑦M(jìn)入隊(duì)列時(shí)調(diào)度消息。這告訴一次不要向工作人員發(fā)送多個(gè)消息。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 工作隊(duì)列 showImg(https://segmentfault.com/img/r...
摘要:交易所在本教程的前幾部分中,我們發(fā)送消息并從隊(duì)列中接收消息。消費(fèi)者是接收消息的用戶的應(yīng)用程序。中的消息傳遞模型的核心思想是生產(chǎn)者永遠(yuǎn)不會(huì)將任何消息直接發(fā)送到隊(duì)列中。交換和隊(duì)列之間的關(guān)系稱為綁定。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 發(fā)布 / 訂閱 (using the Pika Python client) 本章節(jié)教程重點(diǎn)介紹的...
摘要:通常用于命名回調(diào)隊(duì)列。對(duì)每個(gè)響應(yīng)執(zhí)行的回調(diào)函數(shù)做了一個(gè)非常簡(jiǎn)單的工作,對(duì)于每個(gè)響應(yīng)消息它檢查是否是我們正在尋找的。在這個(gè)方法中,首先我們生成一個(gè)唯一的數(shù)并保存回調(diào)函數(shù)將使用這個(gè)值來(lái)捕獲適當(dāng)?shù)捻憫?yīng)。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 遠(yuǎn)程過程調(diào)用(RPC) (using the Pika Python client) 本章節(jié)教程...
閱讀 2039·2023-04-25 14:50
閱讀 2918·2021-11-17 09:33
閱讀 2622·2019-08-30 13:07
閱讀 2847·2019-08-29 16:57
閱讀 915·2019-08-29 15:26
閱讀 3557·2019-08-29 13:08
閱讀 2001·2019-08-29 12:32
閱讀 3394·2019-08-26 13:57