摘要:官方提供的教程,是基于回調(diào)的。下面將給出基于式的寫法。并且實(shí)現(xiàn)動(dòng)態(tài)的隊(duì)列綁定初始化配置地址交換機(jī)名稱讀取在跑多實(shí)例時(shí),例如在中,可以獲取當(dāng)前的名稱多實(shí)例時(shí),寫日志,或者建立連接時(shí),最好帶上名稱,如果出現(xiàn)問題,也比較好定位哪個(gè)出現(xiàn)的問題。
RabbitMQ官方提供的教程https://www.rabbitmq.com/tuto...,是基于回調(diào)的。
下面將給出基于Promise式的寫法。并且實(shí)現(xiàn)動(dòng)態(tài)的隊(duì)列綁定
初始化配置const amqp = require("amqplib") // rabbitMQ地址 const {amqpAddrHost} = require("../config/index.js") // 交換機(jī)名稱 const ex = "amq.topic" const amqpAddr = `amqp://${amqpAddrHost}` // 讀取HOSTNAME, 在跑多實(shí)例時(shí),例如在k8s中,HOSTNAME可以獲取當(dāng)前pod的名稱 // 多實(shí)例時(shí),寫日志,或者建立連接時(shí),最好帶上pod名稱,如果出現(xiàn)問題,也比較好定位哪個(gè)pod出現(xiàn)的問題。 const hostName = process.env.HOSTNAME // 隊(duì)列的屬性設(shè)置 // 一般來說,最好設(shè)置隊(duì)列自動(dòng)刪除autoDelete,當(dāng)鏈接斷開時(shí),隊(duì)列也會(huì)刪除,這樣不會(huì)產(chǎn)生非常多的無用隊(duì)列 // durable是用來的持久化的,最好也可以設(shè)置成不持久化 const queueAttr = {autoDelete: true, durable: false} // 定義channel的引用,當(dāng)鏈接建立時(shí),所有方法都可以通過引用CH來獲取channel方法 let CH = null向隊(duì)列發(fā)送消息的函數(shù)
// 向隊(duì)列發(fā)送消息的函數(shù) function publishMessage (msg) { if (!CH) { return "" } msg = JSON.stringify(msg) // 指定交換機(jī)ex, routing key, 以及消息的內(nèi)容 CH.publish(ex, eventBusTopic, Buffer.from(msg)) }當(dāng)鏈接rabbitMQ斷開時(shí),要主動(dòng)去重連
function reconnectRabbitMq () { log.info("reconnect_rabbit_mq") connectRabbitMq() }連接rabbitMQ的主要函數(shù)
function connectRabbitMq () { amqp.connect(amqpAddr, { // 設(shè)置connection_name的屬性,可以在rabbitMQ的控制臺(tái)的UI上,看到連接是來自哪個(gè)實(shí)例 clientProperties: { connection_name: hostName } }) .then((conn) => { log.info("rabbitmq_connect_successd") // 一定要加上鏈接的報(bào)錯(cuò)事件處理,否則一旦報(bào)error錯(cuò),如果不處理這個(gè)錯(cuò)誤,程序就會(huì)崩潰 // error是個(gè)特別的事件,務(wù)必要處理的 // 報(bào)錯(cuò)就直接去重連 conn.on("error", (err) => { log.error("connect_error " + err.message, err) reconnectRabbitMq() }) // 創(chuàng)建channel return conn.createChannel() }) .then((ch) => { CH = ch // 初始化交換機(jī) ch.assertExchange(ex, "topic", {durable: true}) // 初始化一個(gè)隊(duì)列,隊(duì)列名就用hostName, 比較容易從對(duì)列名上知道是哪個(gè)實(shí)例創(chuàng)建的隊(duì)列 return ch.assertQueue(hostName, queueAttr) }) .then((q) => { // 可以在隊(duì)列初始化完畢就立即綁定routing key, 也可以暫時(shí)不綁定,后續(xù)動(dòng)態(tài)的綁定 // CH.bindQueue(q.queue, ex, "some.topic.aaa") // 消費(fèi)者,獲取消息 CH.consume(q.queue, (msg) => { var _msg = msg.content.toString() var MSG = JSON.parse(_msg) log.info(_msg, MSG) }, {noAck: true}) }) .catch((err) => { console.log(err) }) }動(dòng)態(tài)給隊(duì)列綁定或者解綁routing key
function toggleBindQueue (routingKey, bind) { return new Promise((resolve, reject) => { if (!CH) { log.error("channel not established") reject(new Error("channel not established")) return "" } // 初始化隊(duì)列,如果隊(duì)列已經(jīng)存在,就會(huì)直接使用 CH.assertQueue(`${hostName}`, queueAttr) .then((q) => { // 如果bind是true,就綁定。否則就解綁 if (bind) { log.info(`bindQueue ${hostName} ${topic}`) return CH.bindQueue(q.queue, ex, topic) } else { return CH.unbindQueue(q.queue, ex, topic) } }) .then((res) => { resolve() }) .catch((err) => { reject(err) log.error(err) }) }) } module.exports = { connectRabbitMq, toggleBindQueue, publishMessage }使用方法
加入你的服務(wù)端用的是Express, 那么在app.js中可以
... const {connectRabbitMq} = require("./connect-mq.js") connectRabbitMq() ...完整代碼
// onnect-mq.js const amqp = require("amqplib") // rabbitMQ地址 const {amqpAddrHost} = require("../config/index.js") // 交換機(jī)名稱 const ex = "amq.topic" const amqpAddr = `amqp://${amqpAddrHost}` // 讀取HOSTNAME, 在跑多實(shí)例時(shí),例如在k8s中,HOSTNAME可以獲取當(dāng)前pod的名稱 // 多實(shí)例時(shí),寫日志,或者建立連接時(shí),最好帶上pod名稱,如果出現(xiàn)問題,也比較好定位哪個(gè)pod出現(xiàn)的問題。 const hostName = process.env.HOSTNAME // 隊(duì)列的屬性設(shè)置 // 一般來說,最好設(shè)置隊(duì)列自動(dòng)刪除autoDelete,當(dāng)鏈接斷開時(shí),隊(duì)列也會(huì)刪除,這樣不會(huì)產(chǎn)生非常多的無用隊(duì)列 // durable是用來的持久化的,最好也可以設(shè)置成不持久化 const queueAttr = {autoDelete: true, durable: false} // 定義channel的引用,當(dāng)鏈接建立時(shí),所有方法都可以通過引用CH來獲取channel方法 let CH = null // 向隊(duì)列發(fā)送消息的函數(shù) function publishMessage (msg) { if (!CH) { return "" } msg = JSON.stringify(msg) // 指定交換機(jī)ex, routing key, 以及消息的內(nèi)容 CH.publish(ex, eventBusTopic, Buffer.from(msg)) } // 當(dāng)鏈接rabbitMQ斷開時(shí),要主動(dòng)去重連 function reconnectRabbitMq () { log.info("reconnect_rabbit_mq") connectRabbitMq() } // 鏈接rabbitMQ的主要函數(shù) function connectRabbitMq () { amqp.connect(amqpAddr, { // 設(shè)置connection_name的屬性,可以在rabbitMQ的控制臺(tái)的UI上,看到鏈接是來自哪個(gè)實(shí)例 clientProperties: { connection_name: hostName } }) .then((conn) => { log.info("rabbitmq_connect_successd") // 一定要加上鏈接的報(bào)錯(cuò)事件處理,否則一旦報(bào)error錯(cuò),如果不處理這個(gè)錯(cuò)誤,程序就會(huì)崩潰 // error是個(gè)特別的事件,務(wù)必要處理的 // 報(bào)錯(cuò)就直接去重連 conn.on("error", (err) => { log.error("connect_error " + err.message, err) reconnectRabbitMq() }) // 創(chuàng)建channel return conn.createChannel() }) .then((ch) => { CH = ch // 初始化交換機(jī) ch.assertExchange(ex, "topic", {durable: true}) // 初始化一個(gè)隊(duì)列,隊(duì)列名就用hostName, 比較容易從對(duì)列名上知道是哪個(gè)實(shí)例創(chuàng)建的隊(duì)列 return ch.assertQueue(hostName, queueAttr) }) .then((q) => { // 可以在隊(duì)列初始化完畢就立即綁定routing key, 也可以暫時(shí)不綁定,后續(xù)動(dòng)態(tài)的綁定 // CH.bindQueue(q.queue, ex, "some.topic.aaa") // 消費(fèi)者,獲取消息 CH.consume(q.queue, (msg) => { var _msg = msg.content.toString() var MSG = JSON.parse(_msg) log.info(_msg, MSG) }, {noAck: true}) }) .catch((err) => { console.log(err) }) } // 動(dòng)態(tài)給隊(duì)列綁定或者解綁routing key function toggleBindQueue (routingKey, bind) { return new Promise((resolve, reject) => { if (!CH) { log.error("channel not established") reject(new Error("channel not established")) return "" } // 初始化隊(duì)列,如果隊(duì)列已經(jīng)存在,就會(huì)直接使用 CH.assertQueue(`${hostName}`, queueAttr) .then((q) => { // 如果bind是true,就綁定。否則就解綁 if (bind) { log.info(`bindQueue ${hostName} ${topic}`) return CH.bindQueue(q.queue, ex, topic) } else { return CH.unbindQueue(q.queue, ex, topic) } }) .then((res) => { resolve() }) .catch((err) => { reject(err) log.error(err) }) }) } module.exports = { connectRabbitMq, toggleBindQueue, publishMessage }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/32755.html
摘要:官方提供的教程,是基于回調(diào)的。下面將給出基于式的寫法。并且實(shí)現(xiàn)動(dòng)態(tài)的隊(duì)列綁定初始化配置地址交換機(jī)名稱讀取在跑多實(shí)例時(shí),例如在中,可以獲取當(dāng)前的名稱多實(shí)例時(shí),寫日志,或者建立連接時(shí),最好帶上名稱,如果出現(xiàn)問題,也比較好定位哪個(gè)出現(xiàn)的問題。 RabbitMQ官方提供的教程https://www.rabbitmq.com/tuto...,是基于回調(diào)的。 下面將給出基于Promise式的寫法。...
摘要:官方提供的教程,是基于回調(diào)的。下面將給出基于式的寫法。并且實(shí)現(xiàn)動(dòng)態(tài)的隊(duì)列綁定初始化配置地址交換機(jī)名稱讀取在跑多實(shí)例時(shí),例如在中,可以獲取當(dāng)前的名稱多實(shí)例時(shí),寫日志,或者建立連接時(shí),最好帶上名稱,如果出現(xiàn)問題,也比較好定位哪個(gè)出現(xiàn)的問題。 RabbitMQ官方提供的教程https://www.rabbitmq.com/tuto...,是基于回調(diào)的。 下面將給出基于Promise式的寫法。...
摘要:在如下幾個(gè)屬性,表示當(dāng)前的真實(shí)時(shí)間,用于和服務(wù)器時(shí)間同步,表示創(chuàng)建時(shí)間,主要用于分頁,以及重連時(shí)的判斷,表示是否斷線重連。初始化連接時(shí),將賦值為當(dāng)前本地時(shí)間,連接成功后,將賦值為服務(wù)器返回的當(dāng)前時(shí)間,再設(shè)置一個(gè)定時(shí)器,保持時(shí)間與服務(wù)器一致。 vue項(xiàng)目前端知識(shí)點(diǎn)整理 微信授權(quán)后還能通過瀏覽器返回鍵回到授權(quán)頁 在導(dǎo)航守衛(wèi)中可以在next({})中設(shè)置replace: true來重定向到改...
摘要:用函數(shù)式編程對(duì)進(jìn)行斷舍離當(dāng)從業(yè)的老司機(jī)學(xué)會(huì)函數(shù)式編程時(shí),他扔掉了的特性,也不用面向?qū)ο罅?,最后發(fā)現(xiàn)了真愛啊作用域和閉包作用域和閉包在里非常重要。旨在幫助非函數(shù)式編程的同學(xué),能快速切入到函數(shù)式編程的理念。 1、用函數(shù)式編程對(duì)JavaScript進(jìn)行斷舍離 當(dāng)從業(yè)20的JavaScript老司機(jī)學(xué)會(huì)函數(shù)式編程時(shí),他扔掉了90%的特性,也不用面向?qū)ο罅?,最后發(fā)現(xiàn)了真愛?。。?! https:/...
摘要:用函數(shù)式編程對(duì)進(jìn)行斷舍離當(dāng)從業(yè)的老司機(jī)學(xué)會(huì)函數(shù)式編程時(shí),他扔掉了的特性,也不用面向?qū)ο罅?,最后發(fā)現(xiàn)了真愛啊作用域和閉包作用域和閉包在里非常重要。旨在幫助非函數(shù)式編程的同學(xué),能快速切入到函數(shù)式編程的理念。 1、用函數(shù)式編程對(duì)JavaScript進(jìn)行斷舍離 當(dāng)從業(yè)20的JavaScript老司機(jī)學(xué)會(huì)函數(shù)式編程時(shí),他扔掉了90%的特性,也不用面向?qū)ο罅?,最后發(fā)現(xiàn)了真愛啊?。?! https:/...
閱讀 1745·2021-10-18 13:30
閱讀 2637·2021-10-09 10:02
閱讀 2972·2021-09-28 09:35
閱讀 2099·2019-08-26 13:39
閱讀 3532·2019-08-26 13:36
閱讀 1960·2019-08-26 11:46
閱讀 1144·2019-08-23 14:56
閱讀 1703·2019-08-23 10:38