成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

rabbitmq中文教程python版 - 工作隊(duì)列

tabalt / 2338人閱讀

摘要:我們將任務(wù)封裝為消息并將其發(fā)送到隊(duì)列。為了確保消息永不丟失,支持消息確認(rèn)。沒有任何消息超時(shí)當(dāng)消費(fèi)者死亡時(shí),將重新傳遞消息。發(fā)生這種情況是因?yàn)橹辉谙⑦M(jìn)入隊(duì)列時(shí)調(diào)度消息。這告訴一次不要向工作人員發(fā)送多個(gè)消息。

源碼:https://github.com/ltoddy/rabbitmq-tutorial

工作隊(duì)列

(using the Pika Python client)

本章節(jié)教程重點(diǎn)介紹的內(nèi)容

在第一篇教程中,我們編寫了用于從命名隊(duì)列發(fā)送和接收消息的程序。在這一個(gè)中,我們將創(chuàng)建一個(gè)工作隊(duì)列,用于在多個(gè)工作人員之間分配耗時(shí)的任務(wù)。

工作隊(duì)列(又名:任務(wù)隊(duì)列)背后的主要思想是避免立即執(zhí)行資源密集型任務(wù),并且必須等待它完成。相反,我們安排稍后完成任務(wù)。我們將任務(wù)封裝 為消息并將其發(fā)送到隊(duì)列。
在后臺(tái)運(yùn)行的工作進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè)。當(dāng)你運(yùn)行許多工人時(shí),任務(wù)將在他們之間共享。

這個(gè)概念在Web應(yīng)用程序中特別有用,因?yàn)樵诙痰腍TTP請(qǐng)求窗口中無法處理復(fù)雜的任務(wù)。

在本教程的前一部分中,我們發(fā)送了一條包含“Hello World!”的消息?,F(xiàn)在我們將發(fā)送代表復(fù)雜任務(wù)的字符串。
我們沒有真實(shí)世界的任務(wù),比如要調(diào)整大小的圖像或要渲染的PDF文件,所以讓我們假裝我們很忙 - 使用 time.sleep() 函數(shù)來偽裝它。
我們將把字符串中的點(diǎn)(".")數(shù)作為復(fù)雜度; 每一個(gè)點(diǎn)都會(huì)占用一秒的“工作”。例如,Hello ... 描述的假任務(wù)將需要三秒鐘。

我們稍微修改前面例子中的send.py代碼,以允許從命令行發(fā)送任意消息。這個(gè)程序?qū)讶蝿?wù)安排到我們的工作隊(duì)列中,所以讓我們把它命名為new_task.py

import sys

message = " ".join(sys.argv[1:]) or "Hello World"

channel.basic_publish(exchange="",
                      routing_key="hello",
                      body=message)
print(" [x] Sent %r" % message)

我們的舊版receive.py腳本也需要進(jìn)行一些更改:它需要為郵件正文中的每個(gè)點(diǎn)偽造第二個(gè)工作。它會(huì)從隊(duì)列中彈出消息并執(zhí)行任務(wù),所以我們稱之為worker.py

import time

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b"."))
    print(" [x] Done")
循環(huán)調(diào)度

使用任務(wù)隊(duì)列的優(yōu)點(diǎn)之一是可以輕松地平行工作。如果我們正在積累積壓的工作,我們可以增加更多的工作人員,并且這種方式很容易擴(kuò)展。

首先,我們?cè)囍瑫r(shí)運(yùn)行兩個(gè)worker.py腳本。他們都會(huì)從隊(duì)列中獲取消息,但具體到底是什么?讓我們來看看。

您需要打開三個(gè)控制臺(tái)。兩個(gè)將運(yùn)行worker.py腳本。這些控制臺(tái)將成為我們的兩個(gè)消費(fèi)者 - C1和C2。

默認(rèn)情況下,RabbitMQ將按順序?qū)⒚織l消息發(fā)送給下一個(gè)使用者。平均而言,每個(gè)消費(fèi)者將獲得相同數(shù)量的消息。這種分配消息的方式稱為循環(huán)法。請(qǐng)嘗試與三名或更多的工人。

消息確認(rèn)

做任務(wù)可能需要幾秒鐘的時(shí)間。你可能想知道如果其中一個(gè)消費(fèi)者開始一項(xiàng)長(zhǎng)期任務(wù)并且只是部分完成而死亡會(huì)發(fā)生什么。
用我們目前的代碼,一旦RabbitMQ將消息傳遞給客戶,它立即將其標(biāo)記為刪除。在這種情況下,如果你殺了一個(gè)工人,我們將失去剛剛處理的信息。
我們也會(huì)失去所有派發(fā)給這個(gè)特定工作人員但尚未處理的消息。

但我們不想失去任何任務(wù)。如果一名工人死亡,我們希望將任務(wù)交付給另一名工人。

為了確保消息永不丟失,RabbitMQ支持消息確認(rèn)。消費(fèi)者發(fā)回ack(請(qǐng)求)告訴RabbitMQ已經(jīng)收到,處理了特定的消息,并且RabbitMQ可以自由刪除它。

如果消費(fèi)者死亡(其通道關(guān)閉,連接關(guān)閉或TCP連接丟失),RabbitMQ將理解消息未被完全處理,并將重新排隊(duì)。如果有其他消費(fèi)者同時(shí)在線,它會(huì)迅速將其重新發(fā)送給另一位消費(fèi)者。
這樣,即使工作人員偶爾死亡,也可以確保沒有任何信息丟失。

沒有任何消息超時(shí); 當(dāng)消費(fèi)者死亡時(shí),RabbitMQ將重新傳遞消息。即使處理消息需要非常很長(zhǎng)的時(shí)間也沒關(guān)系。

消息確認(rèn)默認(rèn)是被打開的。在前面的例子中,我們通過 no_ack = True 標(biāo)志明確地將它們關(guān)閉。一旦我們完成了一項(xiàng)任務(wù),現(xiàn)在是時(shí)候清除這個(gè)標(biāo)志并且發(fā)送工人的正確確認(rèn)。

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b"."))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)


channel.basic_consume(callback,
                      queue="hello")

使用這段代碼,我們可以確定,即使在處理消息時(shí)使用CTRL + C來殺死一個(gè)工作者,也不會(huì)丟失任何東西。工人死后不久,所有未確認(rèn)的消息將被重新發(fā)送。

消息持久性

我們已經(jīng)學(xué)會(huì)了如何確保即使消費(fèi)者死亡,任務(wù)也不會(huì)丟失。但是如果RabbitMQ服務(wù)器停止,我們的任務(wù)仍然會(huì)丟失。

當(dāng)RabbitMQ退出或崩潰時(shí),它會(huì)忘記隊(duì)列和消息,除非您告訴它不要。需要做兩件事來確保消息不會(huì)丟失:我們需要將隊(duì)列和消息標(biāo)記為持久。

首先,我們需要確保RabbitMQ永遠(yuǎn)不會(huì)失去我們的隊(duì)列。為了做到這一點(diǎn),我們需要宣布它是持久的:

channel.queue_declare(queue="hello", durable=True)

雖然這個(gè)命令本身是正確的,但它在我們的設(shè)置中不起作用。那是因?yàn)槲覀円呀?jīng)定義了一個(gè)名為hello的隊(duì)列 ,這個(gè)隊(duì)列并不"耐用"。
RabbitMQ不允許您使用不同的參數(shù)重新定義現(xiàn)有的隊(duì)列,并會(huì)向任何試圖執(zhí)行該操作的程序返回錯(cuò)誤。
但是有一個(gè)快速的解決方法 - 讓我們聲明一個(gè)具有不同名稱的隊(duì)列,例如task_queue

channel.queue_declare(queue="task_queue", durable=True)

queue_declare更改需要應(yīng)用于生產(chǎn)者和消費(fèi)者代碼。

此時(shí)我們確信,即使RabbitMQ重新啟動(dòng),task_queue隊(duì)列也不會(huì)丟失。現(xiàn)在我們需要將消息標(biāo)記為持久 - 通過提供值為2的delivery_mode屬性。

channel.basic_publish(exchange="",
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 確保消息是持久的
                      ))
公平派遣

您可能已經(jīng)注意到調(diào)度仍然無法完全按照我們的要求工作。例如,在有兩名工人的情況下,當(dāng)所有奇怪的信息都很重,甚至信息很少時(shí),一名工作人員會(huì)一直很忙,
另一名工作人員幾乎不會(huì)做任何工作。那么,RabbitMQ不知道任何有關(guān)這一點(diǎn),并仍將均勻地發(fā)送消息。

發(fā)生這種情況是因?yàn)镽abbitMQ只在消息進(jìn)入隊(duì)列時(shí)調(diào)度消息。它沒有考慮消費(fèi)者未確認(rèn)消息的數(shù)量。它只是盲目地將第n條消息分發(fā)給第n位消費(fèi)者。

為了解決這個(gè)問題,我們可以使用basic.qos方法和設(shè)置prefetch_count = 1。這告訴RabbitMQ一次不要向工作人員發(fā)送多個(gè)消息。
或者換句話說,不要向工作人員發(fā)送新消息,直到它處理并確認(rèn)了前一個(gè)消息。相反,它會(huì)將其分派給不是仍然忙碌的下一個(gè)工作人員。

channel.basic_qos(prefetch_count=1)
把它放在一起

我們的new_task.py腳本的最終代碼:

#!/usr/bin/env python
import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()

channel.queue_declare(queue="task_queue", durable=True)

message = " ".join(sys.argv[1:]) or "Hello World"

channel.basic_publish(exchange="",
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 確保消息是持久的
                      ))
print(" [x] Sent %r" % message)
connection.close()

而我們的工人 worker.py

#!/usr/bin/env python
import time
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()

channel.queue_declare(queue="task_queue", durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b"."))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(callback,
                      queue="hello")
channel.basic_qos(prefetch_count=1)

print(" [*] Waiting for messages. To exit press CTRL+C")
channel.start_consuming()

使用消息確認(rèn)和prefetch_count,您可以設(shè)置一個(gè)工作隊(duì)列。即使RabbitMQ重新啟動(dòng),持久性選項(xiàng)也可讓任務(wù)繼續(xù)存在。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/44704.html

相關(guān)文章

  • rabbitmq中文教程python - 發(fā)布 / 訂閱

    摘要:交易所在本教程的前幾部分中,我們發(fā)送消息并從隊(duì)列中接收消息。消費(fèi)者是接收消息的用戶的應(yīng)用程序。中的消息傳遞模型的核心思想是生產(chǎn)者永遠(yuǎn)不會(huì)將任何消息直接發(fā)送到隊(duì)列中。交換和隊(duì)列之間的關(guān)系稱為綁定。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 發(fā)布 / 訂閱 (using the Pika Python client) 本章節(jié)教程重點(diǎn)介紹的...

    alphahans 評(píng)論0 收藏0
  • rabbitmq中文教程python - 介紹

    摘要:每當(dāng)我們收到一條消息,這個(gè)回調(diào)函數(shù)就被皮卡庫(kù)調(diào)用。接下來,我們需要告訴這個(gè)特定的回調(diào)函數(shù)應(yīng)該從我們的隊(duì)列接收消息為了讓這個(gè)命令成功,我們必須確保我們想要訂閱的隊(duì)列存在。生產(chǎn)者計(jì)劃將在每次運(yùn)行后停止歡呼我們能夠通過發(fā)送我們的第一條消息。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 介紹 RabbitMQ是一個(gè)消息代理:它接受和轉(zhuǎn)發(fā)消息。你...

    yimo 評(píng)論0 收藏0
  • rabbitmq中文教程python - 遠(yuǎn)程過程調(diào)用

    摘要:通常用于命名回調(diào)隊(duì)列。對(duì)每個(gè)響應(yīng)執(zhí)行的回調(diào)函數(shù)做了一個(gè)非常簡(jiǎn)單的工作,對(duì)于每個(gè)響應(yīng)消息它檢查是否是我們正在尋找的。在這個(gè)方法中,首先我們生成一個(gè)唯一的數(shù)并保存回調(diào)函數(shù)將使用這個(gè)值來捕獲適當(dāng)?shù)捻憫?yīng)。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 遠(yuǎn)程過程調(diào)用(RPC) (using the Pika Python client) 本章節(jié)教程...

    chuyao 評(píng)論0 收藏0
  • rabbitmq中文教程python - 路由

    摘要:為了避免與參數(shù)混淆,我們將其稱為綁定鍵。直接交換我們之前教程的日志記錄系統(tǒng)將所有消息廣播給所有消費(fèi)者。在這種設(shè)置中,使用路由鍵發(fā)布到交換機(jī)的消息將被路由到隊(duì)列。所有其他消息將被丟棄。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 路由 本章節(jié)教程重點(diǎn)介紹的內(nèi)容 在之前的教程中,我們構(gòu)建了一個(gè)簡(jiǎn)單的日志系統(tǒng) 我們能夠?qū)⑷罩鞠V播給許多接收...

    Hwg 評(píng)論0 收藏0
  • rabbitmq中文教程python - Topics

    摘要:?jiǎn)卧~可以是任何東西,但通常它們指定了與該消息相關(guān)的一些功能。消息將使用由三個(gè)字兩個(gè)點(diǎn)組成的路由鍵發(fā)送。另一方面,只會(huì)進(jìn)入第一個(gè)隊(duì)列,而只會(huì)進(jìn)入第二個(gè)隊(duì)列。不匹配任何綁定,因此將被丟棄。代碼幾乎與前一個(gè)教程中的代碼相同。 源碼:https://github.com/ltoddy/rabbitmq-tutorial Topics (using the Pika Python client)...

    ernest.wang 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<