摘要:是螞蟻金服開源的一個框架,相比它沒有這么多歷史的包袱,代碼更加簡潔,設計思路更加清晰,更加容易去理解其中的代碼。
原文鏈接:https://www.cnblogs.com/luozh...
這幾天離職在家,正好沒事可以瘋狂的輸出一下,本來想寫DUBBO的源碼解析的,但是發(fā)現(xiàn)寫DUBBO源碼的太多了,所以找一個寫的不那么多的框架,所以就選中SOFARPC這個框架了。
SOFARPC是螞蟻金服開源的一個RPC框架,相比DUBBO它沒有這么多歷史的包袱,代碼更加簡潔,設計思路更加清晰,更加容易去理解其中的代碼。
那么為什么要去重寫原生的SPI呢?官方給出了如下解釋:
按需加載
可以有別名
可以有優(yōu)先級進行排序和覆蓋
可以控制是否單例
可以在某些場景下使用編碼
可以指定擴展配置位置
可以排斥其他擴展點
整個流程如下:
我們以ConsumerBootstrap為例:
先要有一個抽象類:
@Extensible(singleton = false) public abstract class ConsumerBootstrap{ .... }
指定擴展實現(xiàn)類:
@Extension("sofa") public class DefaultConsumerBootstrapextends ConsumerBootstrap { ... }
擴展描述文件META-INF/services/sofa-rpc/com.alipay.sofa.rpc.bootstrap.ConsumerBootstrap
sofa=com.alipay.sofa.rpc.bootstrap.DefaultConsumerBootstrap
當這些準備完成后,直接調用即可。
ConsumerBootstrap sofa = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class).getExtension("sofa");
接下來我們看看ExtensionLoaderFactory的源碼
/** * All extension loader {Class : ExtensionLoader} * 這個map里面裝的是所有ExtensionLoader */ private static final ConcurrentMapLOADER_MAP = new ConcurrentHashMap (); public static ExtensionLoader getExtensionLoader(Class clazz, ExtensionLoaderListener listener) { ExtensionLoader loader = LOADER_MAP.get(clazz); if (loader == null) { //get不到則加上鎖 synchronized (ExtensionLoaderFactory.class) { //防止其他線程操作再get一次 loader = LOADER_MAP.get(clazz); if (loader == null) { loader = new ExtensionLoader (clazz, listener); LOADER_MAP.put(clazz, loader); } } } return loader; }
然后我們看一下ExtensionLoader這個類的構造器
protected ExtensionLoader(ClassinterfaceClass, boolean autoLoad, ExtensionLoaderListener listener) { //如果正在執(zhí)行關閉,則將屬性置空后直接返回 if (RpcRunningState.isShuttingDown()) { this.interfaceClass = null; this.interfaceName = null; this.listener = null; this.factory = null; this.extensible = null; this.all = null; return; } // 接口為空,既不是接口,也不是抽象類 if (interfaceClass == null || !(interfaceClass.isInterface() || Modifier.isAbstract(interfaceClass.getModifiers()))) { throw new IllegalArgumentException("Extensible class must be interface or abstract class!"); } //當前加載的接口類名 this.interfaceClass = interfaceClass; //接口名字 this.interfaceName = ClassTypeUtils.getTypeStr(interfaceClass); this.listener = listener; //接口上必須要有Extensible注解 Extensible extensible = interfaceClass.getAnnotation(Extensible.class); if (extensible == null) { throw new IllegalArgumentException( "Error when load extensible interface " + interfaceName + ", must add annotation @Extensible."); } else { this.extensible = extensible; } // 如果是單例,那么factory不為空 this.factory = extensible.singleton() ? new ConcurrentHashMap () : null; //這個屬性里面是這個接口的所有實現(xiàn)類 this.all = new ConcurrentHashMap >(); if (autoLoad) { //獲取到擴展點加載的路徑 List paths = RpcConfigs.getListValue(RpcOptions.EXTENSION_LOAD_PATH); for (String path : paths) { //根據(jù)路徑加載文件 loadFromFile(path); } } }
拿到所有的擴展點加載的路徑后進入到loadFromFile中進行文件的加載
protected synchronized void loadFromFile(String path) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Loading extension of extensible {} from path: {}", interfaceName, path); } // 默認如果不指定文件名字,就是接口名 String file = StringUtils.isBlank(extensible.file()) ? interfaceName : extensible.file().trim(); String fullFileName = path + file; try { ClassLoader classLoader = ClassLoaderUtils.getClassLoader(getClass()); loadFromClassLoader(classLoader, fullFileName); } catch (Throwable t) { if (LOGGER.isErrorEnabled()) { LOGGER.error("Failed to load extension of extensible " + interfaceName + " from path:" + fullFileName, t); } } } protected void loadFromClassLoader(ClassLoader classLoader, String fullFileName) throws Throwable { Enumerationurls = classLoader != null ? classLoader.getResources(fullFileName) : ClassLoader.getSystemResources(fullFileName); // 可能存在多個文件。 if (urls != null) { while (urls.hasMoreElements()) { // 讀取一個文件 URL url = urls.nextElement(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Loading extension of extensible {} from classloader: {} and file: {}", interfaceName, classLoader, url); } BufferedReader reader = null; try { reader = new BufferedReader(new InputStreamReader(url.openStream(), "UTF-8")); String line; while ((line = reader.readLine()) != null) { readLine(url, line); } } catch (Throwable t) { if (LOGGER.isWarnEnabled()) { LOGGER.warn("Failed to load extension of extensible " + interfaceName + " from classloader: " + classLoader + " and file:" + url, t); } } finally { if (reader != null) { reader.close(); } } } } }
接下來進入到readLine,這個方法主要是讀取prop文件里面的每一行記錄,并加載該實現(xiàn)類的類文件校驗完后將文件添加到all屬性中
protected void readLine(URL url, String line) { //讀取文件里面的一行記錄,并將這行記錄用=號分割 String[] aliasAndClassName = parseAliasAndClassName(line); if (aliasAndClassName == null || aliasAndClassName.length != 2) { return; } //別名 String alias = aliasAndClassName[0]; //包名 String className = aliasAndClassName[1]; // 讀取配置的實現(xiàn)類 Class tmp; try { tmp = ClassUtils.forName(className, false); } catch (Throwable e) { if (LOGGER.isWarnEnabled()) { LOGGER.warn("Extension {} of extensible {} is disabled, cause by: {}", className, interfaceName, ExceptionUtils.toShortString(e, 2)); } if (LOGGER.isDebugEnabled()) { LOGGER.debug("Extension " + className + " of extensible " + interfaceName + " is disabled.", e); } return; } if (!interfaceClass.isAssignableFrom(tmp)) { throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName + " from file:" + url + ", " + className + " is not subtype of interface."); } Class extends T> implClass = (Class extends T>) tmp; // 檢查是否有可擴展標識 Extension extension = implClass.getAnnotation(Extension.class); if (extension == null) { throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName + " from file:" + url + ", " + className + " must add annotation @Extension."); } else { String aliasInCode = extension.value(); if (StringUtils.isBlank(aliasInCode)) { // 擴展實現(xiàn)類未配置@Extension 標簽 throw new IllegalArgumentException("Error when load extension of extensible " + interfaceClass + " from file:" + url + ", " + className + ""s alias of @Extension is blank"); } if (alias == null) { // spi文件里沒配置,用代碼里的 alias = aliasInCode; } else { // spi文件里配置的和代碼里的不一致 if (!aliasInCode.equals(alias)) { throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName + " from file:" + url + ", aliases of " + className + " are " + "not equal between " + aliasInCode + "(code) and " + alias + "(file)."); } } // 接口需要編號,實現(xiàn)類沒設置 if (extensible.coded() && extension.code() < 0) { throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName + " from file:" + url + ", code of @Extension must >=0 at " + className + "."); } } // 不可以是default和* if (StringUtils.DEFAULT.equals(alias) || StringUtils.ALL.equals(alias)) { throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName + " from file:" + url + ", alias of @Extension must not "default" and "*" at " + className + "."); } // 檢查是否有存在同名的 ExtensionClass old = all.get(alias); ExtensionClassextensionClass = null; if (old != null) { // 如果當前擴展可以覆蓋其它同名擴展 if (extension.override()) { // 如果優(yōu)先級還沒有舊的高,則忽略 if (extension.order() < old.getOrder()) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Extension of extensible {} with alias {} override from {} to {} failure, " + "cause by: order of old extension is higher", interfaceName, alias, old.getClazz(), implClass); } } else { if (LOGGER.isInfoEnabled()) { LOGGER.info("Extension of extensible {} with alias {}: {} has been override to {}", interfaceName, alias, old.getClazz(), implClass); } // 如果當前擴展可以覆蓋其它同名擴展 extensionClass = buildClass(extension, implClass, alias); } } // 如果舊擴展是可覆蓋的 else { if (old.isOverride() && old.getOrder() >= extension.order()) { // 如果已加載覆蓋擴展,再加載到原始擴展 if (LOGGER.isInfoEnabled()) { LOGGER.info("Extension of extensible {} with alias {}: {} has been loaded, ignore origin {}", interfaceName, alias, old.getClazz(), implClass); } } else { // 如果不能被覆蓋,拋出已存在異常 throw new IllegalStateException( "Error when load extension of extensible " + interfaceClass + " from file:" + url + ", Duplicate class with same alias: " + alias + ", " + old.getClazz() + " and " + implClass); } } } else { extensionClass = buildClass(extension, implClass, alias); } if (extensionClass != null) { // 檢查是否有互斥的擴展點 for (Map.Entry > entry : all.entrySet()) { ExtensionClass existed = entry.getValue(); if (extensionClass.getOrder() >= existed.getOrder()) { // 新的優(yōu)先級 >= 老的優(yōu)先級,檢查新的擴展是否排除老的擴展 String[] rejection = extensionClass.getRejection(); if (CommonUtils.isNotEmpty(rejection)) { for (String rej : rejection) { existed = all.get(rej); if (existed == null || extensionClass.getOrder() < existed.getOrder()) { continue; } ExtensionClass removed = all.remove(rej); if (removed != null) { if (LOGGER.isInfoEnabled()) { LOGGER.info( "Extension of extensible {} with alias {}: {} has been reject by new {}", interfaceName, removed.getAlias(), removed.getClazz(), implClass); } } } } } else { String[] rejection = existed.getRejection(); if (CommonUtils.isNotEmpty(rejection)) { for (String rej : rejection) { if (rej.equals(extensionClass.getAlias())) { // 被其它擴展排掉 if (LOGGER.isInfoEnabled()) { LOGGER.info( "Extension of extensible {} with alias {}: {} has been reject by old {}", interfaceName, alias, implClass, existed.getClazz()); return; } } } } } } loadSuccess(alias, extensionClass); } }
加載完文件后我們再回到
ConsumerBootstrap sofa = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class).getExtension("sofa");
進入到getExtension方法中
public ExtensionClassgetExtensionClass(String alias) { return all == null ? null : all.get(alias); } public T getExtension(String alias) { //從all屬性中拿到加載的class ExtensionClass extensionClass = getExtensionClass(alias); if (extensionClass == null) { throw new SofaRpcRuntimeException("Not found extension of " + interfaceName + " named: "" + alias + ""!"); } else { //在加載class的時候,校驗了是否是單例,如果是單例,那么factory不為null if (extensible.singleton() && factory != null) { T t = factory.get(alias); if (t == null) { synchronized (this) { t = factory.get(alias); if (t == null) { //實例化 t = extensionClass.getExtInstance(); //放入到factory,單例的class下次直接拿就好了,不需要重新創(chuàng)建 factory.put(alias, t); } } } return t; } else { //實例化 return extensionClass.getExtInstance(); } } }
我們進入到ExtensionClass看看getExtInstance方法
/** * 服務端實例對象(只在是單例的時候保留) * 用volatile修飾,保證了可見性 */ private volatile transient T instance; /** * 得到服務端實例對象,如果是單例則返回單例對象,如果不是則返回新創(chuàng)建的實例對象 * * @param argTypes 構造函數(shù)參數(shù)類型 * @param args 構造函數(shù)參數(shù)值 * @return 擴展點對象實例 ext instance */ public T getExtInstance(Class[] argTypes, Object[] args) { if (clazz != null) { try { if (singleton) { // 如果是單例 if (instance == null) { synchronized (this) { if (instance == null) { //通過反射創(chuàng)建實例 instance = ClassUtils.newInstanceWithArgs(clazz, argTypes, args); } } } return instance; // 保留單例 } else { //通過反射創(chuàng)建實例 return ClassUtils.newInstanceWithArgs(clazz, argTypes, args); } } catch (Exception e) { throw new SofaRpcRuntimeException("create " + clazz.getCanonicalName() + " instance error", e); } } throw new SofaRpcRuntimeException("Class of ExtensionClass is null"); }
看完了SOFARPC的擴展類實現(xiàn)后感覺代碼寫的非常的整潔,邏輯非常的清晰,里面有很多可以學習的地方,比如線程安全用到了雙重檢查鎖和volatile保證可見性。
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://systransis.cn/yun/76179.html
摘要:二注解該注解為了保證在內部調用具體實現(xiàn)的時候不是硬編碼來指定引用哪個實現(xiàn),也就是為了適配一個接口的多種實現(xiàn),這樣做符合模塊接口設計的可插拔原則,也增加了整個框架的靈活性,該注解也實現(xiàn)了擴展點自動裝配的特性。 Dubbo擴展機制SPI 前一篇文章《dubbo源碼解析(一)Hello,Dubbo》是對dubbo整個項目大體的介紹,而從這篇文章開始,我將會從源碼來解讀dubbo再各個模塊的實...
摘要:在螞蟻金服內部是被所有在線應用的使用的服務調用框架,截止年雙十一,已經(jīng)被螞蟻多個系統(tǒng)所使用,生產(chǎn)環(huán)境發(fā)布的服務數(shù)量超過了個。 原文地址:http://www.sohu.com/a/2288043... 我們很高興地宣布,今天螞蟻金服啟動分布式中間件(Scalable Open Financial Architecture,以下簡稱 SOFA 中間件)的開源計劃! SOFA 是螞蟻金服自...
摘要:今天我想聊聊的另一個很棒的特性就是它的可擴展性。的擴展機制在的官網(wǎng)上,描述自己是一個高性能的框架。接下來的章節(jié)中我們會慢慢揭開擴展機制的神秘面紗。擴展擴展點的實現(xiàn)類。的定義在配置文件中可以看到文件中定義了個的擴展實現(xiàn)。 摘要: 在Dubbo的官網(wǎng)上,Dubbo描述自己是一個高性能的RPC框架。今天我想聊聊Dubbo的另一個很棒的特性, 就是它的可擴展性。 Dubbo的擴展機制 在Dub...
閱讀 3325·2021-11-12 10:36
閱讀 2483·2021-11-02 14:43
閱讀 2156·2019-08-30 14:23
閱讀 3471·2019-08-30 13:08
閱讀 928·2019-08-28 18:09
閱讀 3141·2019-08-26 12:22
閱讀 3155·2019-08-23 18:24
閱讀 2025·2019-08-23 18:17