成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

RabbitMQ 在分布式系統(tǒng)的應(yīng)用

lily_wang / 756人閱讀

摘要:可設(shè)置為模式,所有發(fā)送的消息都會被確認(rèn)一次,用戶可以自行根據(jù)發(fā)回的確認(rèn)消息查看狀態(tài)。指定路由規(guī)則生產(chǎn)者消費(fèi)者同上。傳輸層主要傳輸二進(jìn)制數(shù)據(jù)流,提供幀的處理信道復(fù)用錯誤檢測和數(shù)據(jù)表示。

由于之前做的項(xiàng)目中需要在多個節(jié)點(diǎn)之間可靠地通信,所以廢棄了之前使用的Redis pub/sub(因?yàn)榧河袉吸c(diǎn)問題,且有諸多限制),改用了RabbitMQ。
使用期間得到不少收獲,也踩了不少坑,所以在此分享下心得。(簡單了解下RabbitMQ? 詳見下文:簡介)

怎么保證可靠性的?

RabbitMQ提供了幾種特性,犧牲了一點(diǎn)性能代價(jià),提供了可靠性的保證。

1. 持久化

當(dāng)RabbitMQ退出時(shí),默認(rèn)會將消息和隊(duì)列都清除,所以需要在第一次聲明隊(duì)列和發(fā)送消息時(shí)指定其持久化屬性為true,這樣RabbitMQ會將隊(duì)列、消息和狀態(tài)存到RabbitMQ本地的數(shù)據(jù)庫,重啟后會恢復(fù)。

java:

durable=true   
channel.queueDeclare("task_queue", durable, false, false, null); // 隊(duì)列  
channel.basicPublish("", "task_queue",
    MessageProperties.PERSISTENT_TEXT_PLAIN,
    message.getBytes()); // 消息

注:當(dāng)聲明的隊(duì)列已經(jīng)存在時(shí),嘗試重新定義它的durable是不生效的。

2. 接收應(yīng)答

客戶端接收消息的模式默認(rèn)是自動應(yīng)答,但是通過設(shè)置autoAck為false可以讓客戶端主動應(yīng)答消息。當(dāng)客戶端拒絕此消息或者未應(yīng)答便斷開連接時(shí),就會使得此消息重新入隊(duì)(在版本2.7.0以前是到重新加入到隊(duì)尾,2.7.0及以后是保留消息在隊(duì)列中的原來位置)。

java:

autoAck = false;
requeue = true;
channel.basicConsume(queue, autoAck, callback);
channel.basicAck();//應(yīng)答
channel.basicReject(deliveryTag, requeue); // 拒絕
channel.basicRecover(requeue); // 恢復(fù)

3. 發(fā)送確認(rèn)

默認(rèn)情況下,發(fā)送端不關(guān)注發(fā)出去的消息是否被消費(fèi)掉了??稍O(shè)置channel為confirm模式,所有發(fā)送的消息都會被確認(rèn)一次,用戶可以自行根據(jù)server發(fā)回的確認(rèn)消息查看狀態(tài)。詳細(xì)介紹見:confirms

java:

channel.confirmSelect(); // 進(jìn)入confirm模式
// do publish messages... 每條消息都會被編號,從1開始
channel.getNextPublishSeqNo() // 查看下一條要發(fā)送的消息的序號
channel.waitForConfirms(); // 等待所有消息發(fā)送并確認(rèn) 

4. 事務(wù)

和confirm模式不能同時(shí)使用,而且會帶來大量的多余開銷,導(dǎo)致吞吐量下降很多,故而不推薦。

java:

channel.txSelect();
try {
    // do something...
    channel.txCommit();
} catch (e){
    channel.txRollback();
}

5. 消息隊(duì)列的高可用(主備模式)

相比于路由和綁定,可以視為是共享于所有的節(jié)點(diǎn)的,消息隊(duì)列默認(rèn)只存在于第一次聲明它的節(jié)點(diǎn)上,這樣一旦這個節(jié)點(diǎn)掛了,這個隊(duì)列中未處理的消息就沒有了。

幸好,RabbitMQ提供了將它備份到其他節(jié)點(diǎn)的機(jī)制,任何時(shí)候都有一個master負(fù)責(zé)處理請求,其他slaves負(fù)責(zé)備份,當(dāng)master掛掉,會將最早創(chuàng)建的那個slave提升為master。

命令:

rabbitmqctl set_policy ha-all "^ha." "{"ha-mode":"all"}"

設(shè)置所有以"ha"開頭的queue在所有節(jié)點(diǎn)上擁有備份。詳細(xì)語法點(diǎn)這里,也可以在界面上配置。

注:由于exclusive類型的隊(duì)列會在client和server連接斷開時(shí)被刪掉,所以對它設(shè)置持久化屬性和備份都是沒有意義的。

6. 順序保證

直接上圖好了:

一些需要注意的地方

1. 集群配置

一個集群中多個節(jié)點(diǎn)共享一份.erlang.cookie文件;若是沒有啟用RABBITMQ_USE_LONGNAME,需要在每個節(jié)點(diǎn)的hosts文件中指定其他節(jié)點(diǎn)的地址,不然會找不到其他集群中的節(jié)點(diǎn)。

2. 腦裂(網(wǎng)絡(luò)分區(qū))

RabbitMQ集群對于網(wǎng)絡(luò)分區(qū)的處理和忍受能力不太好,推薦使用federation或者shovel插件去解決。federation詳見高級->Federation

但是,情況已經(jīng)發(fā)生了,怎么去解決呢?放心,還是有辦法恢復(fù)的。

當(dāng)網(wǎng)絡(luò)斷斷續(xù)續(xù)時(shí),會使得節(jié)點(diǎn)之間的通信斷掉,進(jìn)而造成集群被分隔開的情況。

這樣,每個小集群之后便只處理各自本地的連接和消息,從而導(dǎo)致數(shù)據(jù)不同步。當(dāng)重新恢復(fù)網(wǎng)絡(luò)連接時(shí),它們彼此都認(rèn)為是對方掛了-_-||,便可以判斷出有網(wǎng)絡(luò)分區(qū)出現(xiàn)了。但是RabbitMQ默認(rèn)是忽略掉不處理的,造成兩個節(jié)點(diǎn)繼續(xù)各自為政(路由,綁定關(guān)系,隊(duì)列等可以獨(dú)立地創(chuàng)建刪除,甚至主備隊(duì)列也會每一方擁有自己的master)。

可以更改配置使得連接恢復(fù)時(shí),會根據(jù)配置自動恢復(fù):

ignore:默認(rèn),不做任何處理

pause-minority:斷開連接時(shí),判斷當(dāng)前節(jié)點(diǎn)是否屬于少數(shù)派(節(jié)點(diǎn)數(shù)少于或者等于一半),如果是,則暫停直到恢復(fù)連接。

{pause_if_all_down, [nodes], ignore | autoheal}:斷開連接時(shí),判斷當(dāng)前集群中節(jié)點(diǎn)是否有節(jié)點(diǎn)在nodes中,如果有,則繼續(xù)運(yùn)行,否則暫停直到恢復(fù)連接。這種策略下,當(dāng)恢復(fù)連接時(shí),可能會有多個分區(qū)存活,所以,最后一個參數(shù)決定它們怎么合并。

autoheal:當(dāng)恢復(fù)連接時(shí),選擇客戶端連接數(shù)最多的節(jié)點(diǎn)狀態(tài)為主,重啟其他節(jié)點(diǎn)。

配置:集群配置

3. 多次ack

客戶端多次應(yīng)答同一條消息,會使得該客戶端收不到后續(xù)消息。

結(jié)合Docker使用

集群版本的實(shí)現(xiàn):詳見我自己寫的一個例子rabbitmq-server-cluster

消息隊(duì)列中間件的比較

RabbitMQ

優(yōu)點(diǎn):支持很多協(xié)議如:AMQP,XMPP,STMP,STOMP;靈活的路由;成熟穩(wěn)定的集群方案;負(fù)載均衡;數(shù)據(jù)持久化等。

缺點(diǎn):速度較慢;比較重量級,安裝需要依賴Erlang環(huán)境。

Redis

優(yōu)點(diǎn):比較輕量級,易上手

缺點(diǎn):單點(diǎn)問題,功能單一

Kafka:

優(yōu)點(diǎn):高吞吐;分布式;快速持久化;負(fù)載均衡;輕量級

缺點(diǎn):極端情況下會丟消息

最后附一張網(wǎng)上截取的測試結(jié)果:

更多性能參數(shù)見:http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/

如果有興趣簡單了解下RabbitMQ的簡單介紹,可以繼續(xù)往下看~

簡介

幾個重要的概念:

Virtual Host 包含若干個Exchange和Queue,表示一個節(jié)點(diǎn);

Exchange 接受客戶端發(fā)送的消息,并根據(jù)Binding將消息路由給服務(wù)器中的隊(duì)列,Exchange分為direct, fanout, topic三種。

Binding 連接Exchange和Queue,包含路由規(guī)則。

Queue 消息隊(duì)列,存儲還未被消費(fèi)的消息。

Message Header+Body

Channel 通道,執(zhí)行AMQP的命令;一個連接可創(chuàng)建多個通道以節(jié)省資源。

Client

RabbitMQ官方實(shí)現(xiàn)了很多熱門語言的客戶端,就不一一列舉啦,以java為例,直接開始正題:

1. 建立連接

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

可以加上斷開重試機(jī)制:

factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(10000);

創(chuàng)建連接和通道:

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

2. 一對一:一個生產(chǎn)者,一個消費(fèi)者

生產(chǎn)者:

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

消費(fèi)者:

Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
      throws IOException {
    String message = new String(body, "UTF-8");
    System.out.println(" [x] Received "" + message + """);
  }
};
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

3. 一對多:一個生產(chǎn)者,多個消費(fèi)者

代碼同上,只不過會有多個消費(fèi)者,消息會輪序發(fā)給各個消費(fèi)者。

如果設(shè)置了autoAck=false,那么可以實(shí)現(xiàn)公平分發(fā)(即對于某個特定的消費(fèi)者,每次最多只發(fā)送指定條數(shù)的消息,直到其中一條消息應(yīng)答后,再發(fā)送下一條)。需要在消費(fèi)者中加上:

int prefetchCount = 1;
channel.basicQos(prefetchCount);

其他同上。

4. 廣播

生產(chǎn)者:

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

消費(fèi)者同上。

5. Routing:指定路由規(guī)則

生產(chǎn)者:

String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

消費(fèi)者同上。

6. Topics:支持通配符的Routing


* 可以表示一個單詞
# 可以表示一個或多個單詞

生產(chǎn)者:

channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);

消費(fèi)者同上。

7. RPC

其實(shí)就是一對一模式的一種用法:

首先,客戶端發(fā)送一條消息到服務(wù)端聲明的隊(duì)列,消息屬性中包含reply_to和correlation_id

reply_to 是客戶端創(chuàng)建的消息的隊(duì)列,用來接收遠(yuǎn)程調(diào)用結(jié)果

correlation_id 是消息的標(biāo)識,服務(wù)端回應(yīng)的消息屬性中會帶上以便知道是哪條消息的結(jié)果。

然后,服務(wù)端接收到消息,處理,并返回一條結(jié)果到reply_to隊(duì)列中,

最終,客戶端接收到返回消息,繼續(xù)向下處理。

Server

支持各大主流操作系統(tǒng),這里以Unix為例介紹下常用配置和命令:

安裝:

由于RabbitMQ是依賴于Erlang的,所以得首先安裝最近版本的Erlang。

單點(diǎn)的安裝比較簡單,下載解壓即可。下載地址

配置:(一般的,用默認(rèn)的即可)

$RABBITMQ_HOME/etc/rabbitmq/rabbitmq-env.conf: 環(huán)境變量默認(rèn)配置(也可在啟動腳本中設(shè)置,且以啟動命令中的配置為準(zhǔn))。常用的有:

RABBITMQ_NODENAME:節(jié)點(diǎn)名稱,默認(rèn)是rabbit@$HOSTNAME。

RABBITMQ_NODE_PORT:協(xié)議端口號,默認(rèn)5672。

RABBITMQ_SERVER_START_ARGS:覆蓋rabbitmq.config中的一些配置。

$RABBITMQ_HOME/etc/rabbitmq/rabbitmq.config: 核心組件,插件,erlang服務(wù)等配置,常用的有:

disk_free_limit:隊(duì)列持久化等信息都是存到RabbitMQ本地的數(shù)據(jù)庫中的,默認(rèn)限制50000000(也就是最多只讓它使用50M空間啦,不夠可以上調(diào),也支持空閑空間百分比的配置)。要是超標(biāo)了,它就罷工了……

vm_memory_high_watermark:內(nèi)存使用,默認(rèn)0.4(最多讓它使用40%的內(nèi)存,超標(biāo)罷工)

注:若啟動失敗了,可以在啟動日志中查看到具體的錯誤信息。

命令:

$RABBITMQ_HOME/sbin/rabbitmq-server:啟動腳本,會打印出配置文件,插件,集群等信息;加上-detached為后臺啟動;

/sbin/rabbitmqctl status:查看啟動狀態(tài)

/sbin/rabbitmqctl add_user admin admin:添加新用戶admin,密碼admin;默認(rèn)只有一個guest用戶,但只限本機(jī)訪問。

/sbin/rabbitmqctl set_user_tags admin administrator:將admin設(shè)置為管理員權(quán)限

/sbin/rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" 賦予admin所有權(quán)限

/sbin/rabbitmqctl stop:關(guān)閉

集群

集群節(jié)點(diǎn)共享所有的狀態(tài)和數(shù)據(jù),如:用戶、路由、綁定等信息(隊(duì)列有點(diǎn)特殊,雖然從所有節(jié)點(diǎn)都可達(dá),但是只存在于第一次聲明它的那個節(jié)點(diǎn)上,解決方案:詳見上文:消息隊(duì)列的高可用;每個節(jié)點(diǎn)都可以接收連接,處理數(shù)據(jù)。

集群節(jié)點(diǎn)有兩種,disc:默認(rèn),信息存在本地?cái)?shù)據(jù)庫;ram:加入集群時(shí),添加--ram參數(shù),信息存在內(nèi)存,可提高性能。

配置:(一般的,用默認(rèn)的即可。)

$RABBITMQ_HOME/etc/rabbitmq/rabbitmq-env.conf:

RABBITMQ_USE_LONGNAME:默認(rèn)false,(默認(rèn)的,RABBITMQ_NODENAME中@后面的$HOSTNAME是主機(jī)名,所以需要集群中每個節(jié)點(diǎn)的hosts文件包含其他節(jié)點(diǎn)主機(jī)名到地址的映射。但是如果設(shè)置為true,就可以定義RABBITMQ_NODENAME中的$HOSTNAME為域名了)

RABBITMQ_DIST_PORT:集群端口號,默認(rèn)RABBITMQ_NODE_PORT + 20000

$RABBITMQ_HOME/etc/rabbitmq/rabbitmq.config:

cluster_nodes:設(shè)置后,在啟動時(shí)會嘗試自動連接加入的節(jié)點(diǎn)并組成集群。

cluster_partition_handling:詳見上文:網(wǎng)絡(luò)分區(qū)的處理。

更多詳細(xì)的配置見:配置

命令

rabbitmqctl stop_app

rabbitmqctl join_cluster [--ram] nodename@hostname:將當(dāng)前節(jié)點(diǎn)加入到集群中;默認(rèn)是以disc節(jié)點(diǎn)加入集群,加上--ram為ram節(jié)點(diǎn)。

rabbitmqctl start_app

rabbitmqctl cluster_status:查看集群狀態(tài)

注:如果加入集群失敗,可先查看

每個節(jié)點(diǎn)的$HOME/.erlang.cookie內(nèi)容一致;

如果hostname是主機(jī)名,那么此hostname和地址的映射需要加入hosts文件中;

如果使用的是域名,那么需要設(shè)置RABBITMQ_USE_LONGNAME為true。

注:docker版集群的見:rabbitmq-server-cluster

高級

AMQP協(xié)議簡介

RabbitMQ原生支持AMQP 0-9-1并擴(kuò)展實(shí)現(xiàn)了了一些常用的功能:AMQP 0-9-1

包含三層:

模型層: 最高層,提供了客戶端調(diào)用的命令,如:queue.declare,basic.ack,consume等。

會話層:將命令從客戶端傳遞給服務(wù)器,再將服務(wù)器的應(yīng)答傳遞給客戶端,會話層為這個傳遞過程提供可靠性、同步機(jī)制和錯誤處理。

傳輸層:主要傳輸二進(jìn)制數(shù)據(jù)流,提供幀的處理、信道復(fù)用、錯誤檢測和數(shù)據(jù)表示。

注:其他協(xié)議的支持見:RabbitMQ支持的協(xié)議

常用插件

管理界面(神器)

啟動后,執(zhí)行rabbitmq-plugins enable rabbitmq_management->
訪問http://localhost:15672->查看節(jié)點(diǎn)狀態(tài),隊(duì)列信息等等,甚至可以動態(tài)配置消息隊(duì)列的主備策略,如下圖:

Federation

啟用Federation插件,使得不同集群的節(jié)點(diǎn)之間可以傳遞消息,從而模擬出類似集群的效果。這樣可以有幾點(diǎn)好處:

松耦合:聯(lián)合在一起的不同集群可以有各自的用戶,權(quán)限等信息,無需一致;此外,這些集群的RabbitMQ和Erlang的版本可以不一致。

遠(yuǎn)程網(wǎng)絡(luò)連接友好:由于通信是遵循AMQP協(xié)議的,故而對斷斷續(xù)續(xù)的網(wǎng)絡(luò)連接容忍度高。

自定義:可以自主選擇哪些組件啟用federation。

幾個概念:

Upstreams: 定義上游節(jié)點(diǎn)信息,如:

rabbitmqctl set_parameter federation-upstream my-upstream "{"uri":"amqp://server-name","expires":3600000}"

定義一個my-upstream

uri是其上游節(jié)點(diǎn)的地址,多個upstream的節(jié)點(diǎn)無需在同一集群中。

expires表示斷開連接3600000ms后其上游節(jié)點(diǎn)會緩存消息。

Upstream sets: 多個Upstream的集合;默認(rèn)有個all,會將所有的Upstream加進(jìn)去。

Policies: 定義哪些exchanges,queues關(guān)聯(lián)到哪個Upstream或者Upstream set,如:

rabbitmqctl set_policy --apply-to exchanges federate-me "^amq." "{"federation-upstream-set":"all"}"

將此節(jié)點(diǎn)所有以amq.開頭的exchange聯(lián)合到上游節(jié)點(diǎn)的同名exchange。

注:

由于下游節(jié)點(diǎn)的exchange可以繼續(xù)作為其他節(jié)點(diǎn)的上游,故可設(shè)置成循環(huán),廣播等形式。

通過max_hops參數(shù)控制傳遞層數(shù)。

模擬集群,可以將多個節(jié)點(diǎn)兩兩互連,并設(shè)置max_hops=1。

rabbitmq-plugins enable rabbitmq_federation

如果啟用了管理界面,可以添加:

rabbitmq-plugins enable rabbitmq_federation_management

這樣就可以在界面配置Upstream和Policy了。

注:如果在一個集群中使用federation,需要該集群每個節(jié)點(diǎn)都啟用Federation插件

注:更多插件請見:插件

原文作者來自 MaxLeap 團(tuán)隊(duì)_Service&Infra 成員:呂舜
原文鏈接:https://blog.maxleap.cn/archives/648

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/25175.html

相關(guān)文章

  • RabbitMQ 布式系統(tǒng)應(yīng)用

    摘要:可設(shè)置為模式,所有發(fā)送的消息都會被確認(rèn)一次,用戶可以自行根據(jù)發(fā)回的確認(rèn)消息查看狀態(tài)。指定路由規(guī)則生產(chǎn)者消費(fèi)者同上。傳輸層主要傳輸二進(jìn)制數(shù)據(jù)流,提供幀的處理信道復(fù)用錯誤檢測和數(shù)據(jù)表示。 showImg(http://i.niupic.com/images/2016/04/28/h7FUPX.png); 由于之前做的項(xiàng)目中需要在多個節(jié)點(diǎn)之間可靠地通信,所以廢棄了之前使用的Redis pub...

    lanffy 評論0 收藏0
  • RabbitMQ 布式系統(tǒng)應(yīng)用

    摘要:可設(shè)置為模式,所有發(fā)送的消息都會被確認(rèn)一次,用戶可以自行根據(jù)發(fā)回的確認(rèn)消息查看狀態(tài)。指定路由規(guī)則生產(chǎn)者消費(fèi)者同上。傳輸層主要傳輸二進(jìn)制數(shù)據(jù)流,提供幀的處理信道復(fù)用錯誤檢測和數(shù)據(jù)表示。 showImg(http://i.niupic.com/images/2016/04/28/h7FUPX.png); 由于之前做的項(xiàng)目中需要在多個節(jié)點(diǎn)之間可靠地通信,所以廢棄了之前使用的Redis pub...

    Kross 評論0 收藏0
  • 消息中間件——RabbitMQ(二)各大主流消息中間件綜合對比介紹!

    摘要:主流消息中間件介紹是由出品,是一個完全支持和規(guī)范的實(shí)現(xiàn)。主流消息中間件介紹是阿里開源的消息中間件,目前也已經(jīng)孵化為頂級項(xiàng)目。 showImg(https://img-blog.csdnimg.cn/20190509221741422.gif);showImg(https://img-blog.csdnimg.cn/20190718204938932.png?x-oss-process=...

    hiyang 評論0 收藏0
  • 消息隊(duì)列常見問題解析

    摘要:消息隊(duì)列帶來的問題系統(tǒng)可用性降低系統(tǒng)引入的外部依賴越多,系統(tǒng)越容易出問題。系統(tǒng)復(fù)雜性提高加入消息隊(duì)列后,需要保證消息沒有被重復(fù)消費(fèi),保證消息傳遞的順序性等等。 消息隊(duì)列相關(guān)筆記 消息隊(duì)列的應(yīng)用場景: 消費(fèi)者執(zhí)行過程比較長且生產(chǎn)者不需要消費(fèi)者返回結(jié)果。用于更新索引庫,生成商品詳情頁,發(fā)短信。 為什么要使用消息隊(duì)列: 通過異步處理提高系統(tǒng)性能(削峰、減少響應(yīng)所需時(shí)間); 降低系統(tǒng)耦合性。...

    蘇丹 評論0 收藏0

發(fā)表評論

0條評論

lily_wang

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<