成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Swoft 源碼剖析 - RPC 功能實(shí)現(xiàn)

marser / 3133人閱讀

摘要:值得一提的是目前的服務(wù)即服務(wù),暫沒(méi)有其他的服務(wù)功能,所以基本上相關(guān)的配置指代的就是。會(huì)將請(qǐng)求傳遞給各個(gè)中間件,最終最終傳遞給處理。源碼剖析系列目錄

作者:bromine
鏈接:https://www.jianshu.com/p/411...
來(lái)源:簡(jiǎn)書(shū)
著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對(duì)原文進(jìn)行了重新的排版。
Swoft Github: https://github.com/swoft-clou...

前言

Swoft提供了一個(gè)自建RPC(遠(yuǎn)程方法調(diào)用)實(shí)現(xiàn),讓你可以方便的調(diào)用其他Swoft上的服務(wù)。

RPC服務(wù)端的初始化

RPC有兩種啟動(dòng)方式Http伴隨啟動(dòng)和RPC多帶帶啟動(dòng)。值得一提的是目前swoole的tcp服務(wù)即RPC服務(wù),暫沒(méi)有其他的tcp服務(wù)功能,所以基本上tcp相關(guān)的配置指代的就是RPC。

Http伴隨啟動(dòng)

Swoft 的 RPC 服務(wù)在Http服務(wù)啟動(dòng)時(shí)候伴隨啟動(dòng)

//SwoftHttpServerHttpHttpServer.php
/**
 * Http Server
 */
class HttpServer extends AbstractServer
    /**
     * Start Server
     *
     * @throws SwoftExceptionRuntimeException
     */
    public function start()
    {
        //code ...

        //根據(jù).env配置文件Server區(qū)段的TCPABLE字段決定是否啟動(dòng)RPC服務(wù)
        if ((int)$this->serverSetting["tcpable"] === 1) {
            $this->registerRpcEvent();
        }
        //code ....
    }
}
Swoole監(jiān)聽(tīng)

初始化流程即根據(jù)相關(guān)注解注冊(cè)一個(gè)swoole監(jiān)聽(tīng)

//SwoftHttpServerHttpHttpServer.php
/**
 * Register rpc event, swoft/rpc-server required
 *
 * @throws SwoftExceptionRuntimeException
 */
protected function registerRpcEvent()
{
    //含有@SwooleListener且type為SwooleEvent::TYPE_PORT的Bean,即RpcEventListener
    $swooleListeners = SwooleListenerCollector::getCollector();
    if (!isset($swooleListeners[SwooleEvent::TYPE_PORT][0]) || empty($swooleListeners[SwooleEvent::TYPE_PORT][0])) {
        throw new RuntimeException("Please use swoft/rpc-server, run "composer require swoft/rpc-server"");
    }

    //添加swoole RPC相關(guān)的tcp監(jiān)聽(tīng)端口,使用的是.env文件中的TCP區(qū)段配置
    $this->listen = $this->server->listen($this->tcpSetting["host"], $this->tcpSetting["port"], $this->tcpSetting["type"]);
    $tcpSetting = $this->getListenTcpSetting();
    $this->listen->set($tcpSetting);

    //根據(jù)RpcEventListener的相關(guān)注解添加監(jiān)聽(tīng)處理句柄
    $swooleRpcPortEvents = $swooleListeners[SwooleEvent::TYPE_PORT][0];
    $this->registerSwooleEvents($this->listen, $swooleRpcPortEvents);
}

由于是初版,根據(jù)@SwooleListener獲取RPC監(jiān)聽(tīng)Bean的相關(guān)處理暫時(shí)還有點(diǎn)生硬。
目前swoft中type為SwooleEvent::TYPE_PORT@SwooleListener只有RpcEventListener一個(gè),如果添加了同類Bean容易出問(wèn)題,穩(wěn)定版出的時(shí)候應(yīng)該會(huì)有相關(guān)優(yōu)化。

RPC多帶帶啟動(dòng)

入口從SwoftHttpServerCommandServerCommand換成SwoftRpcServerCommandRpcCommand,流程和Http大同小異,就是swoole的設(shè)定監(jiān)聽(tīng),僅僅是少了HTTP相關(guān)的監(jiān)聽(tīng)接口和事件而已,此處不再贅述。

RPC請(qǐng)求處理

RPC服務(wù)器和HTTP服務(wù)器的區(qū)別僅僅在于與客戶端交互報(bào)文格式和報(bào)文所在的網(wǎng)絡(luò)層(Swoft的RPC基于TCP層次),運(yùn)行原理基本相通,都是路由,中間件,RPC Service(對(duì)應(yīng)Http的Controller),你完全可以以Http服務(wù)的思路去理解他。

Swoole的RPC-TCP監(jiān)聽(tīng)設(shè)置好后,RPC服務(wù)端就可以開(kāi)始接受請(qǐng)求了。RpcEventListener的負(fù)責(zé)的工作僅僅是把收到的數(shù)據(jù)轉(zhuǎn)發(fā)給SwoftRpcServerServiceDispatcher分發(fā)。Dispatcher會(huì)將請(qǐng)求傳遞給各個(gè)Middleware中間件,最終最終傳遞給HandlerAdapterMiddleware處理。

PackerMiddleware

PackerMiddleware 是RPC中比較重要的一個(gè)中間件,負(fù)責(zé)將TCP請(qǐng)求中數(shù)據(jù)流解包和數(shù)據(jù)流封包。

getAttribute(self::ATTRIBUTE_DATA);
        $data   = $packer->unpack($data);

        // 觸發(fā)一個(gè)RpcServerEvent::BEFORE_RECEIVE事件,默認(rèn)只有一個(gè)用于添加請(qǐng)求上下文信息的BeforeReceiveListener
        // 利用中間件觸發(fā)流程關(guān)鍵事件的做法耦合有點(diǎn)高,猜測(cè)以后會(huì)調(diào)整
        App::trigger(RpcServerEvent::BEFORE_RECEIVE, null, $data);
        //替換解包后的解包到Request中,提供給后續(xù)中間件和Handler使用
        $request = $request->withAttribute(self::ATTRIBUTE_DATA, $data);

        /* @var SwoftRpcServerRpcResponse $response */
        $response      = $handler->handle($request);

       //為Response封包返回給RPC客戶端
        $serviceResult = $response->getAttribute(HandlerAdapter::ATTRIBUTE);
        $serviceResult = $packer->pack($serviceResult);
        return $response->withAttribute(HandlerAdapter::ATTRIBUTE, $serviceResult);
    }
}
RouterMiddleware

RouterMiddleware負(fù)責(zé)根據(jù)RPC請(qǐng)求的method,version,interface 獲取處理的RPC服務(wù)類,充當(dāng)了路由的作用

getAttribute(PackerMiddleware::ATTRIBUTE_DATA);

        $method    = $data["method"]??"";
        $version   = $data["version"]??"";
        $interface = $data["interface"]??"";

        /* @var SwoftRpcServerRouterHandlerMapping $serviceRouter */
        $serviceRouter  = App::getBean("serviceRouter");
        //路由匹配,即向SwoftRpcServerRouterHandlerMapping->$routes獲取RPC服務(wù)信息
        $serviceHandler = $serviceRouter->getHandler($interface, $version, $method);

        // deliver service data
        $request = $request->withAttribute(self::ATTRIBUTE, $serviceHandler);

        return $handler->handle($request);
    }
}

Swoft啟動(dòng)階段會(huì)掃描并初始化注解信息(參考注解章節(jié)),注解初始化完畢后會(huì)觸發(fā)一個(gè)AppEvent::APPLICATION_LOADER事件,此時(shí)會(huì)將來(lái)自@Service的所有RPC的路由信息注冊(cè)到SwoftRpcServerRouterHandlerMapping->$routes中,用于serviceRouter Bean的路由匹配。

HandlerAdapterMiddleware

HandlerAdapterMiddleware最終轉(zhuǎn)發(fā)請(qǐng)求給HandlerAdapter處理,HandlerAdapter會(huì)使用剛剛RouterMiddleware匹配到的服務(wù)類信息轉(zhuǎn)發(fā)請(qǐng)求并封裝Response最終返回給ServiceDispatcher,ServiceDispatcher會(huì)返回TCP流給客戶端然后結(jié)束本次請(qǐng)求。

getAttribute(PackerMiddleware::ATTRIBUTE_DATA);
        $params = $data["params"] ?? [];
        
        //路由解析出來(lái)的,處理該請(qǐng)求的服務(wù)Bean和方法
        list($serviceClass, $method) = $handler;
        $service = App::getBean($serviceClass);

        // execute handler with params
        $response = PhpHelper::call([$service, $method], $params);
        $response = ResponseHelper::formatData($response);

        // 構(gòu)造Response返回客戶端
        if (! $response instanceof Response) {
            $response = (new Response())->withAttribute(self::ATTRIBUTE, $response);
        }

        return $response;
    }
}
RPC客戶端的實(shí)現(xiàn)

在Bean的屬性中聲明@Reference,Swoft即會(huì)根據(jù)@var聲明的類型注入相應(yīng)的RPC客戶端實(shí)例。

/**
 * @Reference(name="useraaaaaa")
 *
 * @var DemoInterface
 */
private $demoService;

依賴注入的實(shí)現(xiàn)會(huì)專門(mén)另外用一篇文章多帶帶解釋,這里先看看RPC客戶端的相關(guān)代碼。

遠(yuǎn)程代理
namespace SwoftRpcClientService;

/**
 * The proxy of service
 */
class ServiceProxy
{
    /**
     * @param string $className
     * @param string $interfaceClass
     */
    public static function loadProxyClass(string $className, string $interfaceClass)
    {
        $reflectionClass   = new ReflectionClass($interfaceClass);
        $reflectionMethods = $reflectionClass->getMethods(ReflectionMethod::IS_PUBLIC);

        $template = "class $className extends SwoftRpcClientService implements {$interfaceClass} {";

        //SwoftRpcClientService::class
        // the template of methods
        $template .= self::getMethodsTemplate($reflectionMethods);
        $template .= "}";
            
        eval($template);
    }
    //code ...
}

和AOP一樣,原理一樣是使用了動(dòng)態(tài)代理,更具體的說(shuō)法是動(dòng)態(tài)遠(yuǎn)程代理
RPC動(dòng)態(tài)客戶端類實(shí)現(xiàn)了客戶端聲明的Interface類型(如DemoInterface)并繼承了SwoftRpcClientService類。
動(dòng)態(tài)類的實(shí)現(xiàn)很簡(jiǎn)單,對(duì)于接口顯式聲明的方法,實(shí)際上都是調(diào)用SwoftRpcClientService->call()方法。

interface DemoInterface
{
    /**
     * @param array $ids
     * @return array
     */
    public function getUsers(array $ids);
}
class 動(dòng)態(tài)生成RPC客戶端類 extends SwoftRpcClientService implements AppLibDemoInterface { 
    public function getUsers ( array  $ids  ) {
        $params = func_get_args();
        return $this->call("getUsers", $params);
    }
    //code ...
}

對(duì)于自動(dòng)生成的defer方法,則是通過(guò)魔術(shù)方法__call(),調(diào)用SwoftRpcClientService->deferCall()

/**
 * @param string $name
 * @param array  $arguments
 *
 * @return ResultInterface
 * @throws RpcClientException
 */
function __call(string $name, array $arguments)
{
    $method = $name;
    $prefix = self::DEFER_PREFIX;//"defer"
    if (strpos($name, $prefix) !== 0) {
        throw new RpcClientException(sprintf("the method of %s is not exist! ", $name));
    }

    if ($name == $prefix) {
        $method = array_shift($arguments);
    } elseif (strpos($name, $prefix) === 0) {
        $method = lcfirst(ltrim($name, $prefix));
    }

    return $this->deferCall($method, $arguments);
}

我們這里只看具有代表性的call()方法,deferCall()大致相同。
RPC客戶端動(dòng)態(tài)類的本質(zhì)是將客戶端的參數(shù)和接口信息根據(jù)Swoft自己的格式傳遞給RPC服務(wù)端,然后將服務(wù)器返回的數(shù)據(jù)解包取出返回值返回給RPC的調(diào)用者,對(duì)外偽裝成一個(gè)普通的對(duì)象,屏蔽遠(yuǎn)程調(diào)用操作。

// SwoftRpcClientService.php
/**
 * Do call service
 *
 * @param string $func
 * @param array  $params
 *
 * @throws Throwable
 * @return mixed
 */
public function call(string $func, array $params)
{
    $profileKey = $this->interface . "->" . $func;
    //根據(jù)@reference的fallback屬性獲取降級(jí)處理句柄,在RPC服務(wù)調(diào)用失敗的時(shí)候可以會(huì)使用fallback句柄代替
    $fallback   = $this->getFallbackHandler($func);
    try {
        $connectPool    = $this->getPool();
        $circuitBreaker = $this->getBreaker();

        /* @var $client AbstractServiceConnection */
        $client = $connectPool->getConnection();
        //數(shù)據(jù)封包,和RPC服務(wù)端一致
        $packer   = service_packer();
        $type     = $this->getPackerName();
        $data     = $packer->formatData($this->interface, $this->version, $func, $params);
        $packData = $packer->pack($data, $type);

        //通過(guò)熔斷器調(diào)用接口
        $result = $circuitBreaker->call([$client, "send"], [$packData], $fallback);
        if ($result === null || $result === false) {
            return null;
        }

        //和defercall不一致這里直接收包,解包
        App::profileStart($profileKey);
        $result = $client->recv();
        App::profileEnd($profileKey);
        $connectPool->release($client);

        App::debug(sprintf("%s call %s success, data=%", $this->interface, $func, json_encode($data, JSON_UNESCAPED_UNICODE)));
        $result = $packer->unpack($result);
        $data   = $packer->checkData($result);
    } catch (Throwable $throwable) {
        if (empty($fallback)) {
            throw $throwable;
        }
        //RPC調(diào)用失敗則調(diào)用降級(jí)句柄,代替實(shí)際RPC服務(wù)直接返回
        $data = PhpHelper::call($fallback, $params);
    }

    return $data;
}
熔斷器

熔斷器的swoft-RPC的另一重要概念,RPC的所有請(qǐng)求都通過(guò)熔斷器發(fā)送。
熔斷器使用狀態(tài)模式實(shí)現(xiàn),熔斷器有開(kāi)啟,半開(kāi),關(guān)閉 3種狀態(tài),不同狀態(tài)下熔斷器會(huì)持有不同的狀態(tài)實(shí)例,狀態(tài)根據(jù)RPC調(diào)用情況切換,熔斷器根據(jù)持有狀態(tài)實(shí)例的不同,行為也有所不同。

熔斷器關(guān)閉狀態(tài)策略
circuitBreaker->serviceName . "服務(wù),連接建立失敗(null)");
            }

            if ($class instanceof Client && $class->isConnected() == false) {
                throw new Exception($this->circuitBreaker->serviceName . "服務(wù),當(dāng)前連接已斷開(kāi)");
            }
            //調(diào)用swoole協(xié)程客戶端的send()方法發(fā)送數(shù)據(jù)
            $data = $class->$method(...$params);
        } catch (Exception $e) {
            //遞增失敗計(jì)數(shù)
            if ($this->circuitBreaker->isClose()) {
                $this->circuitBreaker->incFailCount();
            }

            App::error($this->circuitBreaker->serviceName . "服務(wù),當(dāng)前[關(guān)閉狀態(tài)],服務(wù)端調(diào)用失敗,開(kāi)始服務(wù)降級(jí)容錯(cuò)處理,error=" . $e->getMessage());
            //RPC調(diào)用失敗則使用降級(jí)接口 
            $data = $this->circuitBreaker->fallback($fallback);
        }
        
        //失敗次數(shù)過(guò)線則切換狀態(tài)
        $failCount = $this->circuitBreaker->getFailCounter();
        $switchToFailCount = $this->circuitBreaker->getSwitchToFailCount();
        if ($failCount >= $switchToFailCount && $this->circuitBreaker->isClose()) {
            App::trace($this->circuitBreaker->serviceName . "服務(wù),當(dāng)前[關(guān)閉狀態(tài)],服務(wù)失敗次數(shù)達(dá)到上限,開(kāi)始切換為開(kāi)啟狀態(tài),failCount=" . $failCount);
            $this->circuitBreaker->switchToOpenState();
        }

        App::trace($this->circuitBreaker->serviceName . "服務(wù),當(dāng)前[關(guān)閉狀態(tài)],failCount=" . $this->circuitBreaker->getFailCounter());
        return $data;
    }
}
熔斷器開(kāi)啟狀態(tài)策略
circuitBreaker->fallback();

        App::trace($this->getServiceName() . "服務(wù),當(dāng)前[開(kāi)啟狀態(tài)],執(zhí)行服務(wù)fallback服務(wù)降級(jí)容錯(cuò)處理");
        $nowTime = time();

        if ($this->circuitBreaker->isOpen()
            && $nowTime > $this->circuitBreaker->getSwitchOpenToHalfOpenTime()
        ) {
            $delayTime = $this->circuitBreaker->getDelaySwitchTimer();

            // swoole定時(shí)器不是嚴(yán)格的,3s容錯(cuò)時(shí)間 ,定時(shí)切換狀態(tài)的半開(kāi)
            $switchToHalfStateTime = $nowTime + ($delayTime / 1000) + 3;
            App::getTimer()->addAfterTimer("openState", $delayTime, [$this, "delayCallback"]);
            $this->circuitBreaker->setSwitchOpenToHalfOpenTime($switchToHalfStateTime);

            App::trace($this->getServiceName() . "服務(wù),當(dāng)前[開(kāi)啟狀態(tài)],創(chuàng)建延遲觸發(fā)器,一段時(shí)間后狀態(tài)切換為半開(kāi)狀態(tài)");
        }

        return $data;
    }

}
熔斷器半開(kāi)狀態(tài)策略

半開(kāi)熔斷器是熔斷器關(guān)閉狀態(tài)和熔斷器開(kāi)啟狀態(tài)的過(guò)度狀態(tài),半開(kāi)熔斷器的所有RPC調(diào)用都是加鎖的,連續(xù)成功或者連續(xù)失敗到閾值后會(huì)切換到關(guān)閉狀態(tài)或者開(kāi)啟狀態(tài),代碼類似,此處不再累述,有興趣的讀者可以自行研究。

Swoft源碼剖析系列目錄:https://segmentfault.com/a/11...

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/28694.html

相關(guān)文章

  • Swoft 源碼剖析 - Swoole和Swoft的那些事 (Http/Rpc服務(wù)篇)

    摘要:和服務(wù)關(guān)系最密切的進(jìn)程是中的進(jìn)程組,絕大部分業(yè)務(wù)處理都在該進(jìn)程中進(jìn)行。隨后觸發(fā)一個(gè)事件各組件通過(guò)該事件進(jìn)行配置文件加載路由注冊(cè)。事件每個(gè)請(qǐng)求到來(lái)時(shí)僅僅會(huì)觸發(fā)事件。服務(wù)器生命周期和服務(wù)基本一致,詳情參考源碼剖析功能實(shí)現(xiàn) 作者:bromine鏈接:https://www.jianshu.com/p/4c0...來(lái)源:簡(jiǎn)書(shū)著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對(duì)原文進(jìn)行了重新的排版。S...

    張漢慶 評(píng)論0 收藏0
  • Swoft 源碼剖析 - 目錄

    摘要:作者鏈接來(lái)源簡(jiǎn)書(shū)著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對(duì)原文進(jìn)行了重新的排版。同時(shí)順手整理個(gè)人對(duì)源碼的相關(guān)理解,希望能夠稍微填補(bǔ)學(xué)習(xí)領(lǐng)域的空白。系列文章只會(huì)節(jié)選關(guān)鍵代碼輔以思路講解,請(qǐng)自行配合源碼閱讀。 作者:bromine鏈接:https://www.jianshu.com/p/2f6...來(lái)源:簡(jiǎn)書(shū)著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對(duì)原文進(jìn)行了重新的排版。Swoft...

    qpwoeiru96 評(píng)論0 收藏0
  • Swoft 源碼剖析 - Swoft 中 IOC 容器的實(shí)現(xiàn)原理

    摘要:作者鏈接來(lái)源簡(jiǎn)書(shū)著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對(duì)原文進(jìn)行了重新的排版。前言為應(yīng)用提供一個(gè)完整的容器作為依賴管理方案,是功能,模塊等功能的實(shí)現(xiàn)基礎(chǔ)。的依賴注入管理方案基于服務(wù)定位器。源碼剖析系列目錄 作者:bromine鏈接:https://www.jianshu.com/p/a23...來(lái)源:簡(jiǎn)書(shū)著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對(duì)原文進(jìn)行了重新的排版。Swof...

    Astrian 評(píng)論0 收藏0
  • Swoft 源碼剖析 - Swoole和Swoft的那些事(Task投遞/定時(shí)任務(wù)篇)

    摘要:作為定時(shí)任務(wù)的執(zhí)行者,通過(guò)每喚醒自身一次,然后把執(zhí)行表遍歷一次,挑選當(dāng)下需要執(zhí)行的任務(wù),通過(guò)投遞出去并更新該任務(wù)執(zhí)行表中的狀態(tài)。 作者:bromine鏈接:https://www.jianshu.com/p/b44...來(lái)源:簡(jiǎn)書(shū)著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對(duì)原文進(jìn)行了重新的排版。Swoft Github: https://github.com/swoft-clou.....

    vvpvvp 評(píng)論0 收藏0
  • Swoft 源碼剖析 - 連接池

    摘要:基于擴(kuò)展實(shí)現(xiàn)真正的數(shù)據(jù)庫(kù)連接池這種方案中,項(xiàng)目占用的連接數(shù)僅僅為。一種是連接暫時(shí)不再使用,其占用狀態(tài)解除,可以從使用者手中交回到空閑隊(duì)列中這種我們稱為連接的歸隊(duì)。源碼剖析系列目錄 作者:bromine鏈接:https://www.jianshu.com/p/1a7...來(lái)源:簡(jiǎn)書(shū)著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對(duì)原文進(jìn)行了重新的排版。Swoft Github: https:...

    rozbo 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<