成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

中間件 - ZooKeeper應(yīng)用場景實(shí)踐

XFLY / 2496人閱讀

摘要:三分布式鎖這部分是重要功能,在此基礎(chǔ)上實(shí)現(xiàn)諸如,分布式協(xié)調(diào)通知,負(fù)載均衡,選舉等復(fù)雜場景。針對(duì)此情況,改進(jìn)后判斷讀寫順序?yàn)閯?chuàng)建完臨時(shí)順序節(jié)點(diǎn)后,獲取下的所有子節(jié)點(diǎn)。

注:該文章用作回顧記錄

一、準(zhǔn)備工作

預(yù)先下載安裝 ZooKeeper ,簡單配置就能使用了。然后構(gòu)建 Maven 項(xiàng)目,將下面的代碼粘貼到 pom.xml中:

      
        org.apache.zookeeper  
        zookeeper  
        3.4.5  
      
      
        com.101tec  
        zkclient  
        0.5  
     

zkclient 是開源的客戶端工具,其中封裝了很多功能,比如:刪除包含子節(jié)點(diǎn)的父節(jié)點(diǎn),連接重試,異步回調(diào),偏向 Java 寫法的注冊監(jiān)聽等,極大地方便了用戶使用。

下面不過多介紹客戶端操作,只針對(duì)應(yīng)用場景做介紹,該文章會(huì)隨著本人的學(xué)習(xí)持續(xù)補(bǔ)充。

二、數(shù)據(jù)發(fā)布/訂閱

使用 ZooKeeper 節(jié)點(diǎn)監(jiān)聽來實(shí)現(xiàn)該功能:

ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000); // 連接集群

zkClient.createPersistent("/xxx/xxx"); // 創(chuàng)建持久節(jié)點(diǎn)

// 注冊子節(jié)點(diǎn)變更監(jiān)聽,當(dāng)子節(jié)點(diǎn)改變(比如創(chuàng)建了"/xxx/xxx/1")或當(dāng)前節(jié)點(diǎn)刪除等,會(huì)觸發(fā)異步回調(diào)
zkClient.subscribeChildChanges("/xxx/xxx", new IZkChildListener() {
    @Override
    public void handleChildChange(String parentPath, List currentChilds) 
        throws Exception {
    }
});

下面為部分源碼:

package org.I0Itec.zkclient;

public class ZkClient implements Watcher {

    public List watchForChilds(final String path) {
        return retryUntilConnected(new Callable>() {
            @Override
            public List call() throws Exception {
                exists(path, true);
                try {
                    return getChildren(path, true);
                } catch (ZkNoNodeException e) {
                }
                return null;
            }
        });
    }
    
    public  T retryUntilConnected(Callable callable) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
        final long operationStartTime = System.currentTimeMillis();
        while (true) {
            if (_closed) {
                throw new IllegalStateException("ZkClient already closed!");
            }
            try {
                return callable.call();
            } catch (Exception e) {
                throw ExceptionUtil.convertToRuntimeException(e);
            }
        }
    }
}

基于 ZooKeeper 實(shí)現(xiàn)的數(shù)據(jù)發(fā)布/訂閱很簡單吧,快動(dòng)手試試。

三、分布式鎖

這部分是 ZooKeeper 重要功能,在此基礎(chǔ)上實(shí)現(xiàn)諸如,分布式協(xié)調(diào)/通知,負(fù)載均衡,Master選舉等復(fù)雜場景。

1、排它鎖

排它鎖又稱為寫鎖或獨(dú)占鎖。比如事務(wù) T1 對(duì)數(shù)據(jù)對(duì)象 O1 加了排它鎖,那么在整個(gè)加鎖期間,只允許 T1 對(duì) O1 進(jìn)行讀取或更新操作,其它事務(wù)都不能對(duì) O1 操作。

1)獲取鎖

所有客戶端都創(chuàng)建臨時(shí)節(jié)點(diǎn) zkClient.createEphemeral("/xxx/xxx", null);,ZooKeeper 會(huì)保證在所有客戶端中,最終只有一個(gè)客戶端能創(chuàng)建成功,那么就認(rèn)為該客戶端獲取了鎖。同時(shí),所有沒獲取到鎖的客戶端需在/xxx/xxx 上注冊子節(jié)點(diǎn)變更監(jiān)聽,以便實(shí)時(shí)監(jiān)聽節(jié)點(diǎn)變化。如節(jié)點(diǎn)發(fā)生變化,則未獲取到鎖的客戶端再重新獲取鎖。

private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000);
private static final String lockParentPath = "/zk-book/exclusice_lock";

public static void main(String[] args) throws InterruptedException {
    try {
        zkClient.createEphemeral(lockParentPath + "/lock");
        System.out.println("service3 獲取鎖成功");
    } catch (Exception e) {
        System.out.println("service3獲取鎖失敗");
        zkClient.subscribeChildChanges(lockParentPath, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List currentChilds)
                    throws Exception {
                System.out.println("service3再次獲取鎖");
                main(null);
            }
        });
    }
    Thread.sleep(Integer.MAX_VALUE);
}

2)釋放鎖

當(dāng) "/xxx/xxx" 是臨時(shí)節(jié)點(diǎn)時(shí),以下倆種情況都會(huì)釋放鎖。

當(dāng)已獲取鎖的客戶機(jī)宕機(jī),導(dǎo)致連接超時(shí)斷開,那么 ZooKeeper 會(huì)將臨時(shí)節(jié)點(diǎn)刪除。

正常執(zhí)行完邏輯后,客戶端主動(dòng)將臨時(shí)節(jié)點(diǎn)刪除。

2、共享鎖

共享鎖又稱為讀鎖。如果事務(wù) T1 對(duì)數(shù)據(jù)對(duì)象 O1 加了共享鎖,那么 T1 只能對(duì) O1 進(jìn)行讀取操作,其它事務(wù)只能對(duì) O1 加共享鎖,直到 O1 上所有共享鎖都被釋放。

1)獲取鎖

所有客戶端都創(chuàng)建臨時(shí)順序節(jié)點(diǎn) zkClient.createEphemeralSequential("/xxx/xxx", null);,ZooKeeper 會(huì)生成類似下面的節(jié)點(diǎn),已保證節(jié)點(diǎn)的唯一性。

2)判斷讀寫順序

創(chuàng)建完臨時(shí)順序節(jié)點(diǎn)后,獲取 "/xxx" 下的所有子節(jié)點(diǎn),并對(duì)該節(jié)點(diǎn)注冊子節(jié)點(diǎn)變更監(jiān)聽。

確定創(chuàng)建完的臨時(shí)順序節(jié)點(diǎn)在所有節(jié)點(diǎn)中的順序。

對(duì)于讀節(jié)點(diǎn):
沒有比自己序號(hào)小的節(jié)點(diǎn),或比自己序號(hào)小的節(jié)點(diǎn)都是讀節(jié)點(diǎn),則成功獲取到共享鎖。
如果比自己序號(hào)小的節(jié)點(diǎn)中存在寫節(jié)點(diǎn),則需進(jìn)入等待。
對(duì)于寫節(jié)點(diǎn):
如果自己不是序號(hào)最小的節(jié)點(diǎn),則需進(jìn)入等待。

接受到子節(jié)點(diǎn)變更通知后,重復(fù)步驟1

以下為實(shí)現(xiàn)代碼:

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.http.client.ClientProtocolException;

/**
 * 分布式共享鎖
 * @author alexnevsky
 * @date 2018年5月23日
 */
public class SharedLock {
    
    private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000);
    private static final String PARENT_PATH = "/zk-book/shared_lock";
    private static volatile boolean isExecuted = false;
    
    public static void main(String[] args) throws InterruptedException, ClientProtocolException, IOException {
        String nodeTemp = zkClient.createEphemeralSequential(PARENT_PATH + "/w-", null);
        String node = nodeTemp.substring(nodeTemp.lastIndexOf("/") + 1);
        
        List currentChilds = sortNodes(zkClient.getChildren(PARENT_PATH));
        if (currentChilds.size() > 0)
            isExecuted = getLockAndExecute(currentChilds, node);
        
        zkClient.subscribeChildChanges(PARENT_PATH, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List currentChilds)
                    throws Exception {
                if (currentChilds.size() > 0) {
                    currentChilds = sortNodes(currentChilds);
                    isExecuted = getLockAndExecute(currentChilds, node);
                }
            }
        });
        
        while (!isExecuted) {
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
    
    /**
     * 排序節(jié)點(diǎn)
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param nodes
     * @return
     */
    private static List sortNodes(List nodes) {
        Collections.sort(nodes, new Comparator() {
            @Override
            public int compare(String o1, String o2) {
                o1 = o1.indexOf("r-") > -1 ? o1.replaceFirst("r-", "") : o1.replaceFirst("w-", "");
                o2 = o2.indexOf("r-") > -1 ? o2.replaceFirst("r-", "") : o2.replaceFirst("w-", "");
                return Integer.parseInt(o1) - Integer.parseInt(o2); // 比較序列號(hào)
            }
        });
        return nodes;
    }
    
    /**
     * 獲取節(jié)點(diǎn)位置
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param nodes
     * @param node
     * @return
     */
    private static Integer getNodePosition(List nodes, String node) {
        for (int i = 0, size = nodes.size(); i < size; i++) {
            if (nodes.get(i).equals(node))
                return i;
        }
        return null; // 無此數(shù)據(jù)
    }
    
    /**
     * 是否得到鎖
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param nodes
     * @param node
     * @param nodePosition
     * @return
     */
    private static boolean isGetLock(List nodes, String node, int nodePosition) {
        if (nodePosition == 0) // 沒有比此序號(hào)更小的節(jié)點(diǎn) 
            return true;
        if (node.indexOf("r-") > -1) { // 讀節(jié)點(diǎn)
            for (int i = 0; i < nodePosition; i++) { // 遍歷小于次序號(hào)的節(jié)點(diǎn)
                String nodeTemp = nodes.get(i);
                if (nodeTemp.indexOf("w-") > -1)  // 存在寫節(jié)點(diǎn),則進(jìn)入等待鎖
                    return false;
            }
            return true;
        }
        return false;
    }
    
    /**
     * 獲取鎖并執(zhí)行
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param currentChilds
     * @param node
     * @return
     */
    private static boolean getLockAndExecute(List currentChilds, String node) {
        Integer nodePosition = getNodePosition(currentChilds, node);
        if (nodePosition == null) // 子節(jié)點(diǎn)為空
            return false;
        System.out.println("子節(jié)點(diǎn):" + currentChilds.toString() + ", " + node + " 的位置:" + nodePosition);
        boolean isGetLock = isGetLock(currentChilds, node, nodePosition);
        if (isGetLock) {
            System.out.println(node + " 成功獲取到鎖,開始執(zhí)行耗時(shí)任務(wù)");
            doSomething();
            boolean isSuccess = zkClient.delete(PARENT_PATH + "/" + node);
            if (isSuccess)
                System.out.println(node + " 成功執(zhí)行完任務(wù)并刪除節(jié)點(diǎn)");
        } else {
            System.out.println(node + " 未獲取到鎖");
        }
        return isGetLock;
    }
    
    private static void doSomething() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
}

測試以上代碼會(huì)發(fā)現(xiàn),當(dāng)獲取鎖的節(jié)點(diǎn)過多時(shí),某一節(jié)點(diǎn)變更會(huì)通知所有節(jié)點(diǎn),會(huì)對(duì) ZooKeeper 服務(wù)器造成巨大的性能影響和網(wǎng)絡(luò)沖擊,服務(wù)器會(huì)發(fā)送給客戶端大量的事件通知。比如有以下節(jié)點(diǎn),當(dāng) w-24 節(jié)點(diǎn)變更時(shí),會(huì)通知給其余節(jié)點(diǎn)。

因?yàn)楫?dāng)獲取共享鎖時(shí),要判斷比自己序號(hào)小的節(jié)點(diǎn),所以應(yīng)該只給 r-25 節(jié)點(diǎn)發(fā)送通知。針對(duì)此情況,改進(jìn)后判斷讀寫順序?yàn)椋?/p>

創(chuàng)建完臨時(shí)順序節(jié)點(diǎn)后,獲取 "/xxx" 下的所有子節(jié)點(diǎn)。

客戶端調(diào)用 getChildren() 來獲取子節(jié)點(diǎn)列表,注意,這里不注冊任何監(jiān)聽。

如果未獲取到共享鎖,那么找到比自己序號(hào)小的節(jié)點(diǎn)來注冊監(jiān)聽,分為以下倆種情況:
讀節(jié)點(diǎn):比自己序號(hào)小的最后一個(gè)寫節(jié)點(diǎn)注冊監(jiān)聽
寫節(jié)點(diǎn):比自己序號(hào)小的最后一個(gè)節(jié)點(diǎn)注冊監(jiān)聽

等待監(jiān)聽通知,重復(fù)步驟2

改進(jìn)后的共享鎖代碼實(shí)現(xiàn):

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.http.client.ClientProtocolException;

/**
 * 分布式共享鎖最優(yōu)
 * @author alexnevsky
 * @date 2018年5月23日
 */
public class SharedLockOptimal {
    
    private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000);
    private static final String PARENT_PATH = "/zk-book/shared_lock";
    private static String nodeFullPath = zkClient.createEphemeralSequential(PARENT_PATH + "/r-", null);
    
    public static void main(String[] args) throws InterruptedException, ClientProtocolException, IOException {
        List currentChilds = sortNodes(zkClient.getChildren(PARENT_PATH));
        String node = nodeFullPath.substring(nodeFullPath.lastIndexOf("/") + 1);
        
        boolean isReadNode = node.indexOf("r-") > -1 ? true : false, isGetLock = getLock(currentChilds, node);
        System.out.println("當(dāng)前所有節(jié)點(diǎn):" + currentChilds.toString() + ", 該" + (isReadNode ? "讀" : "寫") + "節(jié)點(diǎn):" + node);
        
        if (isGetLock) {
            execute(node);
            System.out.println("退出程序");
            System.exit(1);
        } else {
            String monitorNode = getMonitorNode(currentChilds, node);
            System.out.println(node + " 未獲取到鎖,注冊監(jiān)聽節(jié)點(diǎn):" + monitorNode);
            if (null != monitorNode) {
                zkClient.subscribeChildChanges(PARENT_PATH + "/" + monitorNode, new IZkChildListener() {
                    @Override
                    public void handleChildChange(String parentPath, List currentChilds)
                            throws Exception {
                        main(null); // 遞歸調(diào)用
                    }
                });
            }
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
    
    /**
     * 排序節(jié)點(diǎn)
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param nodes
     * @return
     */
    private static List sortNodes(List nodes) {
        Collections.sort(nodes, new Comparator() {
            @Override
            public int compare(String o1, String o2) {
                o1 = o1.indexOf("r-") > -1 ? o1.replaceFirst("r-", "") : o1.replaceFirst("w-", "");
                o2 = o2.indexOf("r-") > -1 ? o2.replaceFirst("r-", "") : o2.replaceFirst("w-", "");
                return Integer.parseInt(o1) - Integer.parseInt(o2); // 比較序列號(hào)
            }
        });
        return nodes;
    }
    
    /**
     * 獲取節(jié)點(diǎn)位置
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param currentChilds
     * @param node
     * @return
     */
    private static Integer getNodePosition(List currentChilds, String node) {
        for (int i = 0, size = currentChilds.size(); i < size; i++) {
            if (currentChilds.get(i).equals(node))
                return i;
        }
        return null;
    }
    
    /**
     * 獲取監(jiān)聽節(jié)點(diǎn)
     * @author alexnevsky
     * @date 2018年5月25日  
     * @param currentChilds
     * @param node
     * @return
     */
    private static String getMonitorNode(List currentChilds, String node) {
        String monitorNode = null;
        Integer nodePosition = getNodePosition(currentChilds, node);
        if (0 < nodePosition) { // 非首節(jié)點(diǎn)
            if (node.indexOf("r-") > -1) { // 讀節(jié)點(diǎn)
                // 獲取比當(dāng)前序號(hào)小的最后一個(gè)寫節(jié)點(diǎn)
                for (int i = nodePosition - 1; i >= 0; i--) {
                    String tempNode = currentChilds.get(i);
                    if (tempNode.indexOf("w-") > -1) 
                        return tempNode;
                }
            } else {
                // 獲取比當(dāng)前序號(hào)小的最后一個(gè)節(jié)點(diǎn)
                return currentChilds.get(nodePosition - 1);
            }
        }
        return monitorNode;
    }
    
    /**
     * 獲取鎖
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param currentChilds
     * @param node
     * @return
     */
    private static boolean getLock(List currentChilds, String node) {
        Integer nodePosition = getNodePosition(currentChilds, node);
        if (nodePosition == null)
            return false;
        if (nodePosition == 0) // 無序號(hào)更小的節(jié)點(diǎn) 
            return true;
        if (node.indexOf("r-") > -1) { // 讀節(jié)點(diǎn)
            for (int i = 0; i < nodePosition; i++) { // 遍歷前面序號(hào)的節(jié)點(diǎn)
                String tempNode = currentChilds.get(i);
                if (tempNode.indexOf("w-") > -1)  // 存在寫節(jié)點(diǎn),返回失敗
                    return false;
            }
            return true;
        }
        return false;
    }
    
    /**
     * 執(zhí)行
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param node
     * @return
     */
    private static void execute(String node) {
        System.out.println(node + " 成功獲取到鎖,開始執(zhí)行耗時(shí)任務(wù)");
        doSomething();
        boolean isDeletedLock = zkClient.delete(nodeFullPath);
        System.out.println(node + " 成功執(zhí)行完任務(wù),刪除節(jié)點(diǎn)" + (isDeletedLock ? "成功" : "失敗"));
    }
    
    /**
     * 模擬耗時(shí)任務(wù)
     * @author alexnevsky
     * @date 2018年5月25日
     */
    public static void doSomething() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
}

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/69512.html

相關(guān)文章

  • Dubbo Cloud Native 之路的實(shí)踐與思考

    摘要:可簡單地認(rèn)為它是的擴(kuò)展,負(fù)載均衡自然成為不可或缺的特性。是基于開發(fā)的服務(wù)代理組件,在使用場景中,它與和整合,打造具備服務(wù)動(dòng)態(tài)更新和負(fù)載均衡能力的服務(wù)網(wǎng)關(guān)。類似的特性在項(xiàng)目也有體現(xiàn),它是另一種高性能代理的方案,提供服務(wù)發(fā)現(xiàn)健康和負(fù)載均衡。 摘要: Cloud Native 應(yīng)用架構(gòu)隨著云技術(shù)的發(fā)展受到業(yè)界特別重視和關(guān)注,尤其是 CNCF(Cloud Native Computing Fo...

    niceforbear 評(píng)論0 收藏0
  • 【推薦】最新200篇:技術(shù)文章整理

    摘要:作為面試官,我是如何甄別應(yīng)聘者的包裝程度語言和等其他語言的對(duì)比分析和主從復(fù)制的原理詳解和持久化的原理是什么面試中經(jīng)常被問到的持久化與恢復(fù)實(shí)現(xiàn)故障恢復(fù)自動(dòng)化詳解哨兵技術(shù)查漏補(bǔ)缺最易錯(cuò)過的技術(shù)要點(diǎn)大掃盲意外宕機(jī)不難解決,但你真的懂?dāng)?shù)據(jù)恢復(fù)嗎每秒 作為面試官,我是如何甄別應(yīng)聘者的包裝程度Go語言和Java、python等其他語言的對(duì)比分析 Redis和MySQL Redis:主從復(fù)制的原理詳...

    BicycleWarrior 評(píng)論0 收藏0
  • 【推薦】最新200篇:技術(shù)文章整理

    摘要:作為面試官,我是如何甄別應(yīng)聘者的包裝程度語言和等其他語言的對(duì)比分析和主從復(fù)制的原理詳解和持久化的原理是什么面試中經(jīng)常被問到的持久化與恢復(fù)實(shí)現(xiàn)故障恢復(fù)自動(dòng)化詳解哨兵技術(shù)查漏補(bǔ)缺最易錯(cuò)過的技術(shù)要點(diǎn)大掃盲意外宕機(jī)不難解決,但你真的懂?dāng)?shù)據(jù)恢復(fù)嗎每秒 作為面試官,我是如何甄別應(yīng)聘者的包裝程度Go語言和Java、python等其他語言的對(duì)比分析 Redis和MySQL Redis:主從復(fù)制的原理詳...

    tommego 評(píng)論0 收藏0
  • Dubbo Cloud Native 實(shí)踐與思考

    摘要:可簡單地認(rèn)為它是的擴(kuò)展,負(fù)載均衡自然成為不可或缺的特性。類似的特性在項(xiàng)目也有體現(xiàn),它是另一種高性能代理的方案,提供服務(wù)發(fā)現(xiàn)健康和負(fù)載均衡。 Dubbo Cloud Native 實(shí)踐與思考 分享簡介 Cloud Native 應(yīng)用架構(gòu)隨著云技術(shù)的發(fā)展受到業(yè)界特別重視和關(guān)注,尤其是 CNCF(Cloud Native Computing Foundation)項(xiàng)目蓬勃發(fā)展之際。Dubbo...

    邱勇 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<