成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

RabbitMQ 基礎教程(2) - Work Queue

jindong / 899人閱讀

摘要:基礎教程注本文是對眾多博客的學習和總結,可能存在理解錯誤。消息的應答現(xiàn)在存在這樣一種場景,消費者取到消息,然后創(chuàng)建任務開始執(zhí)行。如果處理失敗,也就是沒有收到應答,那么就將這條消息重新發(fā)送給該隊列的其他消費者。造成了負載不均衡。

RabbitMQ 基礎教程(2) - Work Queue

注:本文是對眾多博客的學習和總結,可能存在理解錯誤。請帶著懷疑的眼光,同時如果有錯誤希望能指出。

如果你喜歡我的文章,可以關注我的私人博客:http://blog-qeesung.rhcloud.com/

在上一篇文章 RabbitMQ 基礎教程(1) - Hello World 中,我們已經(jīng)簡單的介紹了RabbitMQ以及如何發(fā)送和接收一個消息。接下來我們將繼續(xù)深入RabbitMQ,研究一下消息隊列(Work Queue)

消息隊列

消息的發(fā)布者發(fā)布一個消息到消息隊列中,然后信息的消費者取出消息進行消費。

                                 queue
 +-------------+      +--+--+--+--+--+--+     +-------------+
 |   producer  |----->|m1|m2| ... |  |  |---->|   consumer  |
 +-------------+      +--+--+--+--+--+--+     +-------------+

但是實際情況往往比這個要復雜,假如我們有多個信息的發(fā)布者和多個信息的消費者,那RabbitMQ又將會是怎么工作呢?

+--------------+                              +--------------+
|   producer1  +-                           / |  consumer1   |
+--------------+ -          queue         /- +--------------+
+--------------+   - +---+---+---+----+ /-   +--------------+
|   producer2  +---->X|m1 |m2 |m3 |... |---->|  consumer2   |
+--------------+   /- +---+---+---+----+ -   +--------------+
+--------------+ /-                        - +--------------+
|      ...     |/                            |      ...     |
+--------------+                              +--------------+
Round-robin 分發(fā)算法

RabbitMQ中,如果有多個消費者同時消費同一個消息隊列,那么就通過Round-robin算法將消息隊列中的消息均勻的分配給每一個消費者。

這個算法其實很簡單,每收到一個新的消息,就將這個消息分發(fā)給上下一個消費者。比如上一個消費者是consumer-n,那么有新消息來的時候就將這個新的消息發(fā)布到consumer-n+1,以此類推,如果到了最后一個消費者,那么就又從第一個開始。即:consumer-index = (consumer-index + 1) mod consumer-number

為了演示,首先來做幾項準備工作。

定義任務 task.js

/**
 * 創(chuàng)建一個任務 
 * @param taskName 任務名字
 * @param costTime 任務話費的時間
 * @param callback 任務結束以后的回調函數(shù)
 * @constructor
 */
function Task(taskName ,costTime , callback){
    if(typeof(costTime) !== "number")
        costTime = 0; // no delay there 
    setTimeout(function () {
        console.log(taskName+" finished");
        if(callback && typeof (callback) === "function")
            callback();
    } , 1000*costTime);
};

串行化的消息任務結構

任務發(fā)布者負責將該結構發(fā)布到隊列中,然后消費者取出消息,新建任務開始執(zhí)行。

{
    taskName : "taskname",
    costTime : 1
}

創(chuàng)建任務消息 task-producer.js

var amqp = require("amqplib/callback_api");

// 連接上RabbitMQ服務器
amqp.connect("amqp://localhost", function(err, conn) {
    conn.createChannel(function(err, ch) {
        var q = "tasks";

        // 得到發(fā)送消息的數(shù)目,默認發(fā)送4個
        var name;
        var cost;
        
        (function () {
            if(process.argv.length < 4 )
            {
                console.error("ERROR : usage - node rabbit-producer  ");
                process.exit(-1);
            }
            
            name = process.argv[2];
            cost = +process.argv[3];
        })();

        // 新建隊列,然后將隊列中的消息持久化取消
        ch.assertQueue(q, {durable: true});
        // 將任務串行化存入Buffer中,并推入隊列
        ch.sendToQueue(q, new Buffer(JSON.stringify({taskName :name ,costTime :cost })),{persistent:true});
        console.log(" [x] Sent "+name);
        setTimeout(function () {
            process.exit(0);
        },500);
    });
});

消費任務消息 task-consumer.js

var amqp = require("amqplib/callback_api");
var Task = require("./task.js");

amqp.connect("amqp://localhost", function(err, conn) {
    conn.createChannel(function(err, ch) {
        var q = "tasks";

        ch.assertQueue(q, {durable: true});
        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
        // 監(jiān)聽隊列上面的消息
        ch.consume(q, function(msg) {
            var obj = JSON.parse(msg.content.toString("utf8"));
            console.log("Get the task "+obj.taskName);
            // 定義新的任務
            new Task(obj.taskName,obj.costTime);
        }, {noAck: true});
    });
});

現(xiàn)在開啟兩個消費者進程來等待消費tasks隊列中的消息

# shell1
node task-consumer.js

# shell2
node task-consumer.js

然后向隊列中推入三個消息

# shell3
node task-producer.js task1 0
node task-producer.js task2 0
node task-producer.js task3 0

運行結果

# shell1
[*] Waiting for messages in tasks. To exit press CTRL+C
Get the task task1
task1 finished
Get the task task3
task3 finished

# shell2
[*] Waiting for messages in tasks. To exit press CTRL+C
Get the task task2
task2 finished

# 已經(jīng)通過Round-robin算法將消息隊列中的消息分配到連接的消費者中了.
消息,隊列持久化

細心的讀者可能已經(jīng)發(fā)現(xiàn)了我們在聲明隊列發(fā)送消息的代碼塊中改動了一小部分的代碼,那就是

// 聲明隊列
ch.assertQueue(q, {durable: true});

// 發(fā)送信息
ch.sendToQueue(q, new Buffer(JSON.stringify({taskName :name ,costTime :cost })),{persistent:true});

通過將隊列的durable配置參數(shù)生命為true可以保證在RabbitMQ服務器退出或者異常終止的情況下不會丟失消息隊列,注意這里只是不會丟失消息隊列,并不是消息隊列中沒有被消費的消息不會丟失。

為了保證消息隊列中的消息不會丟失,就需要在發(fā)送消息時指定persistent選項,這里并不能百分之百的保證消息不會丟失,因為從隊列中有新的消息,到將隊列中消息持久化到磁盤這一段時間之內是無法保證的。

消息的應答

現(xiàn)在存在這樣一種場景,消費者取到消息,然后創(chuàng)建任務開始執(zhí)行。但是任務執(zhí)行到一半就拋出異常,那么這個任務算是沒有被成功執(zhí)行的。

在我們之前的代碼實現(xiàn)中,都是消息隊列中有新的消息,馬上就這個消息分配給消費者消費,不管消費者對消息處理結果如何,消息隊列會馬上將已經(jīng)分配的消息從消息隊列中刪除。如果這個任務非常重要,或者一定要執(zhí)行成功,那么一旦任務在執(zhí)行過程中拋出異常,那么這個任務就再也找不回來了,這是非常可怕的事情。

還好在RabbitMQ中我們可以為已經(jīng)分配的消息和消息隊列之間創(chuàng)建一個應答關系:

如果消息處理成功,那么就發(fā)送一個答復給消息隊列,告訴它:我已經(jīng)成功處理消息,不再需要這條消息了,你可以刪除了,于是消息隊列就將已經(jīng)應答的消息從消息隊列中刪除。

如果處理失敗,也就是沒有收到應答,那么就將這條消息重新發(fā)送給該隊列的其他消費者。

要在消費者和消息隊列之間建立這種應答關系我們只需要將channelconsume函數(shù)的noAck參數(shù)設成false就可以了。

 ch.consume(q, function(msg) {
            var obj = JSON.parse(msg.content.toString("utf8"));
            console.log("Get the task "+obj.taskName);
            // 定義新的任務
            new Task(obj.taskName,obj.costTime);
        }, {noAck: false}); // 這里設置成false

下面我們就模擬一下消息處理失敗的場景:

var amqp = require("amqplib/callback_api");
var Task = require("./task.js");

amqp.connect("amqp://localhost", function(err, conn) {
    conn.createChannel(function(err, ch) {
        var q = "tasks";

        ch.assertQueue(q, {durable: true});
        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
        // 監(jiān)聽隊列上面的消息
        ch.consume(q, function(msg) {
            var obj = JSON.parse(msg.content.toString("utf8"));
            console.log("Get the task "+obj.taskName);
            // 定義新的任務
            new Task(obj.taskName,obj.costTime,function(){
                if(obj.taskName === "task2")
                    throw new Error("Test error");
                else
                    ch.ack(msg);
            }); // 如果是任務二,那么就拋出異常。
        }, {noAck: false});
    });
});

按照上面的腳本執(zhí)行順序,我們在執(zhí)行一遍腳本: consumer2得到執(zhí)行task2消息,然后馬上拋出異常退出進行,然后消息隊列再將這個消息分配給cosumer1,接著也執(zhí)行失敗了,退出進程,最終消息隊列中將只會有一個task2的消息存在。

啟動消費者等待消息

# shell1 開啟消費者1 
node rabbit-consumer.js

# shell2 開啟消費者2
node rabbit-consumer.js

創(chuàng)建消息

node rabbit-producer.js task1 0
node rabbit-producer.js task2 10
node rabbit-producer.js task3 0

我們能來看一下結果:

# shell2 消費者2
[*] Waiting for messages in tasks. To exit press CTRL+C
Get the task task2
task2 finished # 消費者2執(zhí)行任務2的時候拋出異常,task2將會重新發(fā)送給消費者1
... throw  new Error("Error test");


# shell1 消費者1
[*] Waiting for messages in tasks. To exit press CTRL+C
Get the task task1
task1 finished
Get the task task3
task3 finished
Get the task task2 # 消費者1接收到任何2
task2 finished
... throw  new Error("Error test"); # 也拋出異常了

最終會在消息隊列中剩下一條未消費的信息。

這里有一點需要注意,如果你將noAck選項設置成了false,那么如果消息處理成功,一定要進行應答,負責消息隊列中的消息會越來越多,直到撐爆內存。

更加均衡的負載

在上文中我們聽到過消息隊列通過Round-robin算法來將消息分配給消費者,但是這個分配過程是盲目的。比如現(xiàn)在有兩個消費者,consumer1consumer2,按照Round-robin算法就會將奇數(shù)編號的任務發(fā)配給consumer1,將偶數(shù)編號的任務分配給consumer2,但是這些任務恰好有一個特性,奇數(shù)編號的任務比較繁重,而偶數(shù)編號的任務就比較簡單。

那么這就會造成一個問題,那就是consumer1會被累死,而consumer2會被閑死。造成了負載不均衡。要是每一個消息都被成功消費以后告訴消息隊列,然后消息隊列再將新的消息分配給空閑下來的消費者不就好了。

RabbitMQ中的確有這樣的一個配置選項。那就是ch.prefetch(1);

我們現(xiàn)在就來模擬一下

var amqp = require("amqplib/callback_api");
var Task = require("./task.js");

amqp.connect("amqp://localhost", function(err, conn) {
    conn.createChannel(function(err, ch) {
        var q = "tasks";

        ch.assertQueue(q, {durable: true});
        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
        // 監(jiān)聽隊列上面的消息
        ch.prefetch(1); // 添加這一行
        ch.consume(q, function(msg) {
            var obj = JSON.parse(msg.content.toString("utf8"));
            console.log("Get the task "+obj.taskName);
            new Task(obj.taskName,obj.costTime ,function () {
                ch.ack(msg);
            });
        }, {noAck: false});
    });
});

啟動消費者等待消息

# shell1 開啟消費者1 
node rabbit-consumer.js

# shell2 開啟消費者2
node rabbit-consumer.js

創(chuàng)建消息

node rabbit-producer.js task1 0
node rabbit-producer.js task2 20
node rabbit-producer.js task3 0
node rabbit-producer.js task4 20
# shell1 開啟消費者1
[*] Waiting for messages in tasks. To exit press CTRL+C
Get the task task1 # 任務馬上結束
task1 finished
Get the task task3 # 任務馬上結束
task3 finished
Get the task task4 # 任務四被分配到consumer1中了 
task4 finished

# shell2 開啟消費者2
[*] Waiting for messages in tasks. To exit press CTRL+C
Get the task task2 
task2 finished

文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉載請注明本文地址:http://systransis.cn/yun/86315.html

相關文章

  • RabbitMQ+PHP 教程二(Work Queues)

    摘要:平均每個消費者將得到相同數(shù)量的消息。消息確認完成任務可能需要幾秒鐘。為了確保消息不會丟失,支持消息確認。沒有任何消息超時當這個消費者中止了,將會重新分配消息時。這是因為只是調度消息時,消息進入隊列。 showImg(https://segmentfault.com/img/bVXNuN?w=332&h=111); 介紹 在上一個 Hello World 教程中,我們編寫了從指定隊列發(fā)送...

    iKcamp 評論0 收藏0
  • 【譯】RabbitMQ系列(二)-Work模式

    摘要:每個消費者會得到平均數(shù)量的。為了確保不會丟失,采用確認機制。如果中斷退出了關閉了,關閉了,或是連接丟失了而沒有發(fā)送,會認為該消息沒有完整的執(zhí)行,會將該消息重新入隊。該消息會被發(fā)送給其他的。當消費者中斷退出,會重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我們寫了通過一個...

    lcodecorex 評論0 收藏0
  • RabbitMQ+PHP 教程一(Hello World)

    摘要:在中間的框是一個隊列的消息緩沖區(qū),保持代表的消費。本教程介紹,這是一個開放的通用的協(xié)議消息。我們將在本教程中使用,解決依賴管理。發(fā)送者將連接到,發(fā)送一條消息,然后退出。注意,這與發(fā)送發(fā)布的隊列匹配。 介紹 RabbitMQ是一個消息代理器:它接受和轉發(fā)消息。你可以把它當作一個郵局:當你把郵件放在信箱里時,你可以肯定郵差先生最終會把郵件送到你的收件人那里。在這個比喻中,RabbitMQ就...

    silencezwm 評論0 收藏0
  • [譯] RabbitMQ tutorials (2) ---- 'work queue&#

    摘要:這樣的消息分發(fā)機制稱作輪詢。在進程掛了之后,所有的未被確認的消息會被重新分發(fā)。忘記確認這是一個普遍的錯誤,丟失。為了使消息不會丟失,兩件事情需要確保,我們需要持久化隊列和消息。 工作隊列 showImg(https://segmentfault.com/img/remote/1460000008229494?w=332&h=111); 在第一篇中,我們寫了一個程序從已經(jīng)聲明的隊列中收發(fā)...

    joyvw 評論0 收藏0
  • RabbitMQ學習筆記

    摘要:消息持久化控制的屬性就是消息的持久化。當生產者發(fā)送的消息路由鍵為時,兩個消費者都會收到消息并處理當生產者發(fā)送的消息路由鍵為時,只有消費者可以接收到消息。八的消息確認機制在中,可以通過持久化數(shù)據(jù)解決服務器異常的數(shù)據(jù)丟失問題。 一、內容大綱&使用場景 1. 消息隊列解決了什么問題? 異步處理 應用解耦 流量削鋒 日志處理 ...... 2. rabbitMQ安裝與配置 3. Java操...

    zacklee 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<