摘要:項目版本源碼在上一博文中跟大家講了的實現(xiàn)思路思路畢竟只是思路那么這篇就帶著源碼給大家講解下實現(xiàn)過程中的各個具體問題讀懂本篇需要的基本知識若尚未清晰請自行了解后再閱讀本文動態(tài)代理框架的基本使用的基本配置最終項目的使用如下調(diào)用端代碼及配置測試類
項目1.0版本源碼
https://github.com/wephone/Me...
在上一博文中 跟大家講了RPC的實現(xiàn)思路 思路畢竟只是思路 那么這篇就帶著源碼給大家講解下實現(xiàn)過程中的各個具體問題
讀懂本篇需要的基本知識 若尚未清晰請自行了解后再閱讀本文java動態(tài)代理
netty框架的基本使用
spring的基本配置
最終項目的使用如下/** *調(diào)用端代碼及spring配置 */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations={"file:src/test/java/rpcTest/ClientContext.xml"}) public class Client { @Test public void start(){ Service service= (Service) RPC.call(Service.class); System.out.println("測試Integer,Double類型傳參與返回String對象:"+service.stringMethodIntegerArgsTest(233,666.66)); //輸出string233666.66 } } /** *Service抽象及其實現(xiàn) *調(diào)用與實現(xiàn)端共同依賴Service */ public interface Service { String stringMethodIntegerArgsTest(Integer a,Double b); } /** * ServiceImpl實現(xiàn)端對接口的具體實現(xiàn) */ public class ServiceImpl implements Service { @Override public String stringMethodIntegerArgsTest(Integer a, Double b) { return "String"+a+b; } }
1.0版本分3個包
Client 調(diào)用端
Server 實現(xiàn)端
Core 核心方法
首先看這句代碼調(diào)用端只需如此調(diào)用
定義接口 傳入接口類類型 后面調(diào)用的接口內(nèi)的方法 全部是由實現(xiàn)端實現(xiàn)
Service service= (Service) RPC.call(Service.class);
這句的作用其實就是生成調(diào)用端的動態(tài)代理
/** * 暴露調(diào)用端使用的靜態(tài)方法 為抽象接口生成動態(tài)代理對象 * TODO 考慮后面優(yōu)化不在使用時仍需強轉(zhuǎn) * @param cls 抽象接口的類類型 * @return 接口生成的動態(tài)代理對象 */ public static Object call(Class cls){ RPCProxyHandler handler=new RPCProxyHandler(); Object proxyObj=Proxy.newProxyInstance(cls.getClassLoader(),new Class>[]{cls},handler); return proxyObj; }
RPCProxyHandler為動態(tài)代理的方法被調(diào)用后的回調(diào)方法 每個方法被調(diào)用時都會執(zhí)行這個invoke
/** * 代理抽象接口調(diào)用的方法 * 發(fā)送方法信息給服務(wù)端 加鎖等待服務(wù)端返回 * @param proxy * @param method * @param args * @return * @throws Throwable */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RPCRequest request=new RPCRequest(); request.setRequestID(buildRequestID(method.getName())); request.setClassName(method.getDeclaringClass().getName());//返回表示聲明由此 Method 對象表示的方法的類或接口的Class對象 request.setMethodName(method.getName()); // request.setParameterTypes(method.getParameterTypes());//返回形參類型 request.setParameters(args);//輸入的實參 RPCRequestNet.requestLockMap.put(request.getRequestID(),request); RPCRequestNet.connect().send(request); //調(diào)用用結(jié)束后移除對應(yīng)的condition映射關(guān)系 RPCRequestNet.requestLockMap.remove(request.getRequestID()); return request.getResult();//目標(biāo)方法的返回結(jié)果 }
也就是收集對應(yīng)調(diào)用的接口的信息 然后send給實現(xiàn)端
那么這個requestLockMap又是作何作用的呢
由于我們的網(wǎng)絡(luò)調(diào)用都是異步的
但是RPC調(diào)用都要做到同步 等待這個遠(yuǎn)程調(diào)用方法完全返回后再繼續(xù)執(zhí)行
所以將每個請求的request對象作為對象鎖 每個請求發(fā)送后加鎖 等到網(wǎng)絡(luò)異步調(diào)用返回后再釋放所
生成每個請求的ID 這里我用隨機數(shù)加時間戳
將請求ID和請求對象維護(hù)在靜態(tài)全局的一個map中 實現(xiàn)端通過ID來對應(yīng)是哪個請求
異步調(diào)用返回后 通過ID notify喚醒對應(yīng)請求對象的線程
netty異步返回的調(diào)用 釋放對象鎖
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String responseJson= (String) msg; RPCResponse response= (RPCResponse) RPC.responseDecode(responseJson); synchronized (RPCRequestNet.requestLockMap.get(response.getRequestID())) { //喚醒在該對象鎖上wait的線程 RPCRequest request= (RPCRequest) RPCRequestNet.requestLockMap.get(response.getRequestID()); request.setResult(response.getResult()); request.notifyAll(); } }
接下來是RPCRequestNet.connect().send(request);方法
connect方法其實是單例模式返回RPCRequestNet實例
RPCRequestNet構(gòu)造方法是使用netty對實現(xiàn)端進(jìn)行TCP鏈接
send方法如下
try { //判斷連接是否已完成 只在連接啟動時會產(chǎn)生阻塞 if (RPCRequestHandler.channelCtx==null){ connectlock.lock(); //掛起等待連接成功 System.out.println("正在等待連接實現(xiàn)端"); connectCondition.await(); connectlock.unlock(); } //編解碼對象為json 發(fā)送請求 String requestJson= null; try { requestJson = RPC.requestEncode(request); } catch (JsonProcessingException e) { e.printStackTrace(); } ByteBuf requestBuf= Unpooled.copiedBuffer(requestJson.getBytes()); RPCRequestHandler.channelCtx.writeAndFlush(requestBuf); System.out.println("調(diào)用"+request.getRequestID()+"已發(fā)送"); //掛起等待實現(xiàn)端處理完畢返回 TODO 后續(xù)配置超時時間 synchronized (request) { //放棄對象鎖 并阻塞等待notify request.wait(); } System.out.println("調(diào)用"+request.getRequestID()+"接收完畢"); } catch (InterruptedException e) { e.printStackTrace(); }
condition和lock同樣是為了同步等待異步IO返回用的
send方法基本是編解碼json后發(fā)送給實現(xiàn)端
/** *實現(xiàn)端代碼及spring配置 */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations={"file:src/test/java/rpcTest/ServerContext.xml"}) public class Server { @Test public void start(){ //啟動spring后才可啟動 防止容器尚未加載完畢 RPC.start(); } }
出了配置spring之外 實現(xiàn)端就一句 RPC.start()
其實就是啟動netty服務(wù)器
服務(wù)端的處理客戶端信息回調(diào)如下
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException { String requestJson= (String) msg; System.out.println("receive request:"+requestJson); RPCRequest request= RPC.requestDeocde(requestJson); Object result=InvokeServiceUtil.invoke(request); //netty的write方法并沒有直接寫入通道(為避免多次喚醒多路復(fù)用選擇器) //而是把待發(fā)送的消息放到緩沖數(shù)組中,flush方法再全部寫到通道中 // ctx.write(resp); //記得加分隔符 不然客戶端一直不會處理 RPCResponse response=new RPCResponse(); response.setRequestID(request.getRequestID()); response.setResult(result); String respStr=RPC.responseEncode(response); ByteBuf responseBuf= Unpooled.copiedBuffer(respStr.getBytes()); ctx.writeAndFlush(responseBuf); }
主要是編解碼json 反射對應(yīng)的方法 我們看看反射的工具類
/** * 反射調(diào)用相應(yīng)實現(xiàn)類并結(jié)果 * @param request * @return */ public static Object invoke(RPCRequest request){ Object result=null;//內(nèi)部變量必須賦值 全局變量才不用 //實現(xiàn)類名 String implClassName= RPC.getServerConfig().getServerImplMap().get(request.getClassName()); try { Class implClass=Class.forName(implClassName); Object[] parameters=request.getParameters(); int parameterNums=request.getParameters().length; Class[] parameterTypes=new Class[parameterNums]; for (int i = 0; i解析Parameters getClass獲取他們的類類型 反射調(diào)用對應(yīng)的方法
這里需要注意一個點本文最初采用Gson處理json 但gson默認(rèn)會把int類型轉(zhuǎn)為double類型 例如2變?yōu)?.0 不適用本場景 我也不想去專門適配
所以換用了jackson
常見json處理框架 反序列化為對象時 int,long等基本類型都會變成他們的包裝類Integer Long
所以本例程中 遠(yuǎn)程調(diào)度接口方法的形參不可以使用int等基本類型
否則method.invoke(implObj,parameters);會找不到對應(yīng)的方法報錯
因為parameters已經(jīng)是包裝類了 而method還是int這些基本類 所以找不到對應(yīng)方法
最后是借助spring配置基礎(chǔ)配置
我寫了兩個類 ServerConfig ClientConfig 作為調(diào)用端和服務(wù)端的配置
只需在spring中配置這兩個bean 并啟動IOC容器即可調(diào)用端
實現(xiàn)端
最后有個小問題我們的框架是作為一個依賴包引入的 我們不可能在我們的框架中讀取對應(yīng)的spring xml
這樣完全是去了框架的靈活性
那我們怎么在運行過程中獲得我們所處于的IOC容器 已獲得我們的正確配置信息呢
答案是spring提供的ApplicationContextAware接口/** * Created by wephone on 17-12-26. */ public class ClientConfig implements ApplicationContextAware { private String host; private int port; //調(diào)用超時時間 private long overtime; public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public long getOvertime() { return overtime; } public void setOvertime(long overtime) { this.overtime = overtime; } /** * 加載Spring配置文件時,如果Spring配置文件中所定義的Bean類 * 如果該類實現(xiàn)了ApplicationContextAware接口 * 那么在加載Spring配置文件時,會自動調(diào)用ApplicationContextAware接口中的 * @param applicationContext * @throws BeansException */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { RPC.clientContext=applicationContext; } }這樣我們在RPC類內(nèi)部就維護(hù)了一個靜態(tài)IOC容器的context
只需如此獲取配置
RPC.getServerConfig().getPort()public static ServerConfig getServerConfig(){ return serverContext.getBean(ServerConfig.class); }就這樣 這個RPC框架的核心部分 已經(jīng)講述完畢了本例程僅為1.0版本
后續(xù)博客中 會加入異常處理 zookeeper支持 負(fù)載均衡策略等
博客:zookeeper支持
歡迎持續(xù)關(guān)注 歡迎star 提issue
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/68280.html
摘要:每個都可以通過其路徑唯一標(biāo)識,同時每個節(jié)點還可以存儲少量數(shù)據(jù)。監(jiān)聽機制,監(jiān)聽某個當(dāng)該發(fā)生變化時,會回調(diào)該,但是這個是一次性的,下次需要監(jiān)聽時還得再注冊一次。 前面的文章中 我用netty實現(xiàn)了一個簡單的一對一的RPC 11個類實現(xiàn)簡單java rpc 接下來的文章中 我將使用zookeeper作為rpc調(diào)用的分布式注冊中心 從而實現(xiàn)多對多(多個調(diào)用者,多個提供者)的rpc調(diào)用,負(fù)載均...
摘要:等之所以支持跨語言,是因為他們自己定義了一套結(jié)構(gòu)化數(shù)據(jù)存儲格式,如的,用于編解碼對象,作為各個語言通信的中間協(xié)議。 前段時間覺得自己一直用別人的框架,站在巨人的肩膀上,也該自己造造輪子了 一時興起 就著手寫起了RPC框架 這里寫了系列博客拿給大家分享下 這篇是開篇的思路篇 項目最終的代碼放在了我的github上https://github.com/wephone/Me... 歡迎sta...
摘要:支持相關(guān)規(guī)范和標(biāo)準(zhǔn),包括同上。支持多種傳輸協(xié)議和協(xié)議綁定數(shù)據(jù)綁定。構(gòu)建端還有其服務(wù)實現(xiàn),接口使用注解,標(biāo)明是一個遠(yuǎn)程服務(wù)接口。然后編寫一個的啟動程序,并運行,我想你會成功的因為我看到了下圖是一種跨平臺的技術(shù)協(xié)議。 本博客 貓叔的博客,轉(zhuǎn)載請申明出處 學(xué)習(xí)系列 RPC框架是啥? RPC框架是啥之Java自帶RPC實現(xiàn),RMI框架入門 Apache CXF一款WebService RP...
摘要:面向服務(wù)面向服務(wù)的基礎(chǔ)面向服務(wù)的三層應(yīng)用層,服務(wù)層,數(shù)據(jù)層應(yīng)用層用于給用戶展示,,,,安卓。在服務(wù)器端,進(jìn)程保持睡眠狀態(tài)直到調(diào)用信息到達(dá)為止。編譯完成,提示我們已經(jīng)在下了。 面向服務(wù) 面向服務(wù)的基礎(chǔ) 面向服務(wù)的三層:應(yīng)用層,服務(wù)層,數(shù)據(jù)層 * 應(yīng)用層:用于給用戶展示,PC,H5,IOS,安卓。 * 服務(wù)層:業(yè)務(wù)邏輯,提供接口(商品,訂單,支付,用戶,物流)。 * 數(shù)據(jù)層:提供數(shù)據(jù)支持(...
閱讀 1609·2023-04-26 01:54
閱讀 1637·2021-09-30 09:55
閱讀 2658·2021-09-22 16:05
閱讀 1874·2021-07-25 21:37
閱讀 2633·2019-08-29 18:45
閱讀 1900·2019-08-29 16:44
閱讀 1896·2019-08-29 12:34
閱讀 1359·2019-08-23 14:02