成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

RabbitMQ基礎(chǔ)

Airmusic / 543人閱讀

摘要:一簡(jiǎn)介是一個(gè)有開(kāi)發(fā)的的開(kāi)源實(shí)現(xiàn)的官網(wǎng)是一款消息組件,其中一定包含生產(chǎn)者,消費(fèi)者,消息組件。

一. RabbitMQ簡(jiǎn)介

1 . RabbitMQ是一個(gè)有Erlang開(kāi)發(fā)的AMQP(Advanced Message Queue)的開(kāi)源實(shí)現(xiàn)

2 . RabbitMQ的官網(wǎng):http://www.rabbitmq.com

3 . RabbitMQ是一款消息組件,其中一定包含生產(chǎn)者,消費(fèi)者,消息組件。RabbitMQ中有三個(gè)重要組成部分

a . Exchange:交換空間

b . Queue:數(shù)據(jù)隊(duì)列

c . RoutingKey:隊(duì)列路由(如果所有的隊(duì)列的RoutingKey都一樣,則屬于廣播小,如果不一樣,則屬于點(diǎn)對(duì)點(diǎn)消息)

4 . RabbitMQ中的幾個(gè)核心概念

a . Broker:消息隊(duì)列的服務(wù)主機(jī)

b . Exchange:消息交換機(jī),用于分發(fā)消息到隊(duì)列

c . Queue:消息隊(duì)列的載體,每個(gè)消息都會(huì)被投入到一個(gè)或多個(gè)隊(duì)列

e . Binding:將Exchange與Queue按照RoutingKey規(guī)則進(jìn)行綁定

f . RoutingKey:路由Key,Exchange根據(jù)RoutingKey進(jìn)行消息分發(fā)

g . Vhost:虛擬主機(jī),一個(gè)Broker可以有多個(gè)Vhost,用于實(shí)現(xiàn)用戶(權(quán)限)的分離

h . Producer:消息生產(chǎn)者

i . Consumer:消息消費(fèi)者

j . Channel:消息通道,每個(gè)Channel代表一個(gè)會(huì)話任務(wù)

二. 環(huán)境搭建

1 . 安裝Erlang開(kāi)發(fā)環(huán)境

a . 在這里安裝Erlang時(shí)遇到的坑較多,個(gè)人不推薦下載erlang源碼進(jìn)行解壓縮編譯安裝,因?yàn)橐蕾?lài)的庫(kù)較多(gcc,libncurses5-dev,.eg):


建立erlang目錄mkdir -p /usr/local/erlang

進(jìn)入源碼目錄 cd /user/local/src/otp_src_19.3

編譯配置 ./configure --prefix=/usr/local/erlang

編譯安裝 make && make install

配置環(huán)境變量

vim /etc/profile
export ERLANG_HOME=/usr/local/erlang
export PATH=$PATH:$ERLANG_HOME/bin:
source /etc/profile

b . 本人使用apt-get安裝erlang語(yǔ)言環(huán)境

apt-get install erlang 或者apt-get install erlang-nox

c . 測(cè)試erlang

輸入erl 表示進(jìn)入erlang環(huán)境

輸入halt().退出

2 . 安裝RabbitMQ

a . 根據(jù)官網(wǎng)介紹進(jìn)行安裝

相關(guān)命令

echo "deb http://www.rabbitmq.com/debian/ testing main" |
     sudo tee /etc/apt/sources.list.d/rabbitmq.list
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc |
     sudo apt-key add -
sudo apt-get update
sudo apt-get install rabbitmq-server

b . 后臺(tái)啟動(dòng)RabbitMQrabbitmq-server start > /dev/null 2>&1 &

c . 開(kāi)啟管理頁(yè)面插件rabbitmq-plugins enable rabbitmq_management

d . 添加新用戶rabbitmqctl add_user evans 123123(創(chuàng)建一個(gè)用戶名為evans,密碼為123123的用戶)

e . 將新用戶設(shè)為管理員rabbitmqctl set_user_tags evans administrator

f . 打開(kāi)瀏覽器輸入訪問(wèn)地址http://192.168.1.1:15672訪問(wèn)RabbitMQ管理頁(yè)面

g . 查看RabbitMQ狀態(tài)rabbitmqctl status,關(guān)閉RabbitMQrabbitmqctl stop

h . 設(shè)置用戶虛擬主機(jī),否則程序無(wú)法連接Queue

二. Java基本操作

1 . 在管理界面中增加一個(gè)新的Queue

a . Name:隊(duì)列名稱(chēng)

b . Durability:持久化選項(xiàng):Durable(持久化保存),Transient(即時(shí)保存),持久化保存在RabbitMQ宕機(jī)或者重啟后,未消費(fèi)的消息仍然存在,即時(shí)保存在RabbitMQ宕機(jī)或者重啟后不存在

c . Auto delete:自動(dòng)刪除

2 . 引入RabbitMQ的Repository


    com.rabbitmq
    amqp-client
    4.1.0

3 . 消息生產(chǎn)者M(jìn)essageProducer.java

package com.evans.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Created by Evans 
 */
public class MessageProducer {
    //隊(duì)列名稱(chēng)
    private static final String QUEUE_NAME = "first";
    //主機(jī)IP
    private static final String HOST="127.0.0.1";
    //端口
    private static final Integer PORT=5672;
    //用戶名
    private static final String USERNAME="evans";
    //密碼
    private static final String PASSWORD="evans";

    public static void main(String[] args) throws Exception {
        //創(chuàng)建工廠類(lèi)
        ConnectionFactory factory = new ConnectionFactory();
        //設(shè)置參數(shù)
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        //創(chuàng)建連接
        Connection connection =factory.newConnection();
        //創(chuàng)建Channel
        Channel channel=connection.createChannel();
        //聲明Queue
        /*
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map arguments)
         * 隊(duì)列名稱(chēng),是否持久保存,是否為專(zhuān)用的隊(duì)列,是否允許自動(dòng)刪除,配置參數(shù)
         * 此處的配置與RabbitMQ管理界面的配置一致
         */
        channel.queueDeclare(QUEUE_NAME,true,false,true,null);
        Long start = System.currentTimeMillis();
        for (int i=0;i<100;i++){
            //發(fā)布消息
            /*
             * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
             * exchange名稱(chēng),RoutingKey,消息參數(shù)(消息頭等)(持久化時(shí)需要設(shè)置),消息體
             * MessageProperties有4中針對(duì)不同場(chǎng)景可以進(jìn)行選擇
             */
            channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,("Message:"+i).getBytes());
        }
        Long end = System.currentTimeMillis();
        System.out.println("System cost :"+(end-start));
        channel.close();
        connection.close();
    }
}

4 . 運(yùn)行MessageProduce的Main方法,在管理界面會(huì)出現(xiàn)詳細(xì)的監(jiān)控?cái)?shù)據(jù),此時(shí)消息已經(jīng)成功發(fā)送至RabbitMQ的隊(duì)列中

5 . 消息消費(fèi)者M(jìn)essageConsumer.java

package com.evans.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Created by Evans on 2017/7/15.
 */
public class MessageConsumer {

    //隊(duì)列名稱(chēng)
    private static final String QUEUE_NAME = "first";
    //主機(jī)IP
    private static final String HOST="10.0.0.37";
    //端口
    private static final Integer PORT=5672;
    //用戶名
    private static final String USERNAME="evans";
    //密碼
    private static final String PASSWORD="evans";

    public static void main(String[] args) throws IOException, TimeoutException {
        //創(chuàng)建工廠類(lèi)
        ConnectionFactory factory = new ConnectionFactory();
        //設(shè)置參數(shù)
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        //創(chuàng)建連接
        Connection connection =factory.newConnection();
        //創(chuàng)建Channel
        Channel channel=connection.createChannel();
        //聲明Queue
        /*
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map arguments)
         * 隊(duì)列名稱(chēng),是否持久保存,是否為專(zhuān)用的隊(duì)列,是否允許自動(dòng)刪除,配置參數(shù)
         * 此處的配置與RabbitMQ管理界面的配置一致
         */
        channel.queueDeclare(QUEUE_NAME,true,false,true,null);
        //這里需要復(fù)寫(xiě)handleDelivery方法進(jìn)行消息自定義處理
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("Consume Get Message : "+message);
            }
        };
        channel.basicConsume(QUEUE_NAME,consumer);
    }
}

6 . 運(yùn)行MessageConsumer的Main方法,會(huì)進(jìn)行消息消費(fèi)處理,此時(shí)控制臺(tái)會(huì)輸出消費(fèi)的消息,此時(shí)完成了消息的生產(chǎn)與消費(fèi)的基本操作,當(dāng)存在多個(gè)消費(fèi)者的處理同一個(gè)隊(duì)列時(shí),RabbitMQ會(huì)自動(dòng)進(jìn)行均衡負(fù)載處理,多個(gè)消費(fèi)者共同來(lái)處理消息

Consume Get Message : Message:0
Consume Get Message : Message:1
Consume Get Message : Message:2
...
Consume Get Message : Message:99

7 . RabbitMQ虛擬主機(jī)

a . 可以在管理界面的admin-vhost下設(shè)置多個(gè)虛擬主機(jī)

b . 在程序中通過(guò)設(shè)置factory參數(shù)進(jìn)行虛擬主機(jī)的指定factory.setVirtualHost("firstHost")

8 . Exchange工作模式:topic、direct、fanout

a . 廣播模式(fanout):一條消息被所有的消費(fèi)者進(jìn)行處理

① .將消費(fèi)者與生產(chǎn)者中的`channel.queueDeclare()`方法替換為`channel.exchangeDeclare(EXCHANGE_NAME, "fanout")`方法進(jìn)行Exchange的指定,channel.basicPublish()方法需要指定exchange
② .此時(shí)再次運(yùn)行生產(chǎn)者和多個(gè)消費(fèi)者,則一個(gè)消息會(huì)被多個(gè)消費(fèi)者進(jìn)行消費(fèi)處理

b . 直連模式(direct):一跳消息根據(jù)RoutingKey進(jìn)行生產(chǎn)者與消費(fèi)者的匹配,從而達(dá)到指定生產(chǎn)者的消息被指定消費(fèi)者進(jìn)行處理

① .將生產(chǎn)者中的`channel.queueDeclare()`方法替換為`channel.exchangeDeclare(EXCHANGE_NAME, "direct")`方法進(jìn)行Exchange的指定,channel.basicPublish()方法需要指定exchange和RoutingKey("mykey")
② .將消費(fèi)者中的`channel.queueDeclare()`方法替換為
// 定義EXCHANGE的聲明String
channel.exchangeDeclare(EXCHANGE_NAME, "direct") ;
// 通過(guò)通道獲取一個(gè)隊(duì)列名稱(chēng)                         
String queueName= channel.queueDeclare().getQueue() ;
// 進(jìn)行綁定處理
channel.queueBind(queueName, EXCHANGE_NAME, "mykey") ;
③ .此時(shí)RoutingKey作為唯一標(biāo)記,這樣就可以將消息推送到指定的消費(fèi)者進(jìn)行處理

c . 主題模式(topic):一條消息被所有的消費(fèi)者進(jìn)行處理

① .將生產(chǎn)者中的`channel.queueDeclare()`方法替換為`channel.exchangeDeclare(EXCHANGE_NAME, "topic") `方法進(jìn)行Exchange的指定,channel.basicPublish()方法需要指定exchange和RoutingKey("mykey-01")
② .將消費(fèi)者中的`channel.queueDeclare()`方法替換為
// 定義EXCHANGE的聲明String
channel.exchangeDeclare(EXCHANGE_NAME, "topic") ;
// 通過(guò)通道獲取一個(gè)隊(duì)列名稱(chēng)                         
String queueName= channel.queueDeclare().getQueue() ;
// 進(jìn)行綁定處理
channel.queueBind(queueName, EXCHANGE_NAME, "mykey-01");
③ .此時(shí)主題模式即為廣播模式與直連模式的混合使用。

三. RabbitMQ整合Spring

1 . 引入srping-rabbit的Repository


    org.springframework.amqp
    spring-rabbit
    1.7.3.RELEASE

2 . 建立rabbitmq.properties,對(duì)RabbitMQ的屬性參數(shù)進(jìn)行設(shè)置

# RabbitMQ的主機(jī)IP
mq.rabbit.host=192.168.68.211
# RabbitMQ的端口
mq.rabbit.port=5672
# RabbitMQ的VHost
mq.rabbit.vhost=hello
# RabbitMQ的exchange名稱(chēng)
mq.rabbit.exchange=spring.rabbit
# 用戶名
mq.rabbit.username=evans
# 密碼
mq.rabbit.password=evans

3 . 生產(chǎn)者XML(需增加xmlns:rabbit命名空間)



  
  
  
  
  
  
  
  
  
  
  
    
      
      
    
  
  
  

4 . 消費(fèi)者XML(需增加xmlns:rabbit命名空間)



  
  
  
  
  
  
  
  
  
  
    
      
      
    
  
  
  
  
  
    
  

5 . 生產(chǎn)者

a . 定義消息Service

package com.evans.rabbitmq;

/**
 * Created by Evans 
 */
public interface MessageService {
    /**
     * 發(fā)送消息
     * @param message
     */
    public void sendMessage(String message);
}

b . 定義MessageService的實(shí)現(xiàn)類(lèi)

package com.evans.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;

import javax.annotation.Resource;

/**
 * Created by Evans
 */
public class MessageServiceImpl implements MessageService {
    
    @Resource
    private AmqpTemplate template;
    
    @Override
    public void sendMessage(String message) {
        template.convertAndSend("key01",message);
    }
}

5 . 消費(fèi)者

a .消費(fèi)者需要實(shí)現(xiàn)MessageListener接口

b .消息處理類(lèi)

package com.evans.rabbitmq;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

/**
 * Created by Evans 
 */
public class MessageConsumer implements MessageListener {
    
    @Override
    public void onMessage(Message message) {
        System.out.println("Consumer Message: "+ message);    
    }
}

四. RabbitMQ整合SpringBoot

1 . 引入SpringBoot的RabbitMQ腳手架


    org.springframework.boot
    spring-boot-starter-amqp

2 . 配置Application.yml

spring:
  rabbitmq:
    host: 10.0.0.37
    port: 5672
    username: evans
    password: evans

3 . 配置類(lèi)

package com.evans.rabbitmq;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**
 * Created by Evans 
 */
@Configuration
public class RabbitConfigure {
    @Bean
    public Queue firstQueue(){
        return new Queue("firstQueue");
    }
}

4 . 生產(chǎn)者

package com.evans.rabbitmq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.time.LocalDateTime;

/**
 * Created by Evans
 */
@Component
public class MessageProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;
    
    public void send(){
        LocalDateTime current =LocalDateTime.now();
        System.out.println("Send Message : "+current);
        rabbitTemplate.convertAndSend("firstQueue","Send Message"+ current);
    }
}

5 . 消費(fèi)者

package com.evans.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Created by Evans 
 */
@Component
@RabbitListener(queues = "firstQueue")
public class MessageConsumer {

    @RabbitHandler
    public void consumer(String message){
        System.out.println("Consumer Message : "+message);
    }
}

6 . FanoutExchange配置

@Configuration
public class FanoutConfiguration {

    @Bean
    public Queue fanoutFirstQueue() {
        return new Queue("fanout.first");
    }

    @Bean
    public Queue fanoutSecondQueue() {
        return new Queue("fanout.second");
    }

    @Bean
    public Queue fanoutThirdQueue() {
        return new Queue("fanout.third");
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    public Binding bindingExchangeFanoutFirst(Queue fanoutFirstQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutFirstQueue).to(fanoutExchange);
    }

    @Bean
    public Binding bindingExchangeFanoutSecond(Queue fanoutSecondQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutSecondQueue).to(fanoutExchange);
    }

    @Bean
    public Binding bindingExchangeFanoutThird(Queue fanoutThirdQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutThirdQueue).to(fanoutExchange);
    }

}

7 . TopicExchange配置

@Configuration
public class TopicConfiguration {

    @Bean
    public Queue topicFirstQueue() {
        return new Queue("topic.first");
    }

    @Bean
    public Queue topicAnyQueue() {
        return new Queue("topic.any");
    }

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topicExchange");
    }

    @Bean
    public Binding bindingExchangeTopicFirst(Queue topicFirstQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicFirstQueue).to(topicExchange).with("topic.first");
    }

    @Bean
    public Binding bindingExchangeTopicAny(Queue topicAnyQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicAnyQueue).to(topicExchange).with("topic.#");
    }

}

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/67408.html

相關(guān)文章

  • RabbitMQ 基礎(chǔ)教程(1) - Hello World

    摘要:基礎(chǔ)教程注本文是對(duì)眾多博客的學(xué)習(xí)和總結(jié),可能存在理解錯(cuò)誤。請(qǐng)帶著懷疑的眼光,同時(shí)如果有錯(cuò)誤希望能指出。安裝庫(kù)這里我們首先將消息推入隊(duì)列,然后消費(fèi)者從隊(duì)列中去除消息進(jìn)行消費(fèi)。 RabbitMQ 基礎(chǔ)教程(1) - Hello World 注:本文是對(duì)眾多博客的學(xué)習(xí)和總結(jié),可能存在理解錯(cuò)誤。請(qǐng)帶著懷疑的眼光,同時(shí)如果有錯(cuò)誤希望能指出。 如果你喜歡我的文章,可以關(guān)注我的私人博客:http:...

    wushuiyong 評(píng)論0 收藏0
  • Spring Cloud構(gòu)建微服務(wù)架構(gòu):消息驅(qū)動(dòng)的微服務(wù)(入門(mén))【Dalston版】

    摘要:它通過(guò)使用來(lái)連接消息代理中間件以實(shí)現(xiàn)消息事件驅(qū)動(dòng)的微服務(wù)應(yīng)用。該示例主要目標(biāo)是構(gòu)建一個(gè)基于的微服務(wù)應(yīng)用,這個(gè)微服務(wù)應(yīng)用將通過(guò)使用消息中間件來(lái)接收消息并將消息打印到日志中。下面我們通過(guò)編寫(xiě)生產(chǎn)消息的單元測(cè)試用例來(lái)完善我們的入門(mén)內(nèi)容。 之前在寫(xiě)Spring Boot基礎(chǔ)教程的時(shí)候?qū)戇^(guò)一篇《Spring Boot中使用RabbitMQ》。在該文中,我們通過(guò)簡(jiǎn)單的配置和注解就能實(shí)現(xiàn)向Rabbi...

    smallStone 評(píng)論0 收藏0
  • Rabbitmq基礎(chǔ)組件架構(gòu)設(shè)計(jì)

    摘要:基礎(chǔ)組件架構(gòu)設(shè)計(jì)基礎(chǔ)組件封裝設(shè)計(jì)迅速消息發(fā)送支持迅速消息發(fā)送模式,在一些日志收集統(tǒng)計(jì)分析等需求下可以保證高性能,高吞吐量。基礎(chǔ)組件封裝設(shè)計(jì)事務(wù)消息發(fā)送支持事務(wù)消息,且保障可靠性投遞,在金融行業(yè)單筆大金額操作時(shí)會(huì)有此類(lèi)需求。 Rabbitmq基礎(chǔ)組件架構(gòu)設(shè)計(jì) 基礎(chǔ)組件封裝設(shè)計(jì) - 迅速消息發(fā)送支持迅速消息發(fā)送模式,在一些日志收集、統(tǒng)計(jì)分析等需求下可以保證高性能,高吞吐量。 基礎(chǔ)組件封...

    Steve_Wang_ 評(píng)論0 收藏0
  • PHP-RabbitMQ學(xué)習(xí)日記(一)

    摘要:通道,建立一個(gè)訪問(wèn)通道。隊(duì)列,每個(gè)消息都會(huì)被投入到一個(gè)或多個(gè)隊(duì)列。路由,根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞。消息消費(fèi)者,就是接受消息的程序。 給自己做一個(gè)記錄 本文主要介紹有一下 1.RabbitMQ是概念 2.RabbitMQ在windows上安裝,啟動(dòng),關(guān)閉 3.RabbitMQ其他小介紹 下面一步一步走起來(lái) 1.RabbitMQ是概念 RabbitMQ是一個(gè)建立在AMQP(高級(jí)消息隊(duì)列協(xié)...

    SolomonXie 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<