成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

PHP RabbitMQ

anRui / 3387人閱讀

摘要:消息隊(duì)列,用于存儲(chǔ)還未被消費(fèi)者消費(fèi)的消息。由在與時(shí)指定,而由發(fā)送時(shí)指定,兩者的匹配方式由決定。需要為每一個(gè)創(chuàng)建,協(xié)議規(guī)定只有通過才能執(zhí)行的命令。建議客戶端線程之間不要共用,至少要保證共用的線程發(fā)送消息必須是串行的,但是建議盡量共用。

安裝

rabbitmq 在 mac 下可以直接用 brew 安裝
默認(rèn)安裝在 /usr/local/Cellar/下
命令被軟連接加入到了/usr/local/sbin 下,因此可以把此目錄放到環(huán)境變量中,建議加入到~/.bash_profile 中
rabbitmq-server start 開啟服務(wù)
端口5672 默認(rèn)
端口15672 web 端登錄管理端口127.0.0.1:15672
rabbitmq 默認(rèn)提供的用戶 guest 密碼 guest

管理

停止服務(wù)
rabbitmqctl stop
開啟應(yīng)用 [服務(wù)依舊運(yùn)行]
rabbitmqctl start_app
停止應(yīng)用 [服務(wù)依舊運(yùn)行]
rabbitmqctl stop_app

用戶管理

添加用戶
sudo rabbitmqctl add_user username password
刪除用戶
sudo rabbitmqctl delete_user username
修改密碼
sudo rabbitmqctl change_password username newpassword
清除用戶密碼,禁止用戶登錄
sudo rabbitmqctl clear_password
列出所有用戶
sudo rabbitmqctl list_users
設(shè)置用戶角色
rabbitmqctl set_user_tags username tag

vhost虛擬主機(jī)管理

virtual host只是起到一個(gè)命名空間的作用,所以可以多個(gè)user共同使用一個(gè)virtual host,"/"這個(gè)是系統(tǒng)默認(rèn)的vhost,就是說當(dāng)我們創(chuàng)建一個(gè)到rabbitmq的connection時(shí)候,它的命名空間是"/",需要注意的是不同的命名空間之間的資源是不能訪問的,比如 exchang,queue ,bingding等
創(chuàng)建虛擬主機(jī)
sudo rabbitmqctl add_vhost vhostpath
刪除虛擬主機(jī)
sudo rabbitmqctl delete_vhost vhostpath
列出所有虛擬主機(jī)
sudo rabbitmqctl list_vhosts
列出某個(gè) vhost 的所有用戶和權(quán)限
list_permissions [-p vhostpath]
列出某個(gè)用戶的所有權(quán)限。
list_user_permissions {username}
清除用戶對(duì)某個(gè) vhost 的權(quán)限。
clear_permissions [-p vhostpath] {username}
設(shè)置用戶對(duì)某個(gè) virtual host 的權(quán)限,如果不指定 vhost,則默認(rèn)為“/” vhost。
set_permissions [-p vhostpath] {user}
rabbitmqctl set_permissions -p test_host kang “." "." ".*"

添加一個(gè)管理員代替 guest
rabbitmqctl add_user admin 123456
指定用戶的角色
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin "." "." ".*”
分配給用戶指定虛擬主機(jī)的權(quán)限,雖然是administrator角色,但不對(duì)所有虛擬主機(jī)都有權(quán)限,一樣需要對(duì)每個(gè)虛擬主機(jī)都授權(quán)

顯示信息
rabbitmqctl list_queues [-p ] [ ...]
列出某個(gè) vhost 的所有 queue。
rabbitmqctl list_exchanges [-p ] [ ...]
列出某個(gè) vhost 的所有 exchange。
rabbitmqctl list_bindings [-p ] [ ...]
列出某個(gè) vhost 的所有 binding。
rabbitmqctl list_connections [ ...]
列出 RabbitMQ broker 的所有 connection。
rabbitmqctl list_channels [ ...]
列出 RabbitMQ broker 的所有 channel
rabbitmqcrl list_consumers [-p ]
列出某個(gè) vhost 的所有 consumer。

基本概念點(diǎn)

1.Server(broker): 接受客戶端連接,實(shí)現(xiàn)AMQP消息隊(duì)列和路由功能的進(jìn)程。

2.Virtual Host:其實(shí)是一個(gè)虛擬概念,類似于權(quán)限控制組,一個(gè)Virtual Host里面可以有若干個(gè)Exchange和Queue,但是權(quán)限控制的最小粒度是Virtual Host

3.Exchange:接受生產(chǎn)者發(fā)送的消息,并根據(jù)Binding規(guī)則將消息路由給服務(wù)器中的隊(duì)列。ExchangeType決定了Exchange路由消息的行為,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三種,不同類型的Exchange路由的行為是不一樣的。

4.Message Queue:消息隊(duì)列,用于存儲(chǔ)還未被消費(fèi)者消費(fèi)的消息。

5.Message: 由Header和Body組成,Header是由生產(chǎn)者添加的各種屬性的集合,包括Message是否被持久化、由哪個(gè)Message Queue接受、優(yōu)先級(jí)是多少等。而Body是真正需要傳輸?shù)腁PP數(shù)據(jù)。

6.Binding:Binding聯(lián)系了Exchange與Message Queue。Exchange在與多個(gè)Message Queue發(fā)生Binding后會(huì)生成一張路由表,路由表中存儲(chǔ)著Message Queue所需消息的限制條件即Binding Key。當(dāng)Exchange收到Message時(shí)會(huì)解析其Header得到Routing Key,Exchange根據(jù)Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時(shí)指定,而Routing Key由Producer發(fā)送Message時(shí)指定,兩者的匹配方式由Exchange Type決定。

7.Connection:連接,對(duì)于RabbitMQ而言,其實(shí)就是一個(gè)位于客戶端和Broker之間的TCP連接。

8.Channel:信道,僅僅創(chuàng)建了客戶端到Broker之間的連接后,客戶端還是不能發(fā)送消息的。需要為每一個(gè)Connection創(chuàng)建Channel,AMQP協(xié)議規(guī)定只有通過Channel才能執(zhí)行AMQP的命令。一個(gè)Connection可以包含多個(gè)Channel。之所以需要Channel,是因?yàn)門CP連接的建立和釋放都是十分昂貴的,如果一個(gè)客戶端每一個(gè)線程都需要與Broker交互,如果每一個(gè)線程都建立一個(gè)TCP連接,暫且不考慮TCP連接是否浪費(fèi),就算操作系統(tǒng)也無法承受每秒建立如此多的TCP連接。RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發(fā)送消息必須是串行的,但是建議盡量共用Connection。

9.Command:AMQP的命令,客戶端通過Command完成與AMQP服務(wù)器的交互來實(shí)現(xiàn)自身的邏輯。例如在RabbitMQ中,客戶端可以通過publish命令發(fā)送消息,txSelect開啟一個(gè)事務(wù),txCommit提交一個(gè)事務(wù)。

客戶端管理

php端 rabbitmq 客戶端可以使用 composer 下的庫(kù)

{
    "require": {
        "php-amqplib/php-amqplib": "2.5.*"
    }
}

使用時(shí),用到了這兩個(gè)東西

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

開始發(fā)送消息

public function handle()
{
    //連接到 test_host 虛擬主機(jī),每個(gè)虛擬主機(jī)有自己的隊(duì)列,交換機(jī)...
    $connection = new AMQPStreamConnection("127.0.0.1", 5672, "kang", "a943434603", "test_host");
    //創(chuàng)建一個(gè) channel
    $channel = $connection->channel();
    //聲明 hello 隊(duì)列
    $channel->queue_declare("hello", false, false, false, false);
    //創(chuàng)建一個(gè)消息
    $msg = new AMQPMessage(time());
    //把消息推送到默認(rèn)的交換機(jī)中,并且告訴交換機(jī)要把消息交給 hello 隊(duì)列
    $channel->basic_publish($msg, "", "hello");
    echo " [x] Sent ".time()."
";
}

重要概念,消息是保存在交換機(jī)中的,當(dāng)消息存放時(shí)指定的隊(duì)列存在,交換機(jī)會(huì)把消息推送到該隊(duì)列

消息隊(duì)列發(fā)送消息給消費(fèi)者,一個(gè)消息發(fā)給一個(gè)消費(fèi)者

public function handle()
{
    //連接
    $connection = new AMQPStreamConnection("localhost", 5672, "kang", "a943434603", "test_host");
    //創(chuàng)建一個(gè) channel
    $channel = $connection->channel();
    //可以運(yùn)行這個(gè)命令很多次,但是只有一個(gè)隊(duì)列會(huì)被創(chuàng)建, 在程序中重復(fù)將隊(duì)列重復(fù)聲明一下是種值得推薦的做法,保證隊(duì)列存在
    $channel->queue_declare("hello", false, false, false, false);

    echo " [*] Waiting for messages. To exit press CTRL+C", "
";

    $callback = function($msg) {
        echo " [x] Received ", $msg->body, "
";
        sleep($msg->body);
        $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]);
    };

    //默認(rèn)情況下,隊(duì)列會(huì)把消息公平的分配給各個(gè)消費(fèi)者
    //如果某個(gè)消費(fèi)者腳本處理完成分配給他的消息任務(wù)后,會(huì)一直空閑
    //另外一個(gè)消費(fèi)者腳本處理的消息都非常耗時(shí),這就容易導(dǎo)致消費(fèi)者腳本得不到合理利用,
    //加入此句話,是告訴隊(duì)列,取消把消息公平分配到各個(gè)腳本,而是那個(gè)腳本空閑,就交給它一個(gè)消息任務(wù)
    //這樣,合理利用到每一個(gè)空閑的消費(fèi)者腳本
    $channel->basic_qos(null, 1, null);

    /**
     * basic_consume 方法    從隊(duì)列中讀取數(shù)據(jù)
     * @param string $queue 指定隊(duì)列
     * @param string $consumer_tag
     * @param bool $no_local
     * @param bool $no_ack    消費(fèi)者處理完消息后,是否不需要告訴隊(duì)列已經(jīng)處理完成,true 不需要 false 需要,
     * true
        默認(rèn)情況下,隊(duì)列會(huì)把消息公平分配到各個(gè)消費(fèi)者中,然后一次性把消息交給消費(fèi)者,如果消費(fèi)者處理了一半掛了,那么消息就丟失了
     * false
        默認(rèn)情況下,隊(duì)列會(huì)把消息公平的分配給各個(gè)消費(fèi)者,然后一個(gè)一個(gè)的把消息分配到消費(fèi)者腳本中,腳本處理完成后,告訴隊(duì)列,隊(duì)列會(huì)刪除這個(gè)消息,并且接著給下一個(gè)消息,
        當(dāng)腳本掛掉,不會(huì)丟失消息,隊(duì)列會(huì)把未完成的消息分配給其他消費(fèi)者
        在 callback 函數(shù)中需要加入這句話,處理完后通知隊(duì)列可以刪除消息了
        $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]);
        未加入這句話,隊(duì)列不會(huì)刪除已處理完的消息,當(dāng)腳本掛掉時(shí),會(huì)把分配給當(dāng)前隊(duì)列的所有消息再次重新分配給其他隊(duì)列,會(huì)導(dǎo)致消息會(huì)重復(fù)處理
     */
    $channel->basic_consume("hello", "", false, false, false, false, $callback);

    while(count($channel->callbacks)) {
        $channel->wait();
    }

    $channel->close();
    $connection->close();
}
發(fā)布/訂閱

一個(gè)消息發(fā)送給多個(gè)消費(fèi)者

扇形交換機(jī) fanout
發(fā)布訂閱模式,科院實(shí)現(xiàn)一個(gè)消息發(fā)送到多個(gè)隊(duì)列中
在發(fā)布消息腳本中,創(chuàng)建一個(gè)扇形交換機(jī),把消息推送到交換機(jī),不需要推動(dòng)到指定的隊(duì)列中,隊(duì)列在消費(fèi)者腳本中創(chuàng)建
消費(fèi)腳本定義個(gè)臨時(shí)隊(duì)列,并綁定這個(gè)臨時(shí)隊(duì)列到交換機(jī)中,扇形交換機(jī)會(huì)把接收到的消息推動(dòng)到每一個(gè)綁定的隊(duì)列中

生產(chǎn)者腳本

public function handle()
{
    //連接到 test_host 虛擬主機(jī),每個(gè)虛擬主機(jī)有自己的隊(duì)列,交換機(jī)...
    $connection = new AMQPStreamConnection("127.0.0.1", 5672, "kang", "a943434603", "test_host");
    //創(chuàng)建一個(gè) channel
    $channel = $connection->channel();
    //聲明 log 隊(duì)列
    //$channel->queue_declare("log", false, false, false, false);
    //創(chuàng)建一個(gè)fanout類型交換機(jī)
    $channel->exchange_declare("logs","fanout",false,false,false);

    //創(chuàng)建一個(gè)消息
    $msg = new AMQPMessage( time() );
    $channel->basic_publish ( $msg , "logs" );

    echo " [x] Sent ".time()."
";

    $channel->close();
    $connection->close();
}

消費(fèi)者腳本

public function handle()
{
    //連接
    $connection = new AMQPStreamConnection("localhost", 5672, "kang", "a943434603", "test_host");
    //創(chuàng)建一個(gè) channel
    $channel = $connection->channel();
    //可以運(yùn)行這個(gè)命令很多次,但是只有一個(gè)隊(duì)列會(huì)被創(chuàng)建, 在程序中重復(fù)將隊(duì)列重復(fù)聲明一下是種值得推薦的做法,保證隊(duì)列存在
    //$channel->queue_declare("hello", false, false, false, false);
    //創(chuàng)建一個(gè)fanout類型交換機(jī)
    $channel->exchange_declare("logs", "fanout", false, false, false);

    //系統(tǒng)創(chuàng)建一個(gè)臨時(shí)隊(duì)列
    list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    //綁定臨時(shí)隊(duì)列到交換機(jī)上
    $channel->queue_bind($queue_name, "logs");

    $callback = function($msg){
        echo " [x] ", $msg->body, "
";
    };
    $channel->basic_consume($queue_name, "", false, true, false, false, $callback);

    while(count($channel->callbacks)) {
        $channel->wait();
    }

    $channel->close();
    $connection->close();
}

直連交換機(jī) direct
交換機(jī)將會(huì)對(duì)綁定鍵(binding key)和路由鍵(routing key)進(jìn)行精確匹配,從而確定消息該分發(fā)到哪個(gè)隊(duì)列

生產(chǎn)者,創(chuàng)建基本上和扇形交換機(jī)一樣,不同的是

$channel->exchange_declare("direct_logs","direct",false,false,false);
//創(chuàng)建一個(gè)消息
$msg = new AMQPMessage( time() );
//把消息推動(dòng)到direct_logs交換機(jī),并給消息加上路由 key,讓消費(fèi)者隊(duì)列來根據(jù) key 接收消息
$channel->basic_publish ( $msg , "direct_logs", "warning" );

消費(fèi)者

$channel->exchange_declare("direct_logs","direct",false,false,false);
//系統(tǒng)創(chuàng)建一個(gè)臨時(shí)隊(duì)列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
//綁定臨時(shí)隊(duì)列到交換機(jī)上,并指定消息的路由 key
$channel->queue_bind($queue_name, "direct_logs", "error");
$channel->queue_bind($queue_name, "direct_logs", "warning");

主題交換機(jī) topic

直連交換機(jī)中路由 key 匹配模式
(星號(hào)*) 用來表示一個(gè)單詞.
(井號(hào)#) 用來表示任意數(shù)量(零個(gè)或多個(gè))單詞。

Q1會(huì)接收到 a.orange.b 等key 值中間為 orange 的消息
Q2會(huì)接收到 a.b.rabbit, lazy.a, lazy.a.b.c 等消息

當(dāng) * (星號(hào)) 和 # (井號(hào)) 這兩個(gè)特殊字符都未在綁定鍵中出現(xiàn)的時(shí)候,此時(shí)主題交換機(jī)就擁有的直連交換機(jī)的行為。

生產(chǎn)者,指定路由 key

$channel->exchange_declare("topic_logs","topic",false,false,false);
//創(chuàng)建一個(gè)消息
$msg = new AMQPMessage( time() );
//把消息推動(dòng)到direct_logs交換機(jī),并給消息加上路由 key,讓消費(fèi)者隊(duì)列來根據(jù) key 接收消息
$channel->basic_publish ( $msg , "topic_logs", "baidu.warning" );

消費(fèi)者1

$channel->exchange_declare("topic_logs","topic",false,false,false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, "topic_logs", "baidu.#");

消費(fèi)者2

$channel->exchange_declare("topic_logs","topic",false,false,false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, "topic_logs", "ali.#");
參考

https://www.rabbitmq.com/tutorials/tutorial-one-php.html
http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/21527.html

相關(guān)文章

  • RabbitMQ】——centos7安裝rabbitmq教程 以及 PHP開啟rabbitmq擴(kuò)展

    摘要:第一步安裝因?yàn)槭钦Z言編寫的,所以我們首先需要安裝第二步安裝官網(wǎng)提供的安裝方式本人安裝成功的方式第三步查看是否已經(jīng)安裝好了,能查到說明已經(jīng)安裝完成了。 第一步:安裝Erlang 因?yàn)閞abbitMQ是Erlang語言編寫的,所以我們首先需要安裝Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...

    lscho 評(píng)論0 收藏0
  • RabbitMQ】——centos7安裝rabbitmq教程 以及 PHP開啟rabbitmq擴(kuò)展

    摘要:第一步安裝因?yàn)槭钦Z言編寫的,所以我們首先需要安裝第二步安裝官網(wǎng)提供的安裝方式本人安裝成功的方式第三步查看是否已經(jīng)安裝好了,能查到說明已經(jīng)安裝完成了。 第一步:安裝Erlang 因?yàn)閞abbitMQ是Erlang語言編寫的,所以我們首先需要安裝Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...

    printempw 評(píng)論0 收藏0
  • RabbitMQ+PHP 消息隊(duì)列環(huán)境配置

    摘要:參考文檔依賴包安裝環(huán)境配置環(huán)境變量增加內(nèi)容保存退出,并刷新變量測(cè)試是否安裝成功安裝完成以后,執(zhí)行看是否能打開,用退出,注意后面的點(diǎn)號(hào),那是的結(jié)束符。 參考文檔:http://www.cnblogs.com/phpinfo/p/4104551...http://blog.csdn.net/historyasamirror/ar... 依賴包安裝 yum install ncurses-d...

    geekidentity 評(píng)論0 收藏0
  • RabbitMQ+PHP 教程一(Hello World)

    摘要:在中間的框是一個(gè)隊(duì)列的消息緩沖區(qū),保持代表的消費(fèi)。本教程介紹,這是一個(gè)開放的通用的協(xié)議消息。我們將在本教程中使用,解決依賴管理。發(fā)送者將連接到,發(fā)送一條消息,然后退出。注意,這與發(fā)送發(fā)布的隊(duì)列匹配。 介紹 RabbitMQ是一個(gè)消息代理器:它接受和轉(zhuǎn)發(fā)消息。你可以把它當(dāng)作一個(gè)郵局:當(dāng)你把郵件放在信箱里時(shí),你可以肯定郵差先生最終會(huì)把郵件送到你的收件人那里。在這個(gè)比喻中,RabbitMQ就...

    silencezwm 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<