摘要:基礎教程注本文是對眾多博客的學習和總結,可能存在理解錯誤。消息的應答現(xiàn)在存在這樣一種場景,消費者取到消息,然后創(chuàng)建任務開始執(zhí)行。如果處理失敗,也就是沒有收到應答,那么就將這條消息重新發(fā)送給該隊列的其他消費者。造成了負載不均衡。
RabbitMQ 基礎教程(2) - Work Queue
注:本文是對眾多博客的學習和總結,可能存在理解錯誤。請帶著懷疑的眼光,同時如果有錯誤希望能指出。
如果你喜歡我的文章,可以關注我的私人博客:http://blog-qeesung.rhcloud.com/
在上一篇文章 RabbitMQ 基礎教程(1) - Hello World 中,我們已經(jīng)簡單的介紹了RabbitMQ以及如何發(fā)送和接收一個消息。接下來我們將繼續(xù)深入RabbitMQ,研究一下消息隊列(Work Queue)
消息隊列消息的發(fā)布者發(fā)布一個消息到消息隊列中,然后信息的消費者取出消息進行消費。
queue +-------------+ +--+--+--+--+--+--+ +-------------+ | producer |----->|m1|m2| ... | | |---->| consumer | +-------------+ +--+--+--+--+--+--+ +-------------+
但是實際情況往往比這個要復雜,假如我們有多個信息的發(fā)布者和多個信息的消費者,那RabbitMQ又將會是怎么工作呢?
+--------------+ +--------------+ | producer1 +- / | consumer1 | +--------------+ - queue /- +--------------+ +--------------+ - +---+---+---+----+ /- +--------------+ | producer2 +---->X|m1 |m2 |m3 |... |---->| consumer2 | +--------------+ /- +---+---+---+----+ - +--------------+ +--------------+ /- - +--------------+ | ... |/ | ... | +--------------+ +--------------+Round-robin 分發(fā)算法
RabbitMQ中,如果有多個消費者同時消費同一個消息隊列,那么就通過Round-robin算法將消息隊列中的消息均勻的分配給每一個消費者。
這個算法其實很簡單,每收到一個新的消息,就將這個消息分發(fā)給上下一個消費者。比如上一個消費者是consumer-n,那么有新消息來的時候就將這個新的消息發(fā)布到consumer-n+1,以此類推,如果到了最后一個消費者,那么就又從第一個開始。即:consumer-index = (consumer-index + 1) mod consumer-number
為了演示,首先來做幾項準備工作。
定義任務 task.js
/** * 創(chuàng)建一個任務 * @param taskName 任務名字 * @param costTime 任務話費的時間 * @param callback 任務結束以后的回調函數(shù) * @constructor */ function Task(taskName ,costTime , callback){ if(typeof(costTime) !== "number") costTime = 0; // no delay there setTimeout(function () { console.log(taskName+" finished"); if(callback && typeof (callback) === "function") callback(); } , 1000*costTime); };
串行化的消息任務結構
任務發(fā)布者負責將該結構發(fā)布到隊列中,然后消費者取出消息,新建任務開始執(zhí)行。
{ taskName : "taskname", costTime : 1 }
創(chuàng)建任務消息 task-producer.js
var amqp = require("amqplib/callback_api"); // 連接上RabbitMQ服務器 amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "tasks"; // 得到發(fā)送消息的數(shù)目,默認發(fā)送4個 var name; var cost; (function () { if(process.argv.length < 4 ) { console.error("ERROR : usage - node rabbit-producer"); process.exit(-1); } name = process.argv[2]; cost = +process.argv[3]; })(); // 新建隊列,然后將隊列中的消息持久化取消 ch.assertQueue(q, {durable: true}); // 將任務串行化存入Buffer中,并推入隊列 ch.sendToQueue(q, new Buffer(JSON.stringify({taskName :name ,costTime :cost })),{persistent:true}); console.log(" [x] Sent "+name); setTimeout(function () { process.exit(0); },500); }); });
消費任務消息 task-consumer.js
var amqp = require("amqplib/callback_api"); var Task = require("./task.js"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "tasks"; ch.assertQueue(q, {durable: true}); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); // 監(jiān)聽隊列上面的消息 ch.consume(q, function(msg) { var obj = JSON.parse(msg.content.toString("utf8")); console.log("Get the task "+obj.taskName); // 定義新的任務 new Task(obj.taskName,obj.costTime); }, {noAck: true}); }); });
現(xiàn)在開啟兩個消費者進程來等待消費tasks隊列中的消息
# shell1 node task-consumer.js # shell2 node task-consumer.js
然后向隊列中推入三個消息
# shell3 node task-producer.js task1 0 node task-producer.js task2 0 node task-producer.js task3 0
運行結果
# shell1 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task1 task1 finished Get the task task3 task3 finished # shell2 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task2 task2 finished # 已經(jīng)通過Round-robin算法將消息隊列中的消息分配到連接的消費者中了.消息,隊列持久化
細心的讀者可能已經(jīng)發(fā)現(xiàn)了我們在聲明隊列和發(fā)送消息的代碼塊中改動了一小部分的代碼,那就是
// 聲明隊列 ch.assertQueue(q, {durable: true}); // 發(fā)送信息 ch.sendToQueue(q, new Buffer(JSON.stringify({taskName :name ,costTime :cost })),{persistent:true});
通過將隊列的durable配置參數(shù)生命為true可以保證在RabbitMQ服務器退出或者異常終止的情況下不會丟失消息隊列,注意這里只是不會丟失消息隊列,并不是消息隊列中沒有被消費的消息不會丟失。
為了保證消息隊列中的消息不會丟失,就需要在發(fā)送消息時指定persistent選項,這里并不能百分之百的保證消息不會丟失,因為從隊列中有新的消息,到將隊列中消息持久化到磁盤這一段時間之內是無法保證的。
消息的應答現(xiàn)在存在這樣一種場景,消費者取到消息,然后創(chuàng)建任務開始執(zhí)行。但是任務執(zhí)行到一半就拋出異常,那么這個任務算是沒有被成功執(zhí)行的。
在我們之前的代碼實現(xiàn)中,都是消息隊列中有新的消息,馬上就這個消息分配給消費者消費,不管消費者對消息處理結果如何,消息隊列會馬上將已經(jīng)分配的消息從消息隊列中刪除。如果這個任務非常重要,或者一定要執(zhí)行成功,那么一旦任務在執(zhí)行過程中拋出異常,那么這個任務就再也找不回來了,這是非常可怕的事情。
還好在RabbitMQ中我們可以為已經(jīng)分配的消息和消息隊列之間創(chuàng)建一個應答關系:
如果消息處理成功,那么就發(fā)送一個答復給消息隊列,告訴它:我已經(jīng)成功處理消息,不再需要這條消息了,你可以刪除了,于是消息隊列就將已經(jīng)應答的消息從消息隊列中刪除。
如果處理失敗,也就是沒有收到應答,那么就將這條消息重新發(fā)送給該隊列的其他消費者。
要在消費者和消息隊列之間建立這種應答關系我們只需要將channel的consume函數(shù)的noAck參數(shù)設成false就可以了。
ch.consume(q, function(msg) { var obj = JSON.parse(msg.content.toString("utf8")); console.log("Get the task "+obj.taskName); // 定義新的任務 new Task(obj.taskName,obj.costTime); }, {noAck: false}); // 這里設置成false
下面我們就模擬一下消息處理失敗的場景:
var amqp = require("amqplib/callback_api"); var Task = require("./task.js"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "tasks"; ch.assertQueue(q, {durable: true}); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); // 監(jiān)聽隊列上面的消息 ch.consume(q, function(msg) { var obj = JSON.parse(msg.content.toString("utf8")); console.log("Get the task "+obj.taskName); // 定義新的任務 new Task(obj.taskName,obj.costTime,function(){ if(obj.taskName === "task2") throw new Error("Test error"); else ch.ack(msg); }); // 如果是任務二,那么就拋出異常。 }, {noAck: false}); }); });
按照上面的腳本執(zhí)行順序,我們在執(zhí)行一遍腳本: consumer2得到執(zhí)行task2消息,然后馬上拋出異常退出進行,然后消息隊列再將這個消息分配給cosumer1,接著也執(zhí)行失敗了,退出進程,最終消息隊列中將只會有一個task2的消息存在。
啟動消費者等待消息
# shell1 開啟消費者1 node rabbit-consumer.js # shell2 開啟消費者2 node rabbit-consumer.js
創(chuàng)建消息
node rabbit-producer.js task1 0 node rabbit-producer.js task2 10 node rabbit-producer.js task3 0
我們能來看一下結果:
# shell2 消費者2 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task2 task2 finished # 消費者2執(zhí)行任務2的時候拋出異常,task2將會重新發(fā)送給消費者1 ... throw new Error("Error test"); # shell1 消費者1 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task1 task1 finished Get the task task3 task3 finished Get the task task2 # 消費者1接收到任何2 task2 finished ... throw new Error("Error test"); # 也拋出異常了
最終會在消息隊列中剩下一條未消費的信息。
更加均衡的負載這里有一點需要注意,如果你將noAck選項設置成了false,那么如果消息處理成功,一定要進行應答,負責消息隊列中的消息會越來越多,直到撐爆內存。
在上文中我們聽到過消息隊列通過Round-robin算法來將消息分配給消費者,但是這個分配過程是盲目的。比如現(xiàn)在有兩個消費者,consumer1和consumer2,按照Round-robin算法就會將奇數(shù)編號的任務發(fā)配給consumer1,將偶數(shù)編號的任務分配給consumer2,但是這些任務恰好有一個特性,奇數(shù)編號的任務比較繁重,而偶數(shù)編號的任務就比較簡單。
那么這就會造成一個問題,那就是consumer1會被累死,而consumer2會被閑死。造成了負載不均衡。要是每一個消息都被成功消費以后告訴消息隊列,然后消息隊列再將新的消息分配給空閑下來的消費者不就好了。
RabbitMQ中的確有這樣的一個配置選項。那就是ch.prefetch(1);
我們現(xiàn)在就來模擬一下
var amqp = require("amqplib/callback_api"); var Task = require("./task.js"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "tasks"; ch.assertQueue(q, {durable: true}); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); // 監(jiān)聽隊列上面的消息 ch.prefetch(1); // 添加這一行 ch.consume(q, function(msg) { var obj = JSON.parse(msg.content.toString("utf8")); console.log("Get the task "+obj.taskName); new Task(obj.taskName,obj.costTime ,function () { ch.ack(msg); }); }, {noAck: false}); }); });
啟動消費者等待消息
# shell1 開啟消費者1 node rabbit-consumer.js # shell2 開啟消費者2 node rabbit-consumer.js
創(chuàng)建消息
node rabbit-producer.js task1 0 node rabbit-producer.js task2 20 node rabbit-producer.js task3 0 node rabbit-producer.js task4 20
# shell1 開啟消費者1 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task1 # 任務馬上結束 task1 finished Get the task task3 # 任務馬上結束 task3 finished Get the task task4 # 任務四被分配到consumer1中了 task4 finished # shell2 開啟消費者2 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task2 task2 finished
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://systransis.cn/yun/86315.html
摘要:平均每個消費者將得到相同數(shù)量的消息。消息確認完成任務可能需要幾秒鐘。為了確保消息不會丟失,支持消息確認。沒有任何消息超時當這個消費者中止了,將會重新分配消息時。這是因為只是調度消息時,消息進入隊列。 showImg(https://segmentfault.com/img/bVXNuN?w=332&h=111); 介紹 在上一個 Hello World 教程中,我們編寫了從指定隊列發(fā)送...
摘要:每個消費者會得到平均數(shù)量的。為了確保不會丟失,采用確認機制。如果中斷退出了關閉了,關閉了,或是連接丟失了而沒有發(fā)送,會認為該消息沒有完整的執(zhí)行,會將該消息重新入隊。該消息會被發(fā)送給其他的。當消費者中斷退出,會重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我們寫了通過一個...
摘要:在中間的框是一個隊列的消息緩沖區(qū),保持代表的消費。本教程介紹,這是一個開放的通用的協(xié)議消息。我們將在本教程中使用,解決依賴管理。發(fā)送者將連接到,發(fā)送一條消息,然后退出。注意,這與發(fā)送發(fā)布的隊列匹配。 介紹 RabbitMQ是一個消息代理器:它接受和轉發(fā)消息。你可以把它當作一個郵局:當你把郵件放在信箱里時,你可以肯定郵差先生最終會把郵件送到你的收件人那里。在這個比喻中,RabbitMQ就...
摘要:這樣的消息分發(fā)機制稱作輪詢。在進程掛了之后,所有的未被確認的消息會被重新分發(fā)。忘記確認這是一個普遍的錯誤,丟失。為了使消息不會丟失,兩件事情需要確保,我們需要持久化隊列和消息。 工作隊列 showImg(https://segmentfault.com/img/remote/1460000008229494?w=332&h=111); 在第一篇中,我們寫了一個程序從已經(jīng)聲明的隊列中收發(fā)...
摘要:消息持久化控制的屬性就是消息的持久化。當生產者發(fā)送的消息路由鍵為時,兩個消費者都會收到消息并處理當生產者發(fā)送的消息路由鍵為時,只有消費者可以接收到消息。八的消息確認機制在中,可以通過持久化數(shù)據(jù)解決服務器異常的數(shù)據(jù)丟失問題。 一、內容大綱&使用場景 1. 消息隊列解決了什么問題? 異步處理 應用解耦 流量削鋒 日志處理 ...... 2. rabbitMQ安裝與配置 3. Java操...
閱讀 3156·2021-10-08 10:04
閱讀 1098·2021-09-30 09:48
閱讀 3466·2021-09-22 10:53
閱讀 1684·2021-09-10 11:22
閱讀 1698·2021-09-06 15:00
閱讀 2156·2019-08-30 15:56
閱讀 719·2019-08-30 15:53
閱讀 2288·2019-08-30 13:04