成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Zookeeper 客戶端 Api 的基本使用

fizz / 1044人閱讀

零 版本

JDK 版本 : OpenJDK 11.0.1

IDE : idea 2018.3

Zookeeper Server 版本 : 3.5.4-beta

Zookeeper Client 版本 : 3.5.4-beta

Curator 版本 : 4.2.0

一 Zookeeper Client

Zookeeper Client 是 Zookeeper 的經(jīng)典原生客戶端。使用之前需要在 Maven 中導入依賴:


    org.apache.zookeeper
    zookeeper
    3.5.4-beta

代碼:

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class ClientTest {

    public static void main(String[] args) {

        /**
         * 創(chuàng)建一個 Zookeeper 的實例
         * 此處為一個集群,Zookeeper 的 ip 之間用逗號隔開
         *
         * 參數(shù)解釋:
         * param 1 - Zookeeper 的實例 ip ,此處是一個集群,所以配置了多個 ip,用逗號隔開
         * param 2 - session 過期時間,單位秒 (1000)
         * param 3 - 監(jiān)視者,用于獲取監(jiān)控事件 (MyWatch)
         */
        ZooKeeper zooKeeper = null;
        try {
            Watcher createZkWatch = new MyWatch();
            zooKeeper = new ZooKeeper("localhost:2101,localhost:2102,localhost:2103",
                    1000,createZkWatch);
        } catch (IOException e) {
            e.printStackTrace();
        }

        /**
         * 值得注意的是,Zookeeper 對象去連接中間件實例是異步的
         * 所以此處需要做一個死循環(huán)等待它連接完畢
         * 更加優(yōu)雅的做法是使用 CownDownLatch 去做,但是 while 比較簡單
         */
        while(zooKeeper.getState() == ZooKeeper.States.CONNECTING){
            //返回 zookeeper 的狀態(tài)
            System.out.println(zooKeeper.getState());

            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        //如果連接不出錯的話此處狀態(tài)應該為 CONNECTED
        if(zooKeeper.getState() != ZooKeeper.States.CONNECTED)
            return;


        /**
         * 創(chuàng)建 ZooKeeper 節(jié)點
         * 參數(shù)解釋:
         * param 1 - znode 名稱 (/zoo)
         * param 2 - 節(jié)點數(shù)據(jù) (my first data)
         * param 3 - 設置權限 (OPEN_ACL_UNSAFE)
         * param 4 - znode 類型 (PERSISTENT)
         *
         *
         * znode 類型有四種:
         * PERSISTENT - 持久化目錄節(jié)點,客戶端與zookeeper斷開連接后,該節(jié)點依舊存在
         * PERSISTENT_SEQUENTIAL - 持久化,并帶有序列號
         * EPHEMERAL - 臨時目錄節(jié)點,客戶端與zookeeper斷開連接后,該節(jié)點被刪除
         * EPHEMERAL_SEQUENTIAL - 臨時,并帶有序列號
         */
        try {
            String s = zooKeeper.create("/zoo", "my first data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("創(chuàng)建節(jié)點:" + s);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        /**
         * 創(chuàng)建一個二級節(jié)點,參數(shù)同上
         * 需要注意的是,必須要有一級節(jié)點才能有二級節(jié)點,不然會報錯
         */
        try {
            String s = zooKeeper.create("/zoo/zoo_1", "my first data_1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("創(chuàng)建二級節(jié)點:" + s);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        /**
         * 查詢 ZooKeeper 節(jié)點的數(shù)據(jù)
         * 參數(shù)解釋:
         * param 1 - znode 名稱 (/zoo)
         * param 2 - 監(jiān)視者,用于獲取監(jiān)控事件 (MyWatch)
         * param 3 - Zookeeper 實例信息和數(shù)據(jù)信息 (stat)
         *
         * 注意如果后續(xù)需要修改該節(jié)點的值,可以在此處記錄節(jié)點版本 version (非必要操作)
         */
        Integer zooVersion = null;
        try {
            MyWatch getDataWatch = new MyWatch();
            Stat stat = new Stat();
            byte[] data = zooKeeper.getData("/zoo",getDataWatch,stat);
            System.out.println("查詢節(jié)點數(shù)據(jù):" + new String(data));

            //從 stat 中可以獲取很多 Zookeeper 實例的信息
            System.out.println("查詢節(jié)點數(shù)據(jù) czxid:" + stat.getCzxid()); //zxid
            zooVersion = stat.getVersion(); //此處獲取 /zoo 節(jié)點的版本號
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        /**
         * 修改 ZooKeeper 節(jié)點的數(shù)據(jù)
         * 參數(shù)解釋:
         * param 1 - znode 名稱 (/zoo)
         * param 2 - 節(jié)點新數(shù)據(jù) (my first data change)
         * param 3 - 該節(jié)點的版本
         *
         * 在成功修改了節(jié)點的數(shù)據(jù)之后,版本號會自動加一
         * 如果此時不知道節(jié)點的版本,也可以輸入 -1,會默認取最新的節(jié)點版本去修改
         */
        try {
            Stat stat = zooKeeper.setData("/zoo", "my first data change".getBytes(), zooVersion); // zooVersion = -1
            System.out.println("修改節(jié)點數(shù)據(jù) czxid:" + stat.getCzxid());
            System.out.println("修改節(jié)點數(shù)據(jù) version:" + stat.getVersion());
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        /**
         * 查看 ZooKeeper 節(jié)點是否存在
         * 參數(shù)解釋:
         * param 1 - znode 名稱 (/zoo)
         * param 2 - 監(jiān)視者,用于獲取監(jiān)控事件 (MyWatch)
         *
         * 如果不存在,返回的 stat 為 null
         */
        try {
            Stat stat = zooKeeper.exists("/zoo_not_exist", new MyWatch());
            System.out.println("查看節(jié)點是否存在 stat:" + stat);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        /**
         * 刪除 ZooKeeper 節(jié)點
         * 參數(shù)解釋:
         * param 1 - znode 名稱 (/zoo)
         * param 2 - 該節(jié)點的版本
         *
         * 版本號如果不清楚的話可以填入 -1,和上述同理
         * 值得注意的是,如果一個節(jié)點下屬存在子節(jié)點,那么它不能被刪除
         */
        try {
            zooKeeper.delete("/zoo", -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }


    private static class MyWatch implements Watcher{

        public void process(WatchedEvent watchedEvent) {
            System.out.println(watchedEvent);
        }
    }
}
二 Curator

Curator 是 Netfix 開發(fā)的 Zookeeper Client,使用起來更方便,功能更加強大,目前應用更加廣泛。使用之前需要在 Maven 中導入依賴:


    org.apache.curator
    curator-recipes
    4.2.0


    org.apache.curator
    curator-framework
    4.2.0

代碼:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.util.List;

public class CuratorTest {

    public static void main(String[] args) {

        /**
         * 創(chuàng)建客戶端
         *
         * RetryPolicy 接口是重試策略
         */
        /**
         * 指定客戶端的重連策略
         *
         * RetryOneTime(int ms)
         * 休眠一定毫秒數(shù)之后重新連接一次
         *
         * RetryForever(int ms)
         * 和第一種策略的差別是會不斷嘗試重連
         *
         * RetryNTimes(int times,int ms)
         * 和第一種策略的差別是,第一個參數(shù)指定重連次數(shù),第二個參數(shù)指定休眠間隔
         *
         * RetryUntilElapsed(int max_sum_ms,int ms)
         * 第一個參數(shù)指定最大休眠時間,第二個參數(shù)指定休眠間隔,如果休眠時間超出了就不會繼續(xù)重連
         *
         * ExponentialBackoffRetry(int ms,int,int max_ms)
         * 第一個參數(shù)代表最初的重連休眠時間,第二個參數(shù)代表最大重連次數(shù),第三個參數(shù)代表最大重連休眠時間
         * 該策略下重連的休眠時間會隨著重連次數(shù)的增加而增加,從最初休眠時間一直增加到最大休眠時間
         * 最大重連次數(shù)必須小于等于 29,超過的情況下會被自動修改成 29
         *
         * [其它策略不一一列舉]
         */
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(100,3,1000);

        /**
         * 采用 buider 模式創(chuàng)建客戶端
         */
        CuratorFramework client = CuratorFrameworkFactory.builder()
                                        //Zookeeper 的地址
                                        .connectString("localhost:2101,localhost:2102,localhost:2103")
                                        //session 的過期時間(毫秒)
                                        .sessionTimeoutMs(5000)
                                        //連接的超時時間(毫秒)
                                        .connectionTimeoutMs(5000)
                                        //拒絕策略
                                        .retryPolicy(retryPolicy)
                                        //設置該客戶端能夠操作的目錄權限,不設置的話默認可以操作全部
                                        //比如此處設置為 zoo,即為該客戶端對象操作的節(jié)點前面默認會添加 /zoo
                                        .namespace("zoo")
                                        //完成創(chuàng)建
                                        .build();
        //啟動客戶端
        client.start();


        /**
         * 創(chuàng)建節(jié)點
         */
        try {
            String createReturn = client.create()
                                    //節(jié)點類型
                                    //PERSISTENT - 持久化目錄節(jié)點,客戶端與zookeeper斷開連接后,該節(jié)點依舊存在
                                    //PERSISTENT_SEQUENTIAL - 持久化,并帶有序列號
                                    //EPHEMERAL - 臨時目錄節(jié)點,客戶端與zookeeper斷開連接后,該節(jié)點被刪除
                                    //EPHEMERAL_SEQUENTIAL - 臨時,并帶有序列號
                                    .withMode(CreateMode.PERSISTENT)
                                    //由于 namespace 設置為 zoo,所以此處相當于創(chuàng)建 /zoo/zoo_1 節(jié)點
                                    .forPath("/zoo_1", "my first data zoo_1".getBytes());
            System.out.println("創(chuàng)建節(jié)點:" + createReturn);
        } catch (Exception e) {
            e.printStackTrace();
        }

        /**
         * 查詢節(jié)點
         */
        try {
            Stat stat = client.checkExists()
                    //查詢 /zoo/zoo_1 節(jié)點
                    .forPath("/zoo_1");
            //如果不存在,stat 為 null
            System.out.println("查詢節(jié)點:" + stat);
        } catch (Exception e) {
            e.printStackTrace();
        }

        /**
         * 刪除節(jié)點
         */
        try {
            client.delete()
                    //如果該節(jié)點下有子節(jié)點,會拋出異常且刪除失敗
                    .forPath("/zoo_1");
        } catch (Exception e) {
            e.printStackTrace();
        }

        /**
         * 查詢節(jié)點的值
         */
        try {
            Stat stat = new Stat();
            byte[] value = client.getData()
                                    //獲取節(jié)點的 stat
                                    .storingStatIn(stat)
                                    //查詢 /zoo/zoo_1 節(jié)點
                                    .forPath("/zoo_1");
            System.out.println("查詢節(jié)點的值:" + new String(value));
        } catch (Exception e) {
            e.printStackTrace();
        }

        /**
         * 更新節(jié)點的值
         */
        try {
            Stat stat = client.setData()
                                //設置版本值,此選項非必填
                                .withVersion(10086)
                                .forPath("/zoo_1", "zoo_1 new data".getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        }

        /**
         * 獲取節(jié)點的子節(jié)點
         */
        try {
            //獲取所有子節(jié)點的節(jié)點名稱
            List nodes = client.getChildren()
                                    .forPath("/zoo_1");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
三 使用 Curator 實現(xiàn)分布式鎖

Zookeeper 中的分布式鎖實現(xiàn)原理很簡單,就是多個線程一起去創(chuàng)建同一個節(jié)點,誰創(chuàng)建成功鎖就歸誰;使用完之后刪除該節(jié)點,其它節(jié)點再進行一次爭搶。Curator 中有一個寫好的重入鎖 InterProcessMutex,簡單封裝即可使用:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * Zookeeper 分布式鎖實現(xiàn)
 */
public class ZkLock implements Lock{

    private InterProcessMutex lock;

    /**
     * 讓使用者方便運用的構(gòu)造方法
     */
    public ZkLock(String zkAddrs){
        this(zkAddrs,
            "/lock_node",
            "lock_base",
            2000,
            new ExponentialBackoffRetry(1000, 10));
    }

    /**
     * 核心構(gòu)造方法,根據(jù)傳入的參數(shù)去構(gòu)造 lock 對象
     * @param zkAddrs  Zookeeper 的服務地址
     * @param lockNode 各個線程要去爭搶創(chuàng)建的 Znode,也就是客戶端有使用權限的 namespace
     * @param baseNode lockNode 的上級 Znode
     * @param sessionOutTimeMs 過期時間
     * @param policy 重連策略
     */
    public ZkLock(String zkAddrs,String lockNode,String baseNode,int sessionOutTimeMs,RetryPolicy policy){

        //有效性驗證
        if(Objects.isNull(zkAddrs)
                || zkAddrs.trim().equals("")
                || Objects.isNull(lockNode)
                || lockNode.trim().equals("")
                || Objects.isNull(policy))
            throw new RuntimeException();


        //通過工廠創(chuàng)建連接
        CuratorFrameworkFactory.Builder cfBuilder = CuratorFrameworkFactory.builder()
                                                                .connectString(zkAddrs)
                                                                .sessionTimeoutMs(sessionOutTimeMs)
                                                                .retryPolicy(policy);
        if(baseNode != null && !baseNode.trim().equals(""))
            cfBuilder.namespace(baseNode);
        CuratorFramework cf = cfBuilder.build();

        //開啟連接
        cf.start();

        //InterProcessMutex 是 Crator 里自帶的一個已經(jīng)實現(xiàn)好的重入鎖
        //只要對其進行簡單封裝即可使用
        lock = new InterProcessMutex(cf,lockNode);
    }

    /**
     * 上鎖方法,死循環(huán)調(diào)用 tryLock() 去上鎖
     */
    @Override
    public void lock() {
        while (!tryLock())
            Thread.yield();
    }

    /**
     * 嘗試獲取鎖,如果沒能獲取到會超時后報錯
     */
    @Override
    public boolean tryLock() {
        try {
            lock.acquire();
        } catch (Exception e) {
            return Boolean.FALSE;
        }
        return Boolean.TRUE;
    }

    /**
     * 嘗試獲取鎖,如果指定時間內(nèi)獲取不到就返回 false
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        try {
            return lock.acquire(time,unit);
        } catch (Exception e) {
            return Boolean.FALSE;
        }
    }

    /**
     * 釋放鎖,如果報錯就會遞歸去釋放
     */
    @Override
    public void unlock() {
        try {
            lock.release();
        } catch (Exception e) {
            unlock();
        }
    }

    //忽略
    @Override
    public Condition newCondition() {
        throw new RuntimeException();
    }

    //忽略
    @Override
    public void lockInterruptibly() throws InterruptedException {
        lock();
    }


    //測試
    public static void main(String[] args) throws Exception {

        //創(chuàng)建一個要被操作的對象
        AtomicInteger count = new AtomicInteger(30);

        //創(chuàng)建一個線程池
        Executor executor = Executors.newFixedThreadPool(10);

        //創(chuàng)建所對象
        Lock lock = new ZkLock("localhost:2101,localhost:2102,localhost:2103");

        //for 循環(huán),把任務丟進線程池里
        for(int i = 0; i < 30; i++){

            executor.execute(()->{
                try {
                    //加鎖
                    lock.lock();

                    //此處開啟業(yè)務邏輯
                    //demo 中簡單模擬,將 count 對象減一
                    int a = count.decrementAndGet();
                    System.out.println(a);

                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    try {
                        //釋放鎖
                        lock.unlock();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

文章版權歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/74642.html

相關文章

  • 大數(shù)據(jù)入門指南(GitHub開源項目)

    摘要:項目地址前言大數(shù)據(jù)技術棧思維導圖大數(shù)據(jù)常用軟件安裝指南一分布式文件存儲系統(tǒng)分布式計算框架集群資源管理器單機偽集群環(huán)境搭建集群環(huán)境搭建常用命令的使用基于搭建高可用集群二簡介及核心概念環(huán)境下的安裝部署和命令行的基本使用常用操作分區(qū)表和分桶表視圖 項目GitHub地址:https://github.com/heibaiying... 前 言 大數(shù)據(jù)技術棧思維導圖 大數(shù)據(jù)常用軟件安裝指...

    guyan0319 評論0 收藏0
  • 2018年第16周-ZooKeeper基本概念(配搭建過程和Master-Workers例子)

    摘要:有可能是宕機或負荷嚴重的情況導致的。為分布式系統(tǒng)提供了協(xié)調(diào)功能和控制沖突。 背景 隨著計算機的硬件和操作系統(tǒng)兩者相輔相成地發(fā)展,從早期的ENIAC計算機到現(xiàn)在的x86的計算機,從以前的單一控制終端(Single Operator, Single Console, SOSC)的操作系統(tǒng)到現(xiàn)在百花爭鳴的操作系統(tǒng)(如MacOS、Windows、Linux等),現(xiàn)代的操作系統(tǒng)發(fā)展還有一個最重要...

    wemall 評論0 收藏0
  • Zookeeper學習系列【一】 教會你Zookeeper一些基礎概念

    摘要:具有不可分割性即原語的執(zhí)行必須是連續(xù)的,在執(zhí)行過程中不允許被中斷。提供服務主要就是通過數(shù)據(jù)結(jié)構(gòu)原語集機制達到的。子節(jié)點的版本號數(shù)據(jù)節(jié)點版本號版本號創(chuàng)建該節(jié)點的會話的。后位則為遞增序列。 前言 最近加入了部門的技術興趣小組,被分配了Zookeeper的研究任務。在研究過程當中,發(fā)現(xiàn)Zookeeper由于其開源的特性和其卓越的性能特點,在業(yè)界使用廣泛,有很多的應用場景,而這些不同的應用場景...

    DevWiki 評論0 收藏0
  • ZooKeeper 概念與基礎

    摘要:由于分布式系統(tǒng)和應用可以提供更強的計算能力,還能更好地容災和擴展,所以逐漸受到青睞?;A由若干條指令組成,用于完成特定功能的過程稱為原語。 信息飛速膨脹,很多應用無法依賴單個服務器處理龐大的數(shù)據(jù)量。由于分布式系統(tǒng)和應用可以提供更強的計算能力,還能更好地容災和擴展,所以逐漸受到青睞。 在開發(fā)分布式應用時,通常需要花費大量時間和精力來處理異構(gòu)系統(tǒng)中的協(xié)作通信問題。 什么是 ZooKeepe...

    endless_road 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<