成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

微信開源mars源碼分析5—底層核心mars分析(續(xù)2)

asce1885 / 1714人閱讀

摘要:執(zhí)行并根據每個連接的狀態(tài)決定后續(xù)處理,上篇已經講過,不再累述。上面的三段處理完畢后,應該是數組中不再有連接才對,這里的保險處理是對數組再進行檢查。至此跳出,算是整個連接過程完畢了。這里需要逐句分析,首先是。

最近回顧之前的文章,發(fā)現最后一篇有些著急了,很多地方沒有敘述清楚。這里先做個銜接吧。
我們還是以長連接為例,從longlink.cc看起。首先是那個線程函數__Run:
/mars-master/mars/stn/src/longlink.cc

void LongLink::__Run() {
    ......
    // 執(zhí)行連接
    SOCKET sock = __RunConnect(conn_profile);
    
    // 無效的socket,更新描述文件,記錄失敗的時間節(jié)點,返回
    if (INVALID_SOCKET == sock) {
        conn_profile.disconn_time = ::gettickcount();
        conn_profile.disconn_signal = ::getSignal(::getNetInfo() == kWifi);
        __UpdateProfile(conn_profile);
        return;
    }
    ......
    // 執(zhí)行讀寫
    __RunReadWrite(sock, errtype, errcode, conn_profile);
}

實際上核心的就2個,連接和讀寫,我們分別看下。
/mars-master/mars/stn/src/longlink.cc

SOCKET LongLink::__RunConnect(ConnectProfile& _conn_profile) {
    std::vector ip_items;
    std::vector vecaddr;
    ......
    // 賦值填充ip_items地址端口數組
    netsource_.GetLongLinkItems(ip_items, dns_util_);
    ......
    // 根據ip_items創(chuàng)建socket_address并加入vecaddr中
    for (unsigned int i = 0; i < ip_items.size(); ++i) {
        vecaddr.push_back(socket_address(ip_items[i].str_ip.c_str(), ip_items[i].port).v4tov6_address(isnat64));
    }
    ......
    // 創(chuàng)建觀察者和ComplexConnect連接核心,然后開始執(zhí)行連接
    LongLinkConnectObserver connect_observer(*this, ip_items);
    ComplexConnect com_connect(kLonglinkConnTimeout, kLonglinkConnInteral, kLonglinkConnInteral, kLonglinkConnMax);
    SOCKET sock = com_connect.ConnectImpatient(vecaddr, connectbreak_, &connect_observer);
    
    // 返回socket
    return sock;
}

1.創(chuàng)建2個數組,地址端口item和socket_address;
2.調用netsource_.GetLongLinkItems(ip_items, dns_util_);填充IPPortItem數組;
3.根據填充好的前者數組生成socket_address填充后者數組;
4.創(chuàng)建連接觀察者;
5.開始執(zhí)行連接;
首先看看netsource_.GetLongLinkItems是如何填充的:
/mars-master/mars/stn/src/net_source.cc

bool NetSource::GetLongLinkItems(std::vector& _ipport_items, DnsUtil& _dns_util) {
    
    ScopedLock lock(sg_ip_mutex);

    if (__GetLonglinkDebugIPPort(_ipport_items)) {
        return true;
    }
    
    lock.unlock();

     std::vector longlink_hosts = NetSource::GetLongLinkHosts();
     if (longlink_hosts.empty()) {
         xerror2("longlink host empty.");
         return false;
     }

     __GetIPPortItems(_ipport_items, longlink_hosts, _dns_util, true);

    return !_ipport_items.empty();
}

可以看到debug的優(yōu)先,這里增加了調試的ip。再往下就先不貼代碼了,基本上就是之前通過SetLongLink設置進去的sg_longlink_hosts(長連接主機列表),再往上倒騰就是在MarsServiceNative.java的onCreate中通過描述文件profile設置進去的主機列表。也就是說之前早就設置好的主機列表已經存在了。
下面我們仍然要進入到上一篇提到的ComplexConnect::ConnectImpatient這個核心函數中看看。
/mars-master/mars/comm/socket/complexconnect.cc

SOCKET ComplexConnect::ConnectImpatient(const std::vector& _vecaddr, SocketSelectBreaker& _breaker, MComplexConnect* _observer) {
    ......
    // 根據地址列表,生成ConnectCheckFSM連接列表
    std::vector vecsocketfsm;

    for (unsigned int i = 0; i < _vecaddr.size(); ++i) {
        xinfo2(TSF"complex.conn %_", _vecaddr[i].url());

        ConnectCheckFSM* ic = new ConnectCheckFSM(_vecaddr[i], timeout_, i, _observer);
        vecsocketfsm.push_back(ic);
    }
    // 下面就是對這個連接列表的各種操作了
    do {
        ......
        // 生成socketselect的封裝對象,并執(zhí)行PreSelect前期準備工作
        SocketSelect sel(_breaker);
        sel.PreSelect();
        
        ......
        // 執(zhí)行連接
        for (unsigned int i = 0; i < index; ++i) {
            if (NULL == vecsocketfsm[i]) continue;

            xgroup2_define(group);
            vecsocketfsm[i]->PreSelect(sel, group);
            xgroup2_if(!group.Empty(), TSF"index:%_, @%_, ", i, this) << group;
            timeout = std::min(timeout, vecsocketfsm[i]->Timeout());
        }
        
        ......
        
        for (unsigned int i = 0; i < index; ++i) {
            if (NULL == vecsocketfsm[i]) continue;

            xgroup2_define(group);
            vecsocketfsm[i]->AfterSelect(sel, group);
            xgroup2_if(!group.Empty(), TSF"index:%_, @%_, ", i, this) << group;

            if (TcpClientFSM::EEnd == vecsocketfsm[i]->Status()) {
                if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(),
                                                         vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime));

                vecsocketfsm[i]->Close();
                delete vecsocketfsm[i];
                vecsocketfsm[i] = NULL;
                lasterror = -1;
                continue;
            }

            if (TcpClientFSM::EReadWrite == vecsocketfsm[i]->Status() && ConnectCheckFSM::ECheckFail == vecsocketfsm[i]->CheckStatus()) {
                if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(),
                                                         vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime));

                vecsocketfsm[i]->Close();
                delete vecsocketfsm[i];
                vecsocketfsm[i] = NULL;
                lasterror = -1;
                continue;
            }

            if (TcpClientFSM::EReadWrite == vecsocketfsm[i]->Status() && ConnectCheckFSM::ECheckOK == vecsocketfsm[i]->CheckStatus()) {
                if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(),
                                                         vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime));

                xinfo2(TSF"index:%_, sock:%_, suc ConnectImpatient:%_:%_, RTT:(%_, %_), @%_", i, vecsocketfsm[i]->Socket(),
                       vecsocketfsm[i]->IP(), vecsocketfsm[i]->Port(), vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), this);
                retsocket = vecsocketfsm[i]->Socket();
                index_ = i;
                index_conn_rtt_ = vecsocketfsm[i]->Rtt();
                index_conn_totalcost_ = vecsocketfsm[i]->TotalRtt();
                vecsocketfsm[i]->Socket(INVALID_SOCKET);
                delete vecsocketfsm[i];
                vecsocketfsm[i] = NULL;
                break;
            }
        }
        
    } while (true);
}

1.數組中的每個長連接地址依次執(zhí)行連接;
2.數組中的每個連接分別做后續(xù)處理(一個for循環(huán)中的三段處理);

我們首先看看vecsocketfsm[i]->PreSelect(sel, group);這句話,是由ConnectCheckFSM的父類TcpClientFSM實現的:
/mars-master/mars/comm/socket/tcpclient_fsm.cc

void TcpClientFSM::PreSelect(SocketSelect& _sel, XLogger& _log) {
    
    switch(status_) {
        case EStart: {
            PreConnectSelect(_sel, _log);
            break;
        }
        case EConnecting: {
            _sel.Write_FD_SET(sock_);
            _sel.Exception_FD_SET(sock_);
            break;
        }
        case EReadWrite: {
            PreReadWriteSelect(_sel, _log);
            break;
        }
        default:
            xassert2(false, "preselect status error");
    }
}

這里是根據這個連接的當前狀態(tài)決定前置操作的行為(開始連接、讀寫、連接中)。再往下看就是進行socket的connect。以PreConnectSelect為例,這里生產了socket,并執(zhí)行了connect,最后將成功連接的socket執(zhí)行_sel.Write_FD_SET(sock_);保存在了SocketSelect中。
我們來看下代碼:
/mars-master/mars/comm/socket/tcpclient_fsm.cc

void TcpClientFSM::PreConnectSelect(SocketSelect& _sel, XLogger& _log) {
    xassert2(EStart == status_, "%d", status_);
    // 執(zhí)行虛函數,由子類繼承實現
    _OnCreate();

    xinfo2(TSF"addr:(%_:%_), ", addr_.ip(), addr_.port()) >> _log;

    // 生成socket
    sock_ = socket(addr_.address().sa_family, SOCK_STREAM, IPPROTO_TCP);

    if (sock_ == INVALID_SOCKET) {
        error_ = socket_errno;
        last_status_ = status_;
        status_ = EEnd;
        _OnClose(last_status_, error_, false);
        xerror2(TSF"close socket err:(%_, %_)", error_, socket_strerror(error_)) >> _log;
        return;
    }

    if (::getNetInfo() == kWifi && socket_fix_tcp_mss(sock_) < 0) {
#ifdef ANDROID
        xinfo2(TSF"wifi set tcp mss error:%0", strerror(socket_errno));
#endif
    }
    if (0 != socket_ipv6only(sock_, 0)){
        xwarn2(TSF"set ipv6only failed. error %_",strerror(socket_errno));
    }
    
    if (0 != socket_set_nobio(sock_)) {
        error_ = socket_errno;
        xerror2(TSF"close socket_set_nobio:(%_, %_)", error_, socket_strerror(error_)) >> _log;
    } else {
        xinfo2(TSF"socket:%_, ", sock_) >> _log;
    }

    if (0 != error_) {
        last_status_ = status_;
        status_ = EEnd;
        return;
    }

    start_connecttime_ = gettickcount();

    // 執(zhí)行連接
    int ret = connect(sock_, &(addr_.address()), addr_.address_length());

    if (0 != ret && !IS_NOBLOCK_CONNECT_ERRNO(socket_errno)) {
        end_connecttime_ = ::gettickcount();

        error_ = socket_errno;
        xwarn2(TSF"close connect err:(%_, %_), localip:%_", error_, socket_strerror(error_), socket_address::getsockname(sock_).ip()) >> _log;
    } else {
        xinfo2("connect") >> _log;
        // 記錄socket到SocketSelect中
        _sel.Write_FD_SET(sock_);
        _sel.Exception_FD_SET(sock_);
    }

    last_status_ = status_;

    if (0 != error_)
        status_ = EEnd;
    else
        status_ = EConnecting;

    if (0 == error_) _OnConnect();
}

需要注意的是_OnCreate的調用,實際上是子類實現的,這里也就是ConnectCheckFSM實現的:

virtual void _OnCreate() { if (m_observer) m_observer->OnCreated(m_index, addr_, sock_);}

這里將觀察者與連接對象的生命周期綁在了一起,執(zhí)行了觀察者的OnCreated。那么觀察者是誰呢?往上看,在LongLink::__RunConnect中生成的LongLinkConnectObserver。當然生命周期的回調并不止OnCreated一個。

回到__RunConnect中,看后續(xù)處理(for循環(huán)的三段處理)。執(zhí)行AfterSelect并根據每個連接的狀態(tài)決定后續(xù)處理,上篇已經講過,不再累述。

那么何時終止這個do while循環(huán)呢?當for循環(huán)的三段處理完畢后,所有的連接過程都已經處理完畢了:

        // end of loop
        bool all_invalid = true;

        for (unsigned int i = 0; i < vecsocketfsm.size(); ++i) {
            if (NULL != vecsocketfsm[i]) {
                all_invalid = false;
                break;
            }
        }

        if (all_invalid || INVALID_SOCKET != retsocket) break;

最后枚舉一遍連接數組,每個元素檢查是否非空,如果還有非空的,就將all_invalid置為false,那么會繼續(xù)走一次do while。上面的三段處理完畢后,應該是數組中不再有連接才對,這里的保險處理是對數組再進行檢查。至此跳出do while,算是整個連接過程完畢了。

可以看到,經過了三段處理后,連接數組中只會命中一個檢測成功的連接,其他的失敗和完成的都會置為null。這里從全局看就是一個地址池的淘汰篩選機制。在三段處理的for循環(huán)中清除不合格的連接,挑出第一個找到的合格的連接,然后跳出三段后,立刻檢查整個數組是否已經就剩這一個可用了,如果不是繼續(xù)執(zhí)行do while,又會去執(zhí)行數組中的每個item的連接過程,再回到三段處理。也就是說所有的數組中的item都會連接一次,然后根據返回的狀態(tài)決定是否命中最終的一個socket。這是干嘛呢這么繞?我之前的理解恐怕還不透徹,現在感覺是在找一個穩(wěn)定的可以讀寫狀態(tài)的連接。
第一次進入do while已經連接所有池中的item了,那么在經過了三段處理后淘汰掉不合適的和失敗的,然后再進入do while再次執(zhí)行vecsocketfsm[i]->PreSelect(sel, group);的時候,已經更新了狀態(tài)并執(zhí)行了不同的調用了,再經過三段處理在新的狀態(tài)下再淘汰一批,最后經過整個運轉,留下來的只能是最持久的(穩(wěn)定的)唯一的一個連接,返回這個。
不得不說,這里確實巧妙,如果我寫并不會比這要好。

我們回來到longlink.cc的線程函數__Run中,當連接處理完畢后,下面繼續(xù)執(zhí)行的是__RunReadWrite。我們來看看:

void LongLink::__RunReadWrite(SOCKET _sock, ErrCmdType& _errtype, int& _errcode, ConnectProfile& _profile) {
    // Alarm消息觸發(fā)處理綁定在__OnAlarm上
    Alarm alarmnoopinterval(boost::bind(&LongLink::__OnAlarm, this), false);
    Alarm alarmnooptimeout(boost::bind(&LongLink::__OnAlarm, this), false);
}

首先是2個Alarm,這里要理解就需要看看這個Alarm是個什么東西:
/mars-master/mars/comm/alarm.h

    template
    explicit Alarm(const T& _op, bool _inthread = true)
        : target_(detail::transform(_op))
        , reg_async_(MessageQueue::InstallAsyncHandler(MessageQueue::GetDefMessageQueue()))
        , runthread_(boost::bind(&Alarm::__Run, this), "alarm")
        , inthread_(_inthread)
        , seq_(0), status_(kInit)
        , after_(0) , starttime_(0) , endtime_(0)
        , reg_(MessageQueue::InstallMessageHandler(boost::bind(&Alarm::OnAlarm, this, _1, _2), true))
#ifdef ANDROID
        , wakelock_(NULL)
#endif
    {}

構造函數。這里需要逐句分析,首先是target_(detail::transform(_op))。簡單看是個賦值語句,后面的參數需要看這個:
/mars-master/mars/comm/thread/runnable.h

// base template for no argument functor
template 
struct TransformImplement {
    static Runnable* transform(const T& t) {
        return new RunnableFunctor(t);
    }
};

template 
inline Runnable* transform(const T& t) {
    return TransformImplement::transform(t);
}

1.這里使用的是c++魔板,直接new了一個RunnableFunctor對象,這個對象是個runnable,其實就是將這個傳遞進來的參數t包裝成了一個runnable,在適當的時候調用他的run方法的時候就會調用這個t了。那么帶入到具體的內容中,這個t是_op,就是boost::bind(&LongLink::__OnAlarm, this)。這里又使用了c++的boost庫,做了bind操作,綁定了參數this也就是LongLink與函數體LongLink::__OnAlarm。好了,現在target_是個包裝好的runnable了,在適當的時候可以執(zhí)行LongLink::__OnAlarm。

2.reg_async_(MessageQueue::InstallAsyncHandler(MessageQueue::GetDefMessageQueue()))。首先看MessageQueue::InstallAsyncHandler:
/mars-master/mars/comm/messagequeue/message_queue.cc

MessageHandler_t InstallAsyncHandler(const MessageQueue_t& id) {
    ASSERT(0 != id);
    return InstallMessageHandler(__AsyncInvokeHandler, false, id);
}

MessageHandler_t InstallMessageHandler(const MessageHandler& _handler, bool _recvbroadcast, const MessageQueue_t& _messagequeueid) {
    ASSERT(bool(_handler));

    ScopedLock lock(sg_messagequeue_map_mutex);
    const MessageQueue_t& id = _messagequeueid;

    if (sg_messagequeue_map.end() == sg_messagequeue_map.find(id)) {
        ASSERT2(false, "%" PRIu64, id);
        return KNullHandler;
    }

    HandlerWrapper* handler = new HandlerWrapper(_handler, _recvbroadcast, _messagequeueid, __MakeSeq());
    sg_messagequeue_map[id].lst_handler.push_back(handler);
    return handler->reg;
}

struct HandlerWrapper {
    HandlerWrapper(const MessageHandler& _handler, bool _recvbroadcast, const MessageQueue_t& _messagequeueid, unsigned int _seq)
        : handler(_handler), recvbroadcast(_recvbroadcast) {
        reg.seq = _seq;
        reg.queue = _messagequeueid;
    }

    MessageHandler_t reg;
    MessageHandler handler;
    bool recvbroadcast;
};

生成了一個HandlerWrapper,并將其保留在了一個map中,隨后返回了MessageHandler_t,其中保存了_seq與_messagequeueid。這里我的感覺是這個handler就是個類似句柄的東西,保存MessageHandler的一個關聯關系,即消息隊列與seq碼(這里是個自增的靜態(tài)變量)。實際上調用者只要有這個MessageHandler_t就可以了。最后將這個MessageHandler_t賦值給了reg_async_。這里又有一個對象ScopeRegister是個MessageHandler_t的包裝對象,里面統(tǒng)一封裝了方法來操作MessageHandler_t。

3.runthread_(boost::bind(&Alarm::__Run, this), "alarm")。一個線程對象,線程函數是Alarm::__Run。沒事什么好解釋的。

4.inthread_(_inthread), seq_(0), status_(kInit), after_(0) , starttime_(0) , endtime_(0)。都是簡單賦值,暫時不去管它。

5.reg_(MessageQueue::InstallMessageHandler(boost::bind(&Alarm::OnAlarm, this, _1, _2), true))。類似2。

好了,這個Alarm可以看做是個消息處理,在有消息觸發(fā)的情況下會調用到具體的函數中,一般是__OnAlarm。

回到__RunReadWrite,往下看。首先是個while的死循環(huán),我們多帶帶摘錄如下:

    while (true) {
        ......
        if (!alarmnoopinterval.IsWaiting()) {
            ......
            if (__NoopReq(noop_xlog, alarmnooptimeout, has_late_toomuch)) {
                is_noop = true;
                __NotifySmartHeartbeatHeartReq(_profile, last_noop_interval, last_noop_actual_interval);
            }
            ......
        }
        
        ......
        // socket處理
        SocketSelect sel(readwritebreak_, true);
        sel.PreSelect();
        sel.Read_FD_SET(_sock);
        sel.Exception_FD_SET(_sock);
        
        ScopedLock lock(mutex_);
        
        if (!lstsenddata_.empty()) sel.Write_FD_SET(_sock);
        
        lock.unlock();
        
        int retsel = sel.Select(10 * 60 * 1000);
        ......
        // 處理發(fā)送(寫入)
        if (sel.Write_FD_ISSET(_sock) && !lstsenddata_.empty()) {
            ......
            ssize_t writelen = ::send(_sock, lstsenddata_.begin()->data.PosPtr(), lstsenddata_.begin()->data.PosLength(), 0);
            ......
            while (it != lstsenddata_.end() && 0 < writelen) {
                if (0 == it->data.Pos()) OnSend(it->taskid);
                
                if ((size_t)writelen >= it->data.PosLength()) {
                    xinfo2(TSF"sub send taskid:%_, cmdid:%_, %_, len(S:%_, %_/%_), ", it->taskid, it->cmdid, it->task_info, it->data.PosLength(), it->data.PosLength(), it->data.Length()) >> xlog_group;
                    writelen -= it->data.PosLength();
                    if (!it->task_info.empty()) sent_taskids[it->taskid] = it->task_info;
                    LongLinkNWriteData nwrite(it->taskid, it->data.PosLength(), it->cmdid, it->task_info);
                    nsent_datas.push_back(nwrite);
                    
                    it = lstsenddata_.erase(it);
                } else {
                    xinfo2(TSF"sub send taskid:%_, cmdid:%_, %_, len(S:%_, %_/%_), ", it->taskid, it->cmdid, it->task_info, writelen, it->data.PosLength(), it->data.Length()) >> xlog_group;
                    it->data.Seek(writelen, AutoBuffer::ESeekCur);
                    writelen = 0;
                }
            }
            
        }
        
        ......
        // 處理接收(讀取)
        if (sel.Read_FD_ISSET(_sock)) {
            bufrecv.AllocWrite(64 * 1024, false);
            ssize_t recvlen = recv(_sock, bufrecv.PosPtr(), 64 * 1024, 0);
            ......
            while (0 < bufrecv.Length()) {
                uint32_t cmdid = 0;
                uint32_t taskid = Task::kInvalidTaskID;
                size_t packlen = 0;
                AutoBuffer body;
                
                int unpackret = longlink_unpack(bufrecv, cmdid, taskid, packlen, body);
                
                if (LONGLINK_UNPACK_FALSE == unpackret) {
                    xerror2(TSF"task socket recv sock:%0, unpack error dump:%1", _sock, xdump(bufrecv.Ptr(), bufrecv.Length()));
                    _errtype = kEctNetMsgXP;
                    _errcode = kEctNetMsgXPHandleBufferErr;
                    goto End;
                }
                
                xinfo2(TSF"task socket recv sock:%_, pack recv %_ taskid:%_, cmdid:%_, %_, packlen:(%_/%_)", _sock, LONGLINK_UNPACK_CONTINUE == unpackret ? "continue" : "finish", taskid, cmdid, sent_taskids[taskid], LONGLINK_UNPACK_CONTINUE == unpackret ? bufrecv.Length() : packlen, packlen);
                lastrecvtime_.gettickcount();
                
                if (LONGLINK_UNPACK_CONTINUE == unpackret) {
                    OnRecv(taskid, bufrecv.Length(), packlen);
                    break;
                } else {
                    
                    sent_taskids.erase(taskid);
                    
                    bufrecv.Move(-(int)(packlen));
                    
                    if (__NoopResp(cmdid, taskid, body, alarmnooptimeout, _profile)) {
                        xdebug2(TSF"noopresp span:%0", alarmnooptimeout.ElapseTime());
                        is_noop = false;
                    } else {
                        OnResponse(kEctOK, 0, cmdid, taskid, body, _profile);
                    }
                }
            }
        }
    }

// 收尾,整個looper退出
End:
    

從while中的代碼能夠看出,基本上就是上面摘錄的幾塊,如下所示:
1.__NoopReq調用,無數據狀態(tài)處理;
2.socket的select處理;
3.處理發(fā)送寫入部分;
4.處理接收讀取部分;

這里需要逐個分析了:
1.__NoopReq:
先看代碼,并不長:

bool LongLink::__NoopReq(XLogger& _log, Alarm& _alarm, bool need_active_timeout) {
    AutoBuffer buffer;
    uint32_t req_cmdid = 0;
    bool suc = false;
    
    if (identifychecker_.GetIdentifyBuffer(buffer, req_cmdid)) {
        suc = Send((const unsigned char*)buffer.Ptr(), (int)buffer.Length(), req_cmdid, Task::kLongLinkIdentifyCheckerTaskID);
        identifychecker_.SetSeq(Task::kLongLinkIdentifyCheckerTaskID);
        xinfo2(TSF"start noop synccheck taskid:%0, cmdid:%1, ", Task::kLongLinkIdentifyCheckerTaskID, req_cmdid) >> _log;
    } else {
        AutoBuffer body;
        longlink_noop_req_body(body);
        suc = SendWhenNoData((const unsigned char*) body.Ptr(), body.Length(), longlink_noop_cmdid(), Task::kNoopTaskID);
        xinfo2(TSF"start noop taskid:%0, cmdid:%1, ", Task::kNoopTaskID, longlink_noop_cmdid()) >> _log;
    }
    
    if (suc) {
        _alarm.Cancel();
        _alarm.Start(need_active_timeout ? (2* 1000) : (10 * 1000));
    } else {
        xerror2("send noop fail");
    }
    
    return suc;
}

說實話,這里看的不是很清晰 ,因為之前肯定有忽略的部分,我的猜測是在走了一個發(fā)送信令的校驗后,根據返回的值的不同決定是執(zhí)行send發(fā)送數據(使用校驗填充好的buffer),還是走SendWhenNoData發(fā)送(自行填充請求體)沒有數據的情況。暫時先往下看一步,看看Send:

bool LongLink::__Send(const unsigned char* _pbuf, size_t _len, uint32_t _cmdid, uint32_t _taskid, const std::string& _task_info) {
    lstsenddata_.push_back(LongLinkSendData());

    lstsenddata_.back().cmdid = _cmdid;
    lstsenddata_.back().taskid = _taskid;
    longlink_pack(_cmdid, _taskid, _pbuf, _len, lstsenddata_.back().data);
    lstsenddata_.back().data.Seek(0, AutoBuffer::ESeekStart);
    lstsenddata_.back().task_info = _task_info;

    readwritebreak_.Break();
    return true;
}

這里能夠清晰的看到,在使用lstsenddata_這個隊列,來進行發(fā)送的請求,實際上就是向隊列中增加一項。那么現在的問題就在于這個發(fā)送的數據時怎么來的了。這就需要我們弄懂LongLinkIdentifyChecker這個玩意兒。
/mars-master/mars/stn/src/longlink_identify_checker.cc

bool LongLinkIdentifyChecker::GetIdentifyBuffer(AutoBuffer &_buffer, uint32_t &_cmdid)
{
    if (has_checked_) return false;
    
    hash_code_buffer_.Reset();
    _buffer.Reset();

    IdentifyMode mode = (IdentifyMode)GetLonglinkIdentifyCheckBuffer(_buffer, hash_code_buffer_, (int&)_cmdid);

    switch (mode)
    {
    case kCheckNever:
        {
            has_checked_ = true;
        }
        break;
    case kCheckNext:
        {
            has_checked_ = false;
        }
        break;
    case kCheckNow:
        {
            cmd_id_ = _cmdid;
            return true;
        }
        break;
    default:
        xassert2(false);
    }
    
    return false;
}

調用了GetLonglinkIdentifyCheckBuffer,我們追溯到stn_logic.cc中:

    int  GetLonglinkIdentifyCheckBuffer(AutoBuffer& identify_buffer, AutoBuffer& buffer_hash, int32_t& cmdid) {
        xassert2(sg_callback != NULL);
        return sg_callback->GetLonglinkIdentifyCheckBuffer(identify_buffer, buffer_hash, cmdid);
    }

實際上是對sg_callback這個回調的調用。最終我找到的線索是在MarsServiceNative.java上層的onCreate中設置了回調:

        // set callback
        AppLogic.setCallBack(stub);
        StnLogic.setCallBack(stub);
        SdtLogic.setCallBack(stub);

再接著找到了MarsServiceStub.java中的getLongLinkIdentifyCheckBuffer:

    @Override
    public int getLongLinkIdentifyCheckBuffer(ByteArrayOutputStream identifyReqBuf, ByteArrayOutputStream hashCodeBuffer, int[] reqRespCmdID) {
        // Send identify request buf to server
        // identifyReqBuf.write();

        return ECHECK_NEVER;
    }

返回的是ECHECK_NEVER,沒有填充buffer。也即是說has_checked_ = true,然后返回false。其實看到這一刻我是崩潰的,真心不知道是想干嘛。我們只能這么理解,只要進入__NoopReq其實都是在走SendWhenNoData發(fā)送無數據狀態(tài)。好吧,我們重新回到__RunReadWrite中看一下。每次在while循環(huán)中一上來只要不是alarmnoopinterval正在等待的狀態(tài),那么就走一個發(fā)送無數據狀態(tài)。看看發(fā)送無數據的代碼:

bool LongLink::SendWhenNoData(const unsigned char* _pbuf, size_t _len, uint32_t _cmdid, uint32_t _taskid) {
    ScopedLock lock(mutex_);

    if (kConnected != connectstatus_) return false;
    if (!lstsenddata_.empty()) return false;

    return __Send(_pbuf, _len, _cmdid, _taskid, "");
}

其實是檢查lstsenddata_是否有內容,如果沒有才發(fā)送。那么好吧,這里整體理解就是每次whie循環(huán)開始都會檢查如果發(fā)送隊列中沒有數據的時候,發(fā)送一個特定的無數據狀態(tài)來確認連接。但是這里寫的比較復雜,還需要回調回上層java的代碼中,讓其來控制狀態(tài),從而根據狀態(tài)控制流程,只能說考慮的很周全,任何情況在任何節(jié)點都可以有處理。吐槽下如果我們自己寫來規(guī)劃這部分的時候大多數時候都是最對無數據檢測放在下層,然后直接就發(fā)送了,不會讓上層這里進行什么干涉吧。其實這里還有些點沒有詳細的分析很清楚,原諒文章有限,畢竟不能偏離主線太多。

2.socket的select操作。
這里倒沒什么可說的,前面的設置,為后面的sel.Select(10 60 1000)做準備,內部采用poll來運作。

3.發(fā)送過程。
先是判斷如果發(fā)送隊列里面有內容,執(zhí)行下面的::send(_sock, lstsenddata_.begin()->data.PosPtr(), lstsenddata_.begin()->data.PosLength(), 0)。這里注意,參數給定的是隊列的第一個的data,也就是說這里是取出第一個執(zhí)行發(fā)送。
下面就是一個while循環(huán),將發(fā)送隊列過了一遍。如果剛才發(fā)送的數據大小與待發(fā)送的實際數據長度相等,那么認為是發(fā)送完了這一個,從隊列中移除這一個,然后下一次while會自動取下一個了。如果沒有;認為是沒發(fā)完,位移數據,下次while仍然獲取到這個item,但是數據位移已經變了,因此繼續(xù)發(fā)送下面的數據。經過這個while之后,所有的發(fā)送隊列中的數據都應當發(fā)送完畢了。

4.接收過程。
前面沒什么好說的,無非是開辟buffer空間,然后執(zhí)行recv調用。之后進入一個while循環(huán),條件是讀取的buffer有數據。
首先走一個解包調用,內部走的是__unpack_test,具體內容就不貼了,我簡單看了下,基本上就是解開頭部,頭部的信息標識了本次傳遞的基本信息,包括了版本號等內容,一個結構體,還是比較標準的。這里是嘗試解包,如果本次接收到的大小連頭部都不夠,那肯定返回錯誤,需要繼續(xù)接收了。那么從這個能夠看出,每次傳遞的數據都是帶有一個頭部的__STNetMsgXpHeader。這東西里面塞入的內容可以和客戶端的版本,當前這個信令的id等關聯起來。
再下去看到的就是對解包返回值的判斷了,如果一切順利,就走到sent_taskids.erase(taskid);這里需要注意,這個sent_taskids是個發(fā)送的taskid的map,這里可以推測發(fā)送和接受其實是關聯的,這里接收完畢移除這個保留項。然后走的__NoopResp這個調用。如果返回false表示不是空的信令返回,那么就走OnResponse。這個函數實際上是在LongLinkTaskManager中綁定了longlink_->OnResponse = boost::bind(&LongLinkTaskManager::__OnResponse, this, _1, _2, _3, _4, _5, _6);綁定在了LongLinkTaskManager::__OnResponse這里。

void LongLinkTaskManager::__OnResponse(ErrCmdType _error_type, int _error_code, uint32_t _cmdid, uint32_t _taskid, AutoBuffer& _body, const ConnectProfile& _connect_profile) {
    copy_wrapper body(_body);
    RETURN_LONKLINK_SYNC2ASYNC_FUNC(boost::bind(&LongLinkTaskManager::__OnResponse, this, _error_type, _error_code, _cmdid, _taskid, body, _connect_profile));

    ......
    
    int err_code = 0;
    int handle_type = Buf2Resp(it->task.taskid, it->task.user_context, body, err_code, Task::kChannelLong);
    
    switch(handle_type){
        case kTaskFailHandleNoError:
        {
            dynamic_timeout_.CgiTaskStatistic(it->task.cgi, (unsigned int)it->transfer_profile.send_data_size + (unsigned int)body->Length(), ::gettickcount() - it->transfer_profile.start_send_time);
            __SingleRespHandle(it, kEctOK, err_code, handle_type, _connect_profile);
            xassert2(fun_notify_network_err_);
            fun_notify_network_err_(__LINE__, kEctOK, err_code, _connect_profile.ip, _connect_profile.port);
        }
            break;
        ......
    }

}

其實就2件事,通過Buf2Resp底層回包返回給上層解析。如果沒有錯誤kTaskFailHandleNoError,會執(zhí)行__SingleRespHandle:

bool LongLinkTaskManager::__SingleRespHandle(std::list::iterator _it, ErrCmdType _err_type, int _err_code, int _fail_handle, const ConnectProfile& _connect_profile) {
    ......
    int cgi_retcode = fun_callback_(_err_type, _err_code, _fail_handle, _it->task, (unsigned int)(curtime - _it->start_task_time));
    ......
}

這里的關鍵點就這一個,調用回調,回調的綁定在net_core.cc中的NetCore構造里,longlink_task_manager_->fun_callback_ = boost::bind(&NetCore::__CallBack, this, (int)kCallFromLong, _1, _2, _3, _4, _5);,最終執(zhí)行的是NetCore::__CallBack:

int NetCore::__CallBack(int _from, ErrCmdType _err_type, int _err_code, int _fail_handle, const Task& _task, unsigned int _taskcosttime) {

    if (task_callback_hook_ && 0 == task_callback_hook_(_from, _err_type, _err_code, _fail_handle, _task)) {
        xwarn2(TSF"task_callback_hook let task return. taskid:%_, cgi%_.", _task.taskid, _task.cgi);
        return 0;
    }

    if (kEctOK == _err_type || kTaskFailHandleTaskEnd == _fail_handle)
        return OnTaskEnd(_task.taskid, _task.user_context, _err_type, _err_code);

    if (kCallFromZombie == _from) return OnTaskEnd(_task.taskid, _task.user_context, _err_type, _err_code);

#ifdef USE_LONG_LINK
    if (!zombie_task_manager_->SaveTask(_task, _taskcosttime))
#endif
        return OnTaskEnd(_task.taskid, _task.user_context, _err_type, _err_code);

    return 0;
}

看到了吧,走了OnTaskEnd,任務結束。

此文從中間部分開始粗糙了,前面鋪墊的東西后面沒有講到,心不靜的時候分析東西效果確實不大好。總而言之既然堅持寫完了,這里還是留個記錄吧,日后有機會的時候會回顧把這部分完善好。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://systransis.cn/yun/70720.html

相關文章

  • 微信開源mars源碼分析2—上層samples分析續(xù)

    摘要:本來是想直接深入到的核心層去看的,但是發(fā)現其實上面的部分還有好些沒有分析到,因此回來繼續(xù)分析。另外一個,是專用于統(tǒng)計的,我們暫時不去關注。具體的內容我會在后面的核心層分析的時候指出。準備下一篇進行的核心層分析吧。 本來是想直接深入到mars的核心層去看的,但是發(fā)現其實上面的samples部分還有好些沒有分析到,因此回來繼續(xù)分析。ConversationActivity這個類中實際上還做...

    MyFaith 評論0 收藏0
  • 微信開源mars源碼分析1—上層samples分析

    摘要:微信已經開源了,但是市面上相關的文章較少,即使有也是多在于使用等這些,那么這次我希望能夠從這個直接用于底層通訊的部分進行個分析。首先明確下,微信用了的開源協議庫,來代替和。核心的部分我們先放下,下一篇再深入分析。 微信已經開源了mars,但是市面上相關的文章較少,即使有也是多在于使用xlog等這些,那么這次我希望能夠從stn這個直接用于im底層通訊的部分進行個分析。為了能分析的全面些,...

    caiyongji 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<