摘要:三分布式鎖這部分是重要功能,在此基礎(chǔ)上實(shí)現(xiàn)諸如,分布式協(xié)調(diào)通知,負(fù)載均衡,選舉等復(fù)雜場景。針對(duì)此情況,改進(jìn)后判斷讀寫順序?yàn)閯?chuàng)建完臨時(shí)順序節(jié)點(diǎn)后,獲取下的所有子節(jié)點(diǎn)。
注:該文章用作回顧記錄
一、準(zhǔn)備工作預(yù)先下載安裝 ZooKeeper ,簡單配置就能使用了。然后構(gòu)建 Maven 項(xiàng)目,將下面的代碼粘貼到 pom.xml中:
org.apache.zookeeper zookeeper 3.4.5 com.101tec zkclient 0.5
zkclient 是開源的客戶端工具,其中封裝了很多功能,比如:刪除包含子節(jié)點(diǎn)的父節(jié)點(diǎn),連接重試,異步回調(diào),偏向 Java 寫法的注冊監(jiān)聽等,極大地方便了用戶使用。
下面不過多介紹客戶端操作,只針對(duì)應(yīng)用場景做介紹,該文章會(huì)隨著本人的學(xué)習(xí)持續(xù)補(bǔ)充。
二、數(shù)據(jù)發(fā)布/訂閱使用 ZooKeeper 節(jié)點(diǎn)監(jiān)聽來實(shí)現(xiàn)該功能:
ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000); // 連接集群 zkClient.createPersistent("/xxx/xxx"); // 創(chuàng)建持久節(jié)點(diǎn) // 注冊子節(jié)點(diǎn)變更監(jiān)聽,當(dāng)子節(jié)點(diǎn)改變(比如創(chuàng)建了"/xxx/xxx/1")或當(dāng)前節(jié)點(diǎn)刪除等,會(huì)觸發(fā)異步回調(diào) zkClient.subscribeChildChanges("/xxx/xxx", new IZkChildListener() { @Override public void handleChildChange(String parentPath, ListcurrentChilds) throws Exception { } });
下面為部分源碼:
package org.I0Itec.zkclient; public class ZkClient implements Watcher { public ListwatchForChilds(final String path) { return retryUntilConnected(new Callable >() { @Override public List
call() throws Exception { exists(path, true); try { return getChildren(path, true); } catch (ZkNoNodeException e) { } return null; } }); } public T retryUntilConnected(Callable callable) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { final long operationStartTime = System.currentTimeMillis(); while (true) { if (_closed) { throw new IllegalStateException("ZkClient already closed!"); } try { return callable.call(); } catch (Exception e) { throw ExceptionUtil.convertToRuntimeException(e); } } } }
基于 ZooKeeper 實(shí)現(xiàn)的數(shù)據(jù)發(fā)布/訂閱很簡單吧,快動(dòng)手試試。
三、分布式鎖這部分是 ZooKeeper 重要功能,在此基礎(chǔ)上實(shí)現(xiàn)諸如,分布式協(xié)調(diào)/通知,負(fù)載均衡,Master選舉等復(fù)雜場景。
1、排它鎖排它鎖又稱為寫鎖或獨(dú)占鎖。比如事務(wù) T1 對(duì)數(shù)據(jù)對(duì)象 O1 加了排它鎖,那么在整個(gè)加鎖期間,只允許 T1 對(duì) O1 進(jìn)行讀取或更新操作,其它事務(wù)都不能對(duì) O1 操作。
1)獲取鎖
所有客戶端都創(chuàng)建臨時(shí)節(jié)點(diǎn) zkClient.createEphemeral("/xxx/xxx", null);,ZooKeeper 會(huì)保證在所有客戶端中,最終只有一個(gè)客戶端能創(chuàng)建成功,那么就認(rèn)為該客戶端獲取了鎖。同時(shí),所有沒獲取到鎖的客戶端需在/xxx/xxx 上注冊子節(jié)點(diǎn)變更監(jiān)聽,以便實(shí)時(shí)監(jiān)聽節(jié)點(diǎn)變化。如節(jié)點(diǎn)發(fā)生變化,則未獲取到鎖的客戶端再重新獲取鎖。
private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000); private static final String lockParentPath = "/zk-book/exclusice_lock"; public static void main(String[] args) throws InterruptedException { try { zkClient.createEphemeral(lockParentPath + "/lock"); System.out.println("service3 獲取鎖成功"); } catch (Exception e) { System.out.println("service3獲取鎖失敗"); zkClient.subscribeChildChanges(lockParentPath, new IZkChildListener() { @Override public void handleChildChange(String parentPath, ListcurrentChilds) throws Exception { System.out.println("service3再次獲取鎖"); main(null); } }); } Thread.sleep(Integer.MAX_VALUE); }
2)釋放鎖
當(dāng) "/xxx/xxx" 是臨時(shí)節(jié)點(diǎn)時(shí),以下倆種情況都會(huì)釋放鎖。
當(dāng)已獲取鎖的客戶機(jī)宕機(jī),導(dǎo)致連接超時(shí)斷開,那么 ZooKeeper 會(huì)將臨時(shí)節(jié)點(diǎn)刪除。
正常執(zhí)行完邏輯后,客戶端主動(dòng)將臨時(shí)節(jié)點(diǎn)刪除。
2、共享鎖共享鎖又稱為讀鎖。如果事務(wù) T1 對(duì)數(shù)據(jù)對(duì)象 O1 加了共享鎖,那么 T1 只能對(duì) O1 進(jìn)行讀取操作,其它事務(wù)只能對(duì) O1 加共享鎖,直到 O1 上所有共享鎖都被釋放。
1)獲取鎖
所有客戶端都創(chuàng)建臨時(shí)順序節(jié)點(diǎn) zkClient.createEphemeralSequential("/xxx/xxx", null);,ZooKeeper 會(huì)生成類似下面的節(jié)點(diǎn),已保證節(jié)點(diǎn)的唯一性。
2)判斷讀寫順序
創(chuàng)建完臨時(shí)順序節(jié)點(diǎn)后,獲取 "/xxx" 下的所有子節(jié)點(diǎn),并對(duì)該節(jié)點(diǎn)注冊子節(jié)點(diǎn)變更監(jiān)聽。
確定創(chuàng)建完的臨時(shí)順序節(jié)點(diǎn)在所有節(jié)點(diǎn)中的順序。
對(duì)于讀節(jié)點(diǎn):
沒有比自己序號(hào)小的節(jié)點(diǎn),或比自己序號(hào)小的節(jié)點(diǎn)都是讀節(jié)點(diǎn),則成功獲取到共享鎖。
如果比自己序號(hào)小的節(jié)點(diǎn)中存在寫節(jié)點(diǎn),則需進(jìn)入等待。
對(duì)于寫節(jié)點(diǎn):
如果自己不是序號(hào)最小的節(jié)點(diǎn),則需進(jìn)入等待。
接受到子節(jié)點(diǎn)變更通知后,重復(fù)步驟1
以下為實(shí)現(xiàn)代碼:
import java.io.IOException; import java.util.Collections; import java.util.Comparator; import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; import org.apache.http.client.ClientProtocolException; /** * 分布式共享鎖 * @author alexnevsky * @date 2018年5月23日 */ public class SharedLock { private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000); private static final String PARENT_PATH = "/zk-book/shared_lock"; private static volatile boolean isExecuted = false; public static void main(String[] args) throws InterruptedException, ClientProtocolException, IOException { String nodeTemp = zkClient.createEphemeralSequential(PARENT_PATH + "/w-", null); String node = nodeTemp.substring(nodeTemp.lastIndexOf("/") + 1); ListcurrentChilds = sortNodes(zkClient.getChildren(PARENT_PATH)); if (currentChilds.size() > 0) isExecuted = getLockAndExecute(currentChilds, node); zkClient.subscribeChildChanges(PARENT_PATH, new IZkChildListener() { @Override public void handleChildChange(String parentPath, List currentChilds) throws Exception { if (currentChilds.size() > 0) { currentChilds = sortNodes(currentChilds); isExecuted = getLockAndExecute(currentChilds, node); } } }); while (!isExecuted) { Thread.sleep(Integer.MAX_VALUE); } } /** * 排序節(jié)點(diǎn) * @author alexnevsky * @date 2018年5月24日 * @param nodes * @return */ private static List sortNodes(List nodes) { Collections.sort(nodes, new Comparator () { @Override public int compare(String o1, String o2) { o1 = o1.indexOf("r-") > -1 ? o1.replaceFirst("r-", "") : o1.replaceFirst("w-", ""); o2 = o2.indexOf("r-") > -1 ? o2.replaceFirst("r-", "") : o2.replaceFirst("w-", ""); return Integer.parseInt(o1) - Integer.parseInt(o2); // 比較序列號(hào) } }); return nodes; } /** * 獲取節(jié)點(diǎn)位置 * @author alexnevsky * @date 2018年5月24日 * @param nodes * @param node * @return */ private static Integer getNodePosition(List nodes, String node) { for (int i = 0, size = nodes.size(); i < size; i++) { if (nodes.get(i).equals(node)) return i; } return null; // 無此數(shù)據(jù) } /** * 是否得到鎖 * @author alexnevsky * @date 2018年5月24日 * @param nodes * @param node * @param nodePosition * @return */ private static boolean isGetLock(List nodes, String node, int nodePosition) { if (nodePosition == 0) // 沒有比此序號(hào)更小的節(jié)點(diǎn) return true; if (node.indexOf("r-") > -1) { // 讀節(jié)點(diǎn) for (int i = 0; i < nodePosition; i++) { // 遍歷小于次序號(hào)的節(jié)點(diǎn) String nodeTemp = nodes.get(i); if (nodeTemp.indexOf("w-") > -1) // 存在寫節(jié)點(diǎn),則進(jìn)入等待鎖 return false; } return true; } return false; } /** * 獲取鎖并執(zhí)行 * @author alexnevsky * @date 2018年5月24日 * @param currentChilds * @param node * @return */ private static boolean getLockAndExecute(List currentChilds, String node) { Integer nodePosition = getNodePosition(currentChilds, node); if (nodePosition == null) // 子節(jié)點(diǎn)為空 return false; System.out.println("子節(jié)點(diǎn):" + currentChilds.toString() + ", " + node + " 的位置:" + nodePosition); boolean isGetLock = isGetLock(currentChilds, node, nodePosition); if (isGetLock) { System.out.println(node + " 成功獲取到鎖,開始執(zhí)行耗時(shí)任務(wù)"); doSomething(); boolean isSuccess = zkClient.delete(PARENT_PATH + "/" + node); if (isSuccess) System.out.println(node + " 成功執(zhí)行完任務(wù)并刪除節(jié)點(diǎn)"); } else { System.out.println(node + " 未獲取到鎖"); } return isGetLock; } private static void doSomething() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }
測試以上代碼會(huì)發(fā)現(xiàn),當(dāng)獲取鎖的節(jié)點(diǎn)過多時(shí),某一節(jié)點(diǎn)變更會(huì)通知所有節(jié)點(diǎn),會(huì)對(duì) ZooKeeper 服務(wù)器造成巨大的性能影響和網(wǎng)絡(luò)沖擊,服務(wù)器會(huì)發(fā)送給客戶端大量的事件通知。比如有以下節(jié)點(diǎn),當(dāng) w-24 節(jié)點(diǎn)變更時(shí),會(huì)通知給其余節(jié)點(diǎn)。
因?yàn)楫?dāng)獲取共享鎖時(shí),要判斷比自己序號(hào)小的節(jié)點(diǎn),所以應(yīng)該只給 r-25 節(jié)點(diǎn)發(fā)送通知。針對(duì)此情況,改進(jìn)后判斷讀寫順序?yàn)椋?/p>
創(chuàng)建完臨時(shí)順序節(jié)點(diǎn)后,獲取 "/xxx" 下的所有子節(jié)點(diǎn)。
客戶端調(diào)用 getChildren() 來獲取子節(jié)點(diǎn)列表,注意,這里不注冊任何監(jiān)聽。
如果未獲取到共享鎖,那么找到比自己序號(hào)小的節(jié)點(diǎn)來注冊監(jiān)聽,分為以下倆種情況:
讀節(jié)點(diǎn):比自己序號(hào)小的最后一個(gè)寫節(jié)點(diǎn)注冊監(jiān)聽
寫節(jié)點(diǎn):比自己序號(hào)小的最后一個(gè)節(jié)點(diǎn)注冊監(jiān)聽
等待監(jiān)聽通知,重復(fù)步驟2
改進(jìn)后的共享鎖代碼實(shí)現(xiàn):
import java.io.IOException; import java.util.Collections; import java.util.Comparator; import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; import org.apache.http.client.ClientProtocolException; /** * 分布式共享鎖最優(yōu) * @author alexnevsky * @date 2018年5月23日 */ public class SharedLockOptimal { private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000); private static final String PARENT_PATH = "/zk-book/shared_lock"; private static String nodeFullPath = zkClient.createEphemeralSequential(PARENT_PATH + "/r-", null); public static void main(String[] args) throws InterruptedException, ClientProtocolException, IOException { ListcurrentChilds = sortNodes(zkClient.getChildren(PARENT_PATH)); String node = nodeFullPath.substring(nodeFullPath.lastIndexOf("/") + 1); boolean isReadNode = node.indexOf("r-") > -1 ? true : false, isGetLock = getLock(currentChilds, node); System.out.println("當(dāng)前所有節(jié)點(diǎn):" + currentChilds.toString() + ", 該" + (isReadNode ? "讀" : "寫") + "節(jié)點(diǎn):" + node); if (isGetLock) { execute(node); System.out.println("退出程序"); System.exit(1); } else { String monitorNode = getMonitorNode(currentChilds, node); System.out.println(node + " 未獲取到鎖,注冊監(jiān)聽節(jié)點(diǎn):" + monitorNode); if (null != monitorNode) { zkClient.subscribeChildChanges(PARENT_PATH + "/" + monitorNode, new IZkChildListener() { @Override public void handleChildChange(String parentPath, List currentChilds) throws Exception { main(null); // 遞歸調(diào)用 } }); } Thread.sleep(Integer.MAX_VALUE); } } /** * 排序節(jié)點(diǎn) * @author alexnevsky * @date 2018年5月24日 * @param nodes * @return */ private static List sortNodes(List nodes) { Collections.sort(nodes, new Comparator () { @Override public int compare(String o1, String o2) { o1 = o1.indexOf("r-") > -1 ? o1.replaceFirst("r-", "") : o1.replaceFirst("w-", ""); o2 = o2.indexOf("r-") > -1 ? o2.replaceFirst("r-", "") : o2.replaceFirst("w-", ""); return Integer.parseInt(o1) - Integer.parseInt(o2); // 比較序列號(hào) } }); return nodes; } /** * 獲取節(jié)點(diǎn)位置 * @author alexnevsky * @date 2018年5月24日 * @param currentChilds * @param node * @return */ private static Integer getNodePosition(List currentChilds, String node) { for (int i = 0, size = currentChilds.size(); i < size; i++) { if (currentChilds.get(i).equals(node)) return i; } return null; } /** * 獲取監(jiān)聽節(jié)點(diǎn) * @author alexnevsky * @date 2018年5月25日 * @param currentChilds * @param node * @return */ private static String getMonitorNode(List currentChilds, String node) { String monitorNode = null; Integer nodePosition = getNodePosition(currentChilds, node); if (0 < nodePosition) { // 非首節(jié)點(diǎn) if (node.indexOf("r-") > -1) { // 讀節(jié)點(diǎn) // 獲取比當(dāng)前序號(hào)小的最后一個(gè)寫節(jié)點(diǎn) for (int i = nodePosition - 1; i >= 0; i--) { String tempNode = currentChilds.get(i); if (tempNode.indexOf("w-") > -1) return tempNode; } } else { // 獲取比當(dāng)前序號(hào)小的最后一個(gè)節(jié)點(diǎn) return currentChilds.get(nodePosition - 1); } } return monitorNode; } /** * 獲取鎖 * @author alexnevsky * @date 2018年5月24日 * @param currentChilds * @param node * @return */ private static boolean getLock(List currentChilds, String node) { Integer nodePosition = getNodePosition(currentChilds, node); if (nodePosition == null) return false; if (nodePosition == 0) // 無序號(hào)更小的節(jié)點(diǎn) return true; if (node.indexOf("r-") > -1) { // 讀節(jié)點(diǎn) for (int i = 0; i < nodePosition; i++) { // 遍歷前面序號(hào)的節(jié)點(diǎn) String tempNode = currentChilds.get(i); if (tempNode.indexOf("w-") > -1) // 存在寫節(jié)點(diǎn),返回失敗 return false; } return true; } return false; } /** * 執(zhí)行 * @author alexnevsky * @date 2018年5月24日 * @param node * @return */ private static void execute(String node) { System.out.println(node + " 成功獲取到鎖,開始執(zhí)行耗時(shí)任務(wù)"); doSomething(); boolean isDeletedLock = zkClient.delete(nodeFullPath); System.out.println(node + " 成功執(zhí)行完任務(wù),刪除節(jié)點(diǎn)" + (isDeletedLock ? "成功" : "失敗")); } /** * 模擬耗時(shí)任務(wù) * @author alexnevsky * @date 2018年5月25日 */ public static void doSomething() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/69512.html
摘要:可簡單地認(rèn)為它是的擴(kuò)展,負(fù)載均衡自然成為不可或缺的特性。是基于開發(fā)的服務(wù)代理組件,在使用場景中,它與和整合,打造具備服務(wù)動(dòng)態(tài)更新和負(fù)載均衡能力的服務(wù)網(wǎng)關(guān)。類似的特性在項(xiàng)目也有體現(xiàn),它是另一種高性能代理的方案,提供服務(wù)發(fā)現(xiàn)健康和負(fù)載均衡。 摘要: Cloud Native 應(yīng)用架構(gòu)隨著云技術(shù)的發(fā)展受到業(yè)界特別重視和關(guān)注,尤其是 CNCF(Cloud Native Computing Fo...
摘要:作為面試官,我是如何甄別應(yīng)聘者的包裝程度語言和等其他語言的對(duì)比分析和主從復(fù)制的原理詳解和持久化的原理是什么面試中經(jīng)常被問到的持久化與恢復(fù)實(shí)現(xiàn)故障恢復(fù)自動(dòng)化詳解哨兵技術(shù)查漏補(bǔ)缺最易錯(cuò)過的技術(shù)要點(diǎn)大掃盲意外宕機(jī)不難解決,但你真的懂?dāng)?shù)據(jù)恢復(fù)嗎每秒 作為面試官,我是如何甄別應(yīng)聘者的包裝程度Go語言和Java、python等其他語言的對(duì)比分析 Redis和MySQL Redis:主從復(fù)制的原理詳...
摘要:作為面試官,我是如何甄別應(yīng)聘者的包裝程度語言和等其他語言的對(duì)比分析和主從復(fù)制的原理詳解和持久化的原理是什么面試中經(jīng)常被問到的持久化與恢復(fù)實(shí)現(xiàn)故障恢復(fù)自動(dòng)化詳解哨兵技術(shù)查漏補(bǔ)缺最易錯(cuò)過的技術(shù)要點(diǎn)大掃盲意外宕機(jī)不難解決,但你真的懂?dāng)?shù)據(jù)恢復(fù)嗎每秒 作為面試官,我是如何甄別應(yīng)聘者的包裝程度Go語言和Java、python等其他語言的對(duì)比分析 Redis和MySQL Redis:主從復(fù)制的原理詳...
摘要:可簡單地認(rèn)為它是的擴(kuò)展,負(fù)載均衡自然成為不可或缺的特性。類似的特性在項(xiàng)目也有體現(xiàn),它是另一種高性能代理的方案,提供服務(wù)發(fā)現(xiàn)健康和負(fù)載均衡。 Dubbo Cloud Native 實(shí)踐與思考 分享簡介 Cloud Native 應(yīng)用架構(gòu)隨著云技術(shù)的發(fā)展受到業(yè)界特別重視和關(guān)注,尤其是 CNCF(Cloud Native Computing Foundation)項(xiàng)目蓬勃發(fā)展之際。Dubbo...
閱讀 1316·2021-11-22 09:34
閱讀 2178·2021-10-08 10:18
閱讀 1737·2021-09-29 09:35
閱讀 2471·2019-08-29 17:20
閱讀 2149·2019-08-29 15:36
閱讀 3412·2019-08-29 13:52
閱讀 791·2019-08-29 12:29
閱讀 1196·2019-08-28 18:10