摘要:如果有多個注冊中心,多個服務(wù)提供者,這個時候會得到一組實例,此時需要通過集群管理類將多個合并成一個實例。并保留服務(wù)提供者的部分配置,比如版本,,時間戳等最后將合并后的配置設(shè)置為查詢字符串中。
1. 簡介
在上一篇文章中,我詳細(xì)的分析了服務(wù)導(dǎo)出的原理。本篇文章我們趁熱打鐵,繼續(xù)分析服務(wù)引用的原理。在 Dubbo 中,我們可以通過兩種方式引用遠(yuǎn)程服務(wù)。第一種是使用服務(wù)直聯(lián)的方式引用服務(wù),第二種方式是基于注冊中心進(jìn)行引用。服務(wù)直聯(lián)的方式僅適合在調(diào)試或測試服務(wù)的場景下使用,不適合在線上環(huán)境使用。因此,本文我將重點分析通過注冊中心引用服務(wù)的過程。從注冊中心中獲取服務(wù)配置只是服務(wù)引用過程中的一環(huán),除此之外,服務(wù)消費者還需要經(jīng)歷 Invoker 創(chuàng)建、代理類創(chuàng)建等步驟。這些步驟,我將在后續(xù)章節(jié)中一一進(jìn)行分析。
2.服務(wù)引用原理Dubbo 服務(wù)引用的時機(jī)有兩個,第一個是在 Spring 容器調(diào)用 ReferenceBean 的 afterPropertiesSet 方法時引用服務(wù),第二個是在 ReferenceBean 對應(yīng)的服務(wù)被注入到其他類中時引用。這兩個引用服務(wù)的時機(jī)區(qū)別在于,第一個是餓漢式的,第二個是懶漢式的。默認(rèn)情況下,Dubbo 使用懶漢式引用服務(wù)。如果需要使用餓漢式,可通過配置
以上就是 Dubbo 引用服務(wù)的大致原理,下面我們深入到代碼中,詳細(xì)分析服務(wù)引用細(xì)節(jié)。
3.源碼分析服務(wù)引用的入口方法為 ReferenceBean 的 getObject 方法,該方法定義在 Spring 的 FactoryBean 接口中,ReferenceBean 實現(xiàn)了這個方法。實現(xiàn)代碼如下:
public Object getObject() throws Exception { return get(); } public synchronized T get() { if (destroyed) { throw new IllegalStateException("Already destroyed!"); } // 檢測 ref 是否為空,為空則通過 init 方法創(chuàng)建 if (ref == null) { // init 方法主要用于處理配置,以及調(diào)用 createProxy 生成代理類 init(); } return ref; }
這里兩個方法代碼都比較簡短,并不難理解。不過這里需要特別說明一下,如果大家從 getObject 方法進(jìn)行代碼調(diào)試時,會碰到比較詫異的問題。這里假設(shè)你使用 IDEA,且保持了 IDEA 的默認(rèn)配置。當(dāng)你面調(diào)試到 get 方法的if (ref == null)時,你會驚奇的發(fā)現(xiàn) ref 不為空,導(dǎo)致你無法進(jìn)入到 init 方法中繼續(xù)調(diào)試。導(dǎo)致這個現(xiàn)象的原因是 Dubbo 框架本身有點小問題,這個小問題會引發(fā)一些讓人詫異的現(xiàn)象。關(guān)于這個問題,我進(jìn)行了將近兩個小時的排查。查明問題后,我給 Dubbo 提交了一個 pull request (#2754) 修復(fù)了此問題。另外,beiwei30 前輩開了一個 issue (#2757) 介紹這個問題,有興趣的朋友可以去看看。大家如果想規(guī)避這個問題,可以修改一下 IDEA 的配置。在配置面板中搜索 toString,然后取消Enable "toString" object view前的對號。具體如下:
講完需要注意的點,我們繼續(xù)向下分析,接下來將分析配置的處理過程。
3.1 處理配置Dubbo 提供了豐富的配置,用于調(diào)整和優(yōu)化框架行為,性能等。Dubbo 在引用或?qū)С龇?wù)時,首先會對這些配置進(jìn)行檢查和處理,以保證配置到正確性。如果大家不是很熟悉 Dubbo 配置,建議先閱讀以下官方文檔。配置解析的方法為 ReferenceConfig 的 init 方法,下面來看一下方法邏輯。
private void init() { if (initialized) { return; } initialized = true; if (interfaceName == null || interfaceName.length() == 0) { throw new IllegalStateException("interface not allow null!"); } // 檢測 consumer 變量是否為空,為空則創(chuàng)建 checkDefault(); appendProperties(this); if (getGeneric() == null && getConsumer() != null) { // 設(shè)置 generic setGeneric(getConsumer().getGeneric()); } // 檢測是否為泛化接口 if (ProtocolUtils.isGeneric(getGeneric())) { interfaceClass = GenericService.class; } else { try { // 加載類 interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } checkInterfaceAndMethods(interfaceClass, methods); } // -------------------------------? 分割線1 ?------------------------------ // 從系統(tǒng)變量中獲取與接口名對應(yīng)的屬性值 String resolve = System.getProperty(interfaceName); String resolveFile = null; if (resolve == null || resolve.length() == 0) { // 從系統(tǒng)屬性中獲取解析文件路徑 resolveFile = System.getProperty("dubbo.resolve.file"); if (resolveFile == null || resolveFile.length() == 0) { // 從指定位置加載配置文件 File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties"); if (userResolveFile.exists()) { // 獲取文件絕對路徑 resolveFile = userResolveFile.getAbsolutePath(); } } if (resolveFile != null && resolveFile.length() > 0) { Properties properties = new Properties(); FileInputStream fis = null; try { fis = new FileInputStream(new File(resolveFile)); // 從文件中加載配置 properties.load(fis); } catch (IOException e) { throw new IllegalStateException("Unload ..., cause:..."); } finally { try { if (null != fis) fis.close(); } catch (IOException e) { logger.warn(e.getMessage(), e); } } // 獲取與接口名對應(yīng)的配置 resolve = properties.getProperty(interfaceName); } } if (resolve != null && resolve.length() > 0) { // 將 resolve 賦值給 url url = resolve; } // -------------------------------? 分割線2 ?------------------------------ if (consumer != null) { if (application == null) { // 從 consumer 中獲取 Application 實例,下同 application = consumer.getApplication(); } if (module == null) { module = consumer.getModule(); } if (registries == null) { registries = consumer.getRegistries(); } if (monitor == null) { monitor = consumer.getMonitor(); } } if (module != null) { if (registries == null) { registries = module.getRegistries(); } if (monitor == null) { monitor = module.getMonitor(); } } if (application != null) { if (registries == null) { registries = application.getRegistries(); } if (monitor == null) { monitor = application.getMonitor(); } } // 檢測本地 Application 和本地存根配置合法性 checkApplication(); checkStubAndMock(interfaceClass); // -------------------------------? 分割線3 ?------------------------------ Mapmap = new HashMap (); Map
上面的代碼很長,做的事情比較多。這里我根據(jù)代碼邏輯,對代碼進(jìn)行了分塊,下面我們一起來看一下。
首先是方法開始到分割線1之間的代碼。這段代碼主要用于檢測 ConsumerConfig 實例是否存在,如不存在則創(chuàng)建一個新的實例,然后通過系統(tǒng)變量或 dubbo.properties 配置文件填充 ConsumerConfig 的字段。接著是檢測泛化配置,并根據(jù)配置設(shè)置 interfaceClass 的值。本段代碼邏輯大致就是這些,接著來看分割線1到分割線2之間的邏輯。這段邏輯用于從系統(tǒng)屬性或配置文件中加載與接口名相對應(yīng)的配置,并將解析結(jié)果賦值給 url 字段。url 字段的作用一般是用于點對點調(diào)用。繼續(xù)向下看,分割線2和分割線3之間的代碼用于檢測幾個核心配置類是否為空,為空則嘗試從其他配置類中獲取。分割線3與分割線4之間的代碼主要是用于收集各種配置,并將配置存儲到 map 中。分割線4和分割線5之間的代碼用于處理 MethodConfig 實例。該實例包含了事件通知配置,比如 onreturn、onthrow、oninvoke 等。分割線5到方法結(jié)尾的代碼主要用于解析服務(wù)消費者 ip,以及調(diào)用 createProxy 創(chuàng)建代理對象。關(guān)于該方法的詳細(xì)分析,將會在接下來的章節(jié)中展開。
到這里,關(guān)于配置的檢查與處理過長就分析完了。這部分邏輯不是很難理解,但比較繁雜,大家需要耐心看一下。好了,本節(jié)先到這,接下來分析服務(wù)引用的過程。
3.2 引用服務(wù)本節(jié)我們要從 createProxy 開始看起。createProxy 這個方法表面上看起來只是用于創(chuàng)建代理對象,但實際上并非如此。該方法還會調(diào)用其他方法構(gòu)建以及合并 Invoker 實例。具體細(xì)節(jié)如下。
private T createProxy(Mapmap) { URL tmpUrl = new URL("temp", "localhost", 0, map); final boolean isJvmRefer; if (isInjvm() == null) { // url 配置被指定,則不做本地引用 if (url != null && url.length() > 0) { isJvmRefer = false; // 根據(jù) url 的協(xié)議、scope 以及 injvm 等參數(shù)檢測是否需要本地引用 // 比如如果用戶顯式配置了 scope=local,此時 isInjvmRefer 返回 true } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { isJvmRefer = true; } else { isJvmRefer = false; } } else { // 獲取 injvm 配置值 isJvmRefer = isInjvm().booleanValue(); } // 本地引用 if (isJvmRefer) { // 生成本地引用 URL,協(xié)議為 injvm URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); // 調(diào)用 refer 方法構(gòu)建 InjvmInvoker 實例 invoker = refprotocol.refer(interfaceClass, url); // 遠(yuǎn)程引用 } else { // url 不為空,表明用戶可能想進(jìn)行點對點調(diào)用 if (url != null && url.length() > 0) { // 當(dāng)需要配置多個 url 時,可用分號進(jìn)行分割,這里會進(jìn)行切分 String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (url.getPath() == null || url.getPath().length() == 0) { // 設(shè)置接口全限定名為 url 路徑 url = url.setPath(interfaceName); } // 檢測 url 協(xié)議是否為 registry,若是,表明用戶想使用指定的注冊中心 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { // 將 map 轉(zhuǎn)換為查詢字符串,并作為 refer 參數(shù)的值添加到 url 中 urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { // 合并 url,移除服務(wù)提供者的一些配置(這些配置來源于用戶配置的 url 屬性), // 比如線程池相關(guān)配置。并保留服務(wù)提供者的部分配置,比如版本,group,時間戳等 // 最后將合并后的配置設(shè)置為 url 查詢字符串中。 urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // 加載注冊中心 url List us = loadRegistries(false); if (us != null && !us.isEmpty()) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } // 添加 refer 參數(shù)到 url 中,并將 url 添加到 urls 中 urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } // 未配置注冊中心,拋出異常 if (urls.isEmpty()) { throw new IllegalStateException("No such any registry to reference..."); } } // 單個注冊中心或服務(wù)提供者(服務(wù)直聯(lián),下同) if (urls.size() == 1) { // 調(diào)用 RegistryProtocol 的 refer 構(gòu)建 Invoker 實例 invoker = refprotocol.refer(interfaceClass, urls.get(0)); // 多個注冊中心或多個服務(wù)提供者,或者兩者混合 } else { List > invokers = new ArrayList >(); URL registryURL = null; // 獲取所有的 Invoker for (URL url : urls) { // 通過 refprotocol 調(diào)用 refer 構(gòu)建 Invoker,refprotocol 會在運行時 // 根據(jù) url 協(xié)議頭加載指定的 Protocol 實例,并調(diào)用實例的 refer 方法 invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; } } if (registryURL != null) { // 如果注冊中心鏈接不為空,則將使用 AvailableCluster URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); // 創(chuàng)建 StaticDirectory 實例,并由 Cluster 對多個 Invoker 進(jìn)行合并 invoker = cluster.join(new StaticDirectory(u, invokers)); } else { invoker = cluster.join(new StaticDirectory(invokers)); } } } Boolean c = check; if (c == null && consumer != null) { c = consumer.isCheck(); } if (c == null) { c = true; } // invoker 可用性檢查 if (c && !invoker.isAvailable()) { throw new IllegalStateException("No provider available for the service..."); } // 生成代理類 return (T) proxyFactory.getProxy(invoker); }
上面代碼很多,不過邏輯比較清晰。首先根據(jù)配置檢查是否為本地調(diào)用,若是,則調(diào)用 InjvmProtocol 的 refer 方法生成 InjvmInvoker 實例。若不是,則讀取直聯(lián)配置項,或注冊中心 url,并將讀取到的 url 存儲到 urls 中。然后,根據(jù) urls 元素數(shù)量進(jìn)行后續(xù)操作。若 urls 元素數(shù)量為1,則直接通過 Protocol 自適應(yīng)拓展構(gòu)建 Invoker 實例接口。若 urls 元素數(shù)量大于1,即存在多個注冊中心或服務(wù)直聯(lián) url,此時先根據(jù) url 構(gòu)建 Invoker。然后再通過 Cluster 合并多個 Invoker,最后調(diào)用 ProxyFactory 生成代理類。這里,Invoker 的構(gòu)建過程以及代理類的過程比較重要,因此我將分兩小節(jié)對這兩個過程進(jìn)行分析。
3.2.1 創(chuàng)建 InvokerInvoker 是 Dubbo 的核心模型,代表一個可執(zhí)行體。在服務(wù)提供方,Invoker 用于調(diào)用服務(wù)提供類。在服務(wù)消費方,Invoker 用于執(zhí)行遠(yuǎn)程調(diào)用。Invoker 在 Dubbo 中的位置十分重要,因此我們有必要去搞懂它。從前面的代碼中可知,Invoker 是由 Protocol 實現(xiàn)類構(gòu)建的。Protocol 實現(xiàn)類有很多,這里我會分析最常用的兩個,分別是 RegistryProtocol 和 DubboProtocol,其他的大家自行分析。下面先來分析 DubboProtocol 的 refer 方法源碼。如下:
publicInvoker refer(Class serviceType, URL url) throws RpcException { optimizeSerialization(url); // 創(chuàng)建 DubboInvoker DubboInvoker invoker = new DubboInvoker (serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
上面方法看起來比較簡單,不過這里有一個調(diào)用需要我們注意一下,即 getClients。這個方法用于獲取客戶端實例,實例類型為 ExchangeClient。ExchangeClient 實際上并不具備通信能力,因此它需要更底層的客戶端實例進(jìn)行通信。比如 NettyClient、MinaClient 等,默認(rèn)情況下,Dubbo 使用 NettyClient 進(jìn)行通信。接下來,我們簡單看一下 getClients 方法的邏輯。
private ExchangeClient[] getClients(URL url) { // 是否共享連接 boolean service_share_connect = false; // 獲取連接數(shù),默認(rèn)為0,表示未配置 int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); // 如果未配置 connections,則共享連接 if (connections == 0) { service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect) { // 獲取共享客戶端 clients[i] = getSharedClient(url); } else { // 初始化新的客戶端 clients[i] = initClient(url); } } return clients; }
這里根據(jù) connections 數(shù)量決定是獲取共享客戶端還是創(chuàng)建新的客戶端實例,默認(rèn)情況下,使用共享客戶端實例。不過 getSharedClient 方法中也會調(diào)用 initClient 方法,因此下面我們一起看一下這兩個方法。
private ExchangeClient getSharedClient(URL url) { String key = url.getAddress(); // 獲取帶有“引用計數(shù)”功能的 ExchangeClient ReferenceCountExchangeClient client = referenceClientMap.get(key); if (client != null) { if (!client.isClosed()) { // 增加引用計數(shù) client.incrementAndGetCount(); return client; } else { referenceClientMap.remove(key); } } locks.putIfAbsent(key, new Object()); synchronized (locks.get(key)) { if (referenceClientMap.containsKey(key)) { return referenceClientMap.get(key); } // 創(chuàng)建 ExchangeClient 客戶端 ExchangeClient exchangeClient = initClient(url); // 將 ExchangeClient 實例傳給 ReferenceCountExchangeClient,這里明顯用了裝飾模式 client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap); referenceClientMap.put(key, client); ghostClientMap.remove(key); locks.remove(key); return client; } }
上面方法先訪問緩存,若緩存未命中,則通過 initClient 方法創(chuàng)建新的 ExchangeClient 實例,并將該實例傳給 ReferenceCountExchangeClient 構(gòu)造方法創(chuàng)建一個帶有引用技術(shù)功能的 ExchangeClient 實例。ReferenceCountExchangeClient 內(nèi)部實現(xiàn)比較簡單,就不分析了。下面我們再來看一下 initClient 方法的代碼。
private ExchangeClient initClient(URL url) { // 獲取客戶端類型,默認(rèn)為 netty String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); // 添加編解碼和心跳包參數(shù)到 url 中 url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // 檢測客戶端類型是否存在,不存在則拋出異常 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported client type: ..."); } ExchangeClient client; try { // 獲取 lazy 配置,并根據(jù)配置值決定創(chuàng)建的客戶端類型 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { // 創(chuàng)建懶加載 ExchangeClient 實例 client = new LazyConnectExchangeClient(url, requestHandler); } else { // 創(chuàng)建普通 ExchangeClient 實例 client = Exchangers.connect(url, requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service..."); } return client; }
initClient 方法首先獲取用戶配置的客戶端類型,默認(rèn)為 netty。然后檢測用戶配置的客戶端類型是否存在,不存在則拋出異常。最后根據(jù) lazy 配置決定創(chuàng)建什么類型的客戶端。這里的 LazyConnectExchangeClient 代碼并不是很復(fù)雜,該類會在 request 方法被調(diào)用時通過 Exchangers 的 connect 方法創(chuàng)建 ExchangeClient 客戶端,這里就不分析 LazyConnectExchangeClient 的代碼了。下面我們分析一下 Exchangers 的 connect 方法。
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); // 獲取 Exchanger 實例,默認(rèn)為 HeaderExchangeClient return getExchanger(url).connect(url, handler); }
如上,getExchanger 會通過 SPI 加載 HeaderExchangeClient 實例,這個方法比較簡單,大家自己看一下吧。接下來分析 HeaderExchangeClient 的實現(xiàn)。
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { // 這里包含了多個調(diào)用,分別如下: // 1. 創(chuàng)建 HeaderExchangeHandler 對象 // 2. 創(chuàng)建 DecodeHandler 對象 // 3. 通過 Transporters 構(gòu)建 Client 實例 // 4. 創(chuàng)建 HeaderExchangeClient 對象 return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); }
這里的調(diào)用比較多,我們這里重點看一下 Transporters 的 connect 方法。如下:
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } ChannelHandler handler; if (handlers == null || handlers.length == 0) { handler = new ChannelHandlerAdapter(); } else if (handlers.length == 1) { handler = handlers[0]; } else { // 如果 handler 數(shù)量大于1,則創(chuàng)建一個 ChannelHandler 分發(fā)器 handler = new ChannelHandlerDispatcher(handlers); } // 獲取 Transporter 自適應(yīng)拓展類,并調(diào)用 connect 方法生成 Client 實例 return getTransporter().connect(url, handler); }
這里,getTransporter 方法返回的是自適應(yīng)拓展類,該類會在運行時根據(jù)客戶端類型加載指定的 Transporter 實現(xiàn)類。若用戶未顯示配置客戶端類型,則默認(rèn)加載 NettyTransporter,并調(diào)用該類的 connect 方法。如下:
public Client connect(URL url, ChannelHandler listener) throws RemotingException { // 創(chuàng)建 NettyClient 對象 return new NettyClient(url, listener); }
到這里就不繼續(xù)跟下去了,在往下就是通過 Netty 提供的接口構(gòu)建 Netty 客戶端了,大家有興趣自己看看。到這里,關(guān)于 DubboProtocol 的 refer 方法就分析完了。接下來,繼續(xù)分析 RegistryProtocol 的 refer 方法邏輯。
publicInvoker refer(Class type, URL url) throws RpcException { // 取 registry 參數(shù)值,并將其設(shè)置為協(xié)議頭 url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); // 獲取注冊中心實例 Registry registry = registryFactory.getRegistry(url); // 這個判斷暫時不知道有什么意圖,為什么要給 RegistryService 類型生成 Invoker ? if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // 將 url 查詢字符串轉(zhuǎn)為 Map Map qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); // 獲取 group 配置 String group = qs.get(Constants.GROUP_KEY); if (group != null && group.length() > 0) { if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { // 通過 SPI 加載 MergeableCluster 實例,并調(diào)用 doRefer 繼續(xù)執(zhí)行引用服務(wù)邏輯 return doRefer(getMergeableCluster(), registry, type, url); } } // 調(diào)用 doRefer 繼續(xù)執(zhí)行引用服務(wù)邏輯 return doRefer(cluster, registry, type, url); }
上面代碼首先為 url 設(shè)置協(xié)議頭,然后根據(jù) url 參數(shù)加載注冊中心實例。接下來對 RegistryService 繼續(xù)針對性處理,這個處理邏輯我不是很明白,不知道為什么要為 RegistryService 類型生成 Invoker,有知道同學(xué)麻煩告知一下。然后就是獲取 group 配置,根據(jù) group 配置決定 doRefer 第一個參數(shù)的類型。這里的重點是 doRefer 方法,如下:
privateInvoker doRefer(Cluster cluster, Registry registry, Class type, URL url) { // 創(chuàng)建 RegistryDirectory 實例 RegistryDirectory directory = new RegistryDirectory (type, url); // 設(shè)置注冊中心和協(xié)議 directory.setRegistry(registry); directory.setProtocol(protocol); Map parameters = new HashMap (directory.getUrl().getParameters()); // 生成服務(wù)消費者鏈接 URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); // 注冊服務(wù)消費者,在 consumers 目錄下新節(jié)點 if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } // 訂閱 providers、configurators、routers 等節(jié)點數(shù)據(jù) directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); // 一個注冊中心可能有多個服務(wù)提供者,因此這里需要將多個服務(wù)提供者合并為一個 Invoker invoker = cluster.join(directory); ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }
如上,doRefer 方法創(chuàng)建一個 RegistryDirectory 實例,然后生成服務(wù)者消費者鏈接,并向注冊中心進(jìn)行注冊。注冊完畢后,緊接著訂閱 providers、configurators、routers 等節(jié)點下的數(shù)據(jù)。完成訂閱后,RegistryDirectory 會收到這幾個節(jié)點下的子節(jié)點信息,比如可以獲取到服務(wù)提供者的配置信息。由于一個服務(wù)可能部署在多臺服務(wù)器上,這樣就會在 providers 產(chǎn)生多個節(jié)點,這個時候就需要 Cluster 將多個服務(wù)節(jié)點合并為一個,并生成一個 Invoker。關(guān)于 RegistryDirectory 和 Cluster,本文不打算進(jìn)行分析,相關(guān)分析將會在隨后的文章中展開。
好了,關(guān)于 Invoker 的創(chuàng)建的邏輯就先分析到這。邏輯比較多,大家耐心看一下。
3.2.2 創(chuàng)建代理Invoker 創(chuàng)建完畢后,接下來要做的事情是為服務(wù)接口生成代理對象。有了代理對象,我們就可以通過代理對象進(jìn)行遠(yuǎn)程調(diào)用。代理對象生成的入口方法為在 ProxyFactory 的 getProxy,接下來進(jìn)行分析。
publicT getProxy(Invoker invoker) throws RpcException { // 調(diào)用重載方法 return getProxy(invoker, false); } public T getProxy(Invoker invoker, boolean generic) throws RpcException { Class>[] interfaces = null; // 獲取接口列表 String config = invoker.getUrl().getParameter("interfaces"); if (config != null && config.length() > 0) { // 切分接口列表 String[] types = Constants.COMMA_SPLIT_PATTERN.split(config); if (types != null && types.length > 0) { interfaces = new Class>[types.length + 2]; // 設(shè)置服務(wù)接口類和 EchoService.class 到 interfaces 中 interfaces[0] = invoker.getInterface(); interfaces[1] = EchoService.class; for (int i = 0; i < types.length; i++) { // 加載接口類 interfaces[i + 1] = ReflectUtils.forName(types[i]); } } } if (interfaces == null) { interfaces = new Class>[]{invoker.getInterface(), EchoService.class}; } // 為 http 和 hessian 協(xié)議提供泛化調(diào)用支持,參考 pull request #1827 if (!invoker.getInterface().equals(GenericService.class) && generic) { int len = interfaces.length; Class>[] temp = interfaces; // 創(chuàng)建新的 interfaces 數(shù)組 interfaces = new Class>[len + 1]; System.arraycopy(temp, 0, interfaces, 0, len); // 設(shè)置 GenericService.class 到數(shù)組中 interfaces[len] = GenericService.class; } // 調(diào)用重載方法 return getProxy(invoker, interfaces); } public abstract T getProxy(Invoker invoker, Class>[] types);
如上,上面大段代碼都是用來獲取 interfaces 數(shù)組的,因此我們需要繼續(xù)往下看。getProxy(Invoker, Class>[]) 這個方法是一個抽象方法,下面我們到 JavassistProxyFactory 類中看一下該方法的實現(xiàn)代碼。
publicT getProxy(Invoker invoker, Class>[] interfaces) { // 生成 Proxy 子類(Proxy 是抽象類)。并調(diào)用Proxy 子類的 newInstance 方法生成 Proxy 實例 return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
上面代碼并不多,首先是通過 Proxy 的 getProxy 方法獲取 Proxy 子類,然后創(chuàng)建 InvokerInvocationHandler 對象,并將該對象傳給 newInstance 生成 Proxy 實例。InvokerInvocationHandler 實現(xiàn)自 JDK 的 InvocationHandler 接口,具體的用途是攔截接口類調(diào)用。該類邏輯比較簡單,這里就不分析了。下面我們重點關(guān)注一下 Proxy 的 getProxy 方法,如下。
public static Proxy getProxy(Class>... ics) { // 調(diào)用重載方法 return getProxy(ClassHelper.getClassLoader(Proxy.class), ics); } public static Proxy getProxy(ClassLoader cl, Class>... ics) { if (ics.length > 65535) throw new IllegalArgumentException("interface limit exceeded"); StringBuilder sb = new StringBuilder(); // 遍歷接口列表 for (int i = 0; i < ics.length; i++) { String itf = ics[i].getName(); // 檢測類型是否為接口 if (!ics[i].isInterface()) throw new RuntimeException(itf + " is not a interface."); Class> tmp = null; try { // 重新加載接口類 tmp = Class.forName(itf, false, cl); } catch (ClassNotFoundException e) { } // 檢測接口是否相同,這里 tmp 有可能為空 if (tmp != ics[i]) throw new IllegalArgumentException(ics[i] + " is not visible from class loader"); // 拼接接口全限定名,分隔符為 ; sb.append(itf).append(";"); } // 使用拼接后接口名作為 key String key = sb.toString(); Mapcache; synchronized (ProxyCacheMap) { cache = ProxyCacheMap.get(cl); if (cache == null) { cache = new HashMap (); ProxyCacheMap.put(cl, cache); } } Proxy proxy = null; synchronized (cache) { do { // 從緩存中獲取 Reference 實例 Object value = cache.get(key); if (value instanceof Reference>) { proxy = (Proxy) ((Reference>) value).get(); if (proxy != null) { return proxy; } } // 多線程控制,保證只有一個線程可以進(jìn)行后續(xù)操作 if (value == PendingGenerationMarker) { try { // 其他線程在此處進(jìn)行等待 cache.wait(); } catch (InterruptedException e) { } } else { // 放置標(biāo)志位到緩存中,并跳出 while 循環(huán)進(jìn)行后續(xù)操作 cache.put(key, PendingGenerationMarker); break; } } while (true); } long id = PROXY_CLASS_COUNTER.getAndIncrement(); String pkg = null; ClassGenerator ccp = null, ccm = null; try { // 創(chuàng)建 ClassGenerator 對象 ccp = ClassGenerator.newInstance(cl); Set worked = new HashSet (); List methods = new ArrayList (); for (int i = 0; i < ics.length; i++) { // 檢測接口訪問級別是否為 protected 或 privete if (!Modifier.isPublic(ics[i].getModifiers())) { // 獲取接口包名 String npkg = ics[i].getPackage().getName(); if (pkg == null) { pkg = npkg; } else { if (!pkg.equals(npkg)) // 非 public 級別的接口必須在同一個包下,否者拋出異常 throw new IllegalArgumentException("non-public interfaces from different packages"); } } // 添加接口到 ClassGenerator 中 ccp.addInterface(ics[i]); // 遍歷接口方法 for (Method method : ics[i].getMethods()) { // 獲取方法描述,可理解為方法簽名 String desc = ReflectUtils.getDesc(method); // 如果已包含在 worked 中,則忽略??紤]這種情況, // A 接口和 B 接口中包含一個完全相同的方法 if (worked.contains(desc)) continue; worked.add(desc); int ix = methods.size(); // 獲取方法返回值類型 Class> rt = method.getReturnType(); // 獲取參數(shù)列表 Class>[] pts = method.getParameterTypes(); // 生成 Object[] args = new Object[1...N] StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];"); for (int j = 0; j < pts.length; j++) // 生成 args[1...N] = ($w)$1...N; code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";"); // 生成 InvokerHandler 接口的 invoker 方法調(diào)用語句,如下: // Object ret = handler.invoke(this, methods[1...N], args); code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);"); // 返回值不為 void if (!Void.TYPE.equals(rt)) // 生成返回語句,形如 return (java.lang.String) ret; code.append(" return ").append(asArgument(rt, "ret")).append(";"); methods.add(method); // 添加方法名、訪問控制符、參數(shù)列表、方法代碼等信息到 ClassGenerator 中 ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString()); } } if (pkg == null) pkg = PACKAGE_NAME; // 構(gòu)建接口代理類名稱:pkg + ".proxy" + id,比如 com.tianxiaobo.proxy0 String pcn = pkg + ".proxy" + id; ccp.setClassName(pcn); ccp.addField("public static java.lang.reflect.Method[] methods;"); // 生成 private java.lang.reflect.InvocationHandler handler; ccp.addField("private " + InvocationHandler.class.getName() + " handler;"); // 為接口代理類添加帶有 InvocationHandler 參數(shù)的構(gòu)造方法,比如: // porxy0(java.lang.reflect.InvocationHandler arg0) { // handler=$1; // } ccp.addConstructor(Modifier.PUBLIC, new Class>[]{InvocationHandler.class}, new Class>[0], "handler=$1;"); // 為接口代理類添加默認(rèn)構(gòu)造方法 ccp.addDefaultConstructor(); // 生成接口代理類 Class> clazz = ccp.toClass(); clazz.getField("methods").set(null, methods.toArray(new Method[0])); // 構(gòu)建 Proxy 子類名稱,比如 Proxy1,Proxy2 等 String fcn = Proxy.class.getName() + id; ccm = ClassGenerator.newInstance(cl); ccm.setClassName(fcn); ccm.addDefaultConstructor(); ccm.setSuperClass(Proxy.class); // 為 Proxy 的抽象方法 newInstance 生成實現(xiàn)代碼,形如: // public Object newInstance(java.lang.reflect.InvocationHandler h) { // return new com.tianxiaobo.proxy0($1); // } ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }"); // 生成 Proxy 實現(xiàn)類 Class> pc = ccm.toClass(); // 通過反射創(chuàng)建 Proxy 實例 proxy = (Proxy) pc.newInstance(); } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } finally { if (ccp != null) // 釋放資源 ccp.release(); if (ccm != null) ccm.release(); synchronized (cache) { if (proxy == null) cache.remove(key); else // 寫緩存 cache.put(key, new WeakReference (proxy)); // 喚醒其他等待線程 cache.notifyAll(); } } return proxy; }
上面代碼比較復(fù)雜,我也寫了很多注釋。大家在閱讀這段代碼時,要搞清楚 ccp 和 ccm 的用途,不然會被搞暈。ccp 用于為服務(wù)接口生成代理類,比如我們有一個 DemoService 接口,這個接口代理類就是由 ccp 生成的。ccm 則是用于為 org.apache.dubbo.common.bytecode.Proxy 抽象類生成子類,主要是實現(xiàn) Proxy 的抽象方法。下面以 org.apache.dubbo.demo.DemoService 這個接口為例,來看一下該接口代理類代碼大致是怎樣的(忽略 EchoService 接口)。
package org.apache.dubbo.common.bytecode; public class proxy0 implements org.apache.dubbo.demo.DemoService { public static java.lang.reflect.Method[] methods; private java.lang.reflect.InvocationHandler handler; public proxy0() { } public proxy0(java.lang.reflect.InvocationHandler arg0) { handler = $1; } public java.lang.String sayHello(java.lang.String arg0) { Object[] args = new Object[1]; args[0] = ($w) $1; Object ret = handler.invoke(this, methods[0], args); return (java.lang.String) ret; } }
好了,到這里代理類生成邏輯就分析完了。整個過程比較復(fù)雜,大家需要耐心看一下,本節(jié)點到這里。
4.總結(jié)本篇文章對服務(wù)引用的過程進(jìn)行了較為詳盡的分析,之所以說是較為詳盡,是因為還有一些地方?jīng)]有分析到。比如 Directory、Cluster 等實現(xiàn)類的代碼并未進(jìn)行詳細(xì)分析,由于這些類功能比較獨立,因此我打算后續(xù)多帶帶成文進(jìn)行分析。暫時我們可以先把這些類看成黑盒,只要知道這些類的用途即可。引用服務(wù)過程涉及到的調(diào)用也非常多,大家在閱讀相關(guān)代碼的中耐心些,并多進(jìn)行調(diào)試。
好了,本篇文章就先到這里了。謝謝閱讀。
本文在知識共享許可協(xié)議 4.0 下發(fā)布,轉(zhuǎn)載需在明顯位置處注明出處
作者:田小波
本文同步發(fā)布在我的個人博客:http://www.tianxiaobo.com
本作品采用知識共享署名-非商業(yè)性使用-禁止演繹 4.0 國際許可協(xié)議進(jìn)行許可。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/72127.html
摘要:服務(wù)引用過程目標(biāo)從源碼的角度分析服務(wù)引用過程。并保留服務(wù)提供者的部分配置,比如版本,,時間戳等最后將合并后的配置設(shè)置為查詢字符串中。的可以參考源碼解析二十三遠(yuǎn)程調(diào)用的一的源碼分析。 dubbo服務(wù)引用過程 目標(biāo):從源碼的角度分析服務(wù)引用過程。 前言 前面服務(wù)暴露過程的文章講解到,服務(wù)引用有兩種方式,一種就是直連,也就是直接指定服務(wù)的地址來進(jìn)行引用,這種方式更多的時候被用來做服務(wù)測試,不...
摘要:服務(wù)提供者代碼上面這個類會被封裝成為一個實例,并新生成一個實例。這樣當(dāng)網(wǎng)絡(luò)通訊層收到一個請求后,會找到對應(yīng)的實例,并調(diào)用它所對應(yīng)的實例,從而真正調(diào)用了服務(wù)提供者的代碼。 這次源碼解析借鑒《肥朝》前輩的dubbo源碼解析,進(jìn)行源碼學(xué)習(xí)??偨Y(jié)起來就是先總體,后局部.也就是先把需要注意的概念先拋出來,把整體架構(gòu)圖先畫出來.讓讀者拿著地圖跟著我的腳步,并且每一步我都提醒,現(xiàn)在我們在哪,我們下一...
摘要:將標(biāo)簽的各種子標(biāo)簽如,存到一個叫的中。內(nèi)部中一定存在一個方法從中拿到解析器。生成來實現(xiàn)的以方法為例。該方法的一大部分都是在拆解。 DUBBO 加載 spring加載bean的時候,遇到dubbo的命名空間時,會調(diào)用DubboNamespaceHandler類。執(zhí)行init方法。將dubbo標(biāo)簽的各種子標(biāo)簽如service,reference存到一個叫parsers的HashMap中。 ...
摘要:遠(yuǎn)程調(diào)用開篇目標(biāo)介紹之后解讀遠(yuǎn)程調(diào)用模塊的內(nèi)容如何編排介紹中的包結(jié)構(gòu)設(shè)計以及最外層的的源碼解析。十該類就是遠(yuǎn)程調(diào)用的上下文,貫穿著整個調(diào)用,例如調(diào)用,然后調(diào)用。十五該類是系統(tǒng)上下文,僅供內(nèi)部使用。 遠(yuǎn)程調(diào)用——開篇 目標(biāo):介紹之后解讀遠(yuǎn)程調(diào)用模塊的內(nèi)容如何編排、介紹dubbo-rpc-api中的包結(jié)構(gòu)設(shè)計以及最外層的的源碼解析。 前言 最近我面臨著一個選擇,因為dubbo 2.7.0-...
摘要:源碼分析一該類實現(xiàn)了,是服務(wù)引用監(jiān)聽器的包裝類。取消暴露遍歷監(jiān)聽集合監(jiān)聽取消暴露該方法是對每個取消服務(wù)暴露的監(jiān)聽。五暴露服務(wù)取消暴露服務(wù)該類是服務(wù)暴露監(jiān)聽器的適配類,沒有做實際的操作。 遠(yuǎn)程調(diào)用——Listener 目標(biāo):介紹dubbo-rpc-api中的各種listener監(jiān)聽器的實現(xiàn)邏輯,內(nèi)容略少,隨便撇兩眼,不是重點。 前言 本文介紹監(jiān)聽器的相關(guān)邏輯。在服務(wù)引用和服務(wù)發(fā)現(xiàn)中監(jiān)聽器...
閱讀 783·2021-10-09 09:58
閱讀 644·2021-08-27 16:24
閱讀 1729·2019-08-30 14:15
閱讀 2389·2019-08-30 11:04
閱讀 2076·2019-08-29 18:43
閱讀 2171·2019-08-29 15:20
閱讀 2722·2019-08-26 12:20
閱讀 1620·2019-08-26 11:44