摘要:為了解決以上問(wèn)題,我們需要使用的生產(chǎn)者確認(rèn)模式。在這樣的機(jī)制下,即使有一個(gè)消費(fèi)者崩潰也不會(huì)丟失任何消息。即使處理一條消息會(huì)花費(fèi)很長(zhǎng)的時(shí)間。一些問(wèn)題這個(gè)庫(kù)提供了心跳檢測(cè)的功能選項(xiàng),但是沒(méi)有做自動(dòng)重連的。參考文章深入學(xué)習(xí)四的模式
數(shù)據(jù)的持久化
對(duì)于非常健壯穩(wěn)定的后臺(tái)系統(tǒng),我們必須得考慮到各種宕機(jī)的情況:物理宕機(jī),應(yīng)用自身出錯(cuò)崩潰等,而這個(gè)時(shí)候我們的應(yīng)用需要做到重啟后數(shù)據(jù)依舊不丟失,這個(gè)問(wèn)題就是數(shù)據(jù)持久化,也就是說(shuō)數(shù)據(jù)持久化到了磁盤(pán)。
在RabbitMQ中,如果要保證消息發(fā)送到broker,我們首先需要做到三點(diǎn)
持久化的exchange(交換器):聲明時(shí)開(kāi)啟durable選項(xiàng)
持久化的queue(隊(duì)列):聲明時(shí)開(kāi)啟durable選項(xiàng)
持久化的message:delivery_mode設(shè)置為2(php,python之類(lèi)的庫(kù),2可以換成更友好的常量),在node的amqp.node庫(kù)中是設(shè)置persistent為true
需要注意的一點(diǎn)是,持久化會(huì)造成性能損耗(寫(xiě)磁盤(pán)操作),但為了保證生產(chǎn)環(huán)境的數(shù)據(jù)一致性,我們必須這么做。
發(fā)送消息的confirm機(jī)制其實(shí)光光做到以上三點(diǎn),數(shù)據(jù)依舊有丟失的可能,因?yàn)樵诳蛻舳顺晒φ{(diào)用api存入消息之后,RabbitMQ還需要一段時(shí)間(很短,但不可忽略)才能落盤(pán),RabbitMQ并不是為每條消息都做fsync的處理,可能僅僅保存到cache中而不是物理磁盤(pán)上,而在這段時(shí)間內(nèi)RabbitMQ broker發(fā)生crash, 消息保存到cache但是還沒(méi)來(lái)得及落盤(pán),那么這些消息將會(huì)丟失。
為了解決以上問(wèn)題,我們需要使用RabbitMQ的生產(chǎn)者確認(rèn)模式。
為了開(kāi)啟確認(rèn)模式,需要生產(chǎn)者將channel設(shè)置成confirm模式,一旦channel進(jìn)入confirm模式,所有在該信道上面發(fā)布的消息都將會(huì)被指派一個(gè)唯一的ID(從1開(kāi)始),一旦消息被投遞到所有匹配的隊(duì)列之后,broker就會(huì)發(fā)送一個(gè)確認(rèn)給生產(chǎn)者(包含消息的唯一ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)列了,如果消息和隊(duì)列是可持久化的,那么確認(rèn)消息會(huì)在將消息寫(xiě)入磁盤(pán)之后發(fā)出,broker回傳給生產(chǎn)者的確認(rèn)消息中delivery-tag域包含了確認(rèn)消息的序列號(hào)。
confirm模式最大的好處在于他是異步的,一旦發(fā)布一條消息,生產(chǎn)者應(yīng)用程序就可以在等信道返回確認(rèn)的同時(shí)繼續(xù)發(fā)送下一條消息,當(dāng)消息最終得到確認(rèn)之后,生產(chǎn)者應(yīng)用便可以通過(guò)回調(diào)方法來(lái)處理該確認(rèn)消息,如果RabbitMQ因?yàn)樽陨韮?nèi)部錯(cuò)誤導(dǎo)致消息丟失,就會(huì)發(fā)送一條nack消息,生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理該nack消息 (來(lái)自參考1)簡(jiǎn)單confirm示例
示例代碼使用NodeJS實(shí)現(xiàn),RabbitMQ服務(wù)可以使用上一篇RabbitMQ二三事的docker-compose.yml快速啟動(dòng)
const QUEUE_NAME = "test_queue" const config = require("./config") const amqp = require("amqplib") async function getMQConnection() { return await amqp.connect({ protocol: "amqp", hostname: config.MQ.host, port: config.MQ.port, username: config.MQ.user, password: config.MQ.pass, locale: "en_US", frameMax: 0, heartbeat: 5, // 心跳 vhost: config.MQ.vhost, }) } async function run(rmqConn, msgArr) { try { const channel = await rmqConn.createConfirmChannel() // 開(kāi)啟confirm const exchangeName = `${QUEUE_NAME}_exchange` await channel.assertExchange(exchangeName, "direct", { durable: true, autoDelete: false }) // 不存在exchange就新建exchange await channel.assertQueue(QUEUE_NAME, {durable: true, autoDelete: false}) // 不存在queue就新建 await channel.bindQueue(QUEUE_NAME, exchangeName, QUEUE_NAME) // 綁定交換器 // queue name當(dāng)routing key msgArr.forEach(str => { channel.publish(exchangeName, QUEUE_NAME, Buffer.from(str), { persistent: true, mandatory: true }) }) await channel.waitForConfirms() console.log("發(fā)送批量數(shù)據(jù)成功") await channel.close() } catch(err) { // do something with err console.log("發(fā)送批量數(shù)據(jù)失敗:" + err.message) } } async function testSendBatchMsg() { const conn = await getMQConnection() await run(conn, [ "cat", "dog", "pig", "mouse", "mouse", "penguin" ]) await conn.close() } testSendBatchMsg()說(shuō)明
assertExchange和assertQueue是保證交換器和隊(duì)列一定存在,這里的exchange是簡(jiǎn)單的direct交換器
ConfirmChannel#publish方法不返回promise
現(xiàn)在我們需要考慮我們的消費(fèi)者了,消費(fèi)者也會(huì)遇到程序出錯(cuò)或者物理宕機(jī)問(wèn)題,RabbitMQ官方也給出了一套解決方案,和confirm機(jī)制類(lèi)似,就是ack機(jī)制(Message acknowledgment).
在ack機(jī)制中,消費(fèi)者在自己處理完業(yè)務(wù)邏輯后,需要發(fā)送一個(gè)ack消息,然后broker才認(rèn)為這條消息被正確消費(fèi),然后從內(nèi)存和磁盤(pán)中移除掉它,只要沒(méi)收到消費(fèi)者的acknowledgment,broker就會(huì)一直保存著這條消息.如果一個(gè)消費(fèi)者崩潰(斷開(kāi)了連接)卻沒(méi)有發(fā)送ack,broker會(huì)理解為這個(gè)消息沒(méi)有處理完全,然后交給另一個(gè)消費(fèi)者去重新處理。在這樣的機(jī)制下,即使有一個(gè)消費(fèi)者崩潰也不會(huì)丟失任何消息。
const QUEUE_NAME = "test_queue" const config = require("./config") const amqp = require("amqplib") async function getMQConnection() { return await amqp.connect({ protocol: "amqp", hostname: config.MQ.host, port: config.MQ.port, username: config.MQ.user, password: config.MQ.pass, locale: "en_US", frameMax: 0, heartbeat: 5, // 心跳 vhost: config.MQ.vhost, }) } async function sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)) } async function start() { const mqConn = await getMQConnection() console.log("connecting RabbitMQ successfully!") const channel = await mqConn.createChannel() const exchangeName = `${QUEUE_NAME}_exchange` await channel.assertExchange(exchangeName, "direct", { durable: true, autoDelete: false }) await channel.assertQueue(QUEUE_NAME, {durable: true, autoDelete: false}) await channel.bindQueue(QUEUE_NAME, exchangeName, QUEUE_NAME) channel.consume(QUEUE_NAME, async function(msg) { console.log("Received msg: %s from %s", QUEUE_NAME, msg.content.toString()) console.log("consuming message...") try { await sleep(500) // 模擬消費(fèi)消息 console.log("consuming ends") channel.ack(msg) // 消費(fèi)成功,發(fā)送ack } catch(e) { console.log("consuming failed: " + e.message) channel.nack(msg) // 消費(fèi)失敗,發(fā)送nack } }, {noAck: false}) // ack } start()注意
自動(dòng)ack是默認(rèn)打開(kāi)的,也就是說(shuō)消息發(fā)送到消費(fèi)者的時(shí)候就被自動(dòng)ack了,而很多情況下,我們想要手動(dòng)ack,所以我們需要顯式設(shè)置autoAsk=false關(guān)閉這種機(jī)制(在示例中是noAck: false)
ack沒(méi)有任何超時(shí)限制;只有當(dāng)消費(fèi)者斷開(kāi)時(shí),broker才會(huì)重新投遞。即使處理一條消息會(huì)花費(fèi)很長(zhǎng)的時(shí)間。
一些問(wèn)題amqp.node這個(gè)庫(kù)提供了心跳檢測(cè)的功能(heartbeat選項(xiàng)),但是沒(méi)有做自動(dòng)重連的。
對(duì)于heartbeat的值,RabbitMQ官網(wǎng)有說(shuō)明
Several years worth of feedback from the users and client library
maintainers suggest that values lower than 5 seconds are fairly likely
to cause false positives, and values of 1 second or lower are very
likely to do so. Values within the 5 to 20 seconds range are optimal
for most environments.
所以心跳不宜設(shè)置的太低(因?yàn)槎虝旱木W(wǎng)絡(luò)擁塞或者流控制),太低容易導(dǎo)致誤報(bào),根據(jù)經(jīng)驗(yàn)5s-20s是比較合理的。
參考文章:
深入學(xué)習(xí)RabbitMQ(四):channel的confirm模式
when-publishes-are-confirmed
Channel-oriented API reference
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/31527.html
摘要:前端之前端之前言前言昨天學(xué)習(xí)了標(biāo)記式語(yǔ)言,也就是無(wú)邏輯語(yǔ)言。今天學(xué)習(xí),被稱之為網(wǎng)頁(yè)的化妝師。為前端頁(yè)面的樣式,由選擇器作用域與樣式塊組成。年初,組織負(fù)責(zé)的工作組開(kāi)始討論第一版中沒(méi)有涉及到的問(wèn)題。其討論結(jié)果組成了年月出版的規(guī)范第二版。前端之 CSS 前言 昨天學(xué)習(xí)了標(biāo)記式語(yǔ)言,也就是無(wú)邏輯語(yǔ)言。了解了網(wǎng)頁(yè)的骨架是什么構(gòu)成的,了解了常用標(biāo)簽,兩個(gè)指令以及轉(zhuǎn)義字符;其中標(biāo)簽可以分為兩大類(lèi): 一類(lèi)...
摘要:初始狀態(tài)對(duì)應(yīng)二叉樹(shù)結(jié)構(gòu)將頂點(diǎn)與最后一個(gè)結(jié)點(diǎn)調(diào)換即將頂點(diǎn)與最后一個(gè)結(jié)點(diǎn)交換,然后將索引為止置。 showImg(https://segmentfault.com/img/bVbgOtL?w=1600&h=800); 本文首發(fā)于一世流云專(zhuān)欄:https://segmentfault.com/blog... 一、PriorityBlockingQueue簡(jiǎn)介 PriorityBlockin...
摘要:第一步安裝因?yàn)槭钦Z(yǔ)言編寫(xiě)的,所以我們首先需要安裝第二步安裝官網(wǎng)提供的安裝方式本人安裝成功的方式第三步查看是否已經(jīng)安裝好了,能查到說(shuō)明已經(jīng)安裝完成了。 第一步:安裝Erlang 因?yàn)閞abbitMQ是Erlang語(yǔ)言編寫(xiě)的,所以我們首先需要安裝Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...
閱讀 1603·2021-09-30 09:47
閱讀 3608·2021-09-22 15:05
閱讀 2842·2021-08-30 09:44
閱讀 3626·2019-08-30 15:55
閱讀 1377·2019-08-30 13:08
閱讀 1332·2019-08-29 16:40
閱讀 557·2019-08-29 12:45
閱讀 1393·2019-08-29 11:25