摘要:消息持久化控制的屬性就是消息的持久化。當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為時,兩個消費(fèi)者都會收到消息并處理當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為時,只有消費(fèi)者可以接收到消息。八的消息確認(rèn)機(jī)制在中,可以通過持久化數(shù)據(jù)解決服務(wù)器異常的數(shù)據(jù)丟失問題。
一、內(nèi)容大綱&使用場景 1. 消息隊(duì)列解決了什么問題?
異步處理
應(yīng)用解耦
流量削鋒
日志處理
......
2. rabbitMQ安裝與配置 3. Java操作rabbitMQsimple 簡單隊(duì)列
. work queues 工作隊(duì)列 公平分發(fā) 輪詢分發(fā)
. publish/subscribe 發(fā)布訂閱
. routing 路由選擇 通配符模式
. Topics 主題
手動和自動確認(rèn)消息
隊(duì)列的持久化和非持久化
rabbitMQ的延遲隊(duì)列
4. Spring AMQP Spring-Rabbit 5. DEMOMQ實(shí)現(xiàn)搜索引擎DIH增量
未支付訂單30分鐘 取消
類似百度統(tǒng)計(jì) cnzz 架構(gòu) 消息隊(duì)列
二、用戶及vhost配置 2.1 添加用戶 2.2 virtual hosts管理virtual hosts相當(dāng)于mysql的db
一般以/開頭
2.3 用戶授權(quán)需要對用戶進(jìn)行授權(quán)
三、簡單隊(duì)列 3.1 模型P:消息生產(chǎn)者
紅色:隊(duì)列
C:消息消費(fèi)者
包含三個對象:生產(chǎn)者、隊(duì)列、消費(fèi)者
3.2 獲取mq連接import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConnectionUtil { /** * 獲取MQ的連接 * @return */ public static Connection getConnection() throws IOException, TimeoutException { //定義一個連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); //AMQP的端口 factory.setPort(5672); //vhost factory.setVirtualHost("/vhost_mmr"); factory.setUsername("rabbit"); factory.setPassword("123456"); Connection connection = factory.newConnection(); return connection; } }3.3 生產(chǎn)消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String QUEUE_NAME = "test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //從連接中獲取一個通道 Channel channel = connection.createChannel(); //創(chuàng)建隊(duì)列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "hello world!"; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("---send msg :" + msg); channel.close(); connection.close(); } }3.4 消費(fèi)消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receive { private static final String QUEUE_NAME = "test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //創(chuàng)建channel Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("msg receive : " + msg); } }; channel.basicConsume(QUEUE_NAME, consumer); } }3.5 簡單隊(duì)列的不足
耦合性高,生產(chǎn)者一一對應(yīng)消費(fèi)者,如果需要多個消費(fèi)者消費(fèi)隊(duì)列中的消息,此時簡單隊(duì)列就無能為力了。
隊(duì)列名變更,源碼需要同時變更
四、Work隊(duì)列 4.1 模型一個生產(chǎn)者將消息放入隊(duì)列中,可以有多個消費(fèi)者進(jìn)行消費(fèi)
為什么會出現(xiàn)工作隊(duì)列?
Simple隊(duì)列:是一一對應(yīng)的,實(shí)際開發(fā)中,生產(chǎn)者改善消息是毫不費(fèi)力的,而消費(fèi)者一般需要跟業(yè)務(wù)相結(jié)合,消費(fèi)者接收到消息之后就需要處理,可能需要花費(fèi)時間,此時隊(duì)列就會積壓很多消息。
4.2 輪詢分發(fā)生產(chǎn)消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //從連接中獲取一個通道 Channel channel = connection.createChannel(); //創(chuàng)建隊(duì)列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 50; i++) { String msg = "hello " + i; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("---send msg :" + msg); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } channel.close(); connection.close(); } }
消費(fèi)者1
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv1 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //創(chuàng)建channel Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[1] msg recv1 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }; boolean ack = true; channel.basicConsume(QUEUE_NAME, ack, consumer); } }
消費(fèi)者2
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv2 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //創(chuàng)建channel Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[2] msg recv1 : " + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }; boolean ack = true; channel.basicConsume(QUEUE_NAME, ack, consumer); } }
現(xiàn)象:
消費(fèi)者1和消費(fèi)者2處理的消息是一樣多的,這種分發(fā)方式稱為輪詢分發(fā)(round-robin),不管誰忙或者誰閑,都不會多給或者少給。任務(wù)均分。
4.3 公平分發(fā) fair dispatch保證一次發(fā)送給消費(fèi)者的消息不超過一條
/** * 每個消費(fèi)者發(fā)送確認(rèn)消息之前,消息隊(duì)列不發(fā)送下一個消息給消費(fèi)者,消費(fèi)者一次只處理一個消息 * * 限制發(fā)送給同一個消費(fèi)者不得超過一條消息 */ int preFetchCount = 1; channel.basicQos(preFetchCount);
使用公平分發(fā),必須關(guān)閉自動應(yīng)答ack,改為手動
channel.basicAck(envelope.getDeliveryTag(), false); boolean ack = false;//自動應(yīng)答改為false channel.basicConsume(QUEUE_NAME, ack, consumer);
生產(chǎn)消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //從連接中獲取一個通道 Channel channel = connection.createChannel(); //創(chuàng)建隊(duì)列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); /** * 每個消費(fèi)者發(fā)送確認(rèn)消息之前,消息隊(duì)列不發(fā)送下一個消息給消費(fèi)者,消費(fèi)者一次只處理一個消息 * * 限制發(fā)送給同一個消費(fèi)者不得超過一條消息 */ int preFetchCount = 1; channel.basicQos(preFetchCount); for (int i = 0; i < 50; i++) { String msg = "hello " + i; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("---send msg :" + msg); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } channel.close(); connection.close(); } }
消費(fèi)消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv2 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //創(chuàng)建channel Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[2] msg recv1 : " + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自動應(yīng)答改為false channel.basicConsume(QUEUE_NAME, ack, consumer); } }4.4 消息應(yīng)答與消息持久化 4.4.1 消息應(yīng)答
boolean ack = false;//自動應(yīng)答改為false channel.basicConsume(QUEUE_NAME, ack, consumer);
ack = true時為自動確認(rèn)模式,一旦rabbitMQ將消息分發(fā)給消費(fèi)者,該消息就會在內(nèi)存中刪除;這種情況下,如果殺死正在處理消息的消費(fèi)者,會丟失正在處理的消息;
ack = false時為手動回執(zhí)(消息應(yīng)答)模式,如果有一個消費(fèi)者掛掉,就會將會給其他消費(fèi)者,rabbitMQ支持消息應(yīng)答,消費(fèi)者發(fā)送一個消息應(yīng)答,告訴rabbitMQ這個消息已經(jīng)被處理,然后rabbitMQ就刪除內(nèi)存中的消息;
消息應(yīng)答默認(rèn)打開,即為false;
由于消息在內(nèi)存中存儲,如果rabbitMQ掛掉,消息仍然會丟失。
4.4.2 消息持久化boolean durable = false; channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
durable控制的屬性就是消息的持久化。
已經(jīng)聲明好的隊(duì)列,如果durable已經(jīng)為false了,就無法修改為true,rabbitMQ不允許重新定義(不同參數(shù))一個已存在的隊(duì)列
五、訂閱模式 Publish/Subscribe 5.1 模型解讀:
1、一個生產(chǎn)者,多個消費(fèi)者;
2、每個消費(fèi)者都有自己的隊(duì)列;
3、生產(chǎn)者沒有直接把消息發(fā)送到隊(duì)列,而是發(fā)送至交換機(jī)(eXchange)
4、每個隊(duì)列都要綁定到交換機(jī)上
5、生產(chǎn)者發(fā)送的消息,經(jīng)過交換機(jī),到達(dá)隊(duì)列,就能實(shí)現(xiàn)一個消息被多個消費(fèi)者消費(fèi)
5.2 實(shí)現(xiàn)生產(chǎn)消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //聲明交換機(jī) channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String msg = "hello ps"; channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); System.out.println("Send " + msg); channel.close(); connection.close(); } }
消息哪去了?丟失了!因?yàn)榻粨Q機(jī)沒有存儲能力,在rabbitMQ中,只有隊(duì)列有存儲能力。此時并沒有完成隊(duì)列綁定到交換機(jī),所以數(shù)據(jù)丟失了。
消費(fèi)消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv1 { private static final String QUEUE_NAME = "test_ps_fanout_email"; private static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //綁定隊(duì)列到交換機(jī) channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[1] msg recv1 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自動應(yīng)答改為false channel.basicConsume(QUEUE_NAME, ack, consumer); } }
不同的隊(duì)列做不同的事情。
5.3 Exchange(交換機(jī)、轉(zhuǎn)發(fā)器)一方面接收生產(chǎn)者的消息,另一方面向隊(duì)列推送消息
rabbitMQ提供了四種Exchange:fanout,direct,topic,header? header模式在實(shí)際使用中較少。
fanout:不處理路由鍵
direct:處理路由鍵
topic
將路由鍵和某模式進(jìn)行匹配
任何發(fā)送到Topic Exchange的消息都會被轉(zhuǎn)發(fā)到所有關(guān)心RouteKey中指定話題的Queue上
六、路由模式 6.1 模型聲明exchange時指定為direct模式
綁定隊(duì)列時,指定路由鍵
6.2 實(shí)現(xiàn)生產(chǎn)者
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String msg = "hello direct"; //指定路由鍵 String routingKey = "warning"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println("send msg:" + msg); channel.close(); connection.close(); } }
消費(fèi)者1
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv1 { private static final String EXCHANGE_NAME = "test_exchange_direct"; private static final String QUEUE_NAME = "test_queue_direct"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); //綁定隊(duì)列與交換機(jī)時,指定路由鍵 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[1] msg recv1 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自動應(yīng)答改為false channel.basicConsume(QUEUE_NAME, ack, consumer); } }
消費(fèi)者2
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv2 { private static final String EXCHANGE_NAME = "test_exchange_direct"; private static final String QUEUE_NAME = "test_queue_direct_2"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); //綁定隊(duì)列與交換機(jī)時,指定路由鍵 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[2] msg recv2 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自動應(yīng)答改為false channel.basicConsume(QUEUE_NAME, ack, consumer); } }七、Topic模式 7.1 模型
# 匹配一個或多個
* 匹配一個
7.2 實(shí)現(xiàn)生產(chǎn)者
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //聲明exchange,指定模式為topic channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String msg = "商品...."; String routingKey = "goods.delete"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println("send msg:" + msg); channel.close(); connection.close(); } }
消費(fèi)者1
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Created by wangbin on 2018/6/26. */ public class Recv1 { private static final String EXCHANGE_NAME = "test_exchange_topic"; private static final String QUEUE_NAME = "test_queue_topic_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[1] msg recv1 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自動應(yīng)答改為false channel.basicConsume(QUEUE_NAME, ack, consumer); } }
消費(fèi)者2
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Created by wangbin on 2018/6/26. */ public class Recv2 { private static final String EXCHANGE_NAME = "test_exchange_topic"; private static final String QUEUE_NAME = "test_queue_topic_2"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[2] msg recv2 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自動應(yīng)答改為false channel.basicConsume(QUEUE_NAME, ack, consumer); } }
其中,消費(fèi)者1綁定路由鍵為goods.#,消費(fèi)者2綁定路由鍵為goods.add。當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為goods.add時,兩個消費(fèi)者都會收到消息并處理;當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為goods.update時,只有消費(fèi)者1可以接收到消息。
八、RabbitMQ的消息確認(rèn)機(jī)制在rabbitMQ中,可以通過持久化數(shù)據(jù)解決rabbitMQ服務(wù)器異常的數(shù)據(jù)丟失問題。
問題:生產(chǎn)者將消息發(fā)送出去之后,消息到底有沒有到達(dá)rabbitMQ服務(wù)器;默認(rèn)情況是不知道消息已到達(dá)的
兩種方式:
AMQP實(shí)現(xiàn)了事務(wù)機(jī)制
confirm模式
8.1 事務(wù)機(jī)制txSelect
用于將當(dāng)前channel設(shè)置成transaction模式
txCommit
用于提交事務(wù)
txRollback
回滾事務(wù)
生產(chǎn)者發(fā)送消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String QUEUE_NAME = "test_queue_tx"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "hello tx msg!"; try { channel.txSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); channel.txCommit(); } catch (IOException e) { channel.txRollback(); System.out.println("發(fā)生異常,事務(wù)已回滾"); } } }
事務(wù)機(jī)制會降低rabbitMQ的吞吐量。
8.2 Confirm模式生產(chǎn)者將信道設(shè)置成confirm模式,一旦信道進(jìn)入confirm模式,所有在該信道上面發(fā)布的消息都將會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊(duì)列之后,broker就會發(fā)送一個確認(rèn)給生產(chǎn)者(包含消息的唯一ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)列了,如果消息和隊(duì)列是可持久化的,那么確認(rèn)消息會在將消息寫入磁盤之后發(fā)出,broker回傳給生產(chǎn)者的確認(rèn)消息中delivery-tag域包含了確認(rèn)消息的序列號,此外broker也可以設(shè)置basic.ack的multiple域,表示到這個序列號之前的所有消息都已經(jīng)得到了處理;
confirm模式最大的好處在于他是異步的,一旦發(fā)布一條消息,生產(chǎn)者應(yīng)用程序就可以在等信道返回確認(rèn)的同時繼續(xù)發(fā)送下一條消息,當(dāng)消息最終得到確認(rèn)之后,生產(chǎn)者應(yīng)用便可以通過回調(diào)方法來處理該確認(rèn)消息,如果RabbitMQ因?yàn)樽陨韮?nèi)部錯誤導(dǎo)致消息丟失,就會發(fā)送一條nack消息,生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理該nack消息。
編程模式:
1、普通,發(fā)一條
2、批量,發(fā)一批
3、異步confirm模式,提供一個回調(diào)方法
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/71406.html
摘要:性能調(diào)優(yōu)筆記避免雷區(qū)要避免流控機(jī)制觸發(fā)服務(wù)端默認(rèn)配置是當(dāng)內(nèi)存使用達(dá)到,磁盤空閑空間小于,即啟動內(nèi)存報警,磁盤報警報警后服務(wù)端觸發(fā)流控機(jī)制。最佳線程生產(chǎn)者使用多線程發(fā)送數(shù)據(jù)到三到五個線程性能發(fā)送最佳,超過它也不能提高生產(chǎn)的發(fā)送速率。 RabbitMq 性能調(diào)優(yōu)筆記 [TOC] 避免雷區(qū) 要避免流控機(jī)制觸發(fā) 服務(wù)端默認(rèn)配置是當(dāng)內(nèi)存使用達(dá)到40%,磁盤空閑空間小于50M,即啟動內(nèi)存報警,磁...
摘要:慕課網(wǎng)消息中間件極速入門與實(shí)戰(zhàn)學(xué)習(xí)總結(jié)時間年月日星期三說明本文部分內(nèi)容均來自慕課網(wǎng)。 慕課網(wǎng)《RabbitMQ消息中間件極速入門與實(shí)戰(zhàn)》學(xué)習(xí)總結(jié) 時間:2018年09月05日星期三 說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):https://www.imooc.com 教學(xué)源碼:無 學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:RabbitM...
本文是公眾號讀者jianfeng投稿的面試經(jīng)驗(yàn)恭喜該同學(xué)成功轉(zhuǎn)型目錄:毅然轉(zhuǎn)型,沒頭蒼蠅制定目標(biāo),系統(tǒng)學(xué)習(xí)面試經(jīng)歷毅然轉(zhuǎn)崗,沒頭蒼蠅首先,介紹一下我的背景。本人坐標(biāo)廣州,2016年畢業(yè)于一個普通二本大學(xué),曾經(jīng)在某機(jī)構(gòu)培訓(xùn)過Android。2018年初的時候已經(jīng)在兩家小公司工作干了兩年的android開發(fā),然后會一些Tomcat、Servlet之類的技術(shù),當(dāng)時的年薪大概也就15萬這樣子。由于個人發(fā)展...
摘要:可以在地址看到如何使用講解下上面命令行表示控制臺端口號,可以在瀏覽器中通過控制臺來執(zhí)行的相關(guān)操作。同時從控制臺可以看到發(fā)送的速率多線程測試性能開了個線程,每個線程發(fā)送條消息。 showImg(http://ww2.sinaimg.cn/large/006tNc79ly1g5jjb62t88j30u00gwdi2.jpg); 前提 上次寫了篇文章,《SpringBoot Kafka 整合...
閱讀 2698·2021-10-22 09:55
閱讀 2030·2021-09-27 13:35
閱讀 1283·2021-08-24 10:02
閱讀 1520·2019-08-30 15:55
閱讀 1213·2019-08-30 14:13
閱讀 3486·2019-08-30 13:57
閱讀 1987·2019-08-30 11:07
閱讀 2465·2019-08-29 17:12