摘要:客戶端對(duì)象字節(jié)輸出流請(qǐng)求對(duì)象響應(yīng)對(duì)象增加協(xié)議頭發(fā)送請(qǐng)求獲得請(qǐng)求后的狀態(tài)碼三該類(lèi)實(shí)現(xiàn)了接口,是創(chuàng)建的工廠類(lèi)。該類(lèi)的實(shí)現(xiàn)跟類(lèi)類(lèi)似,但是是標(biāo)準(zhǔn)的接口調(diào)用會(huì)采用的工廠類(lèi),而是的協(xié)議調(diào)用。
遠(yuǎn)程調(diào)用——hessian協(xié)議
目標(biāo):介紹遠(yuǎn)程調(diào)用中跟hessian協(xié)議相關(guān)的設(shè)計(jì)和實(shí)現(xiàn),介紹dubbo-rpc-hessian的源碼。前言
本文講解多是dubbo集成的第二種協(xié)議,hessian協(xié)議,Hessian 是 Caucho 開(kāi)源的一個(gè) RPC 框架,其通訊效率高于 WebService 和 Java 自帶的序列化。dubbo集成hessian所提供的hessian協(xié)議相關(guān)介紹可以參考官方文檔,我就不再贅述。
文檔地址:http://dubbo.apache.org/zh-cn...源碼分析 (一)DubboHessianURLConnectionFactory
該類(lèi)繼承了HessianURLConnectionFactory類(lèi),是dubbo,用于創(chuàng)建與服務(wù)器的連接的內(nèi)部工廠,重寫(xiě)了父類(lèi)中open方法。
public class DubboHessianURLConnectionFactory extends HessianURLConnectionFactory { /** * 打開(kāi)與HTTP服務(wù)器的新連接或循環(huán)連接 * @param url * @return * @throws IOException */ @Override public HessianConnection open(URL url) throws IOException { // 獲得一個(gè)連接 HessianConnection connection = super.open(url); // 獲得上下文 RpcContext context = RpcContext.getContext(); for (String key : context.getAttachments().keySet()) { // 在http協(xié)議頭里面加入dubbo中附加值,key為 header+key value為附加值的value connection.addHeader(Constants.DEFAULT_EXCHANGER + key, context.getAttachment(key)); } return connection; } }
在hessian上加入dubbo自己所需要的附加值,放到協(xié)議頭里面進(jìn)行發(fā)送。
(二)HttpClientConnection該類(lèi)是基于HttpClient封裝來(lái)實(shí)現(xiàn)HessianConnection接口,其中邏輯比較簡(jiǎn)單。
public class HttpClientConnection implements HessianConnection { /** * http客戶端對(duì)象 */ private final HttpClient httpClient; /** * 字節(jié)輸出流 */ private final ByteArrayOutputStream output; /** * http post請(qǐng)求對(duì)象 */ private final HttpPost request; /** * http 響應(yīng)對(duì)象 */ private volatile HttpResponse response; public HttpClientConnection(HttpClient httpClient, URL url) { this.httpClient = httpClient; this.output = new ByteArrayOutputStream(); this.request = new HttpPost(url.toString()); } /** * 增加協(xié)議頭 * @param key * @param value */ @Override public void addHeader(String key, String value) { request.addHeader(new BasicHeader(key, value)); } @Override public OutputStream getOutputStream() throws IOException { return output; } /** * 發(fā)送請(qǐng)求 * @throws IOException */ @Override public void sendRequest() throws IOException { request.setEntity(new ByteArrayEntity(output.toByteArray())); this.response = httpClient.execute(request); } /** * 獲得請(qǐng)求后的狀態(tài)碼 * @return */ @Override public int getStatusCode() { return response == null || response.getStatusLine() == null ? 0 : response.getStatusLine().getStatusCode(); } @Override public String getStatusMessage() { return response == null || response.getStatusLine() == null ? null : response.getStatusLine().getReasonPhrase(); } @Override public String getContentEncoding() { return (response == null || response.getEntity() == null || response.getEntity().getContentEncoding() == null) ? null : response.getEntity().getContentEncoding().getValue(); } @Override public InputStream getInputStream() throws IOException { return response == null || response.getEntity() == null ? null : response.getEntity().getContent(); } @Override public void close() throws IOException { HttpPost request = this.request; if (request != null) { request.abort(); } } @Override public void destroy() throws IOException { }(三)HttpClientConnectionFactory
該類(lèi)實(shí)現(xiàn)了HessianConnectionFactory接口,是創(chuàng)建HttpClientConnection的工廠類(lèi)。該類(lèi)的實(shí)現(xiàn)跟DubboHessianURLConnectionFactory類(lèi)類(lèi)似,但是DubboHessianURLConnectionFactory是標(biāo)準(zhǔn)的Hessian接口調(diào)用會(huì)采用的工廠類(lèi),而HttpClientConnectionFactory是Dubbo 的 Hessian 協(xié)議調(diào)用。當(dāng)然Dubbo 的 Hessian 協(xié)議也是基于http的。
public class HttpClientConnectionFactory implements HessianConnectionFactory { /** * httpClient對(duì)象 */ private final HttpClient httpClient = new DefaultHttpClient(); @Override public void setHessianProxyFactory(HessianProxyFactory factory) { // 設(shè)置連接超時(shí)時(shí)間 HttpConnectionParams.setConnectionTimeout(httpClient.getParams(), (int) factory.getConnectTimeout()); // 設(shè)置讀取數(shù)據(jù)時(shí)阻塞鏈路的超時(shí)時(shí)間 HttpConnectionParams.setSoTimeout(httpClient.getParams(), (int) factory.getReadTimeout()); } @Override public HessianConnection open(URL url) throws IOException { // 創(chuàng)建一個(gè)HttpClientConnection實(shí)例 HttpClientConnection httpClientConnection = new HttpClientConnection(httpClient, url); // 獲得上下文,用來(lái)獲得附加值 RpcContext context = RpcContext.getContext(); // 遍歷附加值,放入到協(xié)議頭里面 for (String key : context.getAttachments().keySet()) { httpClientConnection.addHeader(Constants.DEFAULT_EXCHANGER + key, context.getAttachment(key)); } return httpClientConnection; } }
實(shí)現(xiàn)了兩個(gè)方法,第一個(gè)方法是給http連接設(shè)置兩個(gè)參數(shù)配置,第二個(gè)方法是創(chuàng)建一個(gè)連接。
(四)HessianProtocol該類(lèi)繼承了AbstractProxyProtocol類(lèi),是hessian協(xié)議的實(shí)現(xiàn)類(lèi)。其中實(shí)現(xiàn)類(lèi)基于hessian協(xié)議的服務(wù)引用、服務(wù)暴露等方法。
1.屬性/** * http服務(wù)器集合 * key為ip:port */ private final Map2.doExportserverMap = new ConcurrentHashMap (); /** * HessianSkeleto 集合 * key為服務(wù)名 */ private final Map skeletonMap = new ConcurrentHashMap (); /** * HttpBinder對(duì)象,默認(rèn)是jetty實(shí)現(xiàn) */ private HttpBinder httpBinder;
@Override protectedRunnable doExport(T impl, Class type, URL url) throws RpcException { // 獲得ip地址 String addr = getAddr(url); // 獲得http服務(wù)器對(duì)象 HttpServer server = serverMap.get(addr); // 如果為空,則重新創(chuàng)建一個(gè)server,然后放入集合 if (server == null) { server = httpBinder.bind(url, new HessianHandler()); serverMap.put(addr, server); } // 獲得服務(wù)path final String path = url.getAbsolutePath(); // 創(chuàng)建Hessian服務(wù)端對(duì)象 final HessianSkeleton skeleton = new HessianSkeleton(impl, type); // 加入集合 skeletonMap.put(path, skeleton); // 獲得通用的path final String genericPath = path + "/" + Constants.GENERIC_KEY; // 加入集合 skeletonMap.put(genericPath, new HessianSkeleton(impl, GenericService.class)); // 返回一個(gè)線程 return new Runnable() { @Override public void run() { skeletonMap.remove(path); skeletonMap.remove(genericPath); } }; }
該方法是服務(wù)暴露的主要邏輯實(shí)現(xiàn)。
3.doRefer@Override @SuppressWarnings("unchecked") protectedT doRefer(Class serviceType, URL url) throws RpcException { // 獲得泛化的參數(shù) String generic = url.getParameter(Constants.GENERIC_KEY); // 是否是泛化調(diào)用 boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class); // 如果是泛化調(diào)用。則設(shè)置泛化的path和附加值 if (isGeneric) { RpcContext.getContext().setAttachment(Constants.GENERIC_KEY, generic); url = url.setPath(url.getPath() + "/" + Constants.GENERIC_KEY); } // 創(chuàng)建代理工廠 HessianProxyFactory hessianProxyFactory = new HessianProxyFactory(); // 是否是Hessian2的請(qǐng)求 默認(rèn)為否 boolean isHessian2Request = url.getParameter(Constants.HESSIAN2_REQUEST_KEY, Constants.DEFAULT_HESSIAN2_REQUEST); // 設(shè)置是否應(yīng)使用Hessian協(xié)議的版本2來(lái)解析請(qǐng)求 hessianProxyFactory.setHessian2Request(isHessian2Request); // 是否應(yīng)為遠(yuǎn)程調(diào)用啟用重載方法,默認(rèn)為否 boolean isOverloadEnabled = url.getParameter(Constants.HESSIAN_OVERLOAD_METHOD_KEY, Constants.DEFAULT_HESSIAN_OVERLOAD_METHOD); // 設(shè)置是否應(yīng)為遠(yuǎn)程調(diào)用啟用重載方法。 hessianProxyFactory.setOverloadEnabled(isOverloadEnabled); // 獲得client實(shí)現(xiàn)方式,默認(rèn)為jdk String client = url.getParameter(Constants.CLIENT_KEY, Constants.DEFAULT_HTTP_CLIENT); if ("httpclient".equals(client)) { // 用http來(lái)創(chuàng)建 hessianProxyFactory.setConnectionFactory(new HttpClientConnectionFactory()); } else if (client != null && client.length() > 0 && !Constants.DEFAULT_HTTP_CLIENT.equals(client)) { // 拋出不支持的協(xié)議異常 throw new IllegalStateException("Unsupported http protocol client="" + client + ""!"); } else { // 創(chuàng)建一個(gè)HessianConnectionFactory對(duì)象 HessianConnectionFactory factory = new DubboHessianURLConnectionFactory(); // 設(shè)置代理工廠 factory.setHessianProxyFactory(hessianProxyFactory); // 設(shè)置工廠 hessianProxyFactory.setConnectionFactory(factory); } // 獲得超時(shí)時(shí)間 int timeout = url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 設(shè)置超時(shí)時(shí)間 hessianProxyFactory.setConnectTimeout(timeout); hessianProxyFactory.setReadTimeout(timeout); // 創(chuàng)建代理 return (T) hessianProxyFactory.create(serviceType, url.setProtocol("http").toJavaURL(), Thread.currentThread().getContextClassLoader()); }
該方法是服務(wù)引用的主要邏輯實(shí)現(xiàn),根據(jù)客戶端配置,來(lái)選擇標(biāo)準(zhǔn) Hessian 接口調(diào)用還是Dubbo 的 Hessian 協(xié)議調(diào)用。
4.getErrorCode@Override protected int getErrorCode(Throwable e) { // 如果屬于HessianConnectionException異常 if (e instanceof HessianConnectionException) { if (e.getCause() != null) { Class> cls = e.getCause().getClass(); // 如果屬于超時(shí)異常,則返回超時(shí)異常 if (SocketTimeoutException.class.equals(cls)) { return RpcException.TIMEOUT_EXCEPTION; } } // 否則返回網(wǎng)絡(luò)異常 return RpcException.NETWORK_EXCEPTION; } else if (e instanceof HessianMethodSerializationException) { // 序列化異常 return RpcException.SERIALIZATION_EXCEPTION; } return super.getErrorCode(e); }
該方法是針對(duì)異常的處理。
5.HessianHandlerprivate class HessianHandler implements HttpHandler { @Override public void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { // 獲得請(qǐng)求的uri String uri = request.getRequestURI(); // 獲得對(duì)應(yīng)的HessianSkeleton對(duì)象 HessianSkeleton skeleton = skeletonMap.get(uri); // 如果如果不是post方法 if (!request.getMethod().equalsIgnoreCase("POST")) { // 返回狀態(tài)設(shè)置為500 response.setStatus(500); } else { // 設(shè)置遠(yuǎn)程地址 RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort()); // 獲得請(qǐng)求頭內(nèi)容 Enumerationenumeration = request.getHeaderNames(); // 遍歷請(qǐng)求頭內(nèi)容 while (enumeration.hasMoreElements()) { String key = enumeration.nextElement(); // 如果key開(kāi)頭是deader,則把附加值取出來(lái)放入上下文 if (key.startsWith(Constants.DEFAULT_EXCHANGER)) { RpcContext.getContext().setAttachment(key.substring(Constants.DEFAULT_EXCHANGER.length()), request.getHeader(key)); } } try { // 執(zhí)行下一個(gè) skeleton.invoke(request.getInputStream(), response.getOutputStream()); } catch (Throwable e) { throw new ServletException(e); } } } }
該內(nèi)部類(lèi)是Hessian的處理器,用來(lái)處理請(qǐng)求中的協(xié)議頭內(nèi)容。
后記該部分相關(guān)的源碼解析地址:https://github.com/CrazyHZM/i...
該文章講解了遠(yuǎn)程調(diào)用中關(guān)于hessian協(xié)議的部分,內(nèi)容比較簡(jiǎn)單,可以參考著官方文檔了解一下。接下來(lái)我將開(kāi)始對(duì)rpc模塊關(guān)于hessian協(xié)議部分進(jìn)行講解。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/73175.html
摘要:大揭秘異步化改造目標(biāo)從源碼的角度分析的新特性中對(duì)于異步化的改造原理??丛创a解析四十六消費(fèi)端發(fā)送請(qǐng)求過(guò)程講到的十四的,在以前的邏輯會(huì)直接在方法中根據(jù)配置區(qū)分同步異步單向調(diào)用。改為關(guān)于可以參考源碼解析十遠(yuǎn)程通信層的六。 2.7大揭秘——異步化改造 目標(biāo):從源碼的角度分析2.7的新特性中對(duì)于異步化的改造原理。 前言 dubbo中提供了很多類(lèi)型的協(xié)議,關(guān)于協(xié)議的系列可以查看下面的文章: du...
摘要:前言基于表單的遠(yuǎn)程調(diào)用協(xié)議,采用的實(shí)現(xiàn),關(guān)于協(xié)議就不用多說(shuō)了吧。后記該部分相關(guān)的源碼解析地址該文章講解了遠(yuǎn)程調(diào)用中關(guān)于協(xié)議的部分,內(nèi)容比較簡(jiǎn)單,可以參考著官方文檔了解一下。 遠(yuǎn)程調(diào)用——http協(xié)議 目標(biāo):介紹遠(yuǎn)程調(diào)用中跟http協(xié)議相關(guān)的設(shè)計(jì)和實(shí)現(xiàn),介紹dubbo-rpc-http的源碼。 前言 基于HTTP表單的遠(yuǎn)程調(diào)用協(xié)議,采用 Spring 的HttpInvoker實(shí)現(xiàn),關(guān)于h...
摘要:可以參考源碼解析二十四遠(yuǎn)程調(diào)用協(xié)議的八。十六的該類(lèi)也是用了適配器模式,該類(lèi)主要的作用就是增加了心跳功能,可以參考源碼解析十遠(yuǎn)程通信層的四。二十的可以參考源碼解析十七遠(yuǎn)程通信的一。 2.7大揭秘——消費(fèi)端發(fā)送請(qǐng)求過(guò)程 目標(biāo):從源碼的角度分析一個(gè)服務(wù)方法調(diào)用經(jīng)歷怎么樣的磨難以后到達(dá)服務(wù)端。 前言 前一篇文章講到的是引用服務(wù)的過(guò)程,引用服務(wù)無(wú)非就是創(chuàng)建出一個(gè)代理。供消費(fèi)者調(diào)用服務(wù)的相關(guān)方法。...
摘要:服務(wù)引用過(guò)程目標(biāo)從源碼的角度分析服務(wù)引用過(guò)程。并保留服務(wù)提供者的部分配置,比如版本,,時(shí)間戳等最后將合并后的配置設(shè)置為查詢字符串中。的可以參考源碼解析二十三遠(yuǎn)程調(diào)用的一的源碼分析。 dubbo服務(wù)引用過(guò)程 目標(biāo):從源碼的角度分析服務(wù)引用過(guò)程。 前言 前面服務(wù)暴露過(guò)程的文章講解到,服務(wù)引用有兩種方式,一種就是直連,也就是直接指定服務(wù)的地址來(lái)進(jìn)行引用,這種方式更多的時(shí)候被用來(lái)做服務(wù)測(cè)試,不...
摘要:而存在的意義就是保證請(qǐng)求或響應(yīng)對(duì)象可在線程池中被解碼,解碼完成后,就會(huì)分發(fā)到的。 2.7大揭秘——服務(wù)端處理請(qǐng)求過(guò)程 目標(biāo):從源碼的角度分析服務(wù)端接收到請(qǐng)求后的一系列操作,最終把客戶端需要的值返回。 前言 上一篇講到了消費(fèi)端發(fā)送請(qǐng)求的過(guò)程,該篇就要將服務(wù)端處理請(qǐng)求的過(guò)程。也就是當(dāng)服務(wù)端收到請(qǐng)求數(shù)據(jù)包后的一系列處理以及如何返回最終結(jié)果。我們也知道消費(fèi)端在發(fā)送請(qǐng)求的時(shí)候已經(jīng)做了編碼,所以我...
閱讀 2950·2021-10-14 09:42
閱讀 3708·2021-08-11 11:19
閱讀 3553·2019-08-30 13:57
閱讀 3138·2019-08-30 13:49
閱讀 1549·2019-08-29 18:38
閱讀 908·2019-08-29 13:16
閱讀 1865·2019-08-26 13:25
閱讀 3237·2019-08-26 13:24