成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

rabbitmq中文教程python版 - 遠(yuǎn)程過程調(diào)用

chuyao / 3402人閱讀

摘要:通常用于命名回調(diào)隊列。對每個響應(yīng)執(zhí)行的回調(diào)函數(shù)做了一個非常簡單的工作,對于每個響應(yīng)消息它檢查是否是我們正在尋找的。在這個方法中,首先我們生成一個唯一的數(shù)并保存回調(diào)函數(shù)將使用這個值來捕獲適當(dāng)?shù)捻憫?yīng)。

源碼:https://github.com/ltoddy/rabbitmq-tutorial

遠(yuǎn)程過程調(diào)用(RPC)

(using the Pika Python client)

本章節(jié)教程重點介紹的內(nèi)容

在第二篇教程中,我們學(xué)習(xí)了如何使用工作隊列在多個工作人員之間分配耗時的任務(wù)。

但是如果我們需要在遠(yuǎn)程計算機上運行某個功能并等待結(jié)果呢?那么,這是一個不同的事情。
這種模式通常稱為遠(yuǎn)程過程調(diào)用(RPC)。

在本教程中,我們將使用RabbitMQ構(gòu)建一個RPC系統(tǒng):一個客戶端和一個可擴展的RPC服務(wù)器。
由于我們沒有任何值得分發(fā)的耗時任務(wù),我們將創(chuàng)建一個返回斐波那契數(shù)字的虛擬RPC服務(wù)。

客戶端界面

為了說明如何使用RPC服務(wù),我們將創(chuàng)建一個簡單的客戶端類。它將公開一個名為call的方法 ,
它發(fā)送一個RPC請求并阻塞,直到收到答案:

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)
*有關(guān)RPC的說明*

雖然RPC是計算中很常見的模式,但它經(jīng)常被吹毛求疵。當(dāng)程序員不知道函數(shù)調(diào)用是本地的還是
慢速的RPC時會出現(xiàn)這些問題。像這樣的混亂導(dǎo)致不可預(yù)知的問題,并增加了調(diào)試的不必要的復(fù)雜性,
而不是我們想要的簡化軟件。

銘記這一點,請考慮以下建議:

  * 確保顯而易見哪個函數(shù)調(diào)用是本地的,哪個是遠(yuǎn)程的。
  * 記錄您的系統(tǒng)。清楚組件之間的依賴關(guān)系。
  * 處理錯誤情況。當(dāng)RPC服務(wù)器長時間關(guān)閉時,客戶端應(yīng)該如何反應(yīng)?

有疑問時避免RPC。如果可以的話,你應(yīng)該使用異步管道 - 而不是類似于RPC的阻塞,
其結(jié)果被異步推送到下一個計算階段。
回調(diào)隊列

一般來說,通過RabbitMQ來執(zhí)行RPC是很容易的??蛻舳税l(fā)送請求消息,服務(wù)器回復(fù)響應(yīng)消息。
為了接收響應(yīng),客戶端需要發(fā)送一個“回調(diào)”隊列地址和請求。讓我們試試看:

result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange="",
                      routing_key="rpc_queue",
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)
消息屬性

AMQP 0-9-1協(xié)議預(yù)定義了一組包含14個屬性的消息。大多數(shù)屬性很少使用,但以下情況除外:

delivery_mode:將消息標(biāo)記為持久(值為2)或瞬態(tài)(任何其他值)。你可能會記得第二篇教程中的這個屬性。
content_type:用于描述編碼的MIME類型。例如,對于經(jīng)常使用的JSON編碼,將此屬性設(shè)置為application/json是一種很好的做法。
reply_to:通常用于命名回調(diào)隊列。
correlation_id:用于將RPC響應(yīng)與請求關(guān)聯(lián)起來。
相關(guān)ID

在上面介紹的方法中,我們建議為每個RPC請求創(chuàng)建一個回調(diào)隊列。這是非常低效的,
但幸運的是有一個更好的方法 - 讓我們?yōu)槊總€客戶端創(chuàng)建一個回調(diào)隊列。

這引發(fā)了一個新問題,在該隊列中收到回復(fù)后,不清楚回復(fù)屬于哪個請求。那是什么時候使用correlation_id屬性。
我們將把它設(shè)置為每個請求的唯一值。稍后,當(dāng)我們在回調(diào)隊列中收到消息時,我們將查看此屬性,
并基于此屬性,我們將能夠?qū)㈨憫?yīng)與請求進行匹配。如果我們看到未知的correlation_id值,
我們可以放心地丟棄該消息 - 它不屬于我們的請求。

您可能會問,為什么我們應(yīng)該忽略回調(diào)隊列中的未知消息,而不是拋出錯誤?
這是由于服務(wù)器端可能出現(xiàn)競爭狀況。雖然不太可能,但在發(fā)送給我們答案之后,但在發(fā)送請求的確認(rèn)消息之前,
RPC服務(wù)器可能會死亡。如果發(fā)生這種情況,重新啟動的RPC服務(wù)器將再次處理該請求。
這就是為什么在客戶端,我們必須優(yōu)雅地處理重復(fù)的響應(yīng),理想情況下RPC應(yīng)該是等冪的。

總結(jié)

我們的RPC會像這樣工作:

當(dāng)客戶端啟動時,它創(chuàng)建一個匿名獨占回調(diào)隊列。

對于RPC請求,客戶端將發(fā)送具有兩個屬性的消息:reply_to,該消息設(shè)置為回調(diào)隊列和correlation_id,該值設(shè)置為每個請求的唯一值。

該請求被發(fā)送到rpc_queue隊列。

RPC worker(又名:服務(wù)器)正在等待該隊列上的請求。當(dāng)出現(xiàn)請求時,它執(zhí)行該作業(yè),并使用reply_to字段中的隊列將結(jié)果發(fā)送回客戶端。

客戶端在回調(diào)隊列中等待數(shù)據(jù)。當(dāng)出現(xiàn)消息時,它會檢查correlation_id屬性。如果它匹配來自請求的值,則返回對應(yīng)用程序的響應(yīng)。

把它放在一起

rpc_server.py的代碼:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))

channel = connection.channel()

channel.queue_declare(queue="rpc_queue")


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange="",
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(
                         correlation_id=props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_size=1)
channel.basic_consume(on_request, queue="rpc_queue")

print(" [x] Awaiting RPC requests")
channel.start_consuming()

服務(wù)器代碼非常簡單:

(4)像往常一樣,我們首先建立連接并聲明隊列。

(11)我們聲明我們的斐波那契函數(shù)。它只假定有效的正整數(shù)輸入。(不要指望這個版本適用于大數(shù)字,它可能是最慢的遞歸實現(xiàn))。

(20)我們聲明了basic_consume的回調(diào),它是RPC服務(wù)器的核心。它在收到請求時執(zhí)行。它完成工作并將響應(yīng)發(fā)回。

(34)我們可能想運行多個服務(wù)器進程。為了在多臺服務(wù)器上平均分配負(fù)載,我們需要設(shè)置prefetch_count設(shè)置。

rpc_client.py的代碼:

#!/usr/bin/env python
import pika
import uuid


class FibonacciRpcClient:
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.corrrelation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange="",
                                   routing_key="rpc_queue",
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.corr_id),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()

        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

客戶端代碼稍有涉及:

(8)我們建立連接,通道并為回復(fù)聲明獨占的“回調(diào)”隊列。

(17)我們訂閱"回調(diào)"隊列,以便我們可以接收RPC響應(yīng)。

(19)對每個響應(yīng)執(zhí)行的"on_response"回調(diào)函數(shù)做了一個非常簡單的工作,對于每個響應(yīng)消息它檢查correlation_id是否是我們正在尋找的。如果是這樣,它將保存self.response中的響應(yīng)并打破消費循環(huán)。

(23)接下來,我們定義我們的主要調(diào)用方法 - 它執(zhí)行實際的RPC請求。

(25)在這個方法中,首先我們生成一個唯一的correlation_id數(shù)并保存 - "on_response"回調(diào)函數(shù)將使用這個值來捕獲適當(dāng)?shù)捻憫?yīng)。

(29)接下來,我們發(fā)布具有兩個屬性的請求消息:reply_tocorrelation_id

(32)在這一點上,我們可以坐下來等待,直到適當(dāng)?shù)幕貞?yīng)到達。

(41)最后,我們將回復(fù)返回給用戶。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/44712.html

相關(guān)文章

  • rabbitmq中文教程python - 介紹

    摘要:每當(dāng)我們收到一條消息,這個回調(diào)函數(shù)就被皮卡庫調(diào)用。接下來,我們需要告訴這個特定的回調(diào)函數(shù)應(yīng)該從我們的隊列接收消息為了讓這個命令成功,我們必須確保我們想要訂閱的隊列存在。生產(chǎn)者計劃將在每次運行后停止歡呼我們能夠通過發(fā)送我們的第一條消息。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 介紹 RabbitMQ是一個消息代理:它接受和轉(zhuǎn)發(fā)消息。你...

    yimo 評論0 收藏0
  • rabbitmq中文教程python - 工作隊列

    摘要:我們將任務(wù)封裝為消息并將其發(fā)送到隊列。為了確保消息永不丟失,支持消息確認(rèn)。沒有任何消息超時當(dāng)消費者死亡時,將重新傳遞消息。發(fā)生這種情況是因為只在消息進入隊列時調(diào)度消息。這告訴一次不要向工作人員發(fā)送多個消息。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 工作隊列 showImg(https://segmentfault.com/img/r...

    tabalt 評論0 收藏0
  • rabbitmq中文教程python - 發(fā)布 / 訂閱

    摘要:交易所在本教程的前幾部分中,我們發(fā)送消息并從隊列中接收消息。消費者是接收消息的用戶的應(yīng)用程序。中的消息傳遞模型的核心思想是生產(chǎn)者永遠(yuǎn)不會將任何消息直接發(fā)送到隊列中。交換和隊列之間的關(guān)系稱為綁定。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 發(fā)布 / 訂閱 (using the Pika Python client) 本章節(jié)教程重點介紹的...

    alphahans 評論0 收藏0
  • rabbitmq中文教程python - 路由

    摘要:為了避免與參數(shù)混淆,我們將其稱為綁定鍵。直接交換我們之前教程的日志記錄系統(tǒng)將所有消息廣播給所有消費者。在這種設(shè)置中,使用路由鍵發(fā)布到交換機的消息將被路由到隊列。所有其他消息將被丟棄。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 路由 本章節(jié)教程重點介紹的內(nèi)容 在之前的教程中,我們構(gòu)建了一個簡單的日志系統(tǒng) 我們能夠?qū)⑷罩鞠V播給許多接收...

    Hwg 評論0 收藏0
  • rabbitmq中文教程python - Topics

    摘要:單詞可以是任何東西,但通常它們指定了與該消息相關(guān)的一些功能。消息將使用由三個字兩個點組成的路由鍵發(fā)送。另一方面,只會進入第一個隊列,而只會進入第二個隊列。不匹配任何綁定,因此將被丟棄。代碼幾乎與前一個教程中的代碼相同。 源碼:https://github.com/ltoddy/rabbitmq-tutorial Topics (using the Pika Python client)...

    ernest.wang 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<