摘要:實(shí)現(xiàn)發(fā)布發(fā)送消息到主題是啥我的博客有寫這個(gè)東西傳送門想要實(shí)現(xiàn)需要使用到中的函數(shù)函數(shù)是什么此次使用的是網(wǎng)上開源案例其中使用的是系列函數(shù)什么是系列函數(shù)大概意思是正如你所指出的,是核心內(nèi)置的,始終可用,而套接字是很少包含的擴(kuò)展的一部分。
php實(shí)現(xiàn)mqtt發(fā)布/發(fā)送 消息到主題
mqtt是啥?我的博客有寫這個(gè)東西:傳送門
php想要實(shí)現(xiàn)mqtt需要使用到php中的socket函數(shù);
socket函數(shù)是什么?
此次使用的是網(wǎng)上開源mqtt案例:其中使用的是 stream_socket_xxxx 系列函數(shù)
什么是stream_socket_xxxx系列函數(shù)
大概意思是:
正如你所指出的,"stream"是PHP核心(內(nèi)置的,始終可用),而"套接字"是很少包含的擴(kuò)展的一部分。除此之外,它們幾乎完全相同。您可以同時(shí)使用TCP和UDP兩種流,也可以使用阻塞和非阻塞模式,這些模式涵蓋了所有用例的99%。MQTT類代碼:我能想到的唯一常見的例外是ICMP。例如,"ping"。但是,看起來目前還沒有一種安全的方式來從PHP執(zhí)行ICMP。這種調(diào)用需要通過套接字?jǐn)U展來實(shí)現(xiàn)SOCK_RAW,這需要執(zhí)行“root”權(quán)限。此外,并非所有路由器都會(huì)在TCP,UDP和ICMP之外路由其他數(shù)據(jù)包類型。這限制了套接字?jǐn)U展的實(shí)用性。
/* phpMQTT */ class Mqtt { private $socket; /* holds the socket */ private $msgid = 1; /* counter for message id */ public $keepalive = 10; /* default keepalive timmer */ public $timesinceping; /* host unix time, used to detect disconects */ public $topics = array(); /* used to store currently subscribed topics */ public $debug = false; /* should output debug messages */ public $address; /* broker address */ public $port; /* broker port */ public $clientid; /* client id sent to brocker */ public $will; /* stores the will of the client */ private $username; /* stores username */ private $password; /* stores password */ public $cafile; function __construct($address, $port, $clientid, $cafile = NULL){ $this->broker($address, $port, $clientid, $cafile); } /* sets the broker details */ function broker($address, $port, $clientid, $cafile = NULL){ $this->address = $address; $this->port = $port; $this->clientid = $clientid; $this->cafile = $cafile; } function connect_auto($clean = true, $will = NULL, $username = NULL, $password = NULL){ while($this->connect($clean, $will, $username, $password)==false){ sleep(10); } return true; } /* connects to the broker inputs: $clean: should the client send a clean session flag */ function connect($clean = true, $will = NULL, $username = NULL, $password = NULL){ if($will) $this->will = $will; if($username) $this->username = $username; if($password) $this->password = $password; if ($this->cafile) { $socketContext = stream_context_create(["ssl" => [ "verify_peer_name" => true, "cafile" => $this->cafile ]]); $this->socket = stream_socket_client("tls://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT, $socketContext); } else { $this->socket = stream_socket_client("tcp://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT); } if (!$this->socket ) { if($this->debug) error_log("stream_socket_create() $errno, $errstr "); return false; } stream_set_timeout($this->socket, 5); stream_set_blocking($this->socket, 0); $i = 0; $buffer = ""; $buffer .= chr(0x00); $i++; $buffer .= chr(0x06); $i++; $buffer .= chr(0x4d); $i++; $buffer .= chr(0x51); $i++; $buffer .= chr(0x49); $i++; $buffer .= chr(0x73); $i++; $buffer .= chr(0x64); $i++; $buffer .= chr(0x70); $i++; $buffer .= chr(0x03); $i++; //No Will $var = 0; if($clean) $var+=2; //Add will info to header if($this->will != NULL){ $var += 4; // Set will flag $var += ($this->will["qos"] << 3); //Set will qos if($this->will["retain"]) $var += 32; //Set will retain } if($this->username != NULL) $var += 128; //Add username to header if($this->password != NULL) $var += 64; //Add password to header $buffer .= chr($var); $i++; //Keep alive $buffer .= chr($this->keepalive >> 8); $i++; $buffer .= chr($this->keepalive & 0xff); $i++; $buffer .= $this->strwritestring($this->clientid,$i); //Adding will to payload if($this->will != NULL){ $buffer .= $this->strwritestring($this->will["topic"],$i); $buffer .= $this->strwritestring($this->will["content"],$i); } if($this->username) $buffer .= $this->strwritestring($this->username,$i); if($this->password) $buffer .= $this->strwritestring($this->password,$i); $head = " "; $head{0} = chr(0x10); $head{1} = chr($i); fwrite($this->socket, $head, 2); fwrite($this->socket, $buffer); $string = $this->read(4); if(ord($string{0})>>4 == 2 && $string{3} == chr(0)){ if($this->debug) echo "Connected to Broker "; }else{ error_log(sprintf("Connection failed! (Error: 0x%02x 0x%02x) ", ord($string{0}),ord($string{3}))); return false; } $this->timesinceping = time(); return true; } /* read: reads in so many bytes */ function read($int = 8192, $nb = false){ // print_r(socket_get_status($this->socket)); $string=""; $togo = $int; if($nb){ return fread($this->socket, $togo); } while (!feof($this->socket) && $togo>0) { $fread = fread($this->socket, $togo); $string .= $fread; $togo = $int - strlen($string); } return $string; } /* subscribe: subscribes to topics */ function subscribe($topics, $qos = 0){ $i = 0; $buffer = ""; $id = $this->msgid; $buffer .= chr($id >> 8); $i++; $buffer .= chr($id % 256); $i++; foreach($topics as $key => $topic){ $buffer .= $this->strwritestring($key,$i); $buffer .= chr($topic["qos"]); $i++; $this->topics[$key] = $topic; } $cmd = 0x80; //$qos $cmd += ($qos << 1); $head = chr($cmd); $head .= chr($i); fwrite($this->socket, $head, 2); fwrite($this->socket, $buffer, $i); $string = $this->read(2); $bytes = ord(substr($string,1,1)); $string = $this->read($bytes); } /* ping: sends a keep alive ping */ function ping(){ $head = " "; $head = chr(0xc0); $head .= chr(0x00); fwrite($this->socket, $head, 2); if($this->debug) echo "ping sent "; } /* disconnect: sends a proper disconect cmd */ function disconnect(){ $head = " "; $head{0} = chr(0xe0); $head{1} = chr(0x00); fwrite($this->socket, $head, 2); } /* close: sends a proper disconect, then closes the socket */ function close(){ $this->disconnect(); stream_socket_shutdown($this->socket, STREAM_SHUT_WR); } /* publish: publishes $content on a $topic */ function publish($topic, $content, $qos = 0, $retain = 0){ $i = 0; $buffer = ""; $buffer .= $this->strwritestring($topic,$i); //$buffer .= $this->strwritestring($content,$i); if($qos){ $id = $this->msgid++; $buffer .= chr($id >> 8); $i++; $buffer .= chr($id % 256); $i++; } $buffer .= $content; $i+=strlen($content); $head = " "; $cmd = 0x30; if($qos) $cmd += $qos << 1; if($retain) $cmd += 1; $head{0} = chr($cmd); $head .= $this->setmsglength($i); fwrite($this->socket, $head, strlen($head)); fwrite($this->socket, $buffer, $i); } /* message: processes a recieved topic */ function message($msg){ $tlen = (ord($msg{0})<<8) + ord($msg{1}); $topic = substr($msg,2,$tlen); $msg = substr($msg,($tlen+2)); $found = 0; foreach($this->topics as $key=>$top){ if( preg_match("/^".str_replace("#",".*", str_replace("+","[^/]*", str_replace("/","/", str_replace("$","$", $key))))."$/",$topic) ){ if(is_callable($top["function"])){ call_user_func($top["function"],$topic,$msg); $found = 1; } } } if($this->debug && !$found) echo "msg recieved but no match in subscriptions "; } /* proc: the processing loop for an "allways on" client set true when you are doing other stuff in the loop good for watching something else at the same time */ function proc( $loop = true){ if(1){ $sockets = array($this->socket); $w = $e = NULL; $cmd = 0; //$byte = fgetc($this->socket); if(feof($this->socket)){ if($this->debug) echo "eof receive going to reconnect for good measure "; fclose($this->socket); $this->connect_auto(false); if(count($this->topics)) $this->subscribe($this->topics); } $byte = $this->read(1, true); if(!strlen($byte)){ if($loop){ usleep(100000); } }else{ $cmd = (int)(ord($byte)/16); if($this->debug) echo "Recevid: $cmd "; $multiplier = 1; $value = 0; do{ $digit = ord($this->read(1)); $value += ($digit & 127) * $multiplier; $multiplier *= 128; }while (($digit & 128) != 0); if($this->debug) echo "Fetching: $value "; if($value) $string = $this->read($value); if($cmd){ switch($cmd){ case 3: $this->message($string); break; } $this->timesinceping = time(); } } if($this->timesinceping < (time() - $this->keepalive )){ if($this->debug) echo "not found something so ping "; $this->ping(); } if($this->timesinceping<(time()-($this->keepalive*2))){ if($this->debug) echo "not seen a package in a while, disconnecting "; fclose($this->socket); $this->connect_auto(false); if(count($this->topics)) $this->subscribe($this->topics); } } return 1; } /* getmsglength: */ function getmsglength(&$msg, &$i){ $multiplier = 1; $value = 0 ; do{ $digit = ord($msg{$i}); $value += ($digit & 127) * $multiplier; $multiplier *= 128; $i++; }while (($digit & 128) != 0); return $value; } /* setmsglength: */ function setmsglength($len){ $string = ""; do{ $digit = $len % 128; $len = $len >> 7; // if there are more digits to encode, set the top bit of this digit if ( $len > 0 ) $digit = ($digit | 0x80); $string .= chr($digit); }while ( $len > 0 ); return $string; } /* strwritestring: writes a string to a buffer */ function strwritestring($str, &$i){ $ret = " "; $len = strlen($str); $msb = $len >> 8; $lsb = $len % 256; $ret = chr($msb); $ret .= chr($lsb); $ret .= $str; $i += ($len+2); return $ret; } function printstr($string){ $strlen = strlen($string); for($j=0;$j<$strlen;$j++){ $num = ord($string{$j}); if($num > 31) $chr = $string{$j}; else $chr = " "; printf("%4d: %08b : 0x%02x : %s ",$j,$num,$num,$chr); } } }實(shí)現(xiàn)部分 發(fā)送到主題
// 發(fā)送給訂閱號信息,創(chuàng)建socket,無sam隊(duì)列 $server = "127.0.0.1"; // 服務(wù)代理地址(mqtt服務(wù)端地址) $port = 1883; // 通信端口 $username = ""; // 用戶名(如果需要) $password = ""; // 密碼(如果需要 $client_id = "clientx9293670xxctr"; // 設(shè)置你的連接客戶端id $mqtt = new Mqtt($server, $port, $client_id); //實(shí)例化MQTT類 if ($mqtt->connect(true, NULL, $username, $password)) { //如果創(chuàng)建鏈接成功 $mqtt->publish("xxx3809293670ctr", "setr=3xxxxxxxxx", 0); // 發(fā)送到 xxx3809293670ctr 的主題 一個(gè)信息 內(nèi)容為 setr=3xxxxxxxxx Qos 為 0 $mqtt->close(); //發(fā)送后關(guān)閉鏈接 } else { echo "Time out! "; }訂閱主題
/*// 訂閱信息,接收一個(gè)信息后退出 $server = "127.0.0.1"; // 服務(wù)代理地址(mqtt服務(wù)端地址) $port = 1883; // 通信端口 $username = ""; // 用戶名(如果需要) $password = ""; // 密碼(如果需要 $client_id = "clientx9293670xxctr"; // 設(shè)置你的連接客戶端id $mqtt = new Mqtt($server, $port, $client_id); if(!$mqtt->connect(true, NULL, $username, $password)) { //鏈接不成功再重復(fù)執(zhí)行監(jiān)聽連接 exit(1); } $topics["SN69143809293670state"] = array("qos" => 0, "function" => "procmsg"); // 訂閱主題為 SN69143809293670state qos為0 $mqtt->subscribe($topics, 0); while($mqtt->proc()){ } //死循環(huán)監(jiān)聽 $mqtt->close(); function procmsg($topic, $msg){ //信息回調(diào)函數(shù) 打印信息 echo "Msg Recieved: " . date("r") . " "; echo "Topic: {$topic} "; echo " $msg "; $xxx = json_decode($msg); var_dump($xxxxxx->aa); die; }
這是php實(shí)現(xiàn)方法,如果用php做發(fā)送端還是不錯(cuò)的.但是
我被這個(gè)圖片打擊了,區(qū)塊鏈應(yīng)用還真提莫的是js寫起來跟簡單;
我最終寫出的mqtt api 使用的是node;為什么?
node.js實(shí)現(xiàn)mqtt
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/28481.html
摘要:,消息隊(duì)列遙測傳輸是開發(fā)的一個(gè)即時(shí)通訊協(xié)議,有可能成為物聯(lián)網(wǎng)的重要組成部分。會(huì)發(fā)生消息丟失或重復(fù)。只有一次,確保消息到達(dá)一次。此外,國內(nèi)很多企業(yè)都廣泛使用作為手機(jī)客戶端與服務(wù)器端推送消息的協(xié)議。 前幾天寫了一下MQTT協(xié)議實(shí)現(xiàn)推送數(shù)據(jù)傳輸,所以我會(huì)不定期的更新一下關(guān)注MQTT的知識。 MQTT: MQTT(Message Queuing Telemetry Transport,消息隊(duì)列...
摘要:協(xié)議版本介紹互聯(lián)網(wǎng)的基礎(chǔ)網(wǎng)絡(luò)協(xié)議是協(xié)議消息隊(duì)列遙測傳輸是基于協(xié)議棧而構(gòu)建的已成為通信的標(biāo)準(zhǔn)為什么選擇有多好多好多么牛逼我就不說了說的再多不如一個(gè)一個(gè)試試完了做比對剩下的那個(gè)就是要選擇的實(shí)在不想這樣搞技術(shù)就跟著一線走發(fā)布和訂閱模型協(xié)議在網(wǎng)絡(luò)中 mqtt 協(xié)議版本: 3.1.1 MQTT 介紹 互聯(lián)網(wǎng)的基礎(chǔ)網(wǎng)絡(luò)協(xié)議是 TCP/IP協(xié)議. MQTT(消息隊(duì)列遙測傳輸)是基于 TCP/IP 協(xié)...
摘要:協(xié)議簡介,消息隊(duì)列遙測傳輸是一個(gè)輕量的發(fā)布訂閱模式消息傳輸協(xié)議,是專門針對低帶寬和不穩(wěn)定網(wǎng)絡(luò)環(huán)境的物聯(lián)網(wǎng)應(yīng)用設(shè)計(jì)的。它是等級協(xié)議交換的第二個(gè)報(bào)文。 1.MQTT協(xié)議簡介 MQTT(Message Queuing Telemetry Transport,消息隊(duì)列遙測傳輸)是一個(gè)輕量的發(fā)布/訂...
摘要:時(shí)間就是金錢,效率就是生命本教程助力開發(fā)者使用協(xié)議快速產(chǎn)品化。摘要借助具備及聯(lián)網(wǎng)功能的,快速部署到客戶產(chǎn)品上,助力開發(fā),縮短開發(fā)周期,快速實(shí)現(xiàn)產(chǎn)品商業(yè)化。 時(shí)間就是金錢,效率就是生命 本教程助力開發(fā)者使用MQTT協(xié)議快速產(chǎn)品化。 摘要 借助具備MQTT及聯(lián)網(wǎng)功能的DTU,快速部署到客戶產(chǎn)品...
閱讀 1628·2021-11-16 11:45
閱讀 2559·2021-09-29 09:48
閱讀 3328·2021-09-07 10:26
閱讀 1850·2021-08-16 10:50
閱讀 1883·2019-08-30 15:44
閱讀 2709·2019-08-28 18:03
閱讀 1909·2019-08-27 10:54
閱讀 1833·2019-08-26 14:01