摘要:七該類也實現(xiàn)了,也是裝飾了接口,但是它是在服務(wù)引用和暴露過程中加上了監(jiān)聽器的功能。如果是注冊中心,則暴露該創(chuàng)建一個暴露者監(jiān)聽器包裝類對象該方法是在服務(wù)暴露上做了監(jiān)聽器功能的增強,也就是加上了監(jiān)聽器。
遠程調(diào)用——Protocol
目標:介紹遠程調(diào)用中協(xié)議的設(shè)計和實現(xiàn),介紹dubbo-rpc-api中的各種protocol包的源碼,是重點內(nèi)容。前言
在遠程調(diào)用中協(xié)議是非常重要的一層,看下面這張圖:
該層是在信息交換層之上,分為了并且夾雜在服務(wù)暴露和服務(wù)引用中間,為了有一個約定的方式進行調(diào)用。
dubbo支持不同協(xié)議的擴展,比如http、thrift等等,具體的可以參照官方文檔。本文講解的源碼大部分是對于公共方法的實現(xiàn),而具體的服務(wù)暴露和服務(wù)引用會在各個協(xié)議實現(xiàn)中講到。
下面是該包下面的類圖:
源碼分析 (一)AbstractProtocol該類是協(xié)議的抽象類,實現(xiàn)了Protocol接口,其中實現(xiàn)了一些公共的方法,抽象方法在它的子類AbstractProxyProtocol中定義。
1.屬性/** * 服務(wù)暴露者集合 */ protected final Map2.serviceKey> exporterMap = new ConcurrentHashMap >(); /** * 服務(wù)引用者集合 */ //TODO SOFEREFENCE protected final Set > invokers = new ConcurrentHashSet >();
protected static String serviceKey(URL url) { // 獲得綁定的端口號 int port = url.getParameter(Constants.BIND_PORT_KEY, url.getPort()); return serviceKey(port, url.getPath(), url.getParameter(Constants.VERSION_KEY), url.getParameter(Constants.GROUP_KEY)); } protected static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) { return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup); }
該方法是為了得到服務(wù)key group+"/"+serviceName+":"+serviceVersion+":"+port
3.destroy@Override public void destroy() { // 遍歷服務(wù)引用實體 for (Invoker> invoker : invokers) { if (invoker != null) { // 從集合中移除 invokers.remove(invoker); try { if (logger.isInfoEnabled()) { logger.info("Destroy reference: " + invoker.getUrl()); } // 銷毀 invoker.destroy(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } // 遍歷服務(wù)暴露者 for (String key : new ArrayList(exporterMap.keySet())) { // 從集合中移除 Exporter> exporter = exporterMap.remove(key); if (exporter != null) { try { if (logger.isInfoEnabled()) { logger.info("Unexport service: " + exporter.getInvoker().getUrl()); } // 取消暴露 exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } }
該方法是對invoker和exporter的銷毀。
(二)AbstractProxyProtocol該類繼承了AbstractProtocol類,其中利用了代理工廠對AbstractProtocol中的兩個集合進行了填充,并且對異常做了處理。
1.屬性/** * rpc的異常類集合 */ private final List2.export> rpcExceptions = new CopyOnWriteArrayList >(); /** * 代理工廠 */ private ProxyFactory proxyFactory;
@Override @SuppressWarnings("unchecked") publicExporter export(final Invoker invoker) throws RpcException { // 獲得uri final String uri = serviceKey(invoker.getUrl()); // 獲得服務(wù)暴露者 Exporter exporter = (Exporter ) exporterMap.get(uri); if (exporter != null) { return exporter; } // 新建一個線程 final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl()); exporter = new AbstractExporter (invoker) { /** * 取消暴露 */ @Override public void unexport() { super.unexport(); // 移除該key對應(yīng)的服務(wù)暴露者 exporterMap.remove(uri); if (runnable != null) { try { // 啟動線程 runnable.run(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } }; // 加入集合 exporterMap.put(uri, exporter); return exporter; }
其中分為兩個步驟,創(chuàng)建一個exporter,放入到集合匯中。在創(chuàng)建exporter時對unexport方法進行了重寫。
3.refer@Override publicInvoker refer(final Class type, final URL url) throws RpcException { // 通過代理獲得實體域 final Invoker target = proxyFactory.getInvoker(doRefer(type, url), type, url); Invoker invoker = new AbstractInvoker (type, url) { @Override protected Result doInvoke(Invocation invocation) throws Throwable { try { // 獲得調(diào)用結(jié)果 Result result = target.invoke(invocation); Throwable e = result.getException(); // 如果拋出異常,則拋出相應(yīng)異常 if (e != null) { for (Class> rpcException : rpcExceptions) { if (rpcException.isAssignableFrom(e.getClass())) { throw getRpcException(type, url, invocation, e); } } } return result; } catch (RpcException e) { // 拋出未知異常 if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) { e.setCode(getErrorCode(e.getCause())); } throw e; } catch (Throwable e) { throw getRpcException(type, url, invocation, e); } } }; // 加入集合 invokers.add(invoker); return invoker; }
該方法是服務(wù)引用,先從代理工廠中獲得Invoker對象target,然后創(chuàng)建了真實的invoker在重寫方法中調(diào)用代理的方法,最后加入到集合。
protected abstractRunnable doExport(T impl, Class type, URL url) throws RpcException; protected abstract T doRefer(Class type, URL url) throws RpcException;
可以看到其中抽象了服務(wù)引用和暴露的方法,讓各類協(xié)議各自實現(xiàn)。
(三)AbstractInvoker該類是invoker的抽象方法,因為協(xié)議被夾在服務(wù)引用和服務(wù)暴露中間,無論什么協(xié)議都有一些通用的Invoker和exporter的方法實現(xiàn),而該類就是實現(xiàn)了Invoker的公共方法,而把doInvoke抽象出來,讓子類只關(guān)注這個方法。
1.屬性/** * 服務(wù)類型 */ private final Class2.convertAttachmenttype; /** * url對象 */ private final URL url; /** * 附加值 */ private final Map attachment; /** * 是否可用 */ private volatile boolean available = true; /** * 是否銷毀 */ private AtomicBoolean destroyed = new AtomicBoolean(false);
private static MapconvertAttachment(URL url, String[] keys) { if (keys == null || keys.length == 0) { return null; } Map attachment = new HashMap (); // 遍歷key,把值放入附加值集合中 for (String key : keys) { String value = url.getParameter(key); if (value != null && value.length() > 0) { attachment.put(key, value); } } return attachment; }
該方法是轉(zhuǎn)化為附加值,把url中的值轉(zhuǎn)化為服務(wù)調(diào)用invoker的附加值。
3.invoke@Override public Result invoke(Invocation inv) throws RpcException { // if invoker is destroyed due to address refresh from registry, let"s allow the current invoke to proceed // 如果服務(wù)引用銷毀,則打印告警日志,但是通過 if (destroyed.get()) { logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, " + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer"); } RpcInvocation invocation = (RpcInvocation) inv; // 會話域中加入該調(diào)用鏈 invocation.setInvoker(this); // 把附加值放入會話域 if (attachment != null && attachment.size() > 0) { invocation.addAttachmentsIfAbsent(attachment); } // 把上下文的附加值放入會話域 MapcontextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { /** * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here, * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information). */ invocation.addAttachments(contextAttachments); } // 如果開啟的是異步調(diào)用,則把該設(shè)置也放入附加值 if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) { invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString()); } // 加入編號 RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); try { // 執(zhí)行調(diào)用鏈 return doInvoke(invocation); } catch (InvocationTargetException e) { // biz exception Throwable te = e.getTargetException(); if (te == null) { return new RpcResult(e); } else { if (te instanceof RpcException) { ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION); } return new RpcResult(te); } } catch (RpcException e) { if (e.isBiz()) { return new RpcResult(e); } else { throw e; } } catch (Throwable e) { return new RpcResult(e); } }
該方法做了一些公共的操作,比如服務(wù)引用銷毀的檢測,加入附加值,加入調(diào)用鏈實體域到會話域中等。然后執(zhí)行了doInvoke抽象方法。各協(xié)議自己去實現(xiàn)。
(四)AbstractExporter該類和AbstractInvoker類似,也是在服務(wù)暴露中實現(xiàn)了一些公共方法。
1.屬性/** * 實體域 */ private final Invoker2.unexportinvoker; /** * 是否取消暴露服務(wù) */ private volatile boolean unexported = false;
@Override public void unexport() { // 如果已經(jīng)消取消暴露,則之間返回 if (unexported) { return; } // 設(shè)置為true unexported = true; // 銷毀該實體域 getInvoker().destroy(); }(五)InvokerWrapper
該類是Invoker的包裝類,其中用到類裝飾模式,不過并沒有實現(xiàn)實際的功能增強。
public class InvokerWrapper(六)ProtocolFilterWrapperimplements Invoker { /** * invoker對象 */ private final Invoker invoker; private final URL url; public InvokerWrapper(Invoker invoker, URL url) { this.invoker = invoker; this.url = url; } @Override public Class getInterface() { return invoker.getInterface(); } @Override public URL getUrl() { return url; } @Override public boolean isAvailable() { return invoker.isAvailable(); } @Override public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); } @Override public void destroy() { invoker.destroy(); } }
該類實現(xiàn)了Protocol接口,其中也用到了裝飾模式,是對Protocol的裝飾,是在服務(wù)引用和暴露的方法上加上了過濾器功能。
1.buildInvokerChainprivate staticInvoker buildInvokerChain(final Invoker invoker, String key, String group) { Invoker last = invoker; // 獲得過濾器的所有擴展實現(xiàn)類實例集合 List filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { // 從最后一個過濾器開始循環(huán),創(chuàng)建一個帶有過濾器鏈的invoker對象 for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); // 記錄last的invoker final Invoker next = last; // 新建last last = new Invoker () { @Override public Class getInterface() { return invoker.getInterface(); } @Override public URL getUrl() { return invoker.getUrl(); } @Override public boolean isAvailable() { return invoker.isAvailable(); } /** * 關(guān)鍵在這里,調(diào)用下一個filter代表的invoker,把每一個過濾器串起來 * @param invocation * @return * @throws RpcException */ @Override public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } @Override public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; }
該方法就是創(chuàng)建帶 Filter 鏈的 Invoker 對象。倒序的把每一個過濾器串連起來,形成一個invoker。
2.export@Override publicExporter export(Invoker invoker) throws RpcException { // 如果是注冊中心,則直接暴露服務(wù) if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } // 服務(wù)提供側(cè)暴露服務(wù) return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); }
該方法是在服務(wù)暴露上做了過濾器鏈的增強,也就是加上了過濾器。
3.refer@Override publicInvoker refer(Class type, URL url) throws RpcException { // 如果是注冊中心,則直接引用 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } // 消費者側(cè)引用服務(wù) return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER); }
該方法是在服務(wù)引用上做了過濾器鏈的增強,也就是加上了過濾器。
(七)ProtocolListenerWrapper該類也實現(xiàn)了Protocol,也是裝飾了Protocol接口,但是它是在服務(wù)引用和暴露過程中加上了監(jiān)聽器的功能。
1.export@Override publicExporter export(Invoker invoker) throws RpcException { // 如果是注冊中心,則暴露該invoker if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } // 創(chuàng)建一個暴露者監(jiān)聽器包裝類對象 return new ListenerExporterWrapper (protocol.export(invoker), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY))); }
該方法是在服務(wù)暴露上做了監(jiān)聽器功能的增強,也就是加上了監(jiān)聽器。
2.refer@Override publicInvoker refer(Class type, URL url) throws RpcException { // 如果是注冊中心。則直接引用服務(wù) if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } // 創(chuàng)建引用服務(wù)監(jiān)聽器包裝類對象 return new ListenerInvokerWrapper (protocol.refer(type, url), Collections.unmodifiableList( ExtensionLoader.getExtensionLoader(InvokerListener.class) .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY))); }
該方法是在服務(wù)引用上做了監(jiān)聽器功能的增強,也就是加上了監(jiān)聽器。
后記該部分相關(guān)的源碼解析地址:https://github.com/CrazyHZM/i...
該文章講解了遠程調(diào)用中關(guān)于協(xié)議的部分,其實就是講了一些公共的方法,并且把關(guān)鍵方法抽象出來讓子類實現(xiàn),具體的方法實現(xiàn)都在各個協(xié)議中自己實現(xiàn)。接下來我將開始對rpc模塊的代理進行講解。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/72957.html
摘要:可以參考源碼解析二十四遠程調(diào)用協(xié)議的八。十六的該類也是用了適配器模式,該類主要的作用就是增加了心跳功能,可以參考源碼解析十遠程通信層的四。二十的可以參考源碼解析十七遠程通信的一。 2.7大揭秘——消費端發(fā)送請求過程 目標:從源碼的角度分析一個服務(wù)方法調(diào)用經(jīng)歷怎么樣的磨難以后到達服務(wù)端。 前言 前一篇文章講到的是引用服務(wù)的過程,引用服務(wù)無非就是創(chuàng)建出一個代理。供消費者調(diào)用服務(wù)的相關(guān)方法。...
摘要:而存在的意義就是保證請求或響應(yīng)對象可在線程池中被解碼,解碼完成后,就會分發(fā)到的。 2.7大揭秘——服務(wù)端處理請求過程 目標:從源碼的角度分析服務(wù)端接收到請求后的一系列操作,最終把客戶端需要的值返回。 前言 上一篇講到了消費端發(fā)送請求的過程,該篇就要將服務(wù)端處理請求的過程。也就是當服務(wù)端收到請求數(shù)據(jù)包后的一系列處理以及如何返回最終結(jié)果。我們也知道消費端在發(fā)送請求的時候已經(jīng)做了編碼,所以我...
摘要:大揭秘異步化改造目標從源碼的角度分析的新特性中對于異步化的改造原理??丛创a解析四十六消費端發(fā)送請求過程講到的十四的,在以前的邏輯會直接在方法中根據(jù)配置區(qū)分同步異步單向調(diào)用。改為關(guān)于可以參考源碼解析十遠程通信層的六。 2.7大揭秘——異步化改造 目標:從源碼的角度分析2.7的新特性中對于異步化的改造原理。 前言 dubbo中提供了很多類型的協(xié)議,關(guān)于協(xié)議的系列可以查看下面的文章: du...
摘要:遠程調(diào)用本地調(diào)用目標介紹本地調(diào)用的設(shè)計和實現(xiàn),介紹的源碼。前言是一個遠程調(diào)用的框架,但是它沒有理由不支持本地調(diào)用,本文就要講解關(guān)于本地調(diào)用的實現(xiàn)。服務(wù)暴露者集合取消暴露調(diào)用父類的取消暴露方法從集合中移除二該類繼承了類,是本地調(diào)用的實現(xiàn)。 遠程調(diào)用——injvm本地調(diào)用 目標:介紹injvm本地調(diào)用的設(shè)計和實現(xiàn),介紹dubbo-rpc-injvm的源碼。 前言 dubbo是一個遠程調(diào)用的...
摘要:前言基于表單的遠程調(diào)用協(xié)議,采用的實現(xiàn),關(guān)于協(xié)議就不用多說了吧。后記該部分相關(guān)的源碼解析地址該文章講解了遠程調(diào)用中關(guān)于協(xié)議的部分,內(nèi)容比較簡單,可以參考著官方文檔了解一下。 遠程調(diào)用——http協(xié)議 目標:介紹遠程調(diào)用中跟http協(xié)議相關(guān)的設(shè)計和實現(xiàn),介紹dubbo-rpc-http的源碼。 前言 基于HTTP表單的遠程調(diào)用協(xié)議,采用 Spring 的HttpInvoker實現(xiàn),關(guān)于h...
閱讀 1995·2021-11-22 19:20
閱讀 2641·2021-11-22 13:54
閱讀 1970·2021-09-04 16:40
閱讀 1827·2021-08-13 11:54
閱讀 2670·2019-08-30 15:55
閱讀 3468·2019-08-29 13:51
閱讀 531·2019-08-29 11:09
閱讀 3010·2019-08-26 14:06