成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

RabbitMQ學(xué)習(xí)筆記

zacklee / 1028人閱讀

摘要:消息持久化控制的屬性就是消息的持久化。當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為時,兩個消費(fèi)者都會收到消息并處理當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為時,只有消費(fèi)者可以接收到消息。八的消息確認(rèn)機(jī)制在中,可以通過持久化數(shù)據(jù)解決服務(wù)器異常的數(shù)據(jù)丟失問題。

一、內(nèi)容大綱&使用場景 1. 消息隊(duì)列解決了什么問題?

異步處理

應(yīng)用解耦

流量削鋒

日志處理

......

2. rabbitMQ安裝與配置 3. Java操作rabbitMQ

simple 簡單隊(duì)列
. work queues 工作隊(duì)列 公平分發(fā) 輪詢分發(fā)
. publish/subscribe 發(fā)布訂閱
. routing 路由選擇 通配符模式
. Topics 主題

手動和自動確認(rèn)消息

隊(duì)列的持久化和非持久化

rabbitMQ的延遲隊(duì)列

4. Spring AMQP Spring-Rabbit 5. DEMO

MQ實(shí)現(xiàn)搜索引擎DIH增量

未支付訂單30分鐘 取消

類似百度統(tǒng)計(jì) cnzz 架構(gòu) 消息隊(duì)列

二、用戶及vhost配置 2.1 添加用戶

2.2 virtual hosts管理

virtual hosts相當(dāng)于mysql的db

一般以/開頭

2.3 用戶授權(quán)

需要對用戶進(jìn)行授權(quán)

三、簡單隊(duì)列 3.1 模型

P:消息生產(chǎn)者

紅色:隊(duì)列

C:消息消費(fèi)者

包含三個對象:生產(chǎn)者、隊(duì)列、消費(fèi)者

3.2 獲取mq連接
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionUtil {
    /**
     * 獲取MQ的連接
     * @return
     */
    public static Connection getConnection() throws IOException, TimeoutException {
        //定義一個連接工廠
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("127.0.0.1");
        //AMQP的端口
        factory.setPort(5672);
        //vhost
        factory.setVirtualHost("/vhost_mmr");
        factory.setUsername("rabbit");
        factory.setPassword("123456");

        Connection connection = factory.newConnection();
        return connection;
    }
}
3.3 生產(chǎn)消息
import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    private static final String QUEUE_NAME = "test_simple_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        //從連接中獲取一個通道
        Channel channel = connection.createChannel();

        //創(chuàng)建隊(duì)列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        String msg = "hello world!";

        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

        System.out.println("---send msg :" + msg);
        channel.close();
        connection.close();
    }
}
3.4 消費(fèi)消息
import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Receive {
    private static final String QUEUE_NAME = "test_simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        //創(chuàng)建channel
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("msg receive : " + msg);
            }
        };

        channel.basicConsume(QUEUE_NAME, consumer);
    }
}
3.5 簡單隊(duì)列的不足

耦合性高,生產(chǎn)者一一對應(yīng)消費(fèi)者,如果需要多個消費(fèi)者消費(fèi)隊(duì)列中的消息,此時簡單隊(duì)列就無能為力了。

隊(duì)列名變更,源碼需要同時變更

四、Work隊(duì)列 4.1 模型

一個生產(chǎn)者將消息放入隊(duì)列中,可以有多個消費(fèi)者進(jìn)行消費(fèi)

為什么會出現(xiàn)工作隊(duì)列?

Simple隊(duì)列:是一一對應(yīng)的,實(shí)際開發(fā)中,生產(chǎn)者改善消息是毫不費(fèi)力的,而消費(fèi)者一般需要跟業(yè)務(wù)相結(jié)合,消費(fèi)者接收到消息之后就需要處理,可能需要花費(fèi)時間,此時隊(duì)列就會積壓很多消息。

4.2 輪詢分發(fā)

生產(chǎn)消息

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    private static final String QUEUE_NAME = "test_work_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        //從連接中獲取一個通道
        Channel channel = connection.createChannel();

        //創(chuàng)建隊(duì)列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 50; i++) {
            String msg = "hello " + i;
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

            System.out.println("---send msg :" + msg);

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }


        channel.close();
        connection.close();
    }
}

消費(fèi)者1

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv1 {
    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        //創(chuàng)建channel
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[1] msg recv1 : " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        boolean ack = true;
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}

消費(fèi)者2

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv2 {
    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        //創(chuàng)建channel
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[2] msg recv1 : " + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        boolean ack = true;
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}

現(xiàn)象

消費(fèi)者1和消費(fèi)者2處理的消息是一樣多的,這種分發(fā)方式稱為輪詢分發(fā)(round-robin),不管誰忙或者誰閑,都不會多給或者少給。任務(wù)均分。

4.3 公平分發(fā) fair dispatch

保證一次發(fā)送給消費(fèi)者的消息不超過一條

        /**
         * 每個消費(fèi)者發(fā)送確認(rèn)消息之前,消息隊(duì)列不發(fā)送下一個消息給消費(fèi)者,消費(fèi)者一次只處理一個消息
         *
         * 限制發(fā)送給同一個消費(fèi)者不得超過一條消息
         */
        int preFetchCount = 1;
        channel.basicQos(preFetchCount);

使用公平分發(fā),必須關(guān)閉自動應(yīng)答ack,改為手動

channel.basicAck(envelope.getDeliveryTag(), false);
boolean ack = false;//自動應(yīng)答改為false
channel.basicConsume(QUEUE_NAME, ack, consumer);

生產(chǎn)消息

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    private static final String QUEUE_NAME = "test_work_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        //從連接中獲取一個通道
        Channel channel = connection.createChannel();

        //創(chuàng)建隊(duì)列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        /**
         * 每個消費(fèi)者發(fā)送確認(rèn)消息之前,消息隊(duì)列不發(fā)送下一個消息給消費(fèi)者,消費(fèi)者一次只處理一個消息
         *
         * 限制發(fā)送給同一個消費(fèi)者不得超過一條消息
         */
        int preFetchCount = 1;
        channel.basicQos(preFetchCount);

        for (int i = 0; i < 50; i++) {
            String msg = "hello " + i;
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

            System.out.println("---send msg :" + msg);

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }


        channel.close();
        connection.close();
    }
}

消費(fèi)消息

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv2 {
    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        //創(chuàng)建channel
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[2] msg recv1 : " + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean ack = false;//自動應(yīng)答改為false
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}
4.4 消息應(yīng)答與消息持久化 4.4.1 消息應(yīng)答
boolean ack = false;//自動應(yīng)答改為false
channel.basicConsume(QUEUE_NAME, ack, consumer);

ack = true時為自動確認(rèn)模式,一旦rabbitMQ將消息分發(fā)給消費(fèi)者,該消息就會在內(nèi)存中刪除;這種情況下,如果殺死正在處理消息的消費(fèi)者,會丟失正在處理的消息;

ack = false時為手動回執(zhí)(消息應(yīng)答)模式,如果有一個消費(fèi)者掛掉,就會將會給其他消費(fèi)者,rabbitMQ支持消息應(yīng)答,消費(fèi)者發(fā)送一個消息應(yīng)答,告訴rabbitMQ這個消息已經(jīng)被處理,然后rabbitMQ就刪除內(nèi)存中的消息;

消息應(yīng)答默認(rèn)打開,即為false;

由于消息在內(nèi)存中存儲,如果rabbitMQ掛掉,消息仍然會丟失。

4.4.2 消息持久化
boolean durable = false;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

durable控制的屬性就是消息的持久化。

已經(jīng)聲明好的隊(duì)列,如果durable已經(jīng)為false了,就無法修改為true,rabbitMQ不允許重新定義(不同參數(shù))一個已存在的隊(duì)列

五、訂閱模式 Publish/Subscribe 5.1 模型

解讀:

1、一個生產(chǎn)者,多個消費(fèi)者;

2、每個消費(fèi)者都有自己的隊(duì)列;

3、生產(chǎn)者沒有直接把消息發(fā)送到隊(duì)列,而是發(fā)送至交換機(jī)(eXchange)

4、每個隊(duì)列都要綁定到交換機(jī)上

5、生產(chǎn)者發(fā)送的消息,經(jīng)過交換機(jī),到達(dá)隊(duì)列,就能實(shí)現(xiàn)一個消息被多個消費(fèi)者消費(fèi)

5.2 實(shí)現(xiàn)

生產(chǎn)消息

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    private static final String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        //聲明交換機(jī)
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String msg = "hello ps";

        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
        System.out.println("Send " + msg);

        channel.close();
        connection.close();
    }
}

消息哪去了?丟失了!因?yàn)榻粨Q機(jī)沒有存儲能力,在rabbitMQ中,只有隊(duì)列有存儲能力。此時并沒有完成隊(duì)列綁定到交換機(jī),所以數(shù)據(jù)丟失了。

消費(fèi)消息

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv1 {
    private static final String QUEUE_NAME = "test_ps_fanout_email";
    private static final String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //綁定隊(duì)列到交換機(jī)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[1] msg recv1 : " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean ack = false;//自動應(yīng)答改為false
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}

不同的隊(duì)列做不同的事情。

5.3 Exchange(交換機(jī)、轉(zhuǎn)發(fā)器)

一方面接收生產(chǎn)者的消息,另一方面向隊(duì)列推送消息

rabbitMQ提供了四種Exchange:fanout,direct,topic,header? header模式在實(shí)際使用中較少。

fanout:不處理路由鍵

direct:處理路由鍵

topic

將路由鍵和某模式進(jìn)行匹配

任何發(fā)送到Topic Exchange的消息都會被轉(zhuǎn)發(fā)到所有關(guān)心RouteKey中指定話題的Queue上

六、路由模式 6.1 模型

聲明exchange時指定為direct模式

綁定隊(duì)列時,指定路由鍵

6.2 實(shí)現(xiàn)

生產(chǎn)者

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    private static final String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        //聲明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String msg = "hello direct";

        //指定路由鍵
        String routingKey = "warning";
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());

        System.out.println("send msg:" + msg);
        channel.close();
        connection.close();
    }
}

消費(fèi)者1

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv1 {
    private static final String EXCHANGE_NAME = "test_exchange_direct";
    private static final String QUEUE_NAME = "test_queue_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);

        //綁定隊(duì)列與交換機(jī)時,指定路由鍵
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");


        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[1] msg recv1 : " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean ack = false;//自動應(yīng)答改為false
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}

消費(fèi)者2

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv2 {
    private static final String EXCHANGE_NAME = "test_exchange_direct";
    private static final String QUEUE_NAME = "test_queue_direct_2";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);

        //綁定隊(duì)列與交換機(jī)時,指定路由鍵
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[2] msg recv2 : " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean ack = false;//自動應(yīng)答改為false
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}
七、Topic模式 7.1 模型

# 匹配一個或多個

* 匹配一個

7.2 實(shí)現(xiàn)

生產(chǎn)者

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    private static final String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        //聲明exchange,指定模式為topic
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String msg = "商品....";

        String routingKey = "goods.delete";
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());

        System.out.println("send msg:" + msg);
        channel.close();
        connection.close();
    }
}

消費(fèi)者1

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Created by wangbin on 2018/6/26.
 */
public class Recv1 {
    private static final String EXCHANGE_NAME = "test_exchange_topic";
    private static final String QUEUE_NAME = "test_queue_topic_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[1] msg recv1 : " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean ack = false;//自動應(yīng)答改為false
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}

消費(fèi)者2

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Created by wangbin on 2018/6/26.
 */
public class Recv2 {
    private static final String EXCHANGE_NAME = "test_exchange_topic";
    private static final String QUEUE_NAME = "test_queue_topic_2";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[2] msg recv2 : " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean ack = false;//自動應(yīng)答改為false
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}

其中,消費(fèi)者1綁定路由鍵為goods.#,消費(fèi)者2綁定路由鍵為goods.add。當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為goods.add時,兩個消費(fèi)者都會收到消息并處理;當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為goods.update時,只有消費(fèi)者1可以接收到消息。

八、RabbitMQ的消息確認(rèn)機(jī)制

在rabbitMQ中,可以通過持久化數(shù)據(jù)解決rabbitMQ服務(wù)器異常的數(shù)據(jù)丟失問題。

問題:生產(chǎn)者將消息發(fā)送出去之后,消息到底有沒有到達(dá)rabbitMQ服務(wù)器;默認(rèn)情況是不知道消息已到達(dá)的

兩種方式:

AMQP實(shí)現(xiàn)了事務(wù)機(jī)制

confirm模式

8.1 事務(wù)機(jī)制

txSelect

用于將當(dāng)前channel設(shè)置成transaction模式

txCommit

用于提交事務(wù)

txRollback

回滾事務(wù)

生產(chǎn)者發(fā)送消息

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    private static final String QUEUE_NAME = "test_queue_tx";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        String msg = "hello tx msg!";

        try {
            channel.txSelect();
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            channel.txCommit();
        } catch (IOException e) {
            channel.txRollback();
            System.out.println("發(fā)生異常,事務(wù)已回滾");
        }
    }
}

事務(wù)機(jī)制會降低rabbitMQ的吞吐量。

8.2 Confirm模式

生產(chǎn)者將信道設(shè)置成confirm模式,一旦信道進(jìn)入confirm模式,所有在該信道上面發(fā)布的消息都將會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊(duì)列之后,broker就會發(fā)送一個確認(rèn)給生產(chǎn)者(包含消息的唯一ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)列了,如果消息和隊(duì)列是可持久化的,那么確認(rèn)消息會在將消息寫入磁盤之后發(fā)出,broker回傳給生產(chǎn)者的確認(rèn)消息中delivery-tag域包含了確認(rèn)消息的序列號,此外broker也可以設(shè)置basic.ack的multiple域,表示到這個序列號之前的所有消息都已經(jīng)得到了處理;

confirm模式最大的好處在于他是異步的,一旦發(fā)布一條消息,生產(chǎn)者應(yīng)用程序就可以在等信道返回確認(rèn)的同時繼續(xù)發(fā)送下一條消息,當(dāng)消息最終得到確認(rèn)之后,生產(chǎn)者應(yīng)用便可以通過回調(diào)方法來處理該確認(rèn)消息,如果RabbitMQ因?yàn)樽陨韮?nèi)部錯誤導(dǎo)致消息丟失,就會發(fā)送一條nack消息,生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理該nack消息。

編程模式:

1、普通,發(fā)一條

2、批量,發(fā)一批

3、異步confirm模式,提供一個回調(diào)方法

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/71406.html

相關(guān)文章

  • RabbitMq 最全的性能調(diào)優(yōu)筆記

    摘要:性能調(diào)優(yōu)筆記避免雷區(qū)要避免流控機(jī)制觸發(fā)服務(wù)端默認(rèn)配置是當(dāng)內(nèi)存使用達(dá)到,磁盤空閑空間小于,即啟動內(nèi)存報警,磁盤報警報警后服務(wù)端觸發(fā)流控機(jī)制。最佳線程生產(chǎn)者使用多線程發(fā)送數(shù)據(jù)到三到五個線程性能發(fā)送最佳,超過它也不能提高生產(chǎn)的發(fā)送速率。 RabbitMq 性能調(diào)優(yōu)筆記 [TOC] 避免雷區(qū) 要避免流控機(jī)制觸發(fā) 服務(wù)端默認(rèn)配置是當(dāng)內(nèi)存使用達(dá)到40%,磁盤空閑空間小于50M,即啟動內(nèi)存報警,磁...

    Tony 評論0 收藏0
  • 慕課網(wǎng)_《RabbitMQ消息中間件極速入門與實(shí)戰(zhàn)》學(xué)習(xí)總結(jié)

    摘要:慕課網(wǎng)消息中間件極速入門與實(shí)戰(zhàn)學(xué)習(xí)總結(jié)時間年月日星期三說明本文部分內(nèi)容均來自慕課網(wǎng)。 慕課網(wǎng)《RabbitMQ消息中間件極速入門與實(shí)戰(zhàn)》學(xué)習(xí)總結(jié) 時間:2018年09月05日星期三 說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):https://www.imooc.com 教學(xué)源碼:無 學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:RabbitM...

    mykurisu 評論0 收藏0
  • Android工程師轉(zhuǎn)型Java后端開發(fā)之路,自己選的路,跪著也要走下去!

    本文是公眾號讀者jianfeng投稿的面試經(jīng)驗(yàn)恭喜該同學(xué)成功轉(zhuǎn)型目錄:毅然轉(zhuǎn)型,沒頭蒼蠅制定目標(biāo),系統(tǒng)學(xué)習(xí)面試經(jīng)歷毅然轉(zhuǎn)崗,沒頭蒼蠅首先,介紹一下我的背景。本人坐標(biāo)廣州,2016年畢業(yè)于一個普通二本大學(xué),曾經(jīng)在某機(jī)構(gòu)培訓(xùn)過Android。2018年初的時候已經(jīng)在兩家小公司工作干了兩年的android開發(fā),然后會一些Tomcat、Servlet之類的技術(shù),當(dāng)時的年薪大概也就15萬這樣子。由于個人發(fā)展...

    番茄西紅柿 評論0 收藏0
  • SpringBoot RabbitMQ 整合使用

    摘要:可以在地址看到如何使用講解下上面命令行表示控制臺端口號,可以在瀏覽器中通過控制臺來執(zhí)行的相關(guān)操作。同時從控制臺可以看到發(fā)送的速率多線程測試性能開了個線程,每個線程發(fā)送條消息。 showImg(http://ww2.sinaimg.cn/large/006tNc79ly1g5jjb62t88j30u00gwdi2.jpg); 前提 上次寫了篇文章,《SpringBoot Kafka 整合...

    yuanxin 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<