成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

RabbitMQ 快速入門 python

wenshi11019 / 1384人閱讀

摘要:為了預(yù)防消息丟失,提供了,即工作進(jìn)程在收到消息并處理后,發(fā)送給,告知這時(shí)候可以把該消息從隊(duì)列中刪除了。如果工作進(jìn)程掛掉了,沒(méi)有收到,那么會(huì)把該消息重新分發(fā)給其他工作進(jìn)程。之前在發(fā)布消息時(shí),的值為即使用。

HelloWorld 簡(jiǎn)介

RabbitMQ:接受消息再傳遞消息,可以視為一個(gè)“郵局”。發(fā)送者和接受者通過(guò)隊(duì)列來(lái)進(jìn)行交互,隊(duì)列的大小可以視為無(wú)限的,多個(gè)發(fā)送者可以發(fā)生給一個(gè)隊(duì)列,多個(gè)接收者也可以從一個(gè)隊(duì)列中接受消息。

code

rabbitmq使用的協(xié)議是amqp,用于python的推薦客戶端是pika

pip install pika -i https://pypi.douban.com/simple/

send.py

# coding: utf8
import pika

# 建立一個(gè)連接
connection = pika.BlockingConnection(pika.ConnectionParameters(
           "localhost"))  # 連接本地的RabbitMQ服務(wù)器
channel = connection.channel()  # 獲得channel

這里鏈接的是本機(jī)的,如果想要連接其他機(jī)器上的服務(wù)器,只要填入地址或主機(jī)名即可。

接下來(lái)我們開(kāi)始發(fā)送消息了,注意要確保接受消息的隊(duì)列是存在的,否則rabbitmq就丟棄掉該消息

channel.queue_declare(queue="hello")  # 在RabbitMQ中創(chuàng)建hello這個(gè)隊(duì)列
channel.basic_publish(exchange="",  # 使用默認(rèn)的exchange來(lái)發(fā)送消息到隊(duì)列
                  routing_key="hello",  # 發(fā)送到該隊(duì)列 hello 中
                  body="Hello World!")  # 消息內(nèi)容

connection.close()  # 關(guān)閉 同時(shí)flush

RabbitMQ默認(rèn)需要1GB的空閑磁盤空間,否則發(fā)送會(huì)失敗。

這時(shí)已在本地隊(duì)列hello中存放了一個(gè)消息,如果使用 rabbitmqctl list_queues 可看到

hello 1

說(shuō)明有一個(gè)hello隊(duì)列 里面存放了一個(gè)消息

receive.py

# coding: utf8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
               "localhost"))
channel = connection.channel()

還是先鏈接到服務(wù)器,和之前發(fā)送時(shí)相同

channel.queue_declare(queue="hello")  # 此處就是聲明了 來(lái)確保該隊(duì)列 hello 存在 可以多次聲明 這里主要是為了防止接受程序先運(yùn)行時(shí)出錯(cuò)

def callback(ch, method, properties, body):  # 用于接收到消息后的回調(diào)
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue="hello",  # 收指定隊(duì)列hello的消息
                      no_ack=True)  #在處理完消息后不發(fā)送ack給服務(wù)器
channel.start_consuming()  # 啟動(dòng)消息接受 這會(huì)進(jìn)入一個(gè)死循環(huán)
工作隊(duì)列(任務(wù)隊(duì)列)

工作隊(duì)列是用于分發(fā)耗時(shí)任務(wù)給多個(gè)工作進(jìn)程的。不立即做那些耗費(fèi)資源的任務(wù)(需要等待這些任務(wù)完成),而是安排這些任務(wù)之后執(zhí)行。例如我們把task作為message發(fā)送到隊(duì)列里,啟動(dòng)工作進(jìn)程來(lái)接受并最終執(zhí)行,且可啟動(dòng)多個(gè)工作進(jìn)程來(lái)工作。這適用于web應(yīng)用,即不應(yīng)在一個(gè)http請(qǐng)求的處理窗口內(nèi)完成復(fù)雜任務(wù)。

channel.basic_publish(exchange="",
                  routing_key="task_queue",
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # 使得消息持久化
                  ))

分配消息的方式為 輪詢 即每個(gè)工作進(jìn)程獲得相同的消息數(shù)。

消息ack

如果消息分配給某個(gè)工作進(jìn)程,但是該工作進(jìn)程未處理完成就崩潰了,可能該消息就丟失了,因?yàn)閞abbitmq一旦把一個(gè)消息分發(fā)給工作進(jìn)程,它就把該消息刪掉了。

為了預(yù)防消息丟失,rabbitmq提供了ack,即工作進(jìn)程在收到消息并處理后,發(fā)送ack給rabbitmq,告知rabbitmq這時(shí)候可以把該消息從隊(duì)列中刪除了。如果工作進(jìn)程掛掉 了,rabbitmq沒(méi)有收到ack,那么會(huì)把該消息 重新分發(fā)給其他工作進(jìn)程。不需要設(shè)置timeout,即使該任務(wù)需要很長(zhǎng)時(shí)間也可以處理。

ack默認(rèn)是開(kāi)啟的,之前我們的工作進(jìn)程顯示指定了no_ack=True

channel.basic_consume(callback, queue="hello")  # 會(huì)啟用ack

帶ack的callback:

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count(".") )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 發(fā)送ack
消息持久化

但是,有時(shí)RabbitMQ重啟了,消息也會(huì)丟失??稍趧?chuàng)建隊(duì)列時(shí)設(shè)置持久化:
(隊(duì)列的性質(zhì)一旦確定無(wú)法改變)

channel.queue_declare(queue="task_queue", durable=True)

同時(shí)在發(fā)送消息時(shí)也得設(shè)置該消息的持久化屬性:

channel.basic_publish(exchange="",

                  routing_key="task_queue",
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # make message persistent
                  ))

但是,如果在RabbitMQ剛接收到消息還沒(méi)來(lái)得及存儲(chǔ),消息還是會(huì)丟失。同時(shí),RabbitMQ也不是在接受到每個(gè)消息都進(jìn)行存盤操作。如果還需要更完善的保證,需要使用publisher confirm。

公平的消息分發(fā)

輪詢模式的消息分發(fā)可能并不公平,例如奇數(shù)的消息都是繁重任務(wù)的話,某些進(jìn)程則會(huì)一直運(yùn)行繁 重任務(wù)。即使某工作進(jìn)程上有積壓的消息未處理,如很多都沒(méi)發(fā)ack,但是RabbitMQ還是會(huì)按照順序發(fā)消息給它??梢栽诮邮苓M(jìn)程中加設(shè)置:

channel.basic_qos(prefetch_count=1)

告知RabbitMQ,這樣在一個(gè)工作進(jìn)程沒(méi)回發(fā)ack情況下是不會(huì)再分配消息給它。

群發(fā)

一般情況下,一條消息是發(fā)送給一個(gè)工作進(jìn)程,然后完成,有時(shí)想把一條消息同時(shí)發(fā)送給多個(gè)進(jìn)程:

exchange

發(fā)送者是不是直接發(fā)送消息到隊(duì)列中的,事實(shí)上發(fā)生者根本不知道消息會(huì)發(fā)送到那個(gè)隊(duì)列,發(fā)送者只能把消息發(fā)送到exchange里。exchange一方面收生產(chǎn)者的消息,另一方面把他們推送到隊(duì)列中。所以作為exchange,它需要知道當(dāng)收到消息時(shí)它需要做什么,是應(yīng)該把它加到一個(gè)特殊的隊(duì)列中還是放到很多的隊(duì)列中,或者丟棄。exchange有direct、topic、headers、fanout等種類,而群發(fā)使用的即fanout。之前在發(fā)布消息時(shí),exchange的值為 "" 即使用default exchange。

channel.exchange_declare(exchange="logs", type="fanout")  # 該exchange會(huì)把消息發(fā)送給所有它知道的隊(duì)列中
臨時(shí)隊(duì)列
result = channel.queue_declare()  # 創(chuàng)建一個(gè)隨機(jī)隊(duì)列
result = channel.queue_declare(exclusive=True)  # 創(chuàng)建一個(gè)隨機(jī)隊(duì)列,同時(shí)在沒(méi)有接收者連接該隊(duì)列后則銷毀它
queue_name = result.method.queue

這樣result.method.queue即是隊(duì)列名稱,在發(fā)送或接受時(shí)即可使用。

綁定exchange 和 隊(duì)列
channel.queue_bind(exchange="logs",
               queue="hello")

logs在發(fā)送消息時(shí)給hello也發(fā)一份。

在發(fā)送消息是使用剛剛創(chuàng)建的 logs exchange
   channel.basic_publish(exchange="logs",
                  routing_key="",
                  body=message)
路由

之前已經(jīng)使用過(guò)bind,即建立exchange和queue的關(guān)系(該隊(duì)列對(duì)來(lái)自該exchange的消息有興趣),bind時(shí)可另外指定routing_key選項(xiàng)。

使用direct exchange

將對(duì)應(yīng)routing key的消息發(fā)送到綁定相同routing key的隊(duì)列中

channel.exchange_declare(exchange="direct_logs",
                     type="direct")

發(fā)送函數(shù),發(fā)布不同severity的消息:

channel.basic_publish(exchange="direct_logs",
                  routing_key=severity,
                  body=message)

接受函數(shù)中綁定對(duì)應(yīng)severity的:

channel.queue_bind(exchange="direct_logs",
                   queue=queue_name,
                   routing_key=severity)
使用topic exchange

之前使用的direct exchange 只能綁定一個(gè)routing key,可以使用這種可以拿.隔開(kāi)routing key的topic exchange,例如:

"stock.usd.nyse" "nyse.vmw"

和direct exchange一樣,在接受者那邊綁定的key與發(fā)送時(shí)指定的routing key相同即可,另外有些特殊的值:

* 代表1個(gè)單詞
# 代表0個(gè)或多個(gè)單詞

如果發(fā)送者發(fā)出的routing key都是3個(gè)部分的,如:celerity.colour.species。

Q1:
*.orange.*  對(duì)應(yīng)的是中間的colour都為orange的

Q2:
*.*.rabbit  對(duì)應(yīng)的是最后部分的species為rabbit的
lazy.#      對(duì)應(yīng)的是第一部分是lazy的

qucik.orange.rabbit Q1 Q2都可接收到,quick.orange.fox 只有Q1能接受到,對(duì)于lazy.pink.rabbit雖然匹配到了Q2兩次,但是只會(huì)發(fā)送一次。如果綁定時(shí)直接綁定#,則會(huì)收到所有的。

 RPC

在遠(yuǎn)程機(jī)器上運(yùn)行一個(gè)函數(shù)然后獲得結(jié)果。

1、客戶端啟動(dòng) 同時(shí)設(shè)置一個(gè)臨時(shí)隊(duì)列用于接受回調(diào),綁定該隊(duì)列

    self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host="localhost"))
    self.channel = self.connection.channel()
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
    self.channel.basic_consume(self.on_response, no_ack=True,
                               queue=self.callback_queue)

2、客戶端發(fā)送rpc請(qǐng)求,同時(shí)附帶reply_to對(duì)應(yīng)回調(diào)隊(duì)列,correlation_id設(shè)置為每個(gè)請(qǐng)求的唯一id(雖然說(shuō)可以為每一次RPC請(qǐng)求都創(chuàng)建一個(gè)回調(diào)隊(duì)列,但是這樣效率不高,如果一個(gè)客戶端只使用一個(gè)隊(duì)列,則需要使用correlation_id來(lái)匹配是哪個(gè)請(qǐng)求),之后阻塞在回調(diào)隊(duì)列直到收到回復(fù)

注意:如果收到了非法的correlation_id直接丟棄即可,因?yàn)橛羞@種情況--服務(wù)器已經(jīng)發(fā)了響應(yīng)但是還沒(méi)發(fā)ack就掛了,等一會(huì)服務(wù)器重啟了又會(huì)重新處理該任務(wù),又發(fā)了一遍相應(yīng),但是這時(shí)那個(gè)請(qǐng)求已經(jīng)被處理掉了

channel.basic_publish(exchange="",
                       routing_key="rpc_queue",
                       properties=pika.BasicProperties(
                             reply_to = self.callback_queue,
                             correlation_id = self.corr_id,
                             ),
                       body=str(n))  # 發(fā)出調(diào)用

while self.response is None:  # 這邊就相當(dāng)于阻塞了
    self.connection.process_data_events()  # 查看回調(diào)隊(duì)列
return int(self.response)

3、請(qǐng)求會(huì)發(fā)送到rpc_queue隊(duì)列
4、RPC服務(wù)器從rpc_queue中取出,執(zhí)行,發(fā)送回復(fù)

channel.basic_consume(on_request, queue="rpc_queue")  # 綁定 等待請(qǐng)求

# 處理之后:
ch.basic_publish(exchange="",
                 routing_key=props.reply_to,
                 properties=pika.BasicProperties(correlation_id = 
                                                     props.correlation_id),
                 body=str(response))  # 發(fā)送回復(fù)到回調(diào)隊(duì)列
ch.basic_ack(delivery_tag = method.delivery_tag)  # 發(fā)送ack

5、客戶端從回調(diào)隊(duì)列中取出數(shù)據(jù),檢查correlation_id,執(zhí)行相應(yīng)操作

if self.corr_id == props.correlation_id:
        self.response = body

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/38486.html

相關(guān)文章

  • 快速入門spring-amqp

    摘要:它還為具有偵聽(tīng)器容器的消息驅(qū)動(dòng)的提供支持。接收消息當(dāng)存在基礎(chǔ)結(jié)構(gòu)時(shí),可以使用任何來(lái)注釋以創(chuàng)建偵聽(tīng)器端點(diǎn)。默認(rèn)情況下,如果禁用重試并且偵聽(tīng)器拋出異常,則會(huì)無(wú)限期地重試傳遞。 Spring-amqp-tutorial Spring AMQP項(xiàng)目將核心Spring概念應(yīng)用于基于AMQP的消息傳遞解決方案的開(kāi)發(fā)。它提供了一個(gè)模板作為發(fā)送和接收消息的高級(jí)抽象。它還為具有偵聽(tīng)器容器的消息驅(qū)動(dòng)的PO...

    鄒強(qiáng) 評(píng)論0 收藏0
  • 寫這么多系列博客,怪不得找不到女朋友

    摘要:前提好幾周沒(méi)更新博客了,對(duì)不斷支持我博客的童鞋們說(shuō)聲抱歉了。熟悉我的人都知道我寫博客的時(shí)間比較早,而且堅(jiān)持的時(shí)間也比較久,一直到現(xiàn)在也是一直保持著更新?tīng)顟B(tài)。 showImg(https://segmentfault.com/img/remote/1460000014076586?w=1920&h=1080); 前提 好幾周沒(méi)更新博客了,對(duì)不斷支持我博客的童鞋們說(shuō)聲:抱歉了!。自己這段時(shí)...

    JerryWangSAP 評(píng)論0 收藏0
  • Spring Cloud構(gòu)建微服務(wù)架構(gòu):消息驅(qū)動(dòng)的微服務(wù)(入門)【Dalston版】

    摘要:它通過(guò)使用來(lái)連接消息代理中間件以實(shí)現(xiàn)消息事件驅(qū)動(dòng)的微服務(wù)應(yīng)用。該示例主要目標(biāo)是構(gòu)建一個(gè)基于的微服務(wù)應(yīng)用,這個(gè)微服務(wù)應(yīng)用將通過(guò)使用消息中間件來(lái)接收消息并將消息打印到日志中。下面我們通過(guò)編寫生產(chǎn)消息的單元測(cè)試用例來(lái)完善我們的入門內(nèi)容。 之前在寫Spring Boot基礎(chǔ)教程的時(shí)候?qū)戇^(guò)一篇《Spring Boot中使用RabbitMQ》。在該文中,我們通過(guò)簡(jiǎn)單的配置和注解就能實(shí)現(xiàn)向Rabbi...

    smallStone 評(píng)論0 收藏0
  • RabbitMQ快速入門

    摘要:就是交換機(jī)生產(chǎn)者發(fā)送消息給交換機(jī),然后由交換機(jī)將消息轉(zhuǎn)發(fā)給隊(duì)列。對(duì)應(yīng)于中則是發(fā)送一個(gè),處理完成之后將其返回給。這樣來(lái)說(shuō)一個(gè)是級(jí)別而不是級(jí)別的了。當(dāng)然這些也都是官網(wǎng)的入門例子,后續(xù)有機(jī)會(huì)的話再深入研究。 一、前言 RabbitMQ其實(shí)是我最早接觸的一個(gè)MQ框架,我記得當(dāng)時(shí)是在大學(xué)的時(shí)候跑到圖書(shū)館一個(gè)人去看,由于RabbitMQ官網(wǎng)的英文還不算太難,因此也是參考官網(wǎng)學(xué)習(xí)的,一共有6章,當(dāng)時(shí)...

    Moxmi 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<