成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

RabbitMQ三四事

Wildcard / 3254人閱讀

摘要:為了解決以上問(wèn)題,我們需要使用的生產(chǎn)者確認(rèn)模式。在這樣的機(jī)制下,即使有一個(gè)消費(fèi)者崩潰也不會(huì)丟失任何消息。即使處理一條消息會(huì)花費(fèi)很長(zhǎng)的時(shí)間。一些問(wèn)題這個(gè)庫(kù)提供了心跳檢測(cè)的功能選項(xiàng),但是沒(méi)有做自動(dòng)重連的。參考文章深入學(xué)習(xí)四的模式

數(shù)據(jù)的持久化

對(duì)于非常健壯穩(wěn)定的后臺(tái)系統(tǒng),我們必須得考慮到各種宕機(jī)的情況:物理宕機(jī),應(yīng)用自身出錯(cuò)崩潰等,而這個(gè)時(shí)候我們的應(yīng)用需要做到重啟后數(shù)據(jù)依舊不丟失,這個(gè)問(wèn)題就是數(shù)據(jù)持久化,也就是說(shuō)數(shù)據(jù)持久化到了磁盤(pán)。
在RabbitMQ中,如果要保證消息發(fā)送到broker,我們首先需要做到三點(diǎn)

持久化的exchange(交換器):聲明時(shí)開(kāi)啟durable選項(xiàng)

持久化的queue(隊(duì)列):聲明時(shí)開(kāi)啟durable選項(xiàng)

持久化的messagedelivery_mode設(shè)置為2(php,python之類(lèi)的庫(kù),2可以換成更友好的常量),在node的amqp.node庫(kù)中是設(shè)置persistenttrue

需要注意的一點(diǎn)是,持久化會(huì)造成性能損耗(寫(xiě)磁盤(pán)操作),但為了保證生產(chǎn)環(huán)境的數(shù)據(jù)一致性,我們必須這么做。

發(fā)送消息的confirm機(jī)制

其實(shí)光光做到以上三點(diǎn),數(shù)據(jù)依舊有丟失的可能,因?yàn)樵诳蛻舳顺晒φ{(diào)用api存入消息之后,RabbitMQ還需要一段時(shí)間(很短,但不可忽略)才能落盤(pán),RabbitMQ并不是為每條消息都做fsync的處理,可能僅僅保存到cache中而不是物理磁盤(pán)上,而在這段時(shí)間內(nèi)RabbitMQ broker發(fā)生crash, 消息保存到cache但是還沒(méi)來(lái)得及落盤(pán),那么這些消息將會(huì)丟失。
為了解決以上問(wèn)題,我們需要使用RabbitMQ的生產(chǎn)者確認(rèn)模式
為了開(kāi)啟確認(rèn)模式,需要生產(chǎn)者將channel設(shè)置成confirm模式,一旦channel進(jìn)入confirm模式,所有在該信道上面發(fā)布的消息都將會(huì)被指派一個(gè)唯一的ID(從1開(kāi)始),一旦消息被投遞到所有匹配的隊(duì)列之后,broker就會(huì)發(fā)送一個(gè)確認(rèn)給生產(chǎn)者(包含消息的唯一ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)列了,如果消息和隊(duì)列是可持久化的,那么確認(rèn)消息會(huì)在將消息寫(xiě)入磁盤(pán)之后發(fā)出,broker回傳給生產(chǎn)者的確認(rèn)消息中delivery-tag域包含了確認(rèn)消息的序列號(hào)。

confirm模式最大的好處在于他是異步的,一旦發(fā)布一條消息,生產(chǎn)者應(yīng)用程序就可以在等信道返回確認(rèn)的同時(shí)繼續(xù)發(fā)送下一條消息,當(dāng)消息最終得到確認(rèn)之后,生產(chǎn)者應(yīng)用便可以通過(guò)回調(diào)方法來(lái)處理該確認(rèn)消息,如果RabbitMQ因?yàn)樽陨韮?nèi)部錯(cuò)誤導(dǎo)致消息丟失,就會(huì)發(fā)送一條nack消息,生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理該nack消息  (來(lái)自參考1)
簡(jiǎn)單confirm示例

示例代碼使用NodeJS實(shí)現(xiàn),RabbitMQ服務(wù)可以使用上一篇RabbitMQ二三事的docker-compose.yml快速啟動(dòng)

const QUEUE_NAME = "test_queue"
const config = require("./config")
const amqp = require("amqplib")

async function getMQConnection() {
    return await amqp.connect({
        protocol: "amqp",
        hostname: config.MQ.host,
        port: config.MQ.port,
        username: config.MQ.user,
        password: config.MQ.pass,
        locale: "en_US",
        frameMax: 0,
        heartbeat: 5, // 心跳
        vhost: config.MQ.vhost,
    })
}

async function run(rmqConn, msgArr) {
    try {
        const channel = await rmqConn.createConfirmChannel() // 開(kāi)啟confirm
        const exchangeName = `${QUEUE_NAME}_exchange`
        await channel.assertExchange(exchangeName, "direct", { durable: true, autoDelete: false }) // 不存在exchange就新建exchange
        await channel.assertQueue(QUEUE_NAME, {durable: true, autoDelete: false}) // 不存在queue就新建
        await channel.bindQueue(QUEUE_NAME, exchangeName, QUEUE_NAME) // 綁定交換器

        // queue name當(dāng)routing key
        msgArr.forEach(str => {
            channel.publish(exchangeName, QUEUE_NAME, Buffer.from(str), { persistent: true, mandatory: true })
        })
        await channel.waitForConfirms()
        console.log("發(fā)送批量數(shù)據(jù)成功")
        await channel.close()
    } catch(err) {
        // do something with err
        console.log("發(fā)送批量數(shù)據(jù)失敗:" + err.message)
    }
}

async function testSendBatchMsg() {
    const conn = await getMQConnection()
    await run(conn, [
        "cat",
        "dog",
        "pig",
        "mouse",
        "mouse",
        "penguin"
    ])
    await conn.close()
}
testSendBatchMsg()
說(shuō)明

assertExchangeassertQueue是保證交換器和隊(duì)列一定存在,這里的exchange是簡(jiǎn)單的direct交換器
ConfirmChannel#publish方法不返回promise

消費(fèi)消息的ack機(jī)制

現(xiàn)在我們需要考慮我們的消費(fèi)者了,消費(fèi)者也會(huì)遇到程序出錯(cuò)或者物理宕機(jī)問(wèn)題,RabbitMQ官方也給出了一套解決方案,和confirm機(jī)制類(lèi)似,就是ack機(jī)制(Message acknowledgment).
在ack機(jī)制中,消費(fèi)者在自己處理完業(yè)務(wù)邏輯后,需要發(fā)送一個(gè)ack消息,然后broker才認(rèn)為這條消息被正確消費(fèi),然后從內(nèi)存和磁盤(pán)中移除掉它,只要沒(méi)收到消費(fèi)者的acknowledgment,broker就會(huì)一直保存著這條消息.如果一個(gè)消費(fèi)者崩潰(斷開(kāi)了連接)卻沒(méi)有發(fā)送ack,broker會(huì)理解為這個(gè)消息沒(méi)有處理完全,然后交給另一個(gè)消費(fèi)者去重新處理。在這樣的機(jī)制下,即使有一個(gè)消費(fèi)者崩潰也不會(huì)丟失任何消息。

簡(jiǎn)單ack示例
const QUEUE_NAME = "test_queue"
const config = require("./config")
const amqp = require("amqplib")

async function getMQConnection() {
    return await amqp.connect({
        protocol: "amqp",
        hostname: config.MQ.host,
        port: config.MQ.port,
        username: config.MQ.user,
        password: config.MQ.pass,
        locale: "en_US",
        frameMax: 0,
        heartbeat: 5, // 心跳
        vhost: config.MQ.vhost,
    })
}

async function sleep(ms) {
    return new Promise(resolve => 
        setTimeout(resolve, ms))
}

async function start() {
    const mqConn = await getMQConnection()
    console.log("connecting RabbitMQ successfully!")
    const channel = await mqConn.createChannel()
    const exchangeName = `${QUEUE_NAME}_exchange`
    await channel.assertExchange(exchangeName, "direct", { durable: true, autoDelete: false })
    await channel.assertQueue(QUEUE_NAME, {durable: true, autoDelete: false})
    await channel.bindQueue(QUEUE_NAME, exchangeName, QUEUE_NAME)

    channel.consume(QUEUE_NAME, async function(msg) {
        console.log("Received msg: %s from %s", QUEUE_NAME, msg.content.toString())
        console.log("consuming message...")
        try {
            await sleep(500) // 模擬消費(fèi)消息
            console.log("consuming ends")
            channel.ack(msg) // 消費(fèi)成功,發(fā)送ack
        } catch(e) {
            console.log("consuming failed: " + e.message)
            channel.nack(msg) // 消費(fèi)失敗,發(fā)送nack
        }
    }, {noAck: false}) // ack
}

start()
注意

自動(dòng)ack是默認(rèn)打開(kāi)的,也就是說(shuō)消息發(fā)送到消費(fèi)者的時(shí)候就被自動(dòng)ack了,而很多情況下,我們想要手動(dòng)ack,所以我們需要顯式設(shè)置autoAsk=false關(guān)閉這種機(jī)制(在示例中是noAck: false)

ack沒(méi)有任何超時(shí)限制;只有當(dāng)消費(fèi)者斷開(kāi)時(shí),broker才會(huì)重新投遞。即使處理一條消息會(huì)花費(fèi)很長(zhǎng)的時(shí)間。

一些問(wèn)題

amqp.node這個(gè)庫(kù)提供了心跳檢測(cè)的功能(heartbeat選項(xiàng)),但是沒(méi)有做自動(dòng)重連的。
對(duì)于heartbeat的值,RabbitMQ官網(wǎng)有說(shuō)明

Several years worth of feedback from the users and client library
maintainers suggest that values lower than 5 seconds are fairly likely
to cause false positives, and values of 1 second or lower are very
likely to do so. Values within the 5 to 20 seconds range are optimal
for most environments.

所以心跳不宜設(shè)置的太低(因?yàn)槎虝旱木W(wǎng)絡(luò)擁塞或者流控制),太低容易導(dǎo)致誤報(bào),根據(jù)經(jīng)驗(yàn)5s-20s是比較合理的。

參考文章:

深入學(xué)習(xí)RabbitMQ(四):channel的confirm模式

when-publishes-are-confirmed

Channel-oriented API reference

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/31527.html

相關(guān)文章

  • 前端(二)之 CSS

    摘要:前端之前端之前言前言昨天學(xué)習(xí)了標(biāo)記式語(yǔ)言,也就是無(wú)邏輯語(yǔ)言。今天學(xué)習(xí),被稱之為網(wǎng)頁(yè)的化妝師。為前端頁(yè)面的樣式,由選擇器作用域與樣式塊組成。年初,組織負(fù)責(zé)的工作組開(kāi)始討論第一版中沒(méi)有涉及到的問(wèn)題。其討論結(jié)果組成了年月出版的規(guī)范第二版。前端之 CSS 前言 昨天學(xué)習(xí)了標(biāo)記式語(yǔ)言,也就是無(wú)邏輯語(yǔ)言。了解了網(wǎng)頁(yè)的骨架是什么構(gòu)成的,了解了常用標(biāo)簽,兩個(gè)指令以及轉(zhuǎn)義字符;其中標(biāo)簽可以分為兩大類(lèi): 一類(lèi)...

    張率功 評(píng)論0 收藏0
  • Java多線程進(jìn)階(三四)—— J.U.C之collections框架:PriorityBlocki

    摘要:初始狀態(tài)對(duì)應(yīng)二叉樹(shù)結(jié)構(gòu)將頂點(diǎn)與最后一個(gè)結(jié)點(diǎn)調(diào)換即將頂點(diǎn)與最后一個(gè)結(jié)點(diǎn)交換,然后將索引為止置。 showImg(https://segmentfault.com/img/bVbgOtL?w=1600&h=800); 本文首發(fā)于一世流云專(zhuān)欄:https://segmentfault.com/blog... 一、PriorityBlockingQueue簡(jiǎn)介 PriorityBlockin...

    levius 評(píng)論0 收藏0
  • RabbitMQ】——centos7安裝rabbitmq教程 以及 PHP開(kāi)啟rabbitmq擴(kuò)展

    摘要:第一步安裝因?yàn)槭钦Z(yǔ)言編寫(xiě)的,所以我們首先需要安裝第二步安裝官網(wǎng)提供的安裝方式本人安裝成功的方式第三步查看是否已經(jīng)安裝好了,能查到說(shuō)明已經(jīng)安裝完成了。 第一步:安裝Erlang 因?yàn)閞abbitMQ是Erlang語(yǔ)言編寫(xiě)的,所以我們首先需要安裝Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...

    lscho 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<