摘要:前言基于表單的遠(yuǎn)程調(diào)用協(xié)議,采用的實(shí)現(xiàn),關(guān)于協(xié)議就不用多說了吧。后記該部分相關(guān)的源碼解析地址該文章講解了遠(yuǎn)程調(diào)用中關(guān)于協(xié)議的部分,內(nèi)容比較簡單,可以參考著官方文檔了解一下。
遠(yuǎn)程調(diào)用——http協(xié)議
目標(biāo):介紹遠(yuǎn)程調(diào)用中跟http協(xié)議相關(guān)的設(shè)計和實(shí)現(xiàn),介紹dubbo-rpc-http的源碼。前言
基于HTTP表單的遠(yuǎn)程調(diào)用協(xié)議,采用 Spring 的HttpInvoker實(shí)現(xiàn),關(guān)于http協(xié)議就不用多說了吧。
源碼分析 (一)HttpRemoteInvocation該類繼承了RemoteInvocation類,是在RemoteInvocation上增加了泛化調(diào)用的參數(shù)設(shè)置,以及增加了dubbo本身需要的附加值設(shè)置。
public class HttpRemoteInvocation extends RemoteInvocation { private static final long serialVersionUID = 1L; /** * dubbo的附加值名稱 */ private static final String dubboAttachmentsAttrName = "dubbo.attachments"; public HttpRemoteInvocation(MethodInvocation methodInvocation) { super(methodInvocation); // 把附加值加入到會話域的屬性里面 addAttribute(dubboAttachmentsAttrName, new HashMap(二)HttpProtocol(RpcContext.getContext().getAttachments())); } @Override public Object invoke(Object targetObject) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { // 獲得上下文 RpcContext context = RpcContext.getContext(); // 獲得附加值 context.setAttachments((Map ) getAttribute(dubboAttachmentsAttrName)); // 泛化標(biāo)志 String generic = (String) getAttribute(Constants.GENERIC_KEY); // 如果不為空,則設(shè)置泛化標(biāo)志 if (StringUtils.isNotEmpty(generic)) { context.setAttachment(Constants.GENERIC_KEY, generic); } try { // 調(diào)用下一個調(diào)用鏈 return super.invoke(targetObject); } finally { context.setAttachments(null); } } }
該類是http實(shí)現(xiàn)的核心,跟我在《dubbo源碼解析(二十五)遠(yuǎn)程調(diào)用——hessian協(xié)議》中講到的HessianProtocol實(shí)現(xiàn)有很多地方相似。
1.屬性/** * 默認(rèn)的端口號 */ public static final int DEFAULT_PORT = 80; /** * http服務(wù)器集合 */ private final Map2.doExportserverMap = new ConcurrentHashMap (); /** * Spring HttpInvokerServiceExporter 集合 */ private final Map skeletonMap = new ConcurrentHashMap (); /** * HttpBinder對象 */ private HttpBinder httpBinder;
@Override protectedRunnable doExport(final T impl, Class type, URL url) throws RpcException { // 獲得ip地址 String addr = getAddr(url); // 獲得http服務(wù)器 HttpServer server = serverMap.get(addr); // 如果服務(wù)器為空,則重新創(chuàng)建服務(wù)器,并且加入到集合 if (server == null) { server = httpBinder.bind(url, new InternalHandler()); serverMap.put(addr, server); } // 獲得服務(wù)path final String path = url.getAbsolutePath(); // 加入集合 skeletonMap.put(path, createExporter(impl, type)); // 通用path final String genericPath = path + "/" + Constants.GENERIC_KEY; // 添加泛化的服務(wù)調(diào)用 skeletonMap.put(genericPath, createExporter(impl, GenericService.class)); return new Runnable() { @Override public void run() { skeletonMap.remove(path); skeletonMap.remove(genericPath); } }; }
該方法是暴露服務(wù)等邏輯,因為dubbo實(shí)現(xiàn)http協(xié)議采用了Spring 的HttpInvoker實(shí)現(xiàn),所以調(diào)用了createExporter方法來創(chuàng)建創(chuàng)建HttpInvokerServiceExporter。
3.createExporterprivateHttpInvokerServiceExporter createExporter(T impl, Class> type) { // 創(chuàng)建HttpInvokerServiceExporter final HttpInvokerServiceExporter httpServiceExporter = new HttpInvokerServiceExporter(); // 設(shè)置要訪問的服務(wù)的接口 httpServiceExporter.setServiceInterface(type); // 設(shè)置服務(wù)實(shí)現(xiàn) httpServiceExporter.setService(impl); try { // 在BeanFactory設(shè)置了所有提供的bean屬性,初始化bean的時候執(zhí)行,可以針對某個具體的bean進(jìn)行配 httpServiceExporter.afterPropertiesSet(); } catch (Exception e) { throw new RpcException(e.getMessage(), e); } return httpServiceExporter; }
該方法是創(chuàng)建一個spring 的HttpInvokerServiceExporter。
4.doRefer@Override @SuppressWarnings("unchecked") protectedT doRefer(final Class serviceType, final URL url) throws RpcException { // 獲得泛化配置 final String generic = url.getParameter(Constants.GENERIC_KEY); // 是否為泛化調(diào)用 final boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class); // 創(chuàng)建HttpInvokerProxyFactoryBean final HttpInvokerProxyFactoryBean httpProxyFactoryBean = new HttpInvokerProxyFactoryBean(); // 設(shè)置RemoteInvocation的工廠類 httpProxyFactoryBean.setRemoteInvocationFactory(new RemoteInvocationFactory() { /** * 為給定的AOP方法調(diào)用創(chuàng)建一個新的RemoteInvocation對象。 * @param methodInvocation * @return */ @Override public RemoteInvocation createRemoteInvocation(MethodInvocation methodInvocation) { // 新建一個HttpRemoteInvocation RemoteInvocation invocation = new HttpRemoteInvocation(methodInvocation); // 如果是泛化調(diào)用 if (isGeneric) { // 設(shè)置標(biāo)志 invocation.addAttribute(Constants.GENERIC_KEY, generic); } return invocation; } }); // 獲得identity message String key = url.toIdentityString(); // 如果是泛化調(diào)用 if (isGeneric) { key = key + "/" + Constants.GENERIC_KEY; } // 設(shè)置服務(wù)url httpProxyFactoryBean.setServiceUrl(key); // 設(shè)置服務(wù)接口 httpProxyFactoryBean.setServiceInterface(serviceType); // 獲得客戶端參數(shù) String client = url.getParameter(Constants.CLIENT_KEY); if (client == null || client.length() == 0 || "simple".equals(client)) { // 創(chuàng)建SimpleHttpInvokerRequestExecutor連接池 使用的是JDK HttpClient SimpleHttpInvokerRequestExecutor httpInvokerRequestExecutor = new SimpleHttpInvokerRequestExecutor() { @Override protected void prepareConnection(HttpURLConnection con, int contentLength) throws IOException { super.prepareConnection(con, contentLength); // 設(shè)置讀取超時時間 con.setReadTimeout(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)); // 設(shè)置連接超時時間 con.setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT)); } }; httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor); } else if ("commons".equals(client)) { // 創(chuàng)建 HttpComponentsHttpInvokerRequestExecutor連接池 使用的是Apache HttpClient HttpComponentsHttpInvokerRequestExecutor httpInvokerRequestExecutor = new HttpComponentsHttpInvokerRequestExecutor(); // 設(shè)置讀取超時時間 httpInvokerRequestExecutor.setReadTimeout(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)); // 設(shè)置連接超時時間 httpInvokerRequestExecutor.setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT)); httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor); } else { throw new IllegalStateException("Unsupported http protocol client " + client + ", only supported: simple, commons"); } httpProxyFactoryBean.afterPropertiesSet(); // 返回HttpInvokerProxyFactoryBean對象 return (T) httpProxyFactoryBean.getObject(); }
該方法是服務(wù)引用的方法,其中根據(jù)url配置simple還是commons來選擇創(chuàng)建連接池的方式。其中的區(qū)別就是SimpleHttpInvokerRequestExecutor使用的是JDK HttpClient,HttpComponentsHttpInvokerRequestExecutor 使用的是Apache HttpClient。
5.getErrorCode@Override protected int getErrorCode(Throwable e) { if (e instanceof RemoteAccessException) { e = e.getCause(); } if (e != null) { Class> cls = e.getClass(); if (SocketTimeoutException.class.equals(cls)) { // 返回超時異常 return RpcException.TIMEOUT_EXCEPTION; } else if (IOException.class.isAssignableFrom(cls)) { // 返回網(wǎng)絡(luò)異常 return RpcException.NETWORK_EXCEPTION; } else if (ClassNotFoundException.class.isAssignableFrom(cls)) { // 返回序列化異常 return RpcException.SERIALIZATION_EXCEPTION; } } return super.getErrorCode(e); }
該方法是處理異常情況,設(shè)置錯誤碼。
6.InternalHandlerprivate class InternalHandler implements HttpHandler { @Override public void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { // 獲得請求uri String uri = request.getRequestURI(); // 獲得服務(wù)暴露者HttpInvokerServiceExporter對象 HttpInvokerServiceExporter skeleton = skeletonMap.get(uri); // 如果不是post,則返回碼設(shè)置500 if (!request.getMethod().equalsIgnoreCase("POST")) { response.setStatus(500); } else { // 遠(yuǎn)程地址放到上下文 RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort()); try { // 調(diào)用下一個調(diào)用 skeleton.handleRequest(request, response); } catch (Throwable e) { throw new ServletException(e); } } } }
該內(nèi)部類實(shí)現(xiàn)了HttpHandler,做了設(shè)置遠(yuǎn)程地址的邏輯。
后記該部分相關(guān)的源碼解析地址:https://github.com/CrazyHZM/i...
該文章講解了遠(yuǎn)程調(diào)用中關(guān)于http協(xié)議的部分,內(nèi)容比較簡單,可以參考著官方文檔了解一下。接下來我將開始對rpc模塊關(guān)于injvm本地調(diào)用部分進(jìn)行講解。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/73199.html
摘要:大揭秘異步化改造目標(biāo)從源碼的角度分析的新特性中對于異步化的改造原理??丛创a解析四十六消費(fèi)端發(fā)送請求過程講到的十四的,在以前的邏輯會直接在方法中根據(jù)配置區(qū)分同步異步單向調(diào)用。改為關(guān)于可以參考源碼解析十遠(yuǎn)程通信層的六。 2.7大揭秘——異步化改造 目標(biāo):從源碼的角度分析2.7的新特性中對于異步化的改造原理。 前言 dubbo中提供了很多類型的協(xié)議,關(guān)于協(xié)議的系列可以查看下面的文章: du...
摘要:可以參考源碼解析二十四遠(yuǎn)程調(diào)用協(xié)議的八。十六的該類也是用了適配器模式,該類主要的作用就是增加了心跳功能,可以參考源碼解析十遠(yuǎn)程通信層的四。二十的可以參考源碼解析十七遠(yuǎn)程通信的一。 2.7大揭秘——消費(fèi)端發(fā)送請求過程 目標(biāo):從源碼的角度分析一個服務(wù)方法調(diào)用經(jīng)歷怎么樣的磨難以后到達(dá)服務(wù)端。 前言 前一篇文章講到的是引用服務(wù)的過程,引用服務(wù)無非就是創(chuàng)建出一個代理。供消費(fèi)者調(diào)用服務(wù)的相關(guān)方法。...
摘要:而存在的意義就是保證請求或響應(yīng)對象可在線程池中被解碼,解碼完成后,就會分發(fā)到的。 2.7大揭秘——服務(wù)端處理請求過程 目標(biāo):從源碼的角度分析服務(wù)端接收到請求后的一系列操作,最終把客戶端需要的值返回。 前言 上一篇講到了消費(fèi)端發(fā)送請求的過程,該篇就要將服務(wù)端處理請求的過程。也就是當(dāng)服務(wù)端收到請求數(shù)據(jù)包后的一系列處理以及如何返回最終結(jié)果。我們也知道消費(fèi)端在發(fā)送請求的時候已經(jīng)做了編碼,所以我...
摘要:客戶端對象字節(jié)輸出流請求對象響應(yīng)對象增加協(xié)議頭發(fā)送請求獲得請求后的狀態(tài)碼三該類實(shí)現(xiàn)了接口,是創(chuàng)建的工廠類。該類的實(shí)現(xiàn)跟類類似,但是是標(biāo)準(zhǔn)的接口調(diào)用會采用的工廠類,而是的協(xié)議調(diào)用。 遠(yuǎn)程調(diào)用——hessian協(xié)議 目標(biāo):介紹遠(yuǎn)程調(diào)用中跟hessian協(xié)議相關(guān)的設(shè)計和實(shí)現(xiàn),介紹dubbo-rpc-hessian的源碼。 前言 本文講解多是dubbo集成的第二種協(xié)議,hessian協(xié)議,He...
摘要:遠(yuǎn)程調(diào)用協(xié)議目標(biāo)介紹遠(yuǎn)程調(diào)用中跟協(xié)議相關(guān)的設(shè)計和實(shí)現(xiàn),介紹的源碼。二該類繼承了,是協(xié)議中獨(dú)有的服務(wù)暴露者。八該類也是對的裝飾,其中增強(qiáng)了調(diào)用次數(shù)多功能。 遠(yuǎn)程調(diào)用——dubbo協(xié)議 目標(biāo):介紹遠(yuǎn)程調(diào)用中跟dubbo協(xié)議相關(guān)的設(shè)計和實(shí)現(xiàn),介紹dubbo-rpc-dubbo的源碼。 前言 Dubbo 缺省協(xié)議采用單一長連接和 NIO 異步通訊,適合于小數(shù)據(jù)量大并發(fā)的服務(wù)調(diào)用,以及服務(wù)消費(fèi)者...
閱讀 2310·2023-04-25 14:22
閱讀 3748·2021-11-15 18:12
閱讀 1303·2019-08-30 15:44
閱讀 3224·2019-08-29 15:37
閱讀 653·2019-08-29 13:49
閱讀 3466·2019-08-26 12:11
閱讀 887·2019-08-23 18:28
閱讀 1592·2019-08-23 14:55