摘要:前言先安裝對(duì)應(yīng)的這里用的是不同的擴(kuò)展實(shí)現(xiàn)方式會(huì)有細(xì)微的差異擴(kuò)展地址具體以官網(wǎng)為準(zhǔn)介紹配置信息基類(lèi)生產(chǎn)者類(lèi)消費(fèi)者類(lèi)消費(fèi)者可有多個(gè)配置交換機(jī)路由生產(chǎn)者路由只控制發(fā)送成功不接受消費(fèi)者是否收到頻道
前言
先安裝PHP對(duì)應(yīng)的RabbitMQ,這里用的是 php_amqp 不同的擴(kuò)展實(shí)現(xiàn)方式會(huì)有細(xì)微的差異. php擴(kuò)展地址: http://pecl.php.net/package/amqp 具體以官網(wǎng)為準(zhǔn) http://www.rabbitmq.com/getstarted.html
介紹
config.php 配置信息 BaseMQ.php MQ基類(lèi) ProductMQ.php 生產(chǎn)者類(lèi) ConsumerMQ.php 消費(fèi)者類(lèi) Consumer2MQ.php 消費(fèi)者2(可有多個(gè))
config.php
[ "host" => "127.0.0.1", "port" => "5672", "login" => "guest", "password" => "guest", "vhost"=>"/", ], //交換機(jī) "exchange"=>"word", //路由 "routes" => [], ];
BaseMQ.php
conf = $conf["host"] ; $this->exchange = $conf["exchange"] ; $this->AMQPConnection = new AMQPConnection($this->conf); if (!$this->AMQPConnection->connect()) throw new AMQPConnectionException("Cannot connect to the broker! "); } /** * close link */ public function close() { $this->AMQPConnection->disconnect(); } /** Channel * @return AMQPChannel * @throws AMQPConnectionException */ public function channel() { if(!$this->AMQPChannel) { $this->AMQPChannel = new AMQPChannel($this->AMQPConnection); } return $this->AMQPChannel; } /** Exchange * @return AMQPExchange * @throws AMQPConnectionException * @throws AMQPExchangeException */ public function exchange() { if(!$this->AMQPExchange) { $this->AMQPExchange = new AMQPExchange($this->channel()); $this->AMQPExchange->setName($this->exchange); } return $this->AMQPExchange ; } /** queue * @return AMQPQueue * @throws AMQPConnectionException * @throws AMQPQueueException */ public function queue() { if(!$this->AMQPQueue) { $this->AMQPQueue = new AMQPQueue($this->channel()); } return $this->AMQPQueue ; } /** Envelope * @return AMQPEnvelope */ public function envelope() { if(!$this->AMQPEnvelope) { $this->AMQPEnvelope = new AMQPEnvelope(); } return $this->AMQPEnvelope; } }
ProductMQ.php
channel(); //創(chuàng)建交換機(jī)對(duì)象 $ex = $this->exchange(); //消息內(nèi)容 $message = "product message ".rand(1,99999); //開(kāi)始事務(wù) $channel->startTransaction(); $sendEd = true ; foreach ($this->routes as $route) { $sendEd = $ex->publish($message, $route) ; echo "Send Message:".$sendEd." "; } if(!$sendEd) { $channel->rollbackTransaction(); } $channel->commitTransaction(); //提交事務(wù) $this->close(); die ; } } try{ (new ProductMQ())->run(); }catch (Exception $exception){ var_dump($exception->getMessage()) ; }
ConsumerMQ.php
exchange(); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct類(lèi)型 $ex->setFlags(AMQP_DURABLE); //持久化 //echo "Exchange Status:".$ex->declare()." "; //創(chuàng)建隊(duì)列 $q = $this->queue(); //var_dump($q->declare());exit(); $q->setName($this->q_name); $q->setFlags(AMQP_DURABLE); //持久化 //echo "Message Total:".$q->declareQueue()." "; //綁定交換機(jī)與隊(duì)列,并指定路由鍵 echo "Queue Bind: ".$q->bind($this->exchange, $this->route)." "; //阻塞模式接收消息 echo "Message: "; while(True){ $q->consume(function ($envelope,$queue){ $msg = $envelope->getBody(); echo $msg." "; //處理消息 $queue->ack($envelope->getDeliveryTag()); //手動(dòng)發(fā)送ACK應(yīng)答 }); //$q->consume("processMessage", AMQP_AUTOACK); //自動(dòng)ACK應(yīng)答 } $this->close(); } } try{ (new ConsumerMQ)->run(); }catch (Exception $exception){ var_dump($exception->getMessage()) ; }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/31023.html
摘要:的定義是使用語(yǔ)言開(kāi)發(fā)的開(kāi)源消息隊(duì)列系統(tǒng),完整的實(shí)現(xiàn)了高級(jí)抽象層消息通信協(xié)議。交換機(jī)接受發(fā)送的消息,并根據(jù)綁定規(guī)則轉(zhuǎn)發(fā)到對(duì)應(yīng)的隊(duì)列。默認(rèn)是無(wú)名交換使用空字符串標(biāo)識(shí)。消息隊(duì)列是內(nèi)部對(duì)象,用于存儲(chǔ)未被消費(fèi)的消息。 RabbitMQ的定義 RabbitMQ是使用erlang語(yǔ)言開(kāi)發(fā)的開(kāi)源消息隊(duì)列系統(tǒng),完整的實(shí)現(xiàn)了AMPQ(高級(jí)抽象層消息通信協(xié)議)。 Mac下RabbitMQ安裝 使用Hom...
摘要:本文將會(huì)講解如何使用實(shí)現(xiàn)延時(shí)重試和失敗消息隊(duì)列,實(shí)現(xiàn)可靠的消息消費(fèi),消費(fèi)失敗后,自動(dòng)延時(shí)將消息重新投遞,當(dāng)達(dá)到一定的重試次數(shù)后,將消息投遞到失敗消息隊(duì)列,等待人工介入處理。 RabbitMQ是一款使用Erlang開(kāi)發(fā)的開(kāi)源消息隊(duì)列。本文假設(shè)讀者對(duì)RabbitMQ是什么已經(jīng)有了基本的了解,如果你還不知道它是什么以及可以用來(lái)做什么,建議先從官網(wǎng)的 RabbitMQ Tutorials 入門(mén)...
摘要:本文將會(huì)講解如何使用實(shí)現(xiàn)延時(shí)重試和失敗消息隊(duì)列,實(shí)現(xiàn)可靠的消息消費(fèi),消費(fèi)失敗后,自動(dòng)延時(shí)將消息重新投遞,當(dāng)達(dá)到一定的重試次數(shù)后,將消息投遞到失敗消息隊(duì)列,等待人工介入處理。 RabbitMQ是一款使用Erlang開(kāi)發(fā)的開(kāi)源消息隊(duì)列。本文假設(shè)讀者對(duì)RabbitMQ是什么已經(jīng)有了基本的了解,如果你還不知道它是什么以及可以用來(lái)做什么,建議先從官網(wǎng)的 RabbitMQ Tutorials 入門(mén)...
摘要:需要特別明確的概念交換機(jī)的持久化,并不等于消息的持久化。消息的處理,是有兩種方式,一次性。在上述示例中,使用的,意味著接收全部的消息。注意與是兩個(gè)不同的隊(duì)列。后端處理,可以針對(duì)每一個(gè)啟動(dòng)一個(gè)或多個(gè),以提高消息處理的實(shí)時(shí)性。 RabbitMQ與PHP(一) 項(xiàng)目中使用RabbitMQ作為隊(duì)列處理用戶(hù)消息通知,消息由前端PHP代碼產(chǎn)生,處理消息使用Python,這就導(dǎo)致代碼一致性問(wèn)題,調(diào)...
摘要:在中間的框是一個(gè)隊(duì)列的消息緩沖區(qū),保持代表的消費(fèi)。本教程介紹,這是一個(gè)開(kāi)放的通用的協(xié)議消息。我們將在本教程中使用,解決依賴(lài)管理。發(fā)送者將連接到,發(fā)送一條消息,然后退出。注意,這與發(fā)送發(fā)布的隊(duì)列匹配。 介紹 RabbitMQ是一個(gè)消息代理器:它接受和轉(zhuǎn)發(fā)消息。你可以把它當(dāng)作一個(gè)郵局:當(dāng)你把郵件放在信箱里時(shí),你可以肯定郵差先生最終會(huì)把郵件送到你的收件人那里。在這個(gè)比喻中,RabbitMQ就...
閱讀 3121·2023-04-26 01:58
閱讀 962·2021-11-24 09:38
閱讀 3293·2021-09-03 10:29
閱讀 723·2021-08-21 14:10
閱讀 1498·2019-08-30 15:44
閱讀 3096·2019-08-30 14:10
閱讀 3224·2019-08-29 16:32
閱讀 1486·2019-08-29 12:48