摘要:本文將會講解如何使用實現(xiàn)延時重試和失敗消息隊列,實現(xiàn)可靠的消息消費,消費失敗后,自動延時將消息重新投遞,當達到一定的重試次數(shù)后,將消息投遞到失敗消息隊列,等待人工介入處理。
RabbitMQ是一款使用Erlang開發(fā)的開源消息隊列。本文假設(shè)讀者對RabbitMQ是什么已經(jīng)有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網(wǎng)的 RabbitMQ Tutorials 入門教程開始學習。
本文將會講解如何使用RabbitMQ實現(xiàn)延時重試和失敗消息隊列,實現(xiàn)可靠的消息消費,消費失敗后,自動延時將消息重新投遞,當達到一定的重試次數(shù)后,將消息投遞到失敗消息隊列,等待人工介入處理。在這里我會帶領(lǐng)大家一步一步的實現(xiàn)一個帶有失敗重試功能的發(fā)布訂閱組件,使用該組件后可以非常簡單的實現(xiàn)消息的發(fā)布訂閱,在進行業(yè)務(wù)開發(fā)的時候,業(yè)務(wù)開發(fā)人員可以將主要精力放在業(yè)務(wù)邏輯實現(xiàn)上,而不需要花費時間去理解RabbitMQ的一些復雜概念。
本文將會持續(xù)修正和更新,最新內(nèi)容請參考我的 GITHUB 上的 程序猿成長計劃 項目,歡迎 Star,更多精彩內(nèi)容請 follow me。
概要我們將會實現(xiàn)如下功能
結(jié)合RabbitMQ的Topic模式和Work Queue模式實現(xiàn)生產(chǎn)方產(chǎn)生消息,消費方按需訂閱,消息投遞到消費方的隊列之后,多個worker同時對消息進行消費
結(jié)合RabbitMQ的 Message TTL 和 Dead Letter Exchange 實現(xiàn)消息的延時重試功能
消息達到最大重試次數(shù)之后,將其投遞到失敗隊列,等待人工介入處理bug后,重新將其加入隊列消費
具體流程見下圖
生產(chǎn)者發(fā)布消息到主Exchange
主Exchange根據(jù)Routing Key將消息分發(fā)到對應(yīng)的消息隊列
多個消費者的worker進程同時對隊列中的消息進行消費,因此它們之間采用“競爭”的方式來爭取消息的消費
消息消費后,不管成功失敗,都要返回ACK消費確認消息給隊列,避免消息消費確認機制導致重復投遞,同時,如果消息處理成功,則結(jié)束流程,否則進入重試階段
如果重試次數(shù)小于設(shè)定的最大重試次數(shù)(3次),則將消息重新投遞到Retry Exchange的重試隊列
重試隊列不需要消費者直接訂閱,它會等待消息的有效時間過期之后,重新將消息投遞給Dead Letter Exchange,我們在這里將其設(shè)置為主Exchange,實現(xiàn)延時后重新投遞消息,這樣消費者就可以重新消費消息
如果三次以上都是消費失敗,則認為消息無法被處理,直接將消息投遞給Failed Exchange的Failed Queue,這時候應(yīng)用可以觸發(fā)報警機制,以通知相關(guān)責任人處理
等待人工介入處理(解決bug)之后,重新將消息投遞到主Exchange,這樣就可以重新消費了
技術(shù)實現(xiàn)Linus Torvalds 曾經(jīng)說過
Talk is cheap. Show me the code
我分別用Java和PHP實現(xiàn)了本文所講述的方案,讀者可以通過參考代碼以及本文中的基本步驟來更好的理解
rabbitmq-pubsub-php
rabbitmq-pubsub-java
創(chuàng)建Exchange為了實現(xiàn)消息的延時重試和失敗存儲,我們需要創(chuàng)建三個Exchange來處理消息。
master 主Exchange,發(fā)布消息時發(fā)布到該Exchange
master.retry 重試Exchange,消息處理失敗時(3次以內(nèi)),將消息重新投遞給該Exchange
master.failed 失敗Exchange,超過三次重試失敗后,消息投遞到該Exchange
所有的Exchange聲明(declare)必須使用以下參數(shù)
參數(shù) | 值 | 說明 |
---|---|---|
exchange | - | Exchange名稱 |
type | topic | Exchange 類型 |
passive | false | 如果Exchange已經(jīng)存在,則返回成功,不存在則創(chuàng)建 |
durable | true | 持久化存儲Exchange,這里僅僅是Exchange本身持久化,消息和隊列需要多帶帶指定其持久化 |
no-wait | false | 該方法需要應(yīng)答確認 |
Java代碼
// 聲明Exchange:主體,失敗,重試 channel.exchangeDeclare("master", "topic", true); channel.exchangeDeclare("master.retry", "topic", true); channel.exchangeDeclare("master.failed", "topic", true);
PHP代碼
// 普通交換機 $this->channel->exchange_declare("master", "topic", false, true, false); // 重試交換機 $this->channel->exchange_declare("master.retry", "topic", false, true, false); // 失敗交換機 $this->channel->exchange_declare("master.failed", "topic", false, true, false);
在RabbitMQ的管理界面中,我們可以看到創(chuàng)建的三個Exchange
消息發(fā)布消息發(fā)布時,使用basic_publish方法,參數(shù)如下
參數(shù) | 值 | 說明 |
---|---|---|
message | - | 發(fā)布的消息對象 |
exchange | master | 消息發(fā)布到的Exchange |
routing-key | - | 路由KEY,用于標識消息類型 |
mandatory | false | 是否強制路由,指定了該選項后,如果沒有訂閱該消息,則會返回路由不可達錯誤 |
immediate | false | 指定了當消息無法直接路由給消費者時如何處理 |
發(fā)布消息時,對于message對象,其內(nèi)容建議使用json編碼后的字符串,同時消息需要標識以下屬性
"delivery_mode"=> 2 // 1為非持久化,2為持久化
Java代碼
channel.basicPublish( "master", routingKey, MessageProperties.PERSISTENT_BASIC, // delivery_mode message.getBytes() );
PHP代碼
$msg = new AMQPMessage($message->serialize(), [ "delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT, ]); $this->channel->basic_publish($msg, "master", $routingKey);消息訂閱
消息訂閱的實現(xiàn)相對復雜一些,需要完成隊列的聲明以及隊列和Exchange的綁定。
Declare Queue對于每一個訂閱消息的服務(wù),都必須創(chuàng)建一個該服務(wù)對應(yīng)的隊列,將該隊列綁定到關(guān)注的路由規(guī)則,這樣之后,消息生產(chǎn)者將消息投遞給Exchange之后,就會按照路由規(guī)則將消息分發(fā)到對應(yīng)的隊列供消費者消費了。
消費服務(wù)需要declare三個隊列
[queue_name] 隊列名稱,格式符合 [服務(wù)名稱]@訂閱服務(wù)標識
[queue_name]@retry 重試隊列
[queue_name]@failed 失敗隊列
訂閱服務(wù)標識是客戶端自己對訂閱的分類標識符,比如用戶中心服務(wù)(服務(wù)名稱ucenter),包含兩個訂閱:user和enterprise,這里兩個訂閱的隊列名稱就為 ucenter@user和ucenter@enterprise,其對應(yīng)的重試隊列為 ucenter@user@retry和ucenter@enterprise@retry。
Declare隊列時,參數(shù)規(guī)定規(guī)則如下
參數(shù) | 值 | 說明 |
---|---|---|
queue | - | 隊列名稱 |
passive | false | 隊列不存在則創(chuàng)建,存在則直接成功 |
durable | true | 隊列持久化 |
exclusive | false | 排他,指定該選項為true則隊列只對當前連接有效,連接斷開后自動刪除 |
no-wait | false | 該方法需要應(yīng)答確認 |
auto-delete | false | 當不再使用時,是否自動刪除 |
對于@retry重試隊列,需要指定額外參數(shù)
"x-dead-letter-exchange" => "master" "x-dead-letter-routing-key" => [queue_name], "x-message-ttl" => 30 * 1000 // 重試時間設(shè)置為30s
這里的兩個header字段的含義是,在隊列中延遲30s后,將該消息重新投遞到x-dead-letter-exchange對應(yīng)的Exchange中,并且routing key指定為消費隊列的名稱,這樣就可以實現(xiàn)消息只投遞給原始出錯時的隊列,避免消息重新投遞給所有關(guān)注當前routing key的消費者了。
Java代碼
// 聲明監(jiān)聽隊列 channel.queueDeclare( queueName, // 隊列名稱 true, // durable false, // exclusive false, // autoDelete null // arguments ); channel.queueDeclare(queueName + "@failed", true, false, false, null); Maparguments = new HashMap (); arguments.put("x-dead-letter-exchange", exchangeName()); arguments.put("x-message-ttl", 30 * 1000); arguments.put("x-dead-letter-routing-key", queueName); channel.queueDeclare(queueName + "@retry", true, false, false, arguments);
PHP代碼
$this->channel->queue_declare($queueName, false, true, false, false, false); $this->channel->queue_declare($failedQueueName, false, true, false, false, false); $this->channel->queue_declare( $retryQueueName, // 隊列名稱 false, // passive true, // durable false, // exclusive false, // auto_delete false, // nowait new AMQPTable([ "x-dead-letter-exchange" => "master", "x-dead-letter-routing-key" => $queueName, "x-message-ttl" => 30 * 1000, ]) );
在RabbitMQ的管理界面中,Queues部分可以看到我們創(chuàng)建的三個隊列
查看隊列的詳細信息,我們可以看到 queueName@retry 隊列與其它兩個隊列的不同
Bind Exchange & Queue創(chuàng)建完隊列之后,需要將隊列與Exchange綁定(bind),不同隊列需要綁定到之前創(chuàng)建的對應(yīng)的Exchange上面
Queue | Exchange |
---|---|
[queue_name] | master |
[queue_name]@retry | master.retry |
[queue_name]@failed | master.failed |
綁定時,需要提供訂閱的路由KEY,該路由KEY與消息發(fā)布時的路由KEY對應(yīng),區(qū)別是這里可以使用通配符同時訂閱多種類型的消息。
參數(shù) | 值 | 說明 |
---|---|---|
queue | - | 綁定的隊列 |
exchange | - | 綁定的Exchange |
routing-key | - | 訂閱的消息路由規(guī)則 |
no-wait | false | 該方法需要應(yīng)答確認 |
Java代碼
// 綁定監(jiān)聽隊列到Exchange channel.queueBind(queueName, "master", routingKey); channel.queueBind(queueName, exchangeName(), queueName); channel.queueBind(queueName + "@failed", "master.failed", queueName); channel.queueBind(queueName + "@retry", "master.retry", queueName);
PHP代碼
$this->channel->queue_bind($queueName, "master", $routingKey); $this->channel->queue_bind($queueName, "master", $queueName); $this->channel->queue_bind($retryQueueName, "master.retry", $queueName); $this->channel->queue_bind($failedQueueName, "master.failed", $queueName);
在RabbitMQ的管理界面中,我們可以看到該隊列與Exchange和routing-key的綁定關(guān)系
消息消費實現(xiàn)使用 basic_consume 對消息進行消費的時候,需要注意下面參數(shù)
參數(shù) | 值 | 說明 |
---|---|---|
queue | - | 消費的隊列名稱 |
consumer-tag | - | 消費者標識,留空即可 |
no_local | false | 如果設(shè)置了該字段,服務(wù)器將不會發(fā)布消息到 發(fā)布它的客戶端 |
no_ack | false | 需要消費確認應(yīng)答 |
exclusive | false | 排他訪問,設(shè)置后只允許當前消費者訪問該隊列 |
nowait | false | 該方法需要應(yīng)答確認 |
消費端在消費消息時,需要從消息中獲取消息被消費的次數(shù),以此判斷該消息處理失敗時重試還是發(fā)送到失敗隊列。
Java代碼
protected Long getRetryCount(AMQP.BasicProperties properties) { Long retryCount = 0L; try { Mapheaders = properties.getHeaders(); if (headers != null) { if (headers.containsKey("x-death")) { List
PHP代碼
protected function getRetryCount(AMQPMessage $msg): int { $retry = 0; if ($msg->has("application_headers")) { $headers = $msg->get("application_headers")->getNativeData(); if (isset($headers["x-death"][0]["count"])) { $retry = $headers["x-death"][0]["count"]; } } return (int)$retry; }
消息消費完成后,需要發(fā)送消費確認消息給服務(wù)端,使用basic_ack方法
ack(delivery-tag=消息的delivery-tag標識)
Java代碼
// 消息消費處理 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ... // 注意,由于使用了basicConsume的autoAck特性,因此這里就不需要手動執(zhí)行 // channel.basicAck(envelope.getDeliveryTag(), false); } }; // 執(zhí)行消息消費處理 channel.basicConsume( queueName, true, // autoAck consumer );
PHP代碼
$this->channel->basic_consume( $queueName, "", // customer_tag false, // no_local false, // no_ack false, // exclusive false, // nowait function (AMQPMessage $msg) use ($queueName, $routingKey, $callback) { ... $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]); } );
如果消息處理中出現(xiàn)異常,應(yīng)該將該消息重新投遞到重試Exchange,等待下次重試
basic_publish(msg, "master.retry", queueName) ack(delivery-tag) // 不要忘記了應(yīng)答消費成功消息
如果判斷重試次數(shù)大于3次,仍然處理失敗,則應(yīng)該講消息投遞到失敗Exchange,等待人工處理
basic_publish(msg, "master.failed", queueName) ack(delivery-tag) // 不要忘記了應(yīng)答消費成功消息
一定不要忘記ack消息,因為重試、失敗都是通過將消息重新投遞到重試、失敗Exchange來實現(xiàn)的,如果忘記ack,則該消息在超時或者連接斷開后,會重新被重新投遞給消費者,如果消費者依舊無法處理,則會造成死循環(huán)。
Java代碼
try { String message = new String(body, "UTF-8"); // 消息處理函數(shù) handler.handle(message, envelope.getRoutingKey()); } catch (Exception e) { long retryCount = getRetryCount(properties); if (retryCount > 3) { // 重試次數(shù)大于3次,則自動加入到失敗隊列 Mapheaders = new HashMap<>(); headers.put("x-orig-routing-key", getOrigRoutingKey(properties, envelope.getRoutingKey())); channel.basicPublish("master.failed", queueName, createOverrideProperties(properties, headers), body); } else { // 重試次數(shù)小于3,則加入到重試隊列,30s后再重試 Map headers = properties.getHeaders(); if (headers == null) { headers = new HashMap<>(); } headers.put("x-orig-routing-key", getOrigRoutingKey(properties, envelope.getRoutingKey())); channel.basicPublish("master.retry", queueName, createOverrideProperties(properties, headers), body); } }
在消息發(fā)送到重試隊列和失敗隊列時,我們在消息的headers中添加了一個名為x-orig-routing-key的字段,該字段是實現(xiàn)消息重試的關(guān)鍵字段,由于我們的消息需要在不同的Exchange,Queue之間流轉(zhuǎn),為了避免消息在重新投遞到主Exchange時,被所有的消費者隊列重新消費,在重試過程中,我們將消息的routing-key修改為隊列名稱,直接投遞給原始消費消息的隊列。x-orig-routing-key用于在之后能夠重新獲取到最開始的routing-key。
這里的重復消費是指 某個消息被兩個消費方A和B消費了,其中A消費失敗,B成功,這時候,消息由A消費者重新投遞到主Exchange后,B消費隊列也會獲取到該消息,因此就會導致B消費者重復消費已經(jīng)消費國的消息失敗任務(wù)重試
如果任務(wù)重試三次仍未成功,則會被投遞到失敗隊列,這時候需要人工處理程序異常,處理完畢后,需要將消息重新投遞到隊列進行處理,這里唯一需要做的就是從失敗隊列訂閱消息,然后獲取到消息后,清空其application_headers頭信息,然后重新投遞到master這個Exchange即可。
Java代碼
channel.basicPublish( "master", envelope.getRoutingKey(), MessageProperties.PERSISTENT_BASIC, body );
PHP代碼
$msg->set("application_headers", new AMQPTable([])); $this->channel->basic_publish( $msg, "master", $msg->get("routing_key") );怎么使用
隊列和Exchange以及發(fā)布訂閱的關(guān)系我們就說完了,那么使用起來是什么效果呢?這里我們以Java代碼為例
// 發(fā)布消息 Publisher publisher = new Publisher(factory.newConnection(), "master"); publisher.publish("{"id":121, "name":"guanyiyao"}", "user.create"); // 訂閱消息 new Subscriber(factory.newConnection(), Main.EXCHANGE_NAME) .init("user-monitor", "user.*") .subscribe((message, routingKey) -> { // TODO 業(yè)務(wù)邏輯 System.out.printf(" <%s> message consumed: %s ", routingKey, message); } );總結(jié)
使用RabbitMQ時,實現(xiàn)延時重試和失敗隊列的方式并不僅僅局限于本文中描述的方法,如果讀者有更好的實現(xiàn)方案,歡迎拍磚,在這里我也只是拋磚引玉了。本文中講述的方法還有很多優(yōu)化空間,讀者也可以試著去改進其實現(xiàn)方案,比如本文中使用了三個Exchagne,是否只使用一個Exchange也能實現(xiàn)本文中所講述的功能。
本文將會持續(xù)修正和更新,最新內(nèi)容請參考我的 GITHUB 上的 程序猿成長計劃 項目,歡迎 Star,更多精彩內(nèi)容請 follow me。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/69372.html
摘要:本文將會講解如何使用實現(xiàn)延時重試和失敗消息隊列,實現(xiàn)可靠的消息消費,消費失敗后,自動延時將消息重新投遞,當達到一定的重試次數(shù)后,將消息投遞到失敗消息隊列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發(fā)的開源消息隊列。本文假設(shè)讀者對RabbitMQ是什么已經(jīng)有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網(wǎng)的 RabbitMQ Tutorials 入門...
摘要:通過以上分析我們可以得出消息隊列具有很好的削峰作用的功能即通過異步處理,將短時間高并發(fā)產(chǎn)生的事務(wù)消息存儲在消息隊列中,從而削平高峰期的并發(fā)事務(wù)。 該文已加入開源項目:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識的文檔類項目,Star 數(shù)接近 16k)。地址:https://github.com/Snailclimb... 本文內(nèi)容思維導圖:showImg(ht...
摘要:簡介是語言編寫的,開源的分布式消息隊列中間件,其設(shè)計的目的是用來大規(guī)模地處理每天數(shù)以十億計級別的消息。 簡介 NSQ是Go語言編寫的,開源的分布式消息隊列中間件,其設(shè)計的目的是用來大規(guī)模地處理每天數(shù)以十億計級別的消息。NSQ 具有分布式和去中心化拓撲結(jié)構(gòu),該結(jié)構(gòu)具有無單點故障、故障容錯、高可用性以及能夠保證消息的可靠傳遞的特征,是一個成熟的、已在大規(guī)模生成環(huán)境下應(yīng)用的產(chǎn)品。 NSQ在國...
閱讀 1314·2021-11-04 16:09
閱讀 3516·2021-10-19 11:45
閱讀 2408·2021-10-11 10:59
閱讀 1022·2021-09-23 11:21
閱讀 2774·2021-09-22 10:54
閱讀 1149·2019-08-30 15:53
閱讀 2618·2019-08-30 15:53
閱讀 3490·2019-08-30 12:57