成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

php實(shí)現(xiàn)mqtt發(fā)布/發(fā)送 消息到主題

eechen / 2090人閱讀

摘要:實(shí)現(xiàn)發(fā)布發(fā)送消息到主題是啥我的博客有寫這個(gè)東西傳送門想要實(shí)現(xiàn)需要使用到中的函數(shù)函數(shù)是什么此次使用的是網(wǎng)上開源案例其中使用的是系列函數(shù)什么是系列函數(shù)大概意思是正如你所指出的,是核心內(nèi)置的,始終可用,而套接字是很少包含的擴(kuò)展的一部分。

php實(shí)現(xiàn)mqtt發(fā)布/發(fā)送 消息到主題

mqtt是啥?我的博客有寫這個(gè)東西:傳送門

php想要實(shí)現(xiàn)mqtt需要使用到php中的socket函數(shù);

socket函數(shù)是什么?

此次使用的是網(wǎng)上開源mqtt案例:其中使用的是 stream_socket_xxxx 系列函數(shù)

什么是stream_socket_xxxx系列函數(shù)

大概意思是:

正如你所指出的,"stream"是PHP核心(內(nèi)置的,始終可用),而"套接字"是很少包含的擴(kuò)展的一部分。除此之外,它們幾乎完全相同。您可以同時(shí)使用TCP和UDP兩種流,也可以使用阻塞和非阻塞模式,這些模式涵蓋了所有用例的99%。

我能想到的唯一常見的例外是ICMP。例如,"ping"。但是,看起來目前還沒有一種安全的方式來從PHP執(zhí)行ICMP。這種調(diào)用需要通過套接字?jǐn)U展來實(shí)現(xiàn)SOCK_RAW,這需要執(zhí)行“root”權(quán)限。此外,并非所有路由器都會(huì)在TCP,UDP和ICMP之外路由其他數(shù)據(jù)包類型。這限制了套接字?jǐn)U展的實(shí)用性。

MQTT類代碼:
/* phpMQTT */
class Mqtt {

    private $socket;             /* holds the socket    */
    private $msgid = 1;            /* counter for message id */
    public $keepalive = 10;        /* default keepalive timmer */
    public $timesinceping;        /* host unix time, used to detect disconects */
    public $topics = array();     /* used to store currently subscribed topics */
    public $debug = false;        /* should output debug messages */
    public $address;            /* broker address */
    public $port;                /* broker port */
    public $clientid;            /* client id sent to brocker */
    public $will;                /* stores the will of the client */
    private $username;            /* stores username */
    private $password;            /* stores password */

    public $cafile;

    function __construct($address, $port, $clientid, $cafile = NULL){
        $this->broker($address, $port, $clientid, $cafile);
    }

    /* sets the broker details */
    function broker($address, $port, $clientid, $cafile = NULL){
        $this->address = $address;
        $this->port = $port;
        $this->clientid = $clientid;
        $this->cafile = $cafile;
    }

    function connect_auto($clean = true, $will = NULL, $username = NULL, $password = NULL){
        while($this->connect($clean, $will, $username, $password)==false){
            sleep(10);
        }
        return true;
    }

    /* connects to the broker
        inputs: $clean: should the client send a clean session flag */
    function connect($clean = true, $will = NULL, $username = NULL, $password = NULL){

        if($will) $this->will = $will;
        if($username) $this->username = $username;
        if($password) $this->password = $password;


        if ($this->cafile) {
            $socketContext = stream_context_create(["ssl" => [
                "verify_peer_name" => true,
                "cafile" => $this->cafile
            ]]);
            $this->socket = stream_socket_client("tls://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT, $socketContext);
        } else {
            $this->socket = stream_socket_client("tcp://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT);
        }

        if (!$this->socket ) {
            if($this->debug) error_log("stream_socket_create() $errno, $errstr 
");
            return false;
        }

        stream_set_timeout($this->socket, 5);
        stream_set_blocking($this->socket, 0);

        $i = 0;
        $buffer = "";

        $buffer .= chr(0x00); $i++;
        $buffer .= chr(0x06); $i++;
        $buffer .= chr(0x4d); $i++;
        $buffer .= chr(0x51); $i++;
        $buffer .= chr(0x49); $i++;
        $buffer .= chr(0x73); $i++;
        $buffer .= chr(0x64); $i++;
        $buffer .= chr(0x70); $i++;
        $buffer .= chr(0x03); $i++;

        //No Will
        $var = 0;
        if($clean) $var+=2;

        //Add will info to header
        if($this->will != NULL){
            $var += 4; // Set will flag
            $var += ($this->will["qos"] << 3); //Set will qos
            if($this->will["retain"])    $var += 32; //Set will retain
        }

        if($this->username != NULL) $var += 128;    //Add username to header
        if($this->password != NULL) $var += 64;    //Add password to header

        $buffer .= chr($var); $i++;

        //Keep alive
        $buffer .= chr($this->keepalive >> 8); $i++;
        $buffer .= chr($this->keepalive & 0xff); $i++;

        $buffer .= $this->strwritestring($this->clientid,$i);

        //Adding will to payload
        if($this->will != NULL){
            $buffer .= $this->strwritestring($this->will["topic"],$i);
            $buffer .= $this->strwritestring($this->will["content"],$i);
        }

        if($this->username) $buffer .= $this->strwritestring($this->username,$i);
        if($this->password) $buffer .= $this->strwritestring($this->password,$i);

        $head = "  ";
        $head{0} = chr(0x10);
        $head{1} = chr($i);

        fwrite($this->socket, $head, 2);
        fwrite($this->socket,  $buffer);

        $string = $this->read(4);

        if(ord($string{0})>>4 == 2 && $string{3} == chr(0)){
            if($this->debug) echo "Connected to Broker
";
        }else{
            error_log(sprintf("Connection failed! (Error: 0x%02x 0x%02x)
",
                ord($string{0}),ord($string{3})));
            return false;
        }

        $this->timesinceping = time();

        return true;
    }

    /* read: reads in so many bytes */
    function read($int = 8192, $nb = false){

        //    print_r(socket_get_status($this->socket));

        $string="";
        $togo = $int;

        if($nb){
            return fread($this->socket, $togo);
        }

        while (!feof($this->socket) && $togo>0) {
            $fread = fread($this->socket, $togo);
            $string .= $fread;
            $togo = $int - strlen($string);
        }




        return $string;
    }

    /* subscribe: subscribes to topics */
    function subscribe($topics, $qos = 0){
        $i = 0;
        $buffer = "";
        $id = $this->msgid;
        $buffer .= chr($id >> 8);  $i++;
        $buffer .= chr($id % 256);  $i++;

        foreach($topics as $key => $topic){
            $buffer .= $this->strwritestring($key,$i);
            $buffer .= chr($topic["qos"]);  $i++;
            $this->topics[$key] = $topic;
        }

        $cmd = 0x80;
        //$qos
        $cmd +=    ($qos << 1);


        $head = chr($cmd);
        $head .= chr($i);

        fwrite($this->socket, $head, 2);
        fwrite($this->socket, $buffer, $i);
        $string = $this->read(2);

        $bytes = ord(substr($string,1,1));
        $string = $this->read($bytes);
    }

    /* ping: sends a keep alive ping */
    function ping(){
        $head = " ";
        $head = chr(0xc0);
        $head .= chr(0x00);
        fwrite($this->socket, $head, 2);
        if($this->debug) echo "ping sent
";
    }

    /* disconnect: sends a proper disconect cmd */
    function disconnect(){
        $head = " ";
        $head{0} = chr(0xe0);
        $head{1} = chr(0x00);
        fwrite($this->socket, $head, 2);
    }

    /* close: sends a proper disconect, then closes the socket */
    function close(){
        $this->disconnect();
        stream_socket_shutdown($this->socket, STREAM_SHUT_WR);
    }

    /* publish: publishes $content on a $topic */
    function publish($topic, $content, $qos = 0, $retain = 0){

        $i = 0;
        $buffer = "";

        $buffer .= $this->strwritestring($topic,$i);

        //$buffer .= $this->strwritestring($content,$i);

        if($qos){
            $id = $this->msgid++;
            $buffer .= chr($id >> 8);  $i++;
            $buffer .= chr($id % 256);  $i++;
        }

        $buffer .= $content;
        $i+=strlen($content);


        $head = " ";
        $cmd = 0x30;
        if($qos) $cmd += $qos << 1;
        if($retain) $cmd += 1;

        $head{0} = chr($cmd);
        $head .= $this->setmsglength($i);

        fwrite($this->socket, $head, strlen($head));
        fwrite($this->socket, $buffer, $i);

    }

    /* message: processes a recieved topic */
    function message($msg){
        $tlen = (ord($msg{0})<<8) + ord($msg{1});
        $topic = substr($msg,2,$tlen);
        $msg = substr($msg,($tlen+2));
        $found = 0;
        foreach($this->topics as $key=>$top){
            if( preg_match("/^".str_replace("#",".*",
                    str_replace("+","[^/]*",
                        str_replace("/","/",
                            str_replace("$","$",
                                $key))))."$/",$topic) ){
                if(is_callable($top["function"])){
                    call_user_func($top["function"],$topic,$msg);
                    $found = 1;
                }
            }
        }

        if($this->debug && !$found) echo "msg recieved but no match in subscriptions
";
    }

    /* proc: the processing loop for an "allways on" client
        set true when you are doing other stuff in the loop good for watching something else at the same time */
    function proc( $loop = true){

        if(1){
            $sockets = array($this->socket);
            $w = $e = NULL;
            $cmd = 0;

            //$byte = fgetc($this->socket);
            if(feof($this->socket)){
                if($this->debug) echo "eof receive going to reconnect for good measure
";
                fclose($this->socket);
                $this->connect_auto(false);
                if(count($this->topics))
                    $this->subscribe($this->topics);
            }

            $byte = $this->read(1, true);

            if(!strlen($byte)){
                if($loop){
                    usleep(100000);
                }

            }else{

                $cmd = (int)(ord($byte)/16);
                if($this->debug) echo "Recevid: $cmd
";

                $multiplier = 1;
                $value = 0;
                do{
                    $digit = ord($this->read(1));
                    $value += ($digit & 127) * $multiplier;
                    $multiplier *= 128;
                }while (($digit & 128) != 0);

                if($this->debug) echo "Fetching: $value
";

                if($value)
                    $string = $this->read($value);

                if($cmd){
                    switch($cmd){
                        case 3:
                            $this->message($string);
                            break;
                    }

                    $this->timesinceping = time();
                }
            }

            if($this->timesinceping < (time() - $this->keepalive )){
                if($this->debug) echo "not found something so ping
";
                $this->ping();
            }


            if($this->timesinceping<(time()-($this->keepalive*2))){
                if($this->debug) echo "not seen a package in a while, disconnecting
";
                fclose($this->socket);
                $this->connect_auto(false);
                if(count($this->topics))
                    $this->subscribe($this->topics);
            }

        }
        return 1;
    }

    /* getmsglength: */
    function getmsglength(&$msg, &$i){

        $multiplier = 1;
        $value = 0 ;
        do{
            $digit = ord($msg{$i});
            $value += ($digit & 127) * $multiplier;
            $multiplier *= 128;
            $i++;
        }while (($digit & 128) != 0);

        return $value;
    }


    /* setmsglength: */
    function setmsglength($len){
        $string = "";
        do{
            $digit = $len % 128;
            $len = $len >> 7;
            // if there are more digits to encode, set the top bit of this digit
            if ( $len > 0 )
                $digit = ($digit | 0x80);
            $string .= chr($digit);
        }while ( $len > 0 );
        return $string;
    }

    /* strwritestring: writes a string to a buffer */
    function strwritestring($str, &$i){
        $ret = " ";
        $len = strlen($str);
        $msb = $len >> 8;
        $lsb = $len % 256;
        $ret = chr($msb);
        $ret .= chr($lsb);
        $ret .= $str;
        $i += ($len+2);
        return $ret;
    }

    function printstr($string){
        $strlen = strlen($string);
        for($j=0;$j<$strlen;$j++){
            $num = ord($string{$j});
            if($num > 31)
                $chr = $string{$j}; else $chr = " ";
            printf("%4d: %08b : 0x%02x : %s 
",$j,$num,$num,$chr);
        }
    }
}
實(shí)現(xiàn)部分 發(fā)送到主題
 // 發(fā)送給訂閱號信息,創(chuàng)建socket,無sam隊(duì)列
 $server = "127.0.0.1";     // 服務(wù)代理地址(mqtt服務(wù)端地址)
 $port = 1883;                     // 通信端口
 $username = "";                   // 用戶名(如果需要)
 $password = "";                   // 密碼(如果需要
 $client_id = "clientx9293670xxctr"; // 設(shè)置你的連接客戶端id
 $mqtt = new Mqtt($server, $port, $client_id); //實(shí)例化MQTT類
 if ($mqtt->connect(true, NULL, $username, $password)) { 
    //如果創(chuàng)建鏈接成功
     $mqtt->publish("xxx3809293670ctr", "setr=3xxxxxxxxx", 0); 
     // 發(fā)送到 xxx3809293670ctr 的主題 一個(gè)信息 內(nèi)容為 setr=3xxxxxxxxx Qos 為 0 
     $mqtt->close();    //發(fā)送后關(guān)閉鏈接
 } else {
     echo "Time out!
"; 
 }
訂閱主題
/*// 訂閱信息,接收一個(gè)信息后退出
$server = "127.0.0.1";     // 服務(wù)代理地址(mqtt服務(wù)端地址)
$port = 1883;                     // 通信端口
$username = "";                   // 用戶名(如果需要)
$password = "";                   // 密碼(如果需要
$client_id = "clientx9293670xxctr"; // 設(shè)置你的連接客戶端id

$mqtt = new Mqtt($server, $port, $client_id);

if(!$mqtt->connect(true, NULL, $username, $password)) { //鏈接不成功再重復(fù)執(zhí)行監(jiān)聽連接
    exit(1);
}

$topics["SN69143809293670state"] = array("qos" => 0, "function" => "procmsg");
// 訂閱主題為 SN69143809293670state qos為0
$mqtt->subscribe($topics, 0);

while($mqtt->proc()){

}
//死循環(huán)監(jiān)聽


$mqtt->close();

function procmsg($topic, $msg){ //信息回調(diào)函數(shù) 打印信息
        echo "Msg Recieved: " . date("r") . "
";
        echo "Topic: {$topic}

";
        echo "	$msg

";
        $xxx = json_decode($msg);
        var_dump($xxxxxx->aa);
        die;
}

這是php實(shí)現(xiàn)方法,如果用php做發(fā)送端還是不錯(cuò)的.但是

我被這個(gè)圖片打擊了,區(qū)塊鏈應(yīng)用還真提莫的是js寫起來跟簡單;

我最終寫出的mqtt api 使用的是node;為什么?

node.js實(shí)現(xiàn)mqtt

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/28481.html

相關(guān)文章

  • MQTT協(xié)議(1)-簡介

    摘要:,消息隊(duì)列遙測傳輸是開發(fā)的一個(gè)即時(shí)通訊協(xié)議,有可能成為物聯(lián)網(wǎng)的重要組成部分。會(huì)發(fā)生消息丟失或重復(fù)。只有一次,確保消息到達(dá)一次。此外,國內(nèi)很多企業(yè)都廣泛使用作為手機(jī)客戶端與服務(wù)器端推送消息的協(xié)議。 前幾天寫了一下MQTT協(xié)議實(shí)現(xiàn)推送數(shù)據(jù)傳輸,所以我會(huì)不定期的更新一下關(guān)注MQTT的知識。 MQTT: MQTT(Message Queuing Telemetry Transport,消息隊(duì)列...

    objc94 評論0 收藏0
  • MQTT

    摘要:協(xié)議版本介紹互聯(lián)網(wǎng)的基礎(chǔ)網(wǎng)絡(luò)協(xié)議是協(xié)議消息隊(duì)列遙測傳輸是基于協(xié)議棧而構(gòu)建的已成為通信的標(biāo)準(zhǔn)為什么選擇有多好多好多么牛逼我就不說了說的再多不如一個(gè)一個(gè)試試完了做比對剩下的那個(gè)就是要選擇的實(shí)在不想這樣搞技術(shù)就跟著一線走發(fā)布和訂閱模型協(xié)議在網(wǎng)絡(luò)中 mqtt 協(xié)議版本: 3.1.1 MQTT 介紹 互聯(lián)網(wǎng)的基礎(chǔ)網(wǎng)絡(luò)協(xié)議是 TCP/IP協(xié)議. MQTT(消息隊(duì)列遙測傳輸)是基于 TCP/IP 協(xié)...

    lastSeries 評論0 收藏0
  • MQTT協(xié)議介紹

    摘要:協(xié)議簡介,消息隊(duì)列遙測傳輸是一個(gè)輕量的發(fā)布訂閱模式消息傳輸協(xié)議,是專門針對低帶寬和不穩(wěn)定網(wǎng)絡(luò)環(huán)境的物聯(lián)網(wǎng)應(yīng)用設(shè)計(jì)的。它是等級協(xié)議交換的第二個(gè)報(bào)文。 1.MQTT協(xié)議簡介 MQTT(Message Queuing Telemetry Transport,消息隊(duì)列遙測傳輸)是一個(gè)輕量的發(fā)布/訂...

    lewinlee 評論0 收藏0
  • MQTT如何快速助你產(chǎn)品化

    摘要:時(shí)間就是金錢,效率就是生命本教程助力開發(fā)者使用協(xié)議快速產(chǎn)品化。摘要借助具備及聯(lián)網(wǎng)功能的,快速部署到客戶產(chǎn)品上,助力開發(fā),縮短開發(fā)周期,快速實(shí)現(xiàn)產(chǎn)品商業(yè)化。 時(shí)間就是金錢,效率就是生命 本教程助力開發(fā)者使用MQTT協(xié)議快速產(chǎn)品化。 摘要 借助具備MQTT及聯(lián)網(wǎng)功能的DTU,快速部署到客戶產(chǎn)品...

    sutaking 評論0 收藏0

發(fā)表評論

0條評論

最新活動(dòng)
閱讀需要支付1元查看
<