成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

PHP+RabbitMQ實(shí)現(xiàn)消息隊(duì)列(代碼全篇)

weakish / 3301人閱讀

摘要:前言先安裝對(duì)應(yīng)的這里用的是不同的擴(kuò)展實(shí)現(xiàn)方式會(huì)有細(xì)微的差異擴(kuò)展地址具體以官網(wǎng)為準(zhǔn)介紹配置信息基類(lèi)生產(chǎn)者類(lèi)消費(fèi)者類(lèi)消費(fèi)者可有多個(gè)配置交換機(jī)路由生產(chǎn)者路由只控制發(fā)送成功不接受消費(fèi)者是否收到頻道

前言

先安裝PHP對(duì)應(yīng)的RabbitMQ,這里用的是 php_amqp 不同的擴(kuò)展實(shí)現(xiàn)方式會(huì)有細(xì)微的差異.
php擴(kuò)展地址: http://pecl.php.net/package/amqp
具體以官網(wǎng)為準(zhǔn)  http://www.rabbitmq.com/getstarted.html 

介紹

config.php 配置信息
BaseMQ.php MQ基類(lèi)
ProductMQ.php 生產(chǎn)者類(lèi)
ConsumerMQ.php 消費(fèi)者類(lèi)
Consumer2MQ.php 消費(fèi)者2(可有多個(gè))

config.php

     [
            "host" => "127.0.0.1",
            "port" => "5672",
            "login" => "guest",
            "password" => "guest",
            "vhost"=>"/",
        ],
        //交換機(jī)
        "exchange"=>"word",
        //路由
        "routes" => [],
    ];

BaseMQ.php

    conf     = $conf["host"] ;
            $this->exchange = $conf["exchange"] ;
            $this->AMQPConnection = new AMQPConnection($this->conf);
            if (!$this->AMQPConnection->connect())
                throw new AMQPConnectionException("Cannot connect to the broker!
");
        }
    
        /**
         * close link
         */
        public function close()
        {
            $this->AMQPConnection->disconnect();
        }
    
        /** Channel
         * @return AMQPChannel
         * @throws AMQPConnectionException
         */
        public function channel()
        {
            if(!$this->AMQPChannel) {
                $this->AMQPChannel =  new AMQPChannel($this->AMQPConnection);
            }
            return $this->AMQPChannel;
        }
    
        /** Exchange
         * @return AMQPExchange
         * @throws AMQPConnectionException
         * @throws AMQPExchangeException
         */
        public function exchange()
        {
            if(!$this->AMQPExchange) {
                $this->AMQPExchange = new AMQPExchange($this->channel());
                $this->AMQPExchange->setName($this->exchange);
            }
            return $this->AMQPExchange ;
        }
    
        /** queue
         * @return AMQPQueue
         * @throws AMQPConnectionException
         * @throws AMQPQueueException
         */
        public function queue()
        {
            if(!$this->AMQPQueue) {
                $this->AMQPQueue = new AMQPQueue($this->channel());
            }
            return $this->AMQPQueue ;
        }
    
        /** Envelope
         * @return AMQPEnvelope
         */
        public function envelope()
        {
            if(!$this->AMQPEnvelope) {
                $this->AMQPEnvelope = new AMQPEnvelope();
            }
            return $this->AMQPEnvelope;
        }
    }

ProductMQ.php

    channel();
            //創(chuàng)建交換機(jī)對(duì)象
            $ex = $this->exchange();
            //消息內(nèi)容
            $message = "product message ".rand(1,99999);
            //開(kāi)始事務(wù)
            $channel->startTransaction();
            $sendEd = true ;
            foreach ($this->routes as $route) {
                $sendEd = $ex->publish($message, $route) ;
                echo "Send Message:".$sendEd."
";
            }
            if(!$sendEd) {
                $channel->rollbackTransaction();
            }
            $channel->commitTransaction(); //提交事務(wù)
            $this->close();
            die ;
        }
    }
    try{
        (new ProductMQ())->run();
    }catch (Exception $exception){
        var_dump($exception->getMessage()) ;
    }

ConsumerMQ.php

    exchange();
            $ex->setType(AMQP_EX_TYPE_DIRECT); //direct類(lèi)型
            $ex->setFlags(AMQP_DURABLE); //持久化
            //echo "Exchange Status:".$ex->declare()."
";
    
            //創(chuàng)建隊(duì)列
            $q = $this->queue();
            //var_dump($q->declare());exit();
            $q->setName($this->q_name);
            $q->setFlags(AMQP_DURABLE); //持久化
            //echo "Message Total:".$q->declareQueue()."
";
    
            //綁定交換機(jī)與隊(duì)列,并指定路由鍵
            echo "Queue Bind: ".$q->bind($this->exchange, $this->route)."
";
    
            //阻塞模式接收消息
            echo "Message:
";
            while(True){
                $q->consume(function ($envelope,$queue){
                    $msg = $envelope->getBody();
                    echo $msg."
"; //處理消息
                    $queue->ack($envelope->getDeliveryTag()); //手動(dòng)發(fā)送ACK應(yīng)答
                });
                //$q->consume("processMessage", AMQP_AUTOACK); //自動(dòng)ACK應(yīng)答
            }
            $this->close();
        }
    }
    try{
        (new ConsumerMQ)->run();
    }catch (Exception $exception){
        var_dump($exception->getMessage()) ;
    }

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/31023.html

相關(guān)文章

  • RabbitMQ使用

    摘要:的定義是使用語(yǔ)言開(kāi)發(fā)的開(kāi)源消息隊(duì)列系統(tǒng),完整的實(shí)現(xiàn)了高級(jí)抽象層消息通信協(xié)議。交換機(jī)接受發(fā)送的消息,并根據(jù)綁定規(guī)則轉(zhuǎn)發(fā)到對(duì)應(yīng)的隊(duì)列。默認(rèn)是無(wú)名交換使用空字符串標(biāo)識(shí)。消息隊(duì)列是內(nèi)部對(duì)象,用于存儲(chǔ)未被消費(fèi)的消息。 RabbitMQ的定義 RabbitMQ是使用erlang語(yǔ)言開(kāi)發(fā)的開(kāi)源消息隊(duì)列系統(tǒng),完整的實(shí)現(xiàn)了AMPQ(高級(jí)抽象層消息通信協(xié)議)。 Mac下RabbitMQ安裝 使用Hom...

    codeKK 評(píng)論0 收藏0
  • RabbitMQ發(fā)布訂閱實(shí)戰(zhàn)-實(shí)現(xiàn)延時(shí)重試隊(duì)列

    摘要:本文將會(huì)講解如何使用實(shí)現(xiàn)延時(shí)重試和失敗消息隊(duì)列,實(shí)現(xiàn)可靠的消息消費(fèi),消費(fèi)失敗后,自動(dòng)延時(shí)將消息重新投遞,當(dāng)達(dá)到一定的重試次數(shù)后,將消息投遞到失敗消息隊(duì)列,等待人工介入處理。 RabbitMQ是一款使用Erlang開(kāi)發(fā)的開(kāi)源消息隊(duì)列。本文假設(shè)讀者對(duì)RabbitMQ是什么已經(jīng)有了基本的了解,如果你還不知道它是什么以及可以用來(lái)做什么,建議先從官網(wǎng)的 RabbitMQ Tutorials 入門(mén)...

    Heier 評(píng)論0 收藏0
  • RabbitMQ發(fā)布訂閱實(shí)戰(zhàn)-實(shí)現(xiàn)延時(shí)重試隊(duì)列

    摘要:本文將會(huì)講解如何使用實(shí)現(xiàn)延時(shí)重試和失敗消息隊(duì)列,實(shí)現(xiàn)可靠的消息消費(fèi),消費(fèi)失敗后,自動(dòng)延時(shí)將消息重新投遞,當(dāng)達(dá)到一定的重試次數(shù)后,將消息投遞到失敗消息隊(duì)列,等待人工介入處理。 RabbitMQ是一款使用Erlang開(kāi)發(fā)的開(kāi)源消息隊(duì)列。本文假設(shè)讀者對(duì)RabbitMQ是什么已經(jīng)有了基本的了解,如果你還不知道它是什么以及可以用來(lái)做什么,建議先從官網(wǎng)的 RabbitMQ Tutorials 入門(mén)...

    vslam 評(píng)論0 收藏0
  • 轉(zhuǎn): RabbitMQPHP(一)

    摘要:需要特別明確的概念交換機(jī)的持久化,并不等于消息的持久化。消息的處理,是有兩種方式,一次性。在上述示例中,使用的,意味著接收全部的消息。注意與是兩個(gè)不同的隊(duì)列。后端處理,可以針對(duì)每一個(gè)啟動(dòng)一個(gè)或多個(gè),以提高消息處理的實(shí)時(shí)性。 RabbitMQ與PHP(一) 項(xiàng)目中使用RabbitMQ作為隊(duì)列處理用戶(hù)消息通知,消息由前端PHP代碼產(chǎn)生,處理消息使用Python,這就導(dǎo)致代碼一致性問(wèn)題,調(diào)...

    wpw 評(píng)論0 收藏0
  • RabbitMQ+PHP 教程一(Hello World)

    摘要:在中間的框是一個(gè)隊(duì)列的消息緩沖區(qū),保持代表的消費(fèi)。本教程介紹,這是一個(gè)開(kāi)放的通用的協(xié)議消息。我們將在本教程中使用,解決依賴(lài)管理。發(fā)送者將連接到,發(fā)送一條消息,然后退出。注意,這與發(fā)送發(fā)布的隊(duì)列匹配。 介紹 RabbitMQ是一個(gè)消息代理器:它接受和轉(zhuǎn)發(fā)消息。你可以把它當(dāng)作一個(gè)郵局:當(dāng)你把郵件放在信箱里時(shí),你可以肯定郵差先生最終會(huì)把郵件送到你的收件人那里。在這個(gè)比喻中,RabbitMQ就...

    silencezwm 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<