成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

dubbo源碼解析(四十五)服務(wù)引用過(guò)程

xiaowugui666 / 2060人閱讀

摘要:服務(wù)引用過(guò)程目標(biāo)從源碼的角度分析服務(wù)引用過(guò)程。并保留服務(wù)提供者的部分配置,比如版本,,時(shí)間戳等最后將合并后的配置設(shè)置為查詢(xún)字符串中。的可以參考源碼解析二十三遠(yuǎn)程調(diào)用的一的源碼分析。

dubbo服務(wù)引用過(guò)程
目標(biāo):從源碼的角度分析服務(wù)引用過(guò)程。
前言

前面服務(wù)暴露過(guò)程的文章講解到,服務(wù)引用有兩種方式,一種就是直連,也就是直接指定服務(wù)的地址來(lái)進(jìn)行引用,這種方式更多的時(shí)候被用來(lái)做服務(wù)測(cè)試,不建議在生產(chǎn)環(huán)境使用這樣的方法,因?yàn)橹边B不適合服務(wù)治理,dubbo本身就是一個(gè)服務(wù)治理的框架,提供了很多服務(wù)治理的功能。所以更多的時(shí)候,我們都不會(huì)選擇繞過(guò)注冊(cè)中心,而是通過(guò)注冊(cè)中心的方式來(lái)進(jìn)行服務(wù)引用。

服務(wù)引用過(guò)程

大致可以分為三個(gè)步驟:

配置加載

創(chuàng)建invoker

創(chuàng)建服務(wù)接口代理類(lèi)

引用起點(diǎn)

dubbo服務(wù)的引用起點(diǎn)就類(lèi)似于bean加載。dubbo中有一個(gè)類(lèi)ReferenceBean,它實(shí)現(xiàn)了FactoryBean接口,繼承了ReferenceConfig,所以ReferenceBean作為dubbo中能生產(chǎn)對(duì)象的工廠Bean,而我們要引用服務(wù),也就是要有一個(gè)該服務(wù)的對(duì)象。

服務(wù)引用被觸發(fā)有兩個(gè)時(shí)機(jī):

Spring 容器調(diào)用 ReferenceBean 的 afterPropertiesSet 方法時(shí)引用服務(wù)(餓漢式)

在 ReferenceBean 對(duì)應(yīng)的服務(wù)被注入到其他類(lèi)中時(shí)引用(懶漢式)

默認(rèn)情況下,Dubbo 使用懶漢式引用服務(wù)。如果需要使用餓漢式,可通過(guò)配置 的 init 屬性開(kāi)啟。

因?yàn)镽eferenceBean實(shí)現(xiàn)了FactoryBean接口的getObject()方法,所以在加載bean的時(shí)候,會(huì)調(diào)用ReferenceBean的getObject()方法

ReferenceBean的getObject()
public Object getObject() {
    return get();
}

這個(gè)get方法是ReferenceConfig的get()方法

ReferenceConfig的get()
public synchronized T get() {
    // 檢查并且更新配置
    checkAndUpdateSubConfigs();

    // 如果被銷(xiāo)毀,則拋出異常
    if (destroyed) {
        throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
    }
    // 檢測(cè) 代理對(duì)象ref 是否為空,為空則通過(guò) init 方法創(chuàng)建
    if (ref == null) {
        // 用于處理配置,以及調(diào)用 createProxy 生成代理類(lèi)
        init();
    }
    return ref;
}

關(guān)于checkAndUpdateSubConfigs()方法前一篇文章已經(jīng)講了,我就不再講述。這里關(guān)注init方法。該方法也是處理各類(lèi)配置的開(kāi)始。

配置加載(1)
ReferenceConfig的init()
private void init() {
    // 如果已經(jīng)初始化過(guò),則結(jié)束
    if (initialized) {
        return;
    }
    // 設(shè)置初始化標(biāo)志為true
    initialized = true;
    // 本地存根合法性校驗(yàn)
    checkStubAndLocal(interfaceClass);
    // mock合法性校驗(yàn)
    checkMock(interfaceClass);
    // 用來(lái)存放配置
    Map map = new HashMap();

    // 存放這是消費(fèi)者側(cè)
    map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);

    // 添加 協(xié)議版本、發(fā)布版本,時(shí)間戳 等信息到 map 中
    appendRuntimeParameters(map);
    // 如果是泛化調(diào)用
    if (!isGeneric()) {
        // 獲得版本號(hào)
        String revision = Version.getVersion(interfaceClass, version);
        if (revision != null && revision.length() > 0) {
            // 設(shè)置版本號(hào)
            map.put(Constants.REVISION_KEY, revision);
        }

        // 獲得所有方法
        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        if (methods.length == 0) {
            logger.warn("No method found in service interface " + interfaceClass.getName());
            map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
        } else {
            // 把所有方法簽名拼接起來(lái)放入map
            map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet(Arrays.asList(methods)), Constants.COMMA_SEPARATOR));
        }
    }
    // 加入服務(wù)接口名稱(chēng)
    map.put(Constants.INTERFACE_KEY, interfaceName);
    // 添加metrics、application、module、consumer、protocol的所有信息到map
    appendParameters(map, metrics);
    appendParameters(map, application);
    appendParameters(map, module);
    appendParameters(map, consumer, Constants.DEFAULT_KEY);
    appendParameters(map, this);
    Map attributes = null;
    if (CollectionUtils.isNotEmpty(methods)) {
        attributes = new HashMap();
        // 遍歷方法配置
        for (MethodConfig methodConfig : methods) {
            // 把方法配置加入map
            appendParameters(map, methodConfig, methodConfig.getName());
            // 生成重試的配置key
            String retryKey = methodConfig.getName() + ".retry";
            // 如果map中已經(jīng)有該配置,則移除該配置
            if (map.containsKey(retryKey)) {
                String retryValue = map.remove(retryKey);
                // 如果配置為false,也就是不重試,則設(shè)置重試次數(shù)為0次
                if ("false".equals(retryValue)) {
                    map.put(methodConfig.getName() + ".retries", "0");
                }
            }
            // 設(shè)置異步配置
            attributes.put(methodConfig.getName(), convertMethodConfig2AyncInfo(methodConfig));
        }
    }

    // 獲取服務(wù)消費(fèi)者 ip 地址
    String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
    // 如果為空,則獲取本地ip
    if (StringUtils.isEmpty(hostToRegistry)) {
        hostToRegistry = NetUtils.getLocalHost();
    }
    // 設(shè)置消費(fèi)者ip
    map.put(Constants.REGISTER_IP_KEY, hostToRegistry);

    // 創(chuàng)建代理對(duì)象
    ref = createProxy(map);

    // 生產(chǎn)服務(wù)key
    String serviceKey = URL.buildKey(interfaceName, group, version);
    // 根據(jù)服務(wù)名,ReferenceConfig,代理類(lèi)構(gòu)建 ConsumerModel,
    // 并將 ConsumerModel 存入到 ApplicationModel 中
    ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes));
}

該方法大致分為以下幾個(gè)步驟:

檢測(cè)本地存根和mock合法性。

添加協(xié)議版本、發(fā)布版本,時(shí)間戳、metrics、application、module、consumer、protocol等的所有信息到 map 中

多帶帶處理方法配置,設(shè)置重試次數(shù)配置以及設(shè)置該方法對(duì)異步配置信息。

添加消費(fèi)者ip地址到map

創(chuàng)建代理對(duì)象

生成ConsumerModel存入到 ApplicationModel 中

在這里處理配置到邏輯比較清晰。下面就是看ReferenceConfig的createProxy()方法。

創(chuàng)建invoker
ReferenceConfig的createProxy()
private T createProxy(Map map) {
    // 根據(jù)配置檢查是否為本地調(diào)用
    if (shouldJvmRefer(map)) {
        // 生成url,protocol使用的是injvm
        URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
        // 利用InjvmProtocol 的 refer 方法生成 InjvmInvoker 實(shí)例
        invoker = refprotocol.refer(interfaceClass, url);
        if (logger.isInfoEnabled()) {
            logger.info("Using injvm service " + interfaceClass.getName());
        }
    } else {
        // 如果url不為空,則用戶(hù)可能想進(jìn)行直連來(lái)調(diào)用
        if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center"s address.
            // 當(dāng)需要配置多個(gè) url 時(shí),可用分號(hào)進(jìn)行分割,這里會(huì)進(jìn)行切分
            String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
            // 遍歷所有的url
            if (us != null && us.length > 0) {
                for (String u : us) {
                    URL url = URL.valueOf(u);
                    if (StringUtils.isEmpty(url.getPath())) {
                        // 設(shè)置接口全限定名為 url 路徑
                        url = url.setPath(interfaceName);
                    }
                    // 檢測(cè) url 協(xié)議是否為 registry,若是,表明用戶(hù)想使用指定的注冊(cè)中心
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        // 將 map 轉(zhuǎn)換為查詢(xún)字符串,并作為 refer 參數(shù)的值添加到 url 中
                        urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    } else {
                        // 合并 url,移除服務(wù)提供者的一些配置(這些配置來(lái)源于用戶(hù)配置的 url 屬性),
                        // 比如線程池相關(guān)配置。并保留服務(wù)提供者的部分配置,比如版本,group,時(shí)間戳等
                        // 最后將合并后的配置設(shè)置為 url 查詢(xún)字符串中。
                        urls.add(ClusterUtils.mergeUrl(url, map));
                    }
                }
            }
        } else { // assemble URL from register center"s configuration
            // 校驗(yàn)注冊(cè)中心
            checkRegistry();
            // 加載注冊(cè)中心的url
            List us = loadRegistries(false);
            if (CollectionUtils.isNotEmpty(us)) {
                // 遍歷所有的注冊(cè)中心
                for (URL u : us) {
                    // 生成監(jiān)控url
                    URL monitorUrl = loadMonitor(u);
                    if (monitorUrl != null) {
                        // 加入監(jiān)控中心url的配置
                        map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                    }
                    // 添加 refer 參數(shù)到 url 中,并將 url 添加到 urls 中
                    urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                }
            }
            // 如果urls為空,則拋出異常
            if (urls.isEmpty()) {
                throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config  to your spring config.");
            }
        }

        // 如果只有一個(gè)注冊(cè)中心,則直接調(diào)用refer方法
        if (urls.size() == 1) {
            // 調(diào)用 RegistryProtocol 的 refer 構(gòu)建 Invoker 實(shí)例
            invoker = refprotocol.refer(interfaceClass, urls.get(0));
        } else {
            List> invokers = new ArrayList>();
            URL registryURL = null;
            // 遍歷所有的注冊(cè)中心url
            for (URL url : urls) {
                // 通過(guò) refprotocol 調(diào)用 refer 構(gòu)建 Invoker,
                // refprotocol 會(huì)在運(yùn)行時(shí)根據(jù) url 協(xié)議頭加載指定的 Protocol 實(shí)例,并調(diào)用實(shí)例的 refer 方法
                // 把生成的Invoker加入到集合中
                invokers.add(refprotocol.refer(interfaceClass, url));
                // 如果是注冊(cè)中心的協(xié)議
                if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    // 則設(shè)置registryURL
                    registryURL = url; // use last registry url
                }
            }
            // 優(yōu)先用注冊(cè)中心的url
            if (registryURL != null) { // registry url is available
                // use RegistryAwareCluster only when register"s cluster is available
                // 只有當(dāng)注冊(cè)中心當(dāng)鏈接可用當(dāng)時(shí)候,采用RegistryAwareCluster
                URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME);
                // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
                // 由集群進(jìn)行多個(gè)invoker合并
                invoker = cluster.join(new StaticDirectory(u, invokers));
            } else { // not a registry url, must be direct invoke.
                // 直接進(jìn)行合并
                invoker = cluster.join(new StaticDirectory(invokers));
            }
        }
    }

    // 如果需要核對(duì)該服務(wù)是否可用,并且該服務(wù)不可用
    if (shouldCheck() && !invoker.isAvailable()) {
        // make it possible for consumer to retry later if provider is temporarily unavailable
        // 修改初始化標(biāo)志為false
        initialized = false;
        // 拋出異常
        throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
    }
    if (logger.isInfoEnabled()) {
        logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
    }
    /**
     * @since 2.7.0
     * ServiceData Store
     */
    // 元數(shù)據(jù)中心服務(wù)
    MetadataReportService metadataReportService = null;
    // 加載元數(shù)據(jù)服務(wù),如果成功
    if ((metadataReportService = getMetadataReportService()) != null) {
        // 生成url
        URL consumerURL = new URL(Constants.CONSUMER_PROTOCOL, map.remove(Constants.REGISTER_IP_KEY), 0, map.get(Constants.INTERFACE_KEY), map);
        // 把消費(fèi)者配置加入到元數(shù)據(jù)中心中
        metadataReportService.publishConsumer(consumerURL);
    }
    // create service proxy
    // 創(chuàng)建服務(wù)代理
    return (T) proxyFactory.getProxy(invoker);
}

該方法的大致邏輯可用分為以下幾步:

如果是本地調(diào)用,則直接使用InjvmProtocol 的 refer 方法生成 Invoker 實(shí)例。

如果不是本地調(diào)用,但是是選擇直連的方式來(lái)進(jìn)行調(diào)用,則分割配置的多個(gè)url。如果協(xié)議是配置是registry,則表明用戶(hù)想使用指定的注冊(cè)中心,配置url后將url并且保存到urls里面,否則就合并url,并且保存到urls。

如果是通過(guò)注冊(cè)中心來(lái)進(jìn)行調(diào)用,則先校驗(yàn)所有的注冊(cè)中心,然后加載注冊(cè)中心的url,遍歷每個(gè)url,加入監(jiān)控中心url配置,最后把每個(gè)url保存到urls。

針對(duì)urls集合的數(shù)量,如果是單注冊(cè)中心,直接引用RegistryProtocol 的 refer 構(gòu)建 Invoker 實(shí)例,如果是多注冊(cè)中心,則對(duì)每個(gè)url都生成Invoker,利用集群進(jìn)行多個(gè)Invoker合并。

最終輸出一個(gè)invoker。

Invoker 是 Dubbo 的核心模型,代表一個(gè)可執(zhí)行體。在服務(wù)提供方,Invoker 用于調(diào)用服務(wù)提供類(lèi)。在服務(wù)消費(fèi)方,Invoker 用于執(zhí)行遠(yuǎn)程調(diào)用。Invoker 是由 Protocol 實(shí)現(xiàn)類(lèi)構(gòu)建而來(lái)。關(guān)于這幾個(gè)接口的定義介紹可以參考《dubbo源碼解析(十九)遠(yuǎn)程調(diào)用——開(kāi)篇》,Protocol 實(shí)現(xiàn)類(lèi)有很多,下面會(huì)分析 RegistryProtocol 和 DubboProtocol,我們可以看到上面的源碼中講到,當(dāng)只有一個(gè)注冊(cè)中心的時(shí)候,會(huì)直接使用RegistryProtocol。所以先來(lái)看看RegistryProtocol的refer()方法。

RegistryProtocol生成invoker
RegistryProtocol的refer()
public  Invoker refer(Class type, URL url) throws RpcException {
    // 取 registry 參數(shù)值,并將其設(shè)置為協(xié)議頭,默認(rèn)是dubbo
    url = URLBuilder.from(url)
            .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
            .removeParameter(REGISTRY_KEY)
            .build();
    // 獲得注冊(cè)中心實(shí)例
    Registry registry = registryFactory.getRegistry(url);
    // 如果是注冊(cè)中心服務(wù),則返回注冊(cè)中心服務(wù)的invoker
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // group="a,b" or group="*"
    // 將 url 查詢(xún)字符串轉(zhuǎn)為 Map
    Map qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
    // 獲得group值
    String group = qs.get(Constants.GROUP_KEY);
    if (group != null && group.length() > 0) {
        // 如果有多個(gè)組,或者組配置為*,則使用MergeableCluster,并調(diào)用 doRefer 繼續(xù)執(zhí)行服務(wù)引用邏輯
        if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    // 只有一個(gè)組或者沒(méi)有組配置,則直接執(zhí)行doRefer
    return doRefer(cluster, registry, type, url);
}

上面的邏輯比較簡(jiǎn)單,如果是注冊(cè)服務(wù)中心,則直接創(chuàng)建代理。如果不是,先處理組配置,根據(jù)組配置來(lái)決定Cluster的實(shí)現(xiàn)方式,然后調(diào)用doRefer方法。

RegistryProtocol的doRefer()
private  Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url) {
    // 創(chuàng)建 RegistryDirectory 實(shí)例
    RegistryDirectory directory = new RegistryDirectory(type, url);
    // 設(shè)置注冊(cè)中心
    directory.setRegistry(registry);
    // 設(shè)置協(xié)議
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    // 所有屬性放到map中
    Map parameters = new HashMap(directory.getUrl().getParameters());
    // 生成服務(wù)消費(fèi)者鏈接
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    // 注冊(cè)服務(wù)消費(fèi)者,在 consumers 目錄下新節(jié)點(diǎn)
    if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
        directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
        // 注冊(cè)服務(wù)消費(fèi)者
        registry.register(directory.getRegisteredConsumerUrl());
    }
    // 創(chuàng)建路由規(guī)則鏈
    directory.buildRouterChain(subscribeUrl);
    // 訂閱 providers、configurators、routers 等節(jié)點(diǎn)數(shù)據(jù)
    directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
            PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
    // 一個(gè)注冊(cè)中心可能有多個(gè)服務(wù)提供者,因此這里需要將多個(gè)服務(wù)提供者合并為一個(gè),生成一個(gè)invoker
    Invoker invoker = cluster.join(directory);
    // 在服務(wù)提供者處注冊(cè)消費(fèi)者
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    return invoker;
}

該方法大致可以分為以下步驟:

創(chuàng)建一個(gè) RegistryDirectory 實(shí)例,然后生成服務(wù)者消費(fèi)者鏈接。

向注冊(cè)中心進(jìn)行注冊(cè)。

緊接著訂閱 providers、configurators、routers 等節(jié)點(diǎn)下的數(shù)據(jù)。完成訂閱后,RegistryDirectory 會(huì)收到這幾個(gè)節(jié)點(diǎn)下的子節(jié)點(diǎn)信息。

由于一個(gè)服務(wù)可能部署在多臺(tái)服務(wù)器上,這樣就會(huì)在 providers 產(chǎn)生多個(gè)節(jié)點(diǎn),這個(gè)時(shí)候就需要 Cluster 將多個(gè)服務(wù)節(jié)點(diǎn)合并為一個(gè),并生成一個(gè) Invoker。關(guān)于 RegistryDirectory 和 Cluster,可以看我前面寫(xiě)的一些文章介紹。

DubboProtocol生成invoker

首先還是從DubboProtocol的refer()開(kāi)始。

DubboProtocol的refer()
public  Invoker refer(Class serviceType, URL url) throws RpcException {
    optimizeSerialization(url);

    // create rpc invoker.
    // 創(chuàng)建一個(gè)DubboInvoker實(shí)例
    DubboInvoker invoker = new DubboInvoker(serviceType, url, getClients(url), invokers);
    // 加入到集合中
    invokers.add(invoker);

    return invoker;
}

創(chuàng)建DubboInvoker比較簡(jiǎn)單,調(diào)用了構(gòu)造方法,這里主要講這么生成ExchangeClient,也就是getClients方法。

DubboProtocol的getClients()

可以參考《dubbo源碼解析(二十四)遠(yuǎn)程調(diào)用——dubbo協(xié)議》的(三)DubboProtocol中的源碼分析。最新版本基本沒(méi)有什么變化,只是因?yàn)榧尤肓伺渲弥行?,配置的?yōu)先級(jí)更加明確了,所以增加了xml配置優(yōu)先級(jí)高于properties配置的代碼邏輯,都比較容易理解。

其中如果是配置的共享,則獲得共享客戶(hù)端對(duì)象,也就是getSharedClient()方法,否則新建客戶(hù)端也就是initClient()方法。

DubboProtocol的getSharedClient()

可以參考《dubbo源碼解析(二十四)遠(yuǎn)程調(diào)用——dubbo協(xié)議》的(三)DubboProtocol中的源碼分析,該方法比較簡(jiǎn)單,先訪問(wèn)緩存,若緩存未命中,則通過(guò) initClient 方法創(chuàng)建新的 ExchangeClient 實(shí)例,并將該實(shí)例傳給 ReferenceCountExchangeClient 構(gòu)造方法創(chuàng)建一個(gè)帶有引用計(jì)數(shù)功能的 ExchangeClient 實(shí)例。

DubboProtocol的initClient()

可以參考《dubbo源碼解析(二十四)遠(yuǎn)程調(diào)用——dubbo協(xié)議》的(三)DubboProtocol中的源碼分析,initClient 方法首先獲取用戶(hù)配置的客戶(hù)端類(lèi)型,最新版本已經(jīng)改為默認(rèn) netty4。然后設(shè)置用戶(hù)心跳配置,然后檢測(cè)用戶(hù)配置的客戶(hù)端類(lèi)型是否存在,不存在則拋出異常。最后根據(jù) lazy 配置決定創(chuàng)建什么類(lèi)型的客戶(hù)端。這里的 LazyConnectExchangeClient 代碼并不是很復(fù)雜,該類(lèi)會(huì)在 request 方法被調(diào)用時(shí)通過(guò) Exchangers 的 connect 方法創(chuàng)建 ExchangeClient 客戶(hù)端。下面我們分析一下 Exchangers 的 connect 方法。

Exchangers的connect()
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    // 獲取 Exchanger 實(shí)例,默認(rèn)為 HeaderExchangeClient
    return getExchanger(url).connect(url, handler);
}

getExchanger 會(huì)通過(guò) SPI 加載 HeaderExchangeClient 實(shí)例,這個(gè)方法比較簡(jiǎn)單。接下來(lái)分析 HeaderExchangeClient 的connect的實(shí)現(xiàn)。

HeaderExchangeClient 的connect()
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    // 創(chuàng)建 HeaderExchangeHandler 對(duì)象
    // 創(chuàng)建 DecodeHandler 對(duì)象
    // 通過(guò) Transporters 構(gòu)建 Client 實(shí)例
    // 創(chuàng)建 HeaderExchangeClient 對(duì)象
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

其中HeaderExchangeHandler、DecodeHandler等可以參考《dubbo源碼解析(九)遠(yuǎn)程通信——Transport層》和《dubbo源碼解析(十)遠(yuǎn)程通信——Exchange層》的分析。這里重點(diǎn)關(guān)注Transporters 構(gòu)建 Client,也就是Transporters的connect方法。

Transporters的connect()

可以參考《dubbo源碼解析(八)遠(yuǎn)程通信——開(kāi)篇》的(十)Transporters中源碼分析。其中獲得自適應(yīng)拓展類(lèi),該類(lèi)會(huì)在運(yùn)行時(shí)根據(jù)客戶(hù)端類(lèi)型加載指定的 Transporter 實(shí)現(xiàn)類(lèi)。若用戶(hù)未配置客戶(hù)端類(lèi)型,則默認(rèn)加載 NettyTransporter,并調(diào)用該類(lèi)的 connect 方法。假設(shè)是netty4的實(shí)現(xiàn),則執(zhí)行以下代碼。

public Client connect(URL url, ChannelHandler listener) throws RemotingException {
    return new NettyClient(url, listener);
}

到這里為止,DubboProtocol生成invoker過(guò)程也結(jié)束了。再回到createProxy方法的最后一句代碼,根據(jù)invoker創(chuàng)建服務(wù)代理對(duì)象。

創(chuàng)建代理

為服務(wù)接口生成代理對(duì)象。有了代理對(duì)象,即可進(jìn)行遠(yuǎn)程調(diào)用。首先來(lái)看AbstractProxyFactory 的 getProxy()方法。

AbstractProxyFactory 的 getProxy()

可以參考《dubbo源碼解析(二十三)遠(yuǎn)程調(diào)用——Proxy》的(一)AbstractProxyFactory的源碼分析??梢钥吹降诙€(gè)getProxy方法其實(shí)就是獲取 interfaces 數(shù)組,調(diào)用到第三個(gè)getProxy方法時(shí),該getProxy是個(gè)抽象方法,由子類(lèi)來(lái)實(shí)現(xiàn),我們還是默認(rèn)它的代理實(shí)現(xiàn)方式為Javassist。所以可以看JavassistProxyFactory的getProxy方法。

JavassistProxyFactory的getProxy()
public  T getProxy(Invoker invoker, Class[] interfaces) {
    // 生成 Proxy 子類(lèi)(Proxy 是抽象類(lèi))。并調(diào)用 Proxy 子類(lèi)的 newInstance 方法創(chuàng)建 Proxy 實(shí)例
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

我們重點(diǎn)看Proxy的getProxy方法。

/**
 * Get proxy.
 *
 * @param ics interface class array.
 * @return Proxy instance.
 */
public static Proxy getProxy(Class... ics) {
    // 獲得Proxy的類(lèi)加載器來(lái)進(jìn)行生成代理類(lèi)
    return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);
}

/**
 * Get proxy.
 *
 * @param cl  class loader.
 * @param ics interface class array.
 * @return Proxy instance.
 */
public static Proxy getProxy(ClassLoader cl, Class... ics) {
    if (ics.length > Constants.MAX_PROXY_COUNT) {
        throw new IllegalArgumentException("interface limit exceeded");
    }

    StringBuilder sb = new StringBuilder();
    // 遍歷接口列表
    for (int i = 0; i < ics.length; i++) {
        String itf = ics[i].getName();
        // 檢測(cè)是否是接口,如果不是,則拋出異常
        if (!ics[i].isInterface()) {
            throw new RuntimeException(itf + " is not a interface.");
        }

        Class tmp = null;
        try {
            // 重新加載接口類(lèi)
            tmp = Class.forName(itf, false, cl);
        } catch (ClassNotFoundException e) {
        }
        // 檢測(cè)接口是否相同,這里 tmp 有可能為空,也就是該接口無(wú)法被類(lèi)加載器加載的。
        if (tmp != ics[i]) {
            throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
        }

        // 拼接接口全限定名,分隔符為 ;
        sb.append(itf).append(";");
    }

    // use interface class name list as key.
    // 使用拼接后的接口名作為 key
    String key = sb.toString();

    // get cache by class loader.
    Map cache;
    // 把該類(lèi)加載器加到本地緩存
    synchronized (ProxyCacheMap) {
        cache = ProxyCacheMap.computeIfAbsent(cl, k -> new HashMap<>());
    }

    Proxy proxy = null;
    synchronized (cache) {
        do {
            // 從緩存中獲取 Reference 實(shí)例
            Object value = cache.get(key);
            if (value instanceof Reference) {
                proxy = (Proxy) ((Reference) value).get();
                if (proxy != null) {
                    return proxy;
                }
            }
            // 并發(fā)控制,保證只有一個(gè)線程可以進(jìn)行后續(xù)操作
            if (value == PendingGenerationMarker) {
                try {
                    // 其他線程在此處進(jìn)行等待
                    cache.wait();
                } catch (InterruptedException e) {
                }
            } else {
                // 放置標(biāo)志位到緩存中,并跳出 while 循環(huán)進(jìn)行后續(xù)操作
                cache.put(key, PendingGenerationMarker);
                break;
            }
        }
        while (true);
    }

    long id = PROXY_CLASS_COUNTER.getAndIncrement();
    String pkg = null;
    ClassGenerator ccp = null, ccm = null;
    try {
        // 創(chuàng)建 ClassGenerator 對(duì)象
        ccp = ClassGenerator.newInstance(cl);

        Set worked = new HashSet<>();
        List methods = new ArrayList<>();

        for (int i = 0; i < ics.length; i++) {
            // 檢測(cè)接口訪問(wèn)級(jí)別是否為 protected 或 privete
            if (!Modifier.isPublic(ics[i].getModifiers())) {
                // 獲取接口包名
                String npkg = ics[i].getPackage().getName();
                if (pkg == null) {
                    pkg = npkg;
                } else {
                    // 非 public 級(jí)別的接口必須在同一個(gè)包下,否者拋出異常
                    if (!pkg.equals(npkg)) {
                        throw new IllegalArgumentException("non-public interfaces from different packages");
                    }
                }
            }
            // 添加接口到 ClassGenerator 中
            ccp.addInterface(ics[i]);

            // 遍歷接口方法
            for (Method method : ics[i].getMethods()) {
                // 獲取方法描述,可理解為方法簽名
                String desc = ReflectUtils.getDesc(method);
                // 如果方法描述字符串已在 worked 中,則忽略??紤]這種情況,
                // A 接口和 B 接口中包含一個(gè)完全相同的方法
                if (worked.contains(desc)) {
                    continue;
                }
                worked.add(desc);

                int ix = methods.size();
                // 獲取方法返回值類(lèi)型
                Class rt = method.getReturnType();
                // 獲取參數(shù)列表
                Class[] pts = method.getParameterTypes();

                // 生成 Object[] args = new Object[1...N]
                StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
                for (int j = 0; j < pts.length; j++) {
                    // 生成 args[1...N] = ($w)$1...N;
                    code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
                }
                // 生成 InvokerHandler 接口的 invoker 方法調(diào)用語(yǔ)句,如下:
                // Object ret = handler.invoke(this, methods[1...N], args);
                code.append(" Object ret = handler.invoke(this, methods[").append(ix).append("], args);");
                // 返回值不為 void
                if (!Void.TYPE.equals(rt)) {
                    // 生成返回語(yǔ)句,形如 return (java.lang.String) ret;
                    code.append(" return ").append(asArgument(rt, "ret")).append(";");
                }

                methods.add(method);
                // 添加方法名、訪問(wèn)控制符、參數(shù)列表、方法代碼等信息到 ClassGenerator 中
                ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
            }
        }

        if (pkg == null) {
            pkg = PACKAGE_NAME;
        }

        // create ProxyInstance class.
        // 構(gòu)建接口代理類(lèi)名稱(chēng):pkg + ".proxy" + id,比如 org.apache.dubbo.proxy0
        String pcn = pkg + ".proxy" + id;
        ccp.setClassName(pcn);
        ccp.addField("public static java.lang.reflect.Method[] methods;");
        // 生成 private java.lang.reflect.InvocationHandler handler;
        ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
        // 為接口代理類(lèi)添加帶有 InvocationHandler 參數(shù)的構(gòu)造方法,比如:
        // porxy0(java.lang.reflect.InvocationHandler arg0) {
        //     handler=$1;
        // }
        ccp.addConstructor(Modifier.PUBLIC, new Class[]{InvocationHandler.class}, new Class[0], "handler=$1;");
        // 為接口代理類(lèi)添加默認(rèn)構(gòu)造方法
        ccp.addDefaultConstructor();
        // 生成接口代理類(lèi)
        Class clazz = ccp.toClass();
        clazz.getField("methods").set(null, methods.toArray(new Method[0]));

        // create Proxy class.
        // 構(gòu)建 Proxy 子類(lèi)名稱(chēng),比如 Proxy1,Proxy2 等
        String fcn = Proxy.class.getName() + id;
        ccm = ClassGenerator.newInstance(cl);
        ccm.setClassName(fcn);
        ccm.addDefaultConstructor();
        ccm.setSuperClass(Proxy.class);
        // 為 Proxy 的抽象方法 newInstance 生成實(shí)現(xiàn)代碼,形如:
        // public Object newInstance(java.lang.reflect.InvocationHandler h) {
        //     return new org.apache.dubbo.proxy0($1);
        // }
        ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
        Class pc = ccm.toClass();
        // 生成 Proxy 實(shí)現(xiàn)類(lèi)
        proxy = (Proxy) pc.newInstance();
    } catch (RuntimeException e) {
        throw e;
    } catch (Exception e) {
        throw new RuntimeException(e.getMessage(), e);
    } finally {
        // release ClassGenerator
        if (ccp != null) {
            // 釋放資源
            ccp.release();
        }
        if (ccm != null) {
            ccm.release();
        }
        synchronized (cache) {
            if (proxy == null) {
                cache.remove(key);
            } else {
                // 寫(xiě)緩存
                cache.put(key, new WeakReference(proxy));
            }
            // 喚醒其他等待線程
            cache.notifyAll();
        }
    }
    return proxy;
}

代碼比較多,大致可以分為以下幾步:

對(duì)接口進(jìn)行校驗(yàn),檢查是否是一個(gè)接口,是否不能被類(lèi)加載器加載。

做并發(fā)控制,保證只有一個(gè)線程可以進(jìn)行后續(xù)的代理生成操作。

創(chuàng)建cpp,用作為服務(wù)接口生成代理類(lèi)。首先對(duì)接口定義以及包信息進(jìn)行處理。

對(duì)接口的方法進(jìn)行處理,包括返回類(lèi)型,參數(shù)類(lèi)型等。最后添加方法名、訪問(wèn)控制符、參數(shù)列表、方法代碼等信息到 ClassGenerator 中。

創(chuàng)建接口代理類(lèi)的信息,比如名稱(chēng),默認(rèn)構(gòu)造方法等。

生成接口代理類(lèi)。

創(chuàng)建ccm,ccm 則是用于為 org.apache.dubbo.common.bytecode.Proxy 抽象類(lèi)生成子類(lèi),主要是實(shí)現(xiàn) Proxy 類(lèi)的抽象方法。

設(shè)置名稱(chēng)、創(chuàng)建構(gòu)造方法、添加方法

生成 Proxy 實(shí)現(xiàn)類(lèi)。

釋放資源

創(chuàng)建弱引用,寫(xiě)入緩存,喚醒其他線程。

到這里,接口代理類(lèi)生成后,服務(wù)引用也就結(jié)束了。

后記
參考官方文檔:https://dubbo.apache.org/zh-c...

該文章講解了dubbo的服務(wù)引用過(guò)程,下一篇就講解服務(wù)方法調(diào)用過(guò)程。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/74310.html

相關(guān)文章

  • dubbo源碼解析四十八)異步化改造

    摘要:大揭秘異步化改造目標(biāo)從源碼的角度分析的新特性中對(duì)于異步化的改造原理??丛创a解析四十六消費(fèi)端發(fā)送請(qǐng)求過(guò)程講到的十四的,在以前的邏輯會(huì)直接在方法中根據(jù)配置區(qū)分同步異步單向調(diào)用。改為關(guān)于可以參考源碼解析十遠(yuǎn)程通信層的六。 2.7大揭秘——異步化改造 目標(biāo):從源碼的角度分析2.7的新特性中對(duì)于異步化的改造原理。 前言 dubbo中提供了很多類(lèi)型的協(xié)議,關(guān)于協(xié)議的系列可以查看下面的文章: du...

    lijinke666 評(píng)論0 收藏0
  • dubbo源碼解析四十六)消費(fèi)端發(fā)送請(qǐng)求過(guò)程

    摘要:可以參考源碼解析二十四遠(yuǎn)程調(diào)用協(xié)議的八。十六的該類(lèi)也是用了適配器模式,該類(lèi)主要的作用就是增加了心跳功能,可以參考源碼解析十遠(yuǎn)程通信層的四。二十的可以參考源碼解析十七遠(yuǎn)程通信的一。 2.7大揭秘——消費(fèi)端發(fā)送請(qǐng)求過(guò)程 目標(biāo):從源碼的角度分析一個(gè)服務(wù)方法調(diào)用經(jīng)歷怎么樣的磨難以后到達(dá)服務(wù)端。 前言 前一篇文章講到的是引用服務(wù)的過(guò)程,引用服務(wù)無(wú)非就是創(chuàng)建出一個(gè)代理。供消費(fèi)者調(diào)用服務(wù)的相關(guān)方法。...

    fish 評(píng)論0 收藏0
  • dubbo源碼解析四十七)服務(wù)端處理請(qǐng)求過(guò)程

    摘要:而存在的意義就是保證請(qǐng)求或響應(yīng)對(duì)象可在線程池中被解碼,解碼完成后,就會(huì)分發(fā)到的。 2.7大揭秘——服務(wù)端處理請(qǐng)求過(guò)程 目標(biāo):從源碼的角度分析服務(wù)端接收到請(qǐng)求后的一系列操作,最終把客戶(hù)端需要的值返回。 前言 上一篇講到了消費(fèi)端發(fā)送請(qǐng)求的過(guò)程,該篇就要將服務(wù)端處理請(qǐng)求的過(guò)程。也就是當(dāng)服務(wù)端收到請(qǐng)求數(shù)據(jù)包后的一系列處理以及如何返回最終結(jié)果。我們也知道消費(fèi)端在發(fā)送請(qǐng)求的時(shí)候已經(jīng)做了編碼,所以我...

    yzzz 評(píng)論0 收藏0
  • dubbo源碼解析四十三)2.7新特性

    摘要:大揭秘目標(biāo)了解的新特性,以及版本升級(jí)的引導(dǎo)。四元數(shù)據(jù)改造我們知道以前的版本只有注冊(cè)中心,注冊(cè)中心的有數(shù)十個(gè)的鍵值對(duì),包含了一個(gè)服務(wù)所有的元數(shù)據(jù)。 DUBBO——2.7大揭秘 目標(biāo):了解2.7的新特性,以及版本升級(jí)的引導(dǎo)。 前言 我們知道Dubbo在2011年開(kāi)源,停止更新了一段時(shí)間。在2017 年 9 月 7 日,Dubbo 悄悄的在 GitHub 發(fā)布了 2.5.4 版本。隨后,版本...

    qqlcbb 評(píng)論0 收藏0
  • dubbo源碼解析(二十五)遠(yuǎn)程調(diào)用——hessian協(xié)議

    摘要:客戶(hù)端對(duì)象字節(jié)輸出流請(qǐng)求對(duì)象響應(yīng)對(duì)象增加協(xié)議頭發(fā)送請(qǐng)求獲得請(qǐng)求后的狀態(tài)碼三該類(lèi)實(shí)現(xiàn)了接口,是創(chuàng)建的工廠類(lèi)。該類(lèi)的實(shí)現(xiàn)跟類(lèi)類(lèi)似,但是是標(biāo)準(zhǔn)的接口調(diào)用會(huì)采用的工廠類(lèi),而是的協(xié)議調(diào)用。 遠(yuǎn)程調(diào)用——hessian協(xié)議 目標(biāo):介紹遠(yuǎn)程調(diào)用中跟hessian協(xié)議相關(guān)的設(shè)計(jì)和實(shí)現(xiàn),介紹dubbo-rpc-hessian的源碼。 前言 本文講解多是dubbo集成的第二種協(xié)議,hessian協(xié)議,He...

    xzavier 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看
<