摘要:層根據(jù)不同的目錄可以有服務(wù)提供者服務(wù)消費(fèi)者路由規(guī)則配置規(guī)則。通過這樣的方式,可以處理類似服務(wù)提供者為空的情況。
注冊中心——zookeeper
目標(biāo):解釋以為zookeeper實(shí)現(xiàn)的注冊中心原理,解讀duubo-registry-zookeeper的源碼
這篇文章是講解注冊中心的最后一篇文章。這篇文章講的是dubbo的注冊中心用zookeeper來實(shí)現(xiàn)。這種實(shí)現(xiàn)注冊中心的方法也是dubbo推薦的方法。為了能更加理解zookeeper在dubbo中的應(yīng)用,接下來我先簡單的介紹一下zookeeper。
因?yàn)閐ubbo是一個分布式的RPC開源框架,各個服務(wù)之間多帶帶部署,就會出現(xiàn)資源之間不一致的問題。而zookeeper就有保證分布式一致性的特性。ZooKeeper是一種為分布式應(yīng)用所設(shè)計的高可用、高性能且一致的開源協(xié)調(diào)服務(wù)。關(guān)于dubbo為什么會推薦使用zookeeper作為它的注冊中心實(shí)現(xiàn),有很多書籍以及博客講解了zookeeper的特性以及優(yōu)勢,這不是本章的重點(diǎn),我要講的是zookeeper的數(shù)據(jù)結(jié)構(gòu),dubbo服務(wù)是如何被zookeeper的數(shù)據(jù)結(jié)構(gòu)存儲管理的,因?yàn)檫@影響到下面源碼的解讀。zookeeper采用的是樹形結(jié)構(gòu)來組織數(shù)據(jù)節(jié)點(diǎn),它類似于一個標(biāo)準(zhǔn)的文件系統(tǒng)。先來看看下面這張圖:
該圖是官方文檔里面的一張圖,展示了dubbo在zookeeper中存儲的形式以及節(jié)點(diǎn)層級,
dubbo的Root層是根目錄,通過
Service層是服務(wù)接口的全名。
Type層是分類,一共有四種分類,分別是providers(服務(wù)提供者列表)、consumers(服務(wù)消費(fèi)者列表)、routes(路由規(guī)則列表)、configurations(配置規(guī)則列表)。
URL層:根據(jù)不同的Type目錄:可以有服務(wù)提供者 URL 、服務(wù)消費(fèi)者 URL 、路由規(guī)則 URL 、配置規(guī)則 URL 。不同的Type關(guān)注的URL不同。
zookeeper以每個斜杠來分割每一層的znode,比如第一層根節(jié)點(diǎn)dubbo就是“/dubbo”,而第二層的Service層就是/com.foo.Barservice,zookeeper的每個節(jié)點(diǎn)通過路徑來表示以及訪問,例如服務(wù)提供者啟動時,向/dubbo/com.foo.Barservice/providers目錄下寫入自己的URL地址。關(guān)于流程調(diào)用說明,見官方文檔:
文檔地址:http://dubbo.apache.org/zh-cn...
了解了dubbo在zookeeper中的節(jié)點(diǎn)層級,就可以看相關(guān)的源碼了,下圖是包的結(jié)構(gòu):
跟前面三種實(shí)現(xiàn)方式一樣的目錄,也就兩個類,看起來非常的舒服,接下來就來解析這兩個類。
(一)ZookeeperRegistry該類繼承了FailbackRegistry類,該類就是針對注冊中心核心的功能注冊、訂閱、取消注冊、取消訂閱,查詢注冊列表進(jìn)行展開,基于zookeeper來實(shí)現(xiàn)。
1.屬性// 日志記錄 private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class); // 默認(rèn)的zookeeper端口 private final static int DEFAULT_ZOOKEEPER_PORT = 2181; // 默認(rèn)zookeeper根節(jié)點(diǎn) private final static String DEFAULT_ROOT = "dubbo"; // zookeeper根節(jié)點(diǎn) private final String root; // 服務(wù)接口集合 private final SetanyServices = new ConcurrentHashSet (); // 監(jiān)聽器集合 private final ConcurrentMap > zkListeners = new ConcurrentHashMap >(); // zookeeper客戶端實(shí)例 private final ZookeeperClient zkClient;
其實(shí)你會發(fā)現(xiàn)zookeeper雖然是最被推薦的,反而它的實(shí)現(xiàn)邏輯相對簡單,因?yàn)檎{(diào)用了zookeeper服務(wù)組件,很多的邏輯不需要在dubbo中自己去實(shí)現(xiàn)。上面的屬性介紹也很簡單,不需要多說,更多的是調(diào)用zookeeper客戶端。
2.構(gòu)造方法public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } // 獲得url攜帶的分組配置,并且作為zookeeper的根節(jié)點(diǎn) String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } this.root = group; // 創(chuàng)建zookeeper client zkClient = zookeeperTransporter.connect(url); // 添加狀態(tài)監(jiān)聽器,當(dāng)狀態(tài)為重連的時候調(diào)用恢復(fù)方法 zkClient.addStateListener(new StateListener() { @Override public void stateChanged(int state) { if (state == RECONNECTED) { try { // 恢復(fù) recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); }
這里有以下幾個關(guān)注點(diǎn):
參數(shù)中ZookeeperTransporter是一個接口,并且在dubbo中有ZkclientZookeeperTransporter和CuratorZookeeperTransporter兩個實(shí)現(xiàn)類,ZookeeperTransporter還是一個可擴(kuò)展的接口,基于 Dubbo SPI Adaptive 機(jī)制,會根據(jù)url中攜帶的參數(shù)去選擇用哪個實(shí)現(xiàn)類。
上面我說明了dubbo在zookeeper節(jié)點(diǎn)層級有一層是root層,該層是通過group屬性來設(shè)置的。
給客戶端添加一個監(jiān)聽器,當(dāng)狀態(tài)為重連的時候調(diào)用FailbackRegistry的恢復(fù)方法
3.appendDefaultPortstatic String appendDefaultPort(String address) { if (address != null && address.length() > 0) { int i = address.indexOf(":"); // 如果地址本身沒有端口,則使用默認(rèn)端口2181 if (i < 0) { return address + ":" + DEFAULT_ZOOKEEPER_PORT; } else if (Integer.parseInt(address.substring(i + 1)) == 0) { return address.substring(0, i + 1) + DEFAULT_ZOOKEEPER_PORT; } } return address; }
該方法是拼接使用默認(rèn)的zookeeper端口,就是方地址本身沒有端口的時候才使用默認(rèn)端口。
4.isAvailable && destroy@Override public boolean isAvailable() { return zkClient.isConnected(); } @Override public void destroy() { super.destroy(); try { zkClient.close(); } catch (Exception e) { logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e); } }
這里兩個方法分別是檢測zookeeper是否連接以及銷毀連接,很簡單,都是調(diào)用了zookeeper客戶端封裝好的方法。
5.doRegister && doUnregister@Override protected void doRegister(URL url) { try { // 創(chuàng)建URL節(jié)點(diǎn),也就是URL層的節(jié)點(diǎn) zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } @Override protected void doUnregister(URL url) { try { // 刪除節(jié)點(diǎn) zkClient.delete(toUrlPath(url)); } catch (Throwable e) { throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
這兩個方法分別是注冊和取消注冊,也很簡單,調(diào)用都是客戶端create和delete方法,一個是創(chuàng)建一個節(jié)點(diǎn),另一個是刪除節(jié)點(diǎn),該操作都在URL層。
6.doSubscribe@Override protected void doSubscribe(final URL url, final NotifyListener listener) { try { // 處理所有Service層發(fā)起的訂閱,例如監(jiān)控中心的訂閱 if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { // 獲得根目錄 String root = toRootPath(); // 獲得url對應(yīng)的監(jiān)聽器集合 ConcurrentMaplisteners = zkListeners.get(url); // 不存在就創(chuàng)建監(jiān)聽器集合 if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap ()); listeners = zkListeners.get(url); } // 獲得節(jié)點(diǎn)監(jiān)聽器 ChildListener zkListener = listeners.get(listener); // 如果該節(jié)點(diǎn)監(jiān)聽器為空,則創(chuàng)建 if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { @Override public void childChanged(String parentPath, List currentChilds) { // 遍歷現(xiàn)有的節(jié)點(diǎn),如果現(xiàn)有的服務(wù)集合中沒有該節(jié)點(diǎn),則加入該節(jié)點(diǎn),然后訂閱該節(jié)點(diǎn) for (String child : currentChilds) { // 解碼 child = URL.decode(child); if (!anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } }); // 重新獲取,為了保證一致性 zkListener = listeners.get(listener); } // 創(chuàng)建service節(jié)點(diǎn),該節(jié)點(diǎn)為持久節(jié)點(diǎn) zkClient.create(root, false); // 向zookeeper的service節(jié)點(diǎn)發(fā)起訂閱,獲得Service接口全名數(shù)組 List services = zkClient.addChildListener(root, zkListener); if (services != null && !services.isEmpty()) { // 遍歷Service接口全名數(shù)組 for (String service : services) { service = URL.decode(service); anyServices.add(service); // 發(fā)起該service層的訂閱 subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } else { // 處理指定 Service 層的發(fā)起訂閱,例如服務(wù)消費(fèi)者的訂閱 List urls = new ArrayList (); // 遍歷分類數(shù)組 for (String path : toCategoriesPath(url)) { // 獲得監(jiān)聽器集合 ConcurrentMap listeners = zkListeners.get(url); // 如果沒有則創(chuàng)建 if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap ()); listeners = zkListeners.get(url); } // 獲得節(jié)點(diǎn)監(jiān)聽器 ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { @Override public void childChanged(String parentPath, List currentChilds) { // 通知服務(wù)變化 回調(diào)NotifyListener ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); } }); // 重新獲取節(jié)點(diǎn)監(jiān)聽器,保證一致性 zkListener = listeners.get(listener); } // 創(chuàng)建type節(jié)點(diǎn),該節(jié)點(diǎn)為持久節(jié)點(diǎn) zkClient.create(path, false); // 向zookeeper的type節(jié)點(diǎn)發(fā)起訂閱 List children = zkClient.addChildListener(path, zkListener); if (children != null) { // 加入到自子節(jié)點(diǎn)數(shù)據(jù)數(shù)組 urls.addAll(toUrlsWithEmpty(url, path, children)); } } // 通知數(shù)據(jù)變化 notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
這個方法是訂閱,邏輯實(shí)現(xiàn)比較多,可以分兩段來看,這里的實(shí)現(xiàn)把所有Service層發(fā)起的訂閱以及指定的Service層發(fā)起的訂閱分開處理。所有Service層類似于監(jiān)控中心發(fā)起的訂閱。指定的Service層發(fā)起的訂閱可以看作是服務(wù)消費(fèi)者的訂閱。訂閱的大致邏輯類似,不過還是有幾個區(qū)別:
所有Service層發(fā)起的訂閱中的ChildListener是在在 Service 層發(fā)生變更時,才會做出解碼,用anyServices屬性判斷是否是新增的服務(wù),最后調(diào)用父類的subscribe訂閱。而指定的Service層發(fā)起的訂閱是在URL層發(fā)生變更的時候,調(diào)用notify,回調(diào)回調(diào)NotifyListener的邏輯,做到通知服務(wù)變更。
所有Service層發(fā)起的訂閱中客戶端創(chuàng)建的節(jié)點(diǎn)是Service節(jié)點(diǎn),該節(jié)點(diǎn)為持久節(jié)點(diǎn),而指定的Service層發(fā)起的訂閱中創(chuàng)建的節(jié)點(diǎn)是Type節(jié)點(diǎn),該節(jié)點(diǎn)也是持久節(jié)點(diǎn)。這里補(bǔ)充一下zookeeper的持久節(jié)點(diǎn)是節(jié)點(diǎn)創(chuàng)建后,就一直存在,直到有刪除操作來主動清除這個節(jié)點(diǎn),不會因?yàn)閯?chuàng)建該節(jié)點(diǎn)的客戶端會話失效而消失。而臨時節(jié)點(diǎn)的生命周期和客戶端會話綁定。也就是說,如果客戶端會話失效,那么這個節(jié)點(diǎn)就會自動被清除掉。注意,這里提到的是會話失效,而非連接斷開。另外,在臨時節(jié)點(diǎn)下面不能創(chuàng)建子節(jié)點(diǎn)。
指定的Service層發(fā)起的訂閱中調(diào)用了兩次notify,第一次是增量的通知,也就是只是通知這次增加的服務(wù)節(jié)點(diǎn),而第二個是全量的通知。
7.doUnsubscribe@Override protected void doUnsubscribe(URL url, NotifyListener listener) { // 獲得監(jiān)聽器集合 ConcurrentMaplisteners = zkListeners.get(url); if (listeners != null) { // 獲得子節(jié)點(diǎn)的監(jiān)聽器 ChildListener zkListener = listeners.get(listener); if (zkListener != null) { // 如果為全部的服務(wù)接口,例如監(jiān)控中心 if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { // 獲得根目錄 String root = toRootPath(); // 移除監(jiān)聽器 zkClient.removeChildListener(root, zkListener); } else { // 遍歷分類數(shù)組進(jìn)行移除監(jiān)聽器 for (String path : toCategoriesPath(url)) { zkClient.removeChildListener(path, zkListener); } } } } }
該方法是取消訂閱,也是分為兩種情況,所有的Service發(fā)起的取消訂閱還是指定的Service發(fā)起的取消訂閱??梢钥吹剿械腟ervice發(fā)起的取消訂閱就直接移除了根目錄下所有的監(jiān)聽器,而指定的Service發(fā)起的取消訂閱是移除了該Service層下面的所有Type節(jié)點(diǎn)監(jiān)聽器。如果不太明白再回去看看前面的那個節(jié)點(diǎn)層級圖。
8.lookup@Override public Listlookup(URL url) { if (url == null) { throw new IllegalArgumentException("lookup url == null"); } try { List providers = new ArrayList (); // 遍歷分組類別 for (String path : toCategoriesPath(url)) { // 獲得子節(jié)點(diǎn) List children = zkClient.getChildren(path); if (children != null) { providers.addAll(children); } } // 獲得 providers 中,和 consumer 匹配的 URL 數(shù)組 return toUrlsWithoutEmpty(url, providers); } catch (Throwable e) { throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
該方法就是查詢符合條件的已經(jīng)注冊的服務(wù)。調(diào)用了toUrlsWithoutEmpty方法,在后面會講到。
9.toServicePathprivate String toServicePath(URL url) { String name = url.getServiceInterface(); // 如果是包括所有服務(wù),則返回根節(jié)點(diǎn) if (Constants.ANY_VALUE.equals(name)) { return toRootPath(); } return toRootDir() + URL.encode(name); }
該方法是獲得服務(wù)路徑,拼接規(guī)則:Root + Type。
10.toCategoriesPathprivate String[] toCategoriesPath(URL url) { String[] categories; // 如果url攜帶的分類配置為*,則創(chuàng)建包括所有分類的數(shù)組 if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) { categories = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY, Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY}; } else { // 返回url攜帶的分類配置 categories = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY}); } String[] paths = new String[categories.length]; for (int i = 0; i < categories.length; i++) { // 加上服務(wù)路徑 paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categories[i]; } return paths; } private String toCategoryPath(URL url) { return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); }
第一個方法是獲得分類數(shù)組,也就是url攜帶的服務(wù)下的所有Type節(jié)點(diǎn)數(shù)組。第二個是獲得分類路徑,分類路徑拼接規(guī)則:Root + Service + Type
11.toUrlPathprivate String toUrlPath(URL url) { return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString()); }
該方法是獲得URL路徑,拼接規(guī)則是Root + Service + Type + URL
12.toUrlsWithoutEmpty && toUrlsWithEmptyprivate ListtoUrlsWithoutEmpty(URL consumer, List providers) { List urls = new ArrayList (); if (providers != null && !providers.isEmpty()) { // 遍歷服務(wù)提供者 for (String provider : providers) { // 解碼 provider = URL.decode(provider); if (provider.contains("://")) { // 把服務(wù)轉(zhuǎn)化成url的形式 URL url = URL.valueOf(provider); // 判斷是否匹配,如果匹配, 則加入到集合中 if (UrlUtils.isMatch(consumer, url)) { urls.add(url); } } } } return urls; } private List toUrlsWithEmpty(URL consumer, String path, List providers) { // 返回和服務(wù)消費(fèi)者匹配的服務(wù)提供者url List urls = toUrlsWithoutEmpty(consumer, providers); // 如果不存在,則創(chuàng)建`empty://` 的 URL返回 if (urls == null || urls.isEmpty()) { int i = path.lastIndexOf("/"); String category = i < 0 ? path : path.substring(i + 1); URL empty = consumer.setProtocol(Constants.EMPTY_PROTOCOL).addParameter(Constants.CATEGORY_KEY, category); urls.add(empty); } return urls; }
第一個toUrlsWithoutEmpty方法是獲得 providers 中,和 consumer 匹配的 URL 數(shù)組,第二個toUrlsWithEmpty方法是調(diào)用了第一個方法后增加了若不存在匹配,則創(chuàng)建 empty:// 的 URL返回。通過這樣的方式,可以處理類似服務(wù)提供者為空的情況。
(二)ZookeeperRegistryFactory該類繼承了AbstractRegistryFactory類,實(shí)現(xiàn)了AbstractRegistryFactory抽象出來的createRegistry方法,看一下原代碼:
public class ZookeeperRegistryFactory extends AbstractRegistryFactory { private ZookeeperTransporter zookeeperTransporter; public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) { this.zookeeperTransporter = zookeeperTransporter; } @Override public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); } }
可以看到就是實(shí)例化了ZookeeperRegistry而已,所有這里就不解釋了。
后記該部分相關(guān)的源碼解析地址:https://github.com/CrazyHZM/i...
該文章講解了dubbo利用zookeeper來實(shí)現(xiàn)注冊中心,其中關(guān)鍵的是需要弄明白dubbo在zookeeper中存儲的節(jié)點(diǎn)層級意義,也就是root層、service層、type層以及url層分別代表什么,其他的邏輯并不復(fù)雜大多數(shù)調(diào)用了zookeeper客戶端的能力,有興趣的同學(xué)也可以深入的去了解zookeeper。如果我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見,我的私人微信號碼:HUA799695226。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/72402.html
摘要:英文全名為,也叫遠(yuǎn)程過程調(diào)用,其實(shí)就是一個計算機(jī)通信協(xié)議,它是一種通過網(wǎng)絡(luò)從遠(yuǎn)程計算機(jī)程序上請求服務(wù)而不需要了解底層網(wǎng)絡(luò)技術(shù)的協(xié)議。 Hello,Dubbo 你好,dubbo,初次見面,我想和你交個朋友。 Dubbo你到底是什么? 先給出一套官方的說法:Apache Dubbo是一款高性能、輕量級基于Java的RPC開源框架。 那么什么是RPC? 文檔地址:http://dubbo.a...
摘要:服務(wù)暴露過程目標(biāo)從源碼的角度分析服務(wù)暴露過程。導(dǎo)出服務(wù),包含暴露服務(wù)到本地,和暴露服務(wù)到遠(yuǎn)程兩個過程。其中服務(wù)暴露的第八步已經(jīng)沒有了。將泛化調(diào)用版本號或者等信息加入獲得服務(wù)暴露地址和端口號,利用內(nèi)數(shù)據(jù)組裝成。 dubbo服務(wù)暴露過程 目標(biāo):從源碼的角度分析服務(wù)暴露過程。 前言 本來這一篇一個寫異步化改造的內(nèi)容,但是最近我一直在想,某一部分的優(yōu)化改造該怎么去撰寫才能更加的讓讀者理解。我覺...
摘要:大揭秘目標(biāo)了解的新特性,以及版本升級的引導(dǎo)。四元數(shù)據(jù)改造我們知道以前的版本只有注冊中心,注冊中心的有數(shù)十個的鍵值對,包含了一個服務(wù)所有的元數(shù)據(jù)。 DUBBO——2.7大揭秘 目標(biāo):了解2.7的新特性,以及版本升級的引導(dǎo)。 前言 我們知道Dubbo在2011年開源,停止更新了一段時間。在2017 年 9 月 7 日,Dubbo 悄悄的在 GitHub 發(fā)布了 2.5.4 版本。隨后,版本...
摘要:面試題服務(wù)提供者能實(shí)現(xiàn)失效踢出是什么原理高頻題服務(wù)宕機(jī)的時候,該節(jié)點(diǎn)由于是持久節(jié)點(diǎn)會永遠(yuǎn)存在,而且當(dāng)服務(wù)再次重啟的時候會將重新注冊一個新節(jié)點(diǎn)。 Dubbo 2.7 版本增加新特性,新系統(tǒng)開始使用 Dubbo 2.7.1 嘗鮮新功能。使用過程中不慎踩到這個版本的 Bug。 系統(tǒng)架構(gòu) Spring Boot 2.14-Release + Dubbo 2.7.1 現(xiàn)象 Dubbo 服務(wù)者啟動...
閱讀 1215·2021-11-10 11:35
閱讀 2953·2021-09-24 10:35
閱讀 2981·2021-09-22 15:38
閱讀 2820·2019-08-30 15:43
閱讀 1355·2019-08-29 18:39
閱讀 2607·2019-08-29 15:22
閱讀 2807·2019-08-28 18:17
閱讀 623·2019-08-26 13:37