摘要:每當(dāng)我們收到一條消息,這個(gè)回調(diào)函數(shù)就被皮卡庫調(diào)用。接下來,我們需要告訴這個(gè)特定的回調(diào)函數(shù)應(yīng)該從我們的隊(duì)列接收消息為了讓這個(gè)命令成功,我們必須確保我們想要訂閱的隊(duì)列存在。生產(chǎn)者計(jì)劃將在每次運(yùn)行后停止歡呼我們能夠通過發(fā)送我們的第一條消息。
源碼:https://github.com/ltoddy/rabbitmq-tutorial
介紹RabbitMQ是一個(gè)消息代理:它接受和轉(zhuǎn)發(fā)消息。你可以把它想象成一個(gè)郵局:當(dāng)你把你想要發(fā)布的郵件放在郵箱中時(shí),你可以確定郵差先生最終將郵件發(fā)送給你的收件人。在這個(gè)比喻中,RabbitMQ是郵政信箱,郵局和郵遞員。
RabbitMQ和郵局的主要區(qū)別在于它不處理紙張,而是接受,存儲(chǔ)和轉(zhuǎn)發(fā)二進(jìn)制數(shù)據(jù)塊 -- 消息。
請(qǐng)注意,生產(chǎn)者,消費(fèi)者和消息代理不必駐留在同一主機(jī)上; 實(shí)際上在大多數(shù)應(yīng)用程序中它們不是同一主機(jī)上。
Hello World!(using the Pika Python client)
pip3 install pika
在本教程的這一部分,我們將使用Python編寫兩個(gè)小程序; 發(fā)送單個(gè)消息的生產(chǎn)者(發(fā)送者),以及接收消息并將其打印出來的消費(fèi)者(接收者)。這是一個(gè)消息傳遞的“Hello World”。
在下圖中,“P”是我們的生產(chǎn)者,“C”是我們的消費(fèi)者。中間的盒子是一個(gè)隊(duì)列 - RabbitMQ代表消費(fèi)者保存的消息緩沖區(qū)。
我們的整體設(shè)計(jì)將如下所示:
生產(chǎn)者將消息發(fā)送到“hello”隊(duì)列,消費(fèi)者接收來自該隊(duì)列的消息。發(fā)送
我們的第一個(gè)程序 send.py 會(huì)向隊(duì)列發(fā)送一條消息。我們需要做的第一件事是與RabbitMQ服務(wù)器建立連接。
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel()
我們現(xiàn)在連接到本地上的的代理 - 因此是 "localhost"。如果我們想連接到另一臺(tái)機(jī)器上的代理,我們只需在此指定其名稱或IP地址。
接下來,在發(fā)送之前,我們需要確保收件人隊(duì)列存在。如果我們發(fā)送消息到不存在的位置,RabbitMQ將只刪除該消息。我們來創(chuàng)建一個(gè)將傳遞消息的 hello 隊(duì)列:
channel.queue_declare(queue="hello")
此時(shí)我們準(zhǔn)備發(fā)送消息。我們的第一條消息將只包含一個(gè)字符串 "Hello World!"我們想把它發(fā)送給我們的 hello 隊(duì)列。
在RabbitMQ中,消息永遠(yuǎn)不會(huì)直接發(fā)送到隊(duì)列,它總是需要經(jīng)過交換。我們現(xiàn)在需要知道的是如何使用由空字符串標(biāo)識(shí)的默認(rèn)交換。這種交換是特殊的 - 它允許我們準(zhǔn)確地指定消息應(yīng)該到達(dá)哪個(gè)隊(duì)列。隊(duì)列名稱需要在routing_key參數(shù)中指定:
channel.basic_publish(exchange="", routing_key="hello", body="Hello World!") print(" [x] Sent "Hello World!"")
在退出程序之前,我們需要確保網(wǎng)絡(luò)緩沖區(qū)被刷新,并且我們的消息被實(shí)際傳送到RabbitMQ。我們可以通過輕輕關(guān)閉連接來完成。
connection.close()接收
我們的第二個(gè)程序 receive.py 將接收隊(duì)列中的消息并將它們打印在屏幕上。
再次,我們首先需要連接到RabbitMQ服務(wù)器。負(fù)責(zé)連接到Rabbit的代碼與以前相同。
下一步,就像以前一樣,要確保隊(duì)列存在。使用queue_declare創(chuàng)建一個(gè)隊(duì)列是冪等的 - 我們可以根據(jù)需要多次運(yùn)行該命令,并且只會(huì)創(chuàng)建一個(gè)。
channel.queue_declare()
您可能會(huì)問為什么我們?cè)俅温暶麝?duì)列 - 我們已經(jīng)在之前的代碼中聲明了它。如果我們確信隊(duì)列已經(jīng)存在,我們可以避免這種情況。例如,如果 send.py 程序之前運(yùn)行過。但我們還不確定首先運(yùn)行哪個(gè)程序。在這種情況下,重復(fù)在兩個(gè)程序中重復(fù)聲明隊(duì)列是一種很好的做法。
列出隊(duì)列 您可能希望看到RabbitMQ有什么隊(duì)列以及它們中有多少條消息。您可以使用rabbitmqctl工具(作為特權(quán)用戶)執(zhí)行此操作: > sudo rabbitmqctl list_queues 在Windows上,省略sudo: > rabbitmqctl.bat list_queues
從隊(duì)列接收消息更為復(fù)雜。它通過向隊(duì)列訂閱 回調(diào)函數(shù) 來工作。每當(dāng)我們收到一條消息,這個(gè)回調(diào)函數(shù)就被皮卡庫調(diào)用。在我們的例子中,這個(gè)函數(shù)會(huì)在屏幕上打印消息的內(nèi)容。
def callback(ch, method, propertites, body): print(" [x] Received {}".format(body))
接下來,我們需要告訴RabbitMQ這個(gè)特定的回調(diào)函數(shù)應(yīng)該從我們的hello隊(duì)列接收消息:
channel.basic_consume(callable, queue="hello", no_ack=True)
為了讓這個(gè)命令成功,我們必須確保我們想要訂閱的隊(duì)列存在。幸運(yùn)的是,我們對(duì)此有信心 - 我們已經(jīng)使用queue_declare創(chuàng)建了一個(gè)隊(duì)列。
NO_ACK參數(shù),后面(幾篇之后)會(huì)有解釋。
最后,我們進(jìn)入一個(gè)永無止境的循環(huán),等待數(shù)據(jù)并在必要時(shí)運(yùn)行回調(diào)。
print(" [*] Waiting for messages. To exit press CTRL+C") channel.start_consuming()把它放在一起
send.py的完整代碼:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) channel = connection.channel() channel.queue_declare(queue="hello") channel.basic_publish(exchange="", routing_key="hello", body="Hello World!") print(" [x] Sent "Hello World!"") connection.close()
receive.py的完整代碼:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() channel.queue_declare(queue="hello") def callback(ch, method, propertites, body): print(" [x] Received {}".format(body)) channel.basic_consume(callable, queue="hello", no_ack=True) print(" [*] Waiting for messages. To exit press CTRL+C") channel.start_consuming()
現(xiàn)在我們可以在終端上試用我們的程序。首先,讓我們開始一個(gè)消費(fèi)者,它將持續(xù)運(yùn)行等待交付:
python receive.py # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received "Hello World!"
現(xiàn)在開始制作。生產(chǎn)者計(jì)劃將在每次運(yùn)行后停止:
python send.py # => [x] Sent "Hello World!"
歡呼!我們能夠通過RabbitMQ發(fā)送我們的第一條消息。正如您可能已經(jīng)注意到的,receive.py 程序不會(huì)退出。它會(huì)隨時(shí)準(zhǔn)備接收更多消息,并可能會(huì)被Ctrl-C中斷。
嘗試在新終端中再次運(yùn)行 send.py。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/44690.html
摘要:我們將任務(wù)封裝為消息并將其發(fā)送到隊(duì)列。為了確保消息永不丟失,支持消息確認(rèn)。沒有任何消息超時(shí)當(dāng)消費(fèi)者死亡時(shí),將重新傳遞消息。發(fā)生這種情況是因?yàn)橹辉谙⑦M(jìn)入隊(duì)列時(shí)調(diào)度消息。這告訴一次不要向工作人員發(fā)送多個(gè)消息。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 工作隊(duì)列 showImg(https://segmentfault.com/img/r...
摘要:交易所在本教程的前幾部分中,我們發(fā)送消息并從隊(duì)列中接收消息。消費(fèi)者是接收消息的用戶的應(yīng)用程序。中的消息傳遞模型的核心思想是生產(chǎn)者永遠(yuǎn)不會(huì)將任何消息直接發(fā)送到隊(duì)列中。交換和隊(duì)列之間的關(guān)系稱為綁定。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 發(fā)布 / 訂閱 (using the Pika Python client) 本章節(jié)教程重點(diǎn)介紹的...
摘要:為了避免與參數(shù)混淆,我們將其稱為綁定鍵。直接交換我們之前教程的日志記錄系統(tǒng)將所有消息廣播給所有消費(fèi)者。在這種設(shè)置中,使用路由鍵發(fā)布到交換機(jī)的消息將被路由到隊(duì)列。所有其他消息將被丟棄。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 路由 本章節(jié)教程重點(diǎn)介紹的內(nèi)容 在之前的教程中,我們構(gòu)建了一個(gè)簡(jiǎn)單的日志系統(tǒng) 我們能夠?qū)⑷罩鞠V播給許多接收...
摘要:?jiǎn)卧~可以是任何東西,但通常它們指定了與該消息相關(guān)的一些功能。消息將使用由三個(gè)字兩個(gè)點(diǎn)組成的路由鍵發(fā)送。另一方面,只會(huì)進(jìn)入第一個(gè)隊(duì)列,而只會(huì)進(jìn)入第二個(gè)隊(duì)列。不匹配任何綁定,因此將被丟棄。代碼幾乎與前一個(gè)教程中的代碼相同。 源碼:https://github.com/ltoddy/rabbitmq-tutorial Topics (using the Pika Python client)...
摘要:通常用于命名回調(diào)隊(duì)列。對(duì)每個(gè)響應(yīng)執(zhí)行的回調(diào)函數(shù)做了一個(gè)非常簡(jiǎn)單的工作,對(duì)于每個(gè)響應(yīng)消息它檢查是否是我們正在尋找的。在這個(gè)方法中,首先我們生成一個(gè)唯一的數(shù)并保存回調(diào)函數(shù)將使用這個(gè)值來捕獲適當(dāng)?shù)捻憫?yīng)。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 遠(yuǎn)程過程調(diào)用(RPC) (using the Pika Python client) 本章節(jié)教程...
閱讀 696·2021-11-18 10:07
閱讀 2890·2021-09-22 16:04
閱讀 890·2021-08-16 10:50
閱讀 3365·2019-08-30 15:56
閱讀 1795·2019-08-29 13:22
閱讀 2704·2019-08-26 17:15
閱讀 1254·2019-08-26 10:57
閱讀 1118·2019-08-23 15:23