摘要:消息隊(duì)列,用于存儲(chǔ)還未被消費(fèi)者消費(fèi)的消息。由在與時(shí)指定,而由發(fā)送時(shí)指定,兩者的匹配方式由決定。需要為每一個(gè)創(chuàng)建,協(xié)議規(guī)定只有通過才能執(zhí)行的命令。建議客戶端線程之間不要共用,至少要保證共用的線程發(fā)送消息必須是串行的,但是建議盡量共用。
安裝
rabbitmq 在 mac 下可以直接用 brew 安裝
默認(rèn)安裝在 /usr/local/Cellar/下
命令被軟連接加入到了/usr/local/sbin 下,因此可以把此目錄放到環(huán)境變量中,建議加入到~/.bash_profile 中
rabbitmq-server start 開啟服務(wù)
端口5672 默認(rèn)
端口15672 web 端登錄管理端口127.0.0.1:15672
rabbitmq 默認(rèn)提供的用戶 guest 密碼 guest
停止服務(wù)
rabbitmqctl stop
開啟應(yīng)用 [服務(wù)依舊運(yùn)行]
rabbitmqctl start_app
停止應(yīng)用 [服務(wù)依舊運(yùn)行]
rabbitmqctl stop_app
添加用戶
sudo rabbitmqctl add_user username password
刪除用戶
sudo rabbitmqctl delete_user username
修改密碼
sudo rabbitmqctl change_password username newpassword
清除用戶密碼,禁止用戶登錄
sudo rabbitmqctl clear_password
列出所有用戶
sudo rabbitmqctl list_users
設(shè)置用戶角色
rabbitmqctl set_user_tags username tag
virtual host只是起到一個(gè)命名空間的作用,所以可以多個(gè)user共同使用一個(gè)virtual host,"/"這個(gè)是系統(tǒng)默認(rèn)的vhost,就是說當(dāng)我們創(chuàng)建一個(gè)到rabbitmq的connection時(shí)候,它的命名空間是"/",需要注意的是不同的命名空間之間的資源是不能訪問的,比如 exchang,queue ,bingding等
創(chuàng)建虛擬主機(jī)
sudo rabbitmqctl add_vhost vhostpath
刪除虛擬主機(jī)
sudo rabbitmqctl delete_vhost vhostpath
列出所有虛擬主機(jī)
sudo rabbitmqctl list_vhosts
列出某個(gè) vhost 的所有用戶和權(quán)限
list_permissions [-p vhostpath]
列出某個(gè)用戶的所有權(quán)限。
list_user_permissions {username}
清除用戶對(duì)某個(gè) vhost 的權(quán)限。
clear_permissions [-p vhostpath] {username}
設(shè)置用戶對(duì)某個(gè) virtual host 的權(quán)限,如果不指定 vhost,則默認(rèn)為“/” vhost。
set_permissions [-p vhostpath] {user}
rabbitmqctl set_permissions -p test_host kang “." "." ".*"
添加一個(gè)管理員代替 guest
rabbitmqctl add_user admin 123456
指定用戶的角色
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin "." "." ".*”
分配給用戶指定虛擬主機(jī)的權(quán)限,雖然是administrator角色,但不對(duì)所有虛擬主機(jī)都有權(quán)限,一樣需要對(duì)每個(gè)虛擬主機(jī)都授權(quán)
顯示信息
rabbitmqctl list_queues [-p
列出某個(gè) vhost 的所有 queue。
rabbitmqctl list_exchanges [-p
列出某個(gè) vhost 的所有 exchange。
rabbitmqctl list_bindings [-p
列出某個(gè) vhost 的所有 binding。
rabbitmqctl list_connections [
列出 RabbitMQ broker 的所有 connection。
rabbitmqctl list_channels [
列出 RabbitMQ broker 的所有 channel
rabbitmqcrl list_consumers [-p
列出某個(gè) vhost 的所有 consumer。
1.Server(broker): 接受客戶端連接,實(shí)現(xiàn)AMQP消息隊(duì)列和路由功能的進(jìn)程。
2.Virtual Host:其實(shí)是一個(gè)虛擬概念,類似于權(quán)限控制組,一個(gè)Virtual Host里面可以有若干個(gè)Exchange和Queue,但是權(quán)限控制的最小粒度是Virtual Host
3.Exchange:接受生產(chǎn)者發(fā)送的消息,并根據(jù)Binding規(guī)則將消息路由給服務(wù)器中的隊(duì)列。ExchangeType決定了Exchange路由消息的行為,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三種,不同類型的Exchange路由的行為是不一樣的。
4.Message Queue:消息隊(duì)列,用于存儲(chǔ)還未被消費(fèi)者消費(fèi)的消息。
5.Message: 由Header和Body組成,Header是由生產(chǎn)者添加的各種屬性的集合,包括Message是否被持久化、由哪個(gè)Message Queue接受、優(yōu)先級(jí)是多少等。而Body是真正需要傳輸?shù)腁PP數(shù)據(jù)。
6.Binding:Binding聯(lián)系了Exchange與Message Queue。Exchange在與多個(gè)Message Queue發(fā)生Binding后會(huì)生成一張路由表,路由表中存儲(chǔ)著Message Queue所需消息的限制條件即Binding Key。當(dāng)Exchange收到Message時(shí)會(huì)解析其Header得到Routing Key,Exchange根據(jù)Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時(shí)指定,而Routing Key由Producer發(fā)送Message時(shí)指定,兩者的匹配方式由Exchange Type決定。
7.Connection:連接,對(duì)于RabbitMQ而言,其實(shí)就是一個(gè)位于客戶端和Broker之間的TCP連接。
8.Channel:信道,僅僅創(chuàng)建了客戶端到Broker之間的連接后,客戶端還是不能發(fā)送消息的。需要為每一個(gè)Connection創(chuàng)建Channel,AMQP協(xié)議規(guī)定只有通過Channel才能執(zhí)行AMQP的命令。一個(gè)Connection可以包含多個(gè)Channel。之所以需要Channel,是因?yàn)門CP連接的建立和釋放都是十分昂貴的,如果一個(gè)客戶端每一個(gè)線程都需要與Broker交互,如果每一個(gè)線程都建立一個(gè)TCP連接,暫且不考慮TCP連接是否浪費(fèi),就算操作系統(tǒng)也無法承受每秒建立如此多的TCP連接。RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發(fā)送消息必須是串行的,但是建議盡量共用Connection。
9.Command:AMQP的命令,客戶端通過Command完成與AMQP服務(wù)器的交互來實(shí)現(xiàn)自身的邏輯。例如在RabbitMQ中,客戶端可以通過publish命令發(fā)送消息,txSelect開啟一個(gè)事務(wù),txCommit提交一個(gè)事務(wù)。
客戶端管理php端 rabbitmq 客戶端可以使用 composer 下的庫(kù)
{ "require": { "php-amqplib/php-amqplib": "2.5.*" } }
使用時(shí),用到了這兩個(gè)東西
use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage;
開始發(fā)送消息
public function handle() { //連接到 test_host 虛擬主機(jī),每個(gè)虛擬主機(jī)有自己的隊(duì)列,交換機(jī)... $connection = new AMQPStreamConnection("127.0.0.1", 5672, "kang", "a943434603", "test_host"); //創(chuàng)建一個(gè) channel $channel = $connection->channel(); //聲明 hello 隊(duì)列 $channel->queue_declare("hello", false, false, false, false); //創(chuàng)建一個(gè)消息 $msg = new AMQPMessage(time()); //把消息推送到默認(rèn)的交換機(jī)中,并且告訴交換機(jī)要把消息交給 hello 隊(duì)列 $channel->basic_publish($msg, "", "hello"); echo " [x] Sent ".time()." "; }
重要概念,消息是保存在交換機(jī)中的,當(dāng)消息存放時(shí)指定的隊(duì)列存在,交換機(jī)會(huì)把消息推送到該隊(duì)列
消息隊(duì)列發(fā)送消息給消費(fèi)者,一個(gè)消息發(fā)給一個(gè)消費(fèi)者
public function handle() { //連接 $connection = new AMQPStreamConnection("localhost", 5672, "kang", "a943434603", "test_host"); //創(chuàng)建一個(gè) channel $channel = $connection->channel(); //可以運(yùn)行這個(gè)命令很多次,但是只有一個(gè)隊(duì)列會(huì)被創(chuàng)建, 在程序中重復(fù)將隊(duì)列重復(fù)聲明一下是種值得推薦的做法,保證隊(duì)列存在 $channel->queue_declare("hello", false, false, false, false); echo " [*] Waiting for messages. To exit press CTRL+C", " "; $callback = function($msg) { echo " [x] Received ", $msg->body, " "; sleep($msg->body); $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]); }; //默認(rèn)情況下,隊(duì)列會(huì)把消息公平的分配給各個(gè)消費(fèi)者 //如果某個(gè)消費(fèi)者腳本處理完成分配給他的消息任務(wù)后,會(huì)一直空閑 //另外一個(gè)消費(fèi)者腳本處理的消息都非常耗時(shí),這就容易導(dǎo)致消費(fèi)者腳本得不到合理利用, //加入此句話,是告訴隊(duì)列,取消把消息公平分配到各個(gè)腳本,而是那個(gè)腳本空閑,就交給它一個(gè)消息任務(wù) //這樣,合理利用到每一個(gè)空閑的消費(fèi)者腳本 $channel->basic_qos(null, 1, null); /** * basic_consume 方法 從隊(duì)列中讀取數(shù)據(jù) * @param string $queue 指定隊(duì)列 * @param string $consumer_tag * @param bool $no_local * @param bool $no_ack 消費(fèi)者處理完消息后,是否不需要告訴隊(duì)列已經(jīng)處理完成,true 不需要 false 需要, * true 默認(rèn)情況下,隊(duì)列會(huì)把消息公平分配到各個(gè)消費(fèi)者中,然后一次性把消息交給消費(fèi)者,如果消費(fèi)者處理了一半掛了,那么消息就丟失了 * false 默認(rèn)情況下,隊(duì)列會(huì)把消息公平的分配給各個(gè)消費(fèi)者,然后一個(gè)一個(gè)的把消息分配到消費(fèi)者腳本中,腳本處理完成后,告訴隊(duì)列,隊(duì)列會(huì)刪除這個(gè)消息,并且接著給下一個(gè)消息, 當(dāng)腳本掛掉,不會(huì)丟失消息,隊(duì)列會(huì)把未完成的消息分配給其他消費(fèi)者 在 callback 函數(shù)中需要加入這句話,處理完后通知隊(duì)列可以刪除消息了 $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]); 未加入這句話,隊(duì)列不會(huì)刪除已處理完的消息,當(dāng)腳本掛掉時(shí),會(huì)把分配給當(dāng)前隊(duì)列的所有消息再次重新分配給其他隊(duì)列,會(huì)導(dǎo)致消息會(huì)重復(fù)處理 */ $channel->basic_consume("hello", "", false, false, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); }發(fā)布/訂閱
一個(gè)消息發(fā)送給多個(gè)消費(fèi)者
扇形交換機(jī) fanout
發(fā)布訂閱模式,科院實(shí)現(xiàn)一個(gè)消息發(fā)送到多個(gè)隊(duì)列中
在發(fā)布消息腳本中,創(chuàng)建一個(gè)扇形交換機(jī),把消息推送到交換機(jī),不需要推動(dòng)到指定的隊(duì)列中,隊(duì)列在消費(fèi)者腳本中創(chuàng)建
消費(fèi)腳本定義個(gè)臨時(shí)隊(duì)列,并綁定這個(gè)臨時(shí)隊(duì)列到交換機(jī)中,扇形交換機(jī)會(huì)把接收到的消息推動(dòng)到每一個(gè)綁定的隊(duì)列中
生產(chǎn)者腳本
public function handle() { //連接到 test_host 虛擬主機(jī),每個(gè)虛擬主機(jī)有自己的隊(duì)列,交換機(jī)... $connection = new AMQPStreamConnection("127.0.0.1", 5672, "kang", "a943434603", "test_host"); //創(chuàng)建一個(gè) channel $channel = $connection->channel(); //聲明 log 隊(duì)列 //$channel->queue_declare("log", false, false, false, false); //創(chuàng)建一個(gè)fanout類型交換機(jī) $channel->exchange_declare("logs","fanout",false,false,false); //創(chuàng)建一個(gè)消息 $msg = new AMQPMessage( time() ); $channel->basic_publish ( $msg , "logs" ); echo " [x] Sent ".time()." "; $channel->close(); $connection->close(); }
消費(fèi)者腳本
public function handle() { //連接 $connection = new AMQPStreamConnection("localhost", 5672, "kang", "a943434603", "test_host"); //創(chuàng)建一個(gè) channel $channel = $connection->channel(); //可以運(yùn)行這個(gè)命令很多次,但是只有一個(gè)隊(duì)列會(huì)被創(chuàng)建, 在程序中重復(fù)將隊(duì)列重復(fù)聲明一下是種值得推薦的做法,保證隊(duì)列存在 //$channel->queue_declare("hello", false, false, false, false); //創(chuàng)建一個(gè)fanout類型交換機(jī) $channel->exchange_declare("logs", "fanout", false, false, false); //系統(tǒng)創(chuàng)建一個(gè)臨時(shí)隊(duì)列 list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); //綁定臨時(shí)隊(duì)列到交換機(jī)上 $channel->queue_bind($queue_name, "logs"); $callback = function($msg){ echo " [x] ", $msg->body, " "; }; $channel->basic_consume($queue_name, "", false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); }
直連交換機(jī) direct
交換機(jī)將會(huì)對(duì)綁定鍵(binding key)和路由鍵(routing key)進(jìn)行精確匹配,從而確定消息該分發(fā)到哪個(gè)隊(duì)列
生產(chǎn)者,創(chuàng)建基本上和扇形交換機(jī)一樣,不同的是
$channel->exchange_declare("direct_logs","direct",false,false,false); //創(chuàng)建一個(gè)消息 $msg = new AMQPMessage( time() ); //把消息推動(dòng)到direct_logs交換機(jī),并給消息加上路由 key,讓消費(fèi)者隊(duì)列來根據(jù) key 接收消息 $channel->basic_publish ( $msg , "direct_logs", "warning" );
消費(fèi)者
$channel->exchange_declare("direct_logs","direct",false,false,false); //系統(tǒng)創(chuàng)建一個(gè)臨時(shí)隊(duì)列 list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); //綁定臨時(shí)隊(duì)列到交換機(jī)上,并指定消息的路由 key $channel->queue_bind($queue_name, "direct_logs", "error"); $channel->queue_bind($queue_name, "direct_logs", "warning");
主題交換機(jī) topic
直連交換機(jī)中路由 key 匹配模式
(星號(hào)*) 用來表示一個(gè)單詞.
(井號(hào)#) 用來表示任意數(shù)量(零個(gè)或多個(gè))單詞。
Q1會(huì)接收到 a.orange.b 等key 值中間為 orange 的消息
Q2會(huì)接收到 a.b.rabbit, lazy.a, lazy.a.b.c 等消息
當(dāng) * (星號(hào)) 和 # (井號(hào)) 這兩個(gè)特殊字符都未在綁定鍵中出現(xiàn)的時(shí)候,此時(shí)主題交換機(jī)就擁有的直連交換機(jī)的行為。
生產(chǎn)者,指定路由 key
$channel->exchange_declare("topic_logs","topic",false,false,false); //創(chuàng)建一個(gè)消息 $msg = new AMQPMessage( time() ); //把消息推動(dòng)到direct_logs交換機(jī),并給消息加上路由 key,讓消費(fèi)者隊(duì)列來根據(jù) key 接收消息 $channel->basic_publish ( $msg , "topic_logs", "baidu.warning" );
消費(fèi)者1
$channel->exchange_declare("topic_logs","topic",false,false,false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queue_name, "topic_logs", "baidu.#");
消費(fèi)者2
$channel->exchange_declare("topic_logs","topic",false,false,false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queue_name, "topic_logs", "ali.#");參考
https://www.rabbitmq.com/tutorials/tutorial-one-php.html
http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/21527.html
摘要:第一步安裝因?yàn)槭钦Z言編寫的,所以我們首先需要安裝第二步安裝官網(wǎng)提供的安裝方式本人安裝成功的方式第三步查看是否已經(jīng)安裝好了,能查到說明已經(jīng)安裝完成了。 第一步:安裝Erlang 因?yàn)閞abbitMQ是Erlang語言編寫的,所以我們首先需要安裝Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...
摘要:第一步安裝因?yàn)槭钦Z言編寫的,所以我們首先需要安裝第二步安裝官網(wǎng)提供的安裝方式本人安裝成功的方式第三步查看是否已經(jīng)安裝好了,能查到說明已經(jīng)安裝完成了。 第一步:安裝Erlang 因?yàn)閞abbitMQ是Erlang語言編寫的,所以我們首先需要安裝Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...
摘要:參考文檔依賴包安裝環(huán)境配置環(huán)境變量增加內(nèi)容保存退出,并刷新變量測(cè)試是否安裝成功安裝完成以后,執(zhí)行看是否能打開,用退出,注意后面的點(diǎn)號(hào),那是的結(jié)束符。 參考文檔:http://www.cnblogs.com/phpinfo/p/4104551...http://blog.csdn.net/historyasamirror/ar... 依賴包安裝 yum install ncurses-d...
摘要:在中間的框是一個(gè)隊(duì)列的消息緩沖區(qū),保持代表的消費(fèi)。本教程介紹,這是一個(gè)開放的通用的協(xié)議消息。我們將在本教程中使用,解決依賴管理。發(fā)送者將連接到,發(fā)送一條消息,然后退出。注意,這與發(fā)送發(fā)布的隊(duì)列匹配。 介紹 RabbitMQ是一個(gè)消息代理器:它接受和轉(zhuǎn)發(fā)消息。你可以把它當(dāng)作一個(gè)郵局:當(dāng)你把郵件放在信箱里時(shí),你可以肯定郵差先生最終會(huì)把郵件送到你的收件人那里。在這個(gè)比喻中,RabbitMQ就...
閱讀 1401·2019-08-30 12:54
閱讀 1883·2019-08-30 11:16
閱讀 1628·2019-08-30 10:50
閱讀 2462·2019-08-29 16:17
閱讀 1282·2019-08-26 12:17
閱讀 1391·2019-08-26 10:15
閱讀 2399·2019-08-23 18:38
閱讀 797·2019-08-23 17:50