成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

rabbitmq中文教程python版 - 發(fā)布 / 訂閱

alphahans / 973人閱讀

摘要:交易所在本教程的前幾部分中,我們發(fā)送消息并從隊(duì)列中接收消息。消費(fèi)者是接收消息的用戶的應(yīng)用程序。中的消息傳遞模型的核心思想是生產(chǎn)者永遠(yuǎn)不會(huì)將任何消息直接發(fā)送到隊(duì)列中。交換和隊(duì)列之間的關(guān)系稱為綁定。

源碼:https://github.com/ltoddy/rabbitmq-tutorial

發(fā)布 / 訂閱

(using the Pika Python client)

本章節(jié)教程重點(diǎn)介紹的內(nèi)容

在上一篇教程中,我們創(chuàng)建了工作隊(duì)列。工作隊(duì)列背后的假設(shè)是每個(gè)任務(wù)只能傳遞給一個(gè)工作人員。
在這一部分,我們將做一些完全不同的事情 - 我們會(huì)向多個(gè)消費(fèi)者傳遞信息。這種模式被稱為“發(fā)布/訂閱”。

為了說明這種模式,我們將建立一個(gè)簡(jiǎn)單的日志系統(tǒng)。它將包含兩個(gè)程序 - 第一個(gè)將發(fā)送日志消息,第二個(gè)將接收并打印它們。

在我們的日志系統(tǒng)中,接收程序的每個(gè)運(yùn)行副本都會(huì)收到消息。這樣我們就可以運(yùn)行一個(gè)接收器并將日志指向磁盤; 同時(shí)我們將能夠運(yùn)行另一個(gè)接收器并在屏幕上查看日志。

一般來說,發(fā)布的日志消息將以廣播的形式發(fā)給所有的接收者。

交易所

在本教程的前幾部分中,我們發(fā)送消息并從隊(duì)列中接收消息?,F(xiàn)在是時(shí)候在rabbitmq中引入完整的消息傳遞模型。

讓我們快速回顧一下前面教程中的內(nèi)容:

生產(chǎn)者是發(fā)送消息的用戶的應(yīng)用程序。

隊(duì)列是存儲(chǔ)消息的緩沖器。

消費(fèi)者是接收消息的用戶的應(yīng)用程序。

RabbitMQ中的消息傳遞模型的核心思想是生產(chǎn)者永遠(yuǎn)不會(huì)將任何消息直接發(fā)送到隊(duì)列中。實(shí)際上,生產(chǎn)者通常甚至不知道郵件是否會(huì)被傳送到任何隊(duì)列中。

相反,生產(chǎn)者只能發(fā)送消息給交易所。交換是一件非常簡(jiǎn)單的事情。一方面它接收來自生產(chǎn)者的消息,另一方則推動(dòng)他們排隊(duì)。
交易所必須知道如何處理收到的消息。是否應(yīng)該附加到特定隊(duì)列?它應(yīng)該附加到許多隊(duì)列中嗎?或者它應(yīng)該被丟棄。這些規(guī)則由交換類型定義 (exchange type)。

有幾種可用的交換類型: direct, topic, header 和 fanout。我們將關(guān)注最后一個(gè) - fanout。讓我們創(chuàng)建該類型的交換,并將其稱為logs:

channel.exchange_declare(exchange="logs",
                         exchange_type="fanout")

fanout交換非常簡(jiǎn)單。正如你可能從名字中猜出的那樣,它只是將收到的所有消息廣播到它所知道的所有隊(duì)列中。這正是我們logger所需要的。

現(xiàn)在,我們可以發(fā)布到我們的指定交易所:

channel.basic_publish(exchange="logs",
                      routing_key="",
                      body=message)
臨時(shí)隊(duì)列

正如你以前可能記得我們正在使用具有指定名稱的隊(duì)列(還記得hellotask_queue嗎?)。能夠命名隊(duì)列對(duì)我們至關(guān)重要 - 我們需要將工作人員指向同一隊(duì)列。
當(dāng)你想在生產(chǎn)者和消費(fèi)者之間分享隊(duì)列時(shí),給隊(duì)列一個(gè)名字是很重要的。

但是,我們的記錄器并非如此。我們希望聽到所有日志消息,而不僅僅是其中的一部分。我們也只對(duì)目前流動(dòng)的消息感興趣,而不是舊消息。要解決這個(gè)問題,我們需要做兩件事。

首先,每當(dāng)我們連接到rabbitmq,我們需要一個(gè)新的,空的隊(duì)列。要做到這一點(diǎn),我們可以創(chuàng)建一個(gè)隨機(jī)名稱的隊(duì)列,或者甚至更好 - 讓服務(wù)器為我們選擇一個(gè)隨機(jī)隊(duì)列名稱。
我們可以通過不將隊(duì)列參數(shù)提供給queue_declare來實(shí)現(xiàn)這一點(diǎn):

result = channel.queue_declare()

此時(shí),result.method.queue包含一個(gè)隨機(jī)隊(duì)列名稱。例如,它可能看起來像amq.gen-i94oCE_tj3LyWsy-94KXHg。

其次,一旦消費(fèi)者連接關(guān)閉,隊(duì)列應(yīng)該被刪除。這是一個(gè)專有標(biāo)志:

result = channel.queue_declare(exclusive=True)
綁定

我們已經(jīng)創(chuàng)建了一個(gè)fanout交換和一個(gè)隊(duì)列?,F(xiàn)在我們需要告訴交換所將消息發(fā)送到我們的隊(duì)列。交換和隊(duì)列之間的關(guān)系稱為綁定。

channel.queue_bind(exchange="logs",
                   queue=result.method.queue)

從現(xiàn)在起,logs 交易所會(huì)將消息附加到我們的隊(duì)列中。

把它放在一起

發(fā)出日志消息的生產(chǎn)者程序與之前的教程沒有多大區(qū)別。最重要的變化是我們現(xiàn)在想發(fā)布消息到我們的logs交易所,而不是無名字的消息。發(fā)送時(shí)我們需要提供一個(gè)routing_key,但是對(duì)于fanout交換,它的值將被忽略。這里是emit_log.py腳本的代碼 :

#!/usr/bin/env python
import sys
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host="localhost"))
channel = connection.channel()

channel.exchange_declare(exchange="logs",
                         exchange_type="fanout")

message = " ".join(sys.argv[1:]) or "info: Hello world!"
channel.basic_publish(exchange="logs",
                      routing_key="",
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

如你所見,建立連接后,我們宣布交易所。這一步是必要的,因?yàn)榘l(fā)布到不存在的交易所是被禁止的。

如果沒有隊(duì)列綁定到交換機(jī)上,這些消息將會(huì)丟失,但這對(duì)我們來說沒問題; 如果沒有消費(fèi)者正在收聽,我們可以放心地丟棄消息。

receive_logs.py的代碼:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()

channel.exchange_declare(exchange="logs",
                         exchange_type="fanout")

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange="logs",
                   queue=queue_name)

print(" [*] Waiting for logs. To exit press CTRL+C")


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

我們完成了。如果您想將日志保存到文件中,只需打開一個(gè)控制臺(tái)并輸入:

python receive_logs.py > logs_from_rabbit.log

如果你想在屏幕上看到日志,打開一個(gè)新的終端并運(yùn)行:

python receive_logs.py

當(dāng)然,

python emit_log.py

使用rabbitmqctl list_bindings,你可以驗(yàn)證代碼是否真正創(chuàng)建了綁定和隊(duì)列。當(dāng)有兩個(gè)receive_logs.py程序正在運(yùn)行,你應(yīng)該看到如下所示:

root@921edcb46341:/# rabbitmqctl list_bindings
Listing bindings for vhost /...
    exchange    amq.gen-6YXn7BycIwtI7kFuUrTbaA    queue    amq.gen-6YXn7BycIwtI7kFuUrTbaA    []
    exchange    amq.gen-JhFL-rbMAoricMu5Dyo-hA    queue    amq.gen-JhFL-rbMAoricMu5Dyo-hA    []
logs    exchange    amq.gen-6YXn7BycIwtI7kFuUrTbaA    queue    amq.gen-6YXn7BycIwtI7kFuUrTbaA    []
logs    exchange    amq.gen-JhFL-rbMAoricMu5Dyo-hA    queue    amq.gen-JhFL-rbMAoricMu5Dyo-hA    []

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/41446.html

相關(guān)文章

  • rabbitmq中文教程python - 介紹

    摘要:每當(dāng)我們收到一條消息,這個(gè)回調(diào)函數(shù)就被皮卡庫調(diào)用。接下來,我們需要告訴這個(gè)特定的回調(diào)函數(shù)應(yīng)該從我們的隊(duì)列接收消息為了讓這個(gè)命令成功,我們必須確保我們想要訂閱的隊(duì)列存在。生產(chǎn)者計(jì)劃將在每次運(yùn)行后停止歡呼我們能夠通過發(fā)送我們的第一條消息。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 介紹 RabbitMQ是一個(gè)消息代理:它接受和轉(zhuǎn)發(fā)消息。你...

    yimo 評(píng)論0 收藏0
  • rabbitmq中文教程python - Topics

    摘要:?jiǎn)卧~可以是任何東西,但通常它們指定了與該消息相關(guān)的一些功能。消息將使用由三個(gè)字兩個(gè)點(diǎn)組成的路由鍵發(fā)送。另一方面,只會(huì)進(jìn)入第一個(gè)隊(duì)列,而只會(huì)進(jìn)入第二個(gè)隊(duì)列。不匹配任何綁定,因此將被丟棄。代碼幾乎與前一個(gè)教程中的代碼相同。 源碼:https://github.com/ltoddy/rabbitmq-tutorial Topics (using the Pika Python client)...

    ernest.wang 評(píng)論0 收藏0
  • rabbitmq中文教程python - 路由

    摘要:為了避免與參數(shù)混淆,我們將其稱為綁定鍵。直接交換我們之前教程的日志記錄系統(tǒng)將所有消息廣播給所有消費(fèi)者。在這種設(shè)置中,使用路由鍵發(fā)布到交換機(jī)的消息將被路由到隊(duì)列。所有其他消息將被丟棄。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 路由 本章節(jié)教程重點(diǎn)介紹的內(nèi)容 在之前的教程中,我們構(gòu)建了一個(gè)簡(jiǎn)單的日志系統(tǒng) 我們能夠?qū)⑷罩鞠V播給許多接收...

    Hwg 評(píng)論0 收藏0
  • rabbitmq中文教程python - 遠(yuǎn)程過程調(diào)用

    摘要:通常用于命名回調(diào)隊(duì)列。對(duì)每個(gè)響應(yīng)執(zhí)行的回調(diào)函數(shù)做了一個(gè)非常簡(jiǎn)單的工作,對(duì)于每個(gè)響應(yīng)消息它檢查是否是我們正在尋找的。在這個(gè)方法中,首先我們生成一個(gè)唯一的數(shù)并保存回調(diào)函數(shù)將使用這個(gè)值來捕獲適當(dāng)?shù)捻憫?yīng)。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 遠(yuǎn)程過程調(diào)用(RPC) (using the Pika Python client) 本章節(jié)教程...

    chuyao 評(píng)論0 收藏0
  • rabbitmq中文教程python - 工作隊(duì)列

    摘要:我們將任務(wù)封裝為消息并將其發(fā)送到隊(duì)列。為了確保消息永不丟失,支持消息確認(rèn)。沒有任何消息超時(shí)當(dāng)消費(fèi)者死亡時(shí),將重新傳遞消息。發(fā)生這種情況是因?yàn)橹辉谙⑦M(jìn)入隊(duì)列時(shí)調(diào)度消息。這告訴一次不要向工作人員發(fā)送多個(gè)消息。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 工作隊(duì)列 showImg(https://segmentfault.com/img/r...

    tabalt 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<