成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

基于wokerman實(shí)現(xiàn)即時(shí)消息推送

JellyBool / 1682人閱讀

摘要:目前網(wǎng)站有兩個(gè)用到實(shí)時(shí)消息推送的功能,源碼最新動(dòng)態(tài),實(shí)時(shí)顯示用戶的操作行為消息推送,如重要消息通知,任務(wù)指派等等考慮的問題既然要實(shí)現(xiàn)即時(shí),那就少不了。

目前網(wǎng)站有兩個(gè)用到實(shí)時(shí)消息推送的功能,源碼:https://github.com/wuzhc/team

最新動(dòng)態(tài),實(shí)時(shí)顯示用戶的操作行為

消息推送,如重要消息通知,任務(wù)指派等等

考慮的問題

既然要實(shí)現(xiàn)即時(shí),那就少不了socketio。因?yàn)轫?xiàng)目是PHP寫的,所以服務(wù)端直接用phpsocket.io

我們應(yīng)該保存離線消息,否則如果用戶不在線,那就接受不到消息。這里我用mongodb來存儲(chǔ)消息。

一是消息不需要關(guān)聯(lián)表,一條消息一個(gè)文檔

二是mongodb適合做海量數(shù)據(jù)存儲(chǔ),并且分片也很簡單

三是消息不需要永久存儲(chǔ),隨著時(shí)間推移,消息的價(jià)值性越低,可以用固定集合來存儲(chǔ)消息,當(dāng)數(shù)據(jù)量達(dá)到一定值時(shí)覆蓋最久的消息

一個(gè)頁面連接一個(gè)socket,若用戶同時(shí)打開了多個(gè)頁面(即一個(gè)用戶會(huì)對(duì)應(yīng)多個(gè)socketID),那么消息應(yīng)該推送到用戶每一個(gè)打開的頁面。一個(gè)簡單的做法就是把用戶多個(gè)sockID加到group分組(socket.emit("group")),group分組名稱可以是"uid:1",表示userID為1的用戶,然后socket.broadcast.to("uid:1").emit("event_name", data);

就可以實(shí)現(xiàn)對(duì)多個(gè)頁面推送消息

實(shí)時(shí)動(dòng)態(tài)

實(shí)時(shí)消息推送

客戶端
        // 初始化io對(duì)象
        var socket = io("http://" + document.domain + ":2120");
        // 當(dāng)socket連接后發(fā)送登錄請(qǐng)求
        socket.on("connect", function () {
            socket.emit("login", {
                uid: "user->id?>",
                companyID: "user->identity->fdCompanyID?>"
            });
        });
        // 實(shí)時(shí)消息,當(dāng)服務(wù)端推送來消息時(shí)觸發(fā)
        socket.on("new_msg", function (msg) {
            if (msg.typeID == "") {
                var html = "
  • " + "" + "
    " + "" + "
    " + "

    " + msg.title + "

    " + "

    " + msg.content + "

    " + "
    " + "
  • "; $("#msg-handle").prepend(html); var num = $(".msg-handle-num").eq(1).text() * 1; $(".msg-handle-num").text(num + 1); } }); // 實(shí)時(shí)動(dòng)態(tài),當(dāng)服務(wù)端推送來消息時(shí)觸發(fā) socket.on("update_dynamic", function (msg) { var html = "
  • " + "" + "User Image" + "
    " + "" + "" + msg.date + "" + "

    " + "" + msg.operator + "" + "" + msg.title + "" + "

    " + "
    " + msg.content + "
    " + "
    " + "
  • "; $("#dynamic-list").prepend(html); }); // 在線人數(shù) socket.on("update_online_count", function (msg) { $("#online-people").html(msg); });
    服務(wù)端
        /**
         * 消息推送
         */
        public function actionStart()
        {
            // PHPSocketIO服務(wù)
            $io = new SocketIO(2120);
            // 客戶端發(fā)起連接事件時(shí),設(shè)置連接socket的各種事件回調(diào)
            $io->on("connection", function ($socket) use ($io) {
    
                /** @var Connection $redis */
                $redis = Yii::$app->redis;
    
                // 當(dāng)客戶端發(fā)來登錄事件時(shí)觸發(fā)
                $socket->on("login", function ($data) use ($socket, $redis, $io) {
    
                    $uid = isset($data["uid"]) ? $data["uid"] : null;
                    $companyID = isset($data["companyID"]) ? $data["companyID"] : null;
    
                    // uid和companyID應(yīng)該做下加密 by wuzhc 2018-01-28
                    if (empty($uid) || empty($companyID)) {
                        return ;
                    }
                    $this->companyID = $companyID;
    
                    // 已經(jīng)登錄過了
                    if (isset($socket->uid)) {
                        return;
                    }
    
                    $key = "socketio:company:" . $companyID;
                    $field = "uid:" . $uid;
    
                    // 用hash方便統(tǒng)計(jì)用戶打開頁面數(shù)量
                    if (!$redis->hget($key, $field)) {
                        $redis->hset($key, $field, 0);
                    }
    
                    // 同個(gè)用戶打開新頁面時(shí)加1
                    $redis->hincrby($key, $field, 1);
    
                    // 加入uid分組,方便對(duì)同個(gè)用戶的所有打開頁面推送消息
                    $socket->join("uid:". $uid);
    
                    // 加入companyID,方便對(duì)整個(gè)公司的所有用戶推送消息
                    $socket->join("company:".$companyID);
    
                    $socket->uid = $uid;
                    $socket->companyID = $companyID;
    
                    // 整個(gè)公司在線人數(shù)更新
                    $io->to("company:".$companyID)->emit("update_online_count", $redis->hlen($key));
                });
    
                // 當(dāng)客戶端斷開連接是觸發(fā)(一般是關(guān)閉網(wǎng)頁或者跳轉(zhuǎn)刷新導(dǎo)致)
                $socket->on("disconnect", function () use ($socket, $redis, $io) {
                    if (!isset($socket->uid)) {
                        return;
                    }
    
                    $key = "socketio:company:" . $socket->companyID;
                    $field = "uid:" . $socket->uid;
                    $redis->hincrby($key, $field, -1);
    
                    if ($redis->hget($key, $field) <= 0) {
                        $redis->hdel($key, $field);
    
                        // 某某下線了,刷新整個(gè)公司的在線人數(shù)
                        $io->to("company:".$socket->companyID)->emit("update_online_count", $redis->hlen($key));
                    }
                });
            });
    
            // 開始進(jìn)程時(shí),監(jiān)聽2121端口,用戶數(shù)據(jù)推送
            $io->on("workerStart", function () use ($io) {
                /** @var Connection $redis */
                $redis = Yii::$app->redis;
    
                $httpWorker = new Worker("http://0.0.0.0:2121");
                // 當(dāng)http客戶端發(fā)來數(shù)據(jù)時(shí)觸發(fā)
                $httpWorker->onMessage = function ($conn, $data) use ($io, $redis) {
                    $_POST = $_POST ? $_POST : $_GET;
                    switch (@$_POST["action"]) {
                        case "message":
                            $to = "uid:" . @$_POST["receiverID"];
                            $_POST["content"] = htmlspecialchars(@$_POST["content"]);
                            $_POST["title"] = htmlspecialchars(@$_POST["title"]);
    
                            // 有指定uid則向uid所在socket組發(fā)送數(shù)據(jù)
                            if ($to) {
                                $io->to($to)->emit("new_msg", $_POST);
                            }
    
                            $companyID = @$_POST["companyID"];
                            $key = "socketio:company:" . $companyID;
                            $field = "uid:" . $to;
    
                            // http接口返回,如果用戶離線socket返回fail
                            if ($to && $redis->hget($key, $field)) {
                                return $conn->send("offline");
                            } else {
                                return $conn->send("ok");
                            }
                            break;
                        case "dynamic":
                            $to = "company:" . @$_POST["companyID"];
                            $_POST["content"] = htmlspecialchars(@$_POST["content"]);
                            $_POST["title"] = htmlspecialchars(@$_POST["title"]);
    
                            // 有指定uid則向uid所在socket組發(fā)送數(shù)據(jù)
                            if ($to) {
                                $io->to($to)->emit("update_dynamic", $_POST);
                            }
    
                            $companyID = @$_POST["companyID"];
                            $key = "socketio:company:" . $companyID;
    
                            // http接口返回,如果用戶離線socket返回fail
                            if ($to && $redis->hlen($key)) {
                                return $conn->send("offline");
                            } else {
                                return $conn->send("ok");
                            }
                    }
    
                    return $conn->send("fail");
                };
    
                // 執(zhí)行監(jiān)聽
                $httpWorker->listen();
            });
    
            // 運(yùn)行所有的實(shí)例
            global $argv;
            array_shift($argv);
    
            if (isset($argv[2]) && $argv[2] == "daemon") {
                $argv[2] = "-d";
            }
    
            Worker::runAll();
        }
    例子
        /**
         * 新建任務(wù)
         * @param $projectID
         * @param $categoryID
         * @return string
         * @throws ForbiddenHttpException
         * @since 2018-01-26
         */
        public function actionCreate($projectID, $categoryID)
        {
            $userID = Yii::$app->user->id;
    
            if (!Yii::$app->user->can("createTask")) {
                throw new ForbiddenHttpException(ResponseUtil::$msg[1]);
            }
    
            if ($data = Yii::$app->request->post()) {
                if (empty($data["name"])) {
                    ResponseUtil::jsonCORS(["status" => Conf::FAILED, "msg" => "任務(wù)標(biāo)題不能為空"]);
                }
    
                // 保存任務(wù)
                $taskID = TaskService::factory()->save([
                    "name"       => $data["name"],
                    "creatorID"  => $userID,
                    "companyID"  => $this->companyID,
                    "level"      => $data["level"],
                    "categoryID" => $categoryID,
                    "projectID"  => $projectID,
                    "content"    => $data["content"]
                ]);
    
                if ($taskID) {
                    $url = Url::to(["task/view", "taskID" => $taskID]);
                    $portrait = UserService::factory()->getUserPortrait($userID);
                    $username = UserService::factory()->getUserName($userID);
                    $title = "創(chuàng)建了新任務(wù)";
                    $content = $data["name"];
    
                    // 保存操作日志
                    LogService::factory()->saveHandleLog([
                        "objectID"   => $taskID,
                        "companyID"  => $this->companyID,
                        "operatorID" => $userID,
                        "objectType" => Conf::OBJECT_TASK,
                        "content"    => $content,
                        "url"        => $url,
                        "title"      => $title,
                        "portrait"   => $portrait,
                        "operator"   => $username
                    ]);
    
                    // 動(dòng)態(tài)推送
                    MsgService::factory()->push("dynamic", [
                        "companyID"  => $this->companyID,
                        "operatorID" => $userID,
                        "operator"   => $username,
                        "portrait"   => $portrait,
                        "title"      => $title,
                        "content"    => $content,
                        "url"        => $url,
                        "date"       => date("Y-m-d H:i:s"),
                    ]);
    
                    ResponseUtil::jsonCORS(null, Conf::SUCCESS, "創(chuàng)建成功");
                } else {
                    ResponseUtil::jsonCORS(null, Conf::FAILED, "創(chuàng)建失敗");
                }
            } else {
                return $this->render("create", [
                    "projectID" => $projectID,
                    "category"  => TaskCategory::findOne(["id" => $categoryID]),
                ]);
            }
        }
    調(diào)用保存操作日志底層接口
    LogService::factory()->saveHandleLog($args)
    參數(shù)說明
    參數(shù) 說明
    operatorID 操作者ID,用于用戶跳轉(zhuǎn)鏈接
    companyID 公司ID,用于該公司下的動(dòng)態(tài)
    operator 操作者姓名
    portrait 操作者頭像
    receiver 接受者,可選
    title 動(dòng)態(tài)標(biāo)題,例如創(chuàng)建任務(wù)
    content 動(dòng)態(tài)內(nèi)容,例如創(chuàng)建任務(wù)的名稱
    date 動(dòng)態(tài)時(shí)間
    url 動(dòng)態(tài)跳轉(zhuǎn)鏈接
    token 驗(yàn)證
    objectType 對(duì)象類型,例如任務(wù),文檔等等
    objectID 對(duì)象ID,例如任務(wù)ID,文檔ID等等

    companyID 用于查詢?cè)摴鞠碌膭?dòng)態(tài)

    objectType和objectID組合可以查詢某個(gè)對(duì)象的操作日志,例如一個(gè)任務(wù)被操作的日志

    保存消息
    參數(shù) 說明
    senderID 發(fā)送者ID,用于用戶跳轉(zhuǎn)鏈接
    companyID 公司ID,用于該公司下的動(dòng)態(tài)
    sender 發(fā)送者姓名
    portrait 發(fā)送者頭像
    receiverID 接受者ID
    title 動(dòng)態(tài)標(biāo)題,例如創(chuàng)建任務(wù)
    content 動(dòng)態(tài)內(nèi)容,例如創(chuàng)建任務(wù)的名稱
    date 動(dòng)態(tài)時(shí)間
    url 動(dòng)態(tài)跳轉(zhuǎn)鏈接
    typeID 消息類型,1系統(tǒng)通知,2管理員通知,3操作通知

    receiverID和typeID和isRead組合可以查詢用戶未讀消息

    請(qǐng)求接口
    MsgService::factory()->push($action, $args)
    即時(shí)動(dòng)態(tài)
    參數(shù) 說明
    operatorID 操作者ID,用于用戶跳轉(zhuǎn)鏈接
    companyID 公司ID,用于給該公司的所有socket推送
    operator 操作者姓名
    portrait 操作者頭像
    receiver 接受者,可選
    title 動(dòng)態(tài)標(biāo)題,例如創(chuàng)建任務(wù)
    content 動(dòng)態(tài)內(nèi)容,例如創(chuàng)建任務(wù)的名稱
    date 動(dòng)態(tài)時(shí)間
    url 動(dòng)態(tài)跳轉(zhuǎn)鏈接
    即時(shí)消息
    參數(shù) 說明
    senderID 發(fā)送者ID,用于用戶跳轉(zhuǎn)鏈接
    sender 發(fā)送者姓名
    receiverID 接受者ID,用于給接受者的所有socket推送消息
    typeID 消息類型,1系統(tǒng)通知,2管理員通知,3操作通知
    portrait 發(fā)送者頭像
    title 消息標(biāo)題
    content 消息內(nèi)容
    url 消息跳轉(zhuǎn)鏈接
    完整代碼參考:https://github.com/wuzhc/team

    文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

    轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/28194.html

    相關(guān)文章

    • 打造你的Laravel即時(shí)應(yīng)用(二)-消息推送與監(jiān)聽

      摘要:打造你的即時(shí)應(yīng)用二消息推送與監(jiān)聽年月日接于上篇博客打造你的即時(shí)應(yīng)用一項(xiàng)目初始化構(gòu)建在上一篇博客中介紹了項(xiàng)目的基本構(gòu)建現(xiàn)在進(jìn)入實(shí)戰(zhàn)操作一消息推送創(chuàng)建事件類的廣播推送通過來實(shí)現(xiàn)下面通過命令來創(chuàng)建一個(gè)事件類為了配合我們的廣播系統(tǒng)使用需要實(shí)現(xiàn)接 打造你的Laravel即時(shí)應(yīng)用(二)-消息推送與監(jiān)聽 2019年08月04日20:16:21 XXM 接于上篇博客: 打造你的Laravel即時(shí)應(yīng)用(...

      omgdog 評(píng)論0 收藏0
    • 搭建IM服務(wù) so easy

      摘要:現(xiàn)在很多網(wǎng)站都通過服務(wù)來實(shí)現(xiàn)消息推送及數(shù)據(jù)即時(shí)同步功能,即時(shí)通訊組件逐漸成為產(chǎn)品的標(biāo)配。目前國內(nèi)有很多成熟穩(wěn)定的第三方即時(shí)通訊服務(wù)廠家,比如融云。 現(xiàn)在很多網(wǎng)站、APP都通過IM服務(wù)來實(shí)現(xiàn)消息推送及數(shù)據(jù)即時(shí)同步功能,即時(shí)通訊組件逐漸成為產(chǎn)品的標(biāo)配。目前國內(nèi)有很多成熟穩(wěn)定的第三方即時(shí)通訊服務(wù)廠家,比如:融云。使用這些專業(yè)的服務(wù)可以提高開發(fā)效率而且服務(wù)穩(wěn)定有保障。 如果自己DIY或者需要在...

      imccl 評(píng)論0 收藏0

    發(fā)表評(píng)論

    0條評(píng)論

    最新活動(dòng)
    閱讀需要支付1元查看
    <