摘要:就是上面提到的變化后執(zhí)行的程序。這段代碼是的構(gòu)造函數(shù),除了里面的一些賦值函數(shù),重點說下這個函數(shù)它有個參數(shù),第一個參數(shù)是你要的名字,類似這樣的。
昨晚微信開放了JS接口。忙活了一晚上。
然后周六就沒啥斗志了,突然想起第二篇說好要介紹demo的沒介紹,就趕緊來寫了。
先來個 傳送門
這里簡單介紹了下官方demo。用來演示Zookeeper官方推薦的程序框架(Executor、DataMonitor)。
這個Demo實現(xiàn)了如下功能:
監(jiān)視一個結(jié)點,獲取該結(jié)點的數(shù)據(jù),并啟動一個自定義程序
如果這個結(jié)點發(fā)生變化了(數(shù)據(jù)更改,新增或者刪除),則停止這個自定義程序并重新執(zhí)行。
如果這個結(jié)點被刪除了,則停止運行自定義程序
對著Demo看下代碼。首先,官方有這么個章節(jié)
Program DesignConventionally, ZooKeeper applications are broken into two units, one which maintains the connection, and the other which monitors data. In this application, the class called the Executor maintains the ZooKeeper connection, and the class called the DataMonitor monitors the data in the ZooKeeper tree. Also, Executor contains the main thread and contains the execution logic. It is responsible for what little user interaction there is, as well as interaction with the exectuable program you pass in as an argument and which the sample (per the requirements) shuts down and restarts, according to the state of the znode.
通俗翻譯下,官方認為,一個zookeeper應(yīng)用簡單地分為兩個單元即可,一個用來管理連接,另一個用來監(jiān)視數(shù)據(jù)。在我們的Demo中,Executor被用來管理和Zookeeper的連接,DataMonitor用來監(jiān)視Zookeeper中的數(shù)據(jù)樹。另外,Executor同時也包含了主線程的執(zhí)行(也就是Main函數(shù)),然后也包含一點點的執(zhí)行邏輯。Demo對用戶交互的部分非常少,輸入的幾個參數(shù)(好吧,我自己去實現(xiàn)的時候,偷懶去寫死了)就可以開始跑了,Demo還會對znode的變化或者新的znode的連入進行一些響應(yīng)。
看完這段,對原文最下面的完整樣例算了解一些,那這里既然就兩個類,Executor和DataMonitor嘛,對吧。就分別看下。
javapublic Executor(String hostPort, String znode, String filename, String exec[]) throws KeeperException, IOException { this.filename = filename; this.exec = exec; zk = new ZooKeeper(hostPort, 3000, this); dm = new DataMonitor(zk, znode, null, this); }
這個構(gòu)造函數(shù)么實際上做了這么幾個事:
這里的filename是用來記錄znode變化后的data的,隨便填寫,當然你也可以刪了。
exec就是上面提到的 znode變化后執(zhí)行的程序。(我偷懶,指定了一個ls)
zk就是用來初始化ZooKeeper連接的啦,第一個參數(shù)是主機和端口號,第二個是Session超時時間,第三個是DefaultWatcher。DefaultWacheter就是你在不指定wacher對象但是想watch的時候默認的watcher,Wathcer是一個接口。
dm就是初始化DataMoniter用來監(jiān)視znode啦。
Watcher
javapublic void process(WatchedEvent event);
這個是Watcher接口里的唯一函數(shù),用處就是在你關(guān)心的事件被觸發(fā)后,會調(diào)用這個接口,具體參數(shù)內(nèi)容是什么就看文檔吧。
DataMonitor
javapublic DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher, DataMonitorListener listener) { this.zk = zk; this.znode = znode; this.chainedWatcher = chainedWatcher; this.listener = listener; // Get things started by checking if the node exists. We are going // to be completely event driven zk.exists(znode, true, this, null); }
這段代碼是DataMonitor的構(gòu)造函數(shù),除了里面的一些賦值函數(shù),重點說下zk.exists這個函數(shù)
它有4個參數(shù),第一個參數(shù)是你要query的znode名字,類似/zk_test這樣的。第二個參數(shù)是問你要不要watch,就是這個結(jié)點變化后要不要得到通知,第三個是exists結(jié)果的回調(diào),是StatCallback接口,exists目的是想查找這個znode是否存在,結(jié)果從第三個接口返回,
其實這個接口的設(shè)計讓我挺頭疼的,單一職責(zé)原則不見了,這個接口干了兩件事情,第一是告訴你這個znode存在不存在,第二件事是問你要不要監(jiān)控這個znode的變化。OK這里你不提供Watcher的話,就用上面new Zookeeper里面提供的默認Watcher(這里就是我們的Executor對象啦)。
好了讓我們看下工作流吧。
zk.exists第一次調(diào)用的時候,返回的結(jié)果傳給了DataMonitor(實現(xiàn)了StatCallback接口)的processResult函數(shù)
java public void processResult(int rc, String path, Object ctx, Stat stat) { boolean exists; switch (rc) { case Code.Ok: exists = true; break; case Code.NoNode: exists = false; break; case Code.SessionExpired: case Code.NoAuth: dead = true; listener.closing(rc); return; default: // Retry errors zk.exists(znode, true, this, null); return; } byte b[] = null; if (exists) { try { b = zk.getData(znode, false, null); } catch (KeeperException e) { // We don"t need to worry about recovering now. The watch // callbacks will kick off any exception handling e.printStackTrace(); } catch (InterruptedException e) { return; } } if ((b == null && b != prevData) || (b != null && !Arrays.equals(prevData, b))) { listener.exists(b); prevData = b; } }
rc是這次exists查詢的返回碼,具體含義看變量名也大概知道。 最后一個if里面的listener是DataMonitor里面的DataMonitorListener接口,我們的Executor實現(xiàn)了它(其實就是把結(jié)果從DataMonitor再傳出去)。這里實現(xiàn)的效果就是如果結(jié)點存在的話,就把結(jié)點的數(shù)據(jù)傳出去,不然就傳個null,當然如果數(shù)據(jù)沒有變化(用prevData記錄了上一次的數(shù)據(jù))那么就什么都不做。
于是第一次你的exec就執(zhí)行了。
這時候,我用zkCli去刪除了這個監(jiān)視的結(jié)點(我設(shè)置的是/zk_test)或者重新設(shè)置了data,那么我的Watcher都會收到通知(這里我就很奇怪,為什么不是我關(guān)心的事情,比如exists關(guān)心的應(yīng)該是create和delete)
源碼里是這么說的
java/** * Return the stat of the node of the given path. Return null if no such a * node exists. ** If the watch is non-null and the call is successful (no exception is thrown), * a watch will be left on the node with the given path. The watch will be * triggered by a successful operation that creates/delete the node or sets * the data on the node. * * @param path the node path * @param watcher explicit watcher * @return the stat of the node of the given path; return null if no such a * node exists. * @throws KeeperException If the server signals an error * @throws InterruptedException If the server transaction is interrupted. * @throws IllegalArgumentException if an invalid path is specified */ public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException;
OK,我剛剛設(shè)置的Watcher是Executor,實際上Executor是把這個事情傳給了DataMonitor,看process函數(shù)
javapublic void process(WatchedEvent event) { dm.process(event); }
DataMonitor中的process
javapublic void process(WatchedEvent event) { String path = event.getPath(); if (event.getType() == Event.EventType.None) { // We are are being told that the state of the // connection has changed switch (event.getState()) { case SyncConnected: logger.debug("SyncConnected"); // In this particular example we don"t need to do anything // here - watches are automatically re-registered with // server and any watches triggered while the client was // disconnected will be delivered (in order of course) break; case Expired: logger.debug("expired"); // It"s all over dead = true; listener.closing(Code.SESSIONEXPIRED.intValue()); break; } } else { if (path != null && path.equals(znode)) { // Something has changed on the node, let"s find out zk.exists(znode, true, this, null); } } if (chainedWatcher != null) { chainedWatcher.process(event); } }
關(guān)鍵的是第一個if/else, else中處理的事件是我們最關(guān)心的和znode相關(guān)的,我們發(fā)現(xiàn)znode發(fā)生了變化(create,delete,dataChanged,childChanged)就再次調(diào)用exists去進行我們結(jié)點關(guān)心的操作。
好了,說了那么多的代碼其實都是在說框架里的東西。這個demo里最讓人不關(guān)心的是業(yè)務(wù)邏輯部分么= =
請看Executor里面的exists方法
java public void exists(byte[] data) { if (data == null) { if (child != null) { System.out.println("Killing process"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { } } child = null; } else { if (child != null) { System.out.println("Stopping child"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { e.printStackTrace(); } } try { FileOutputStream fos = new FileOutputStream(filename); fos.write(data); fos.close(); } catch (IOException e) { e.printStackTrace(); } try { System.out.println("Starting child"); child = Runtime.getRuntime().exec(exec); new StreamWriter(child.getInputStream(), System.out); new StreamWriter(child.getErrorStream(), System.err); } catch (IOException e) { e.printStackTrace(); } } }
就是根據(jù)我們的要的data來執(zhí)行我們要的代碼之類的。具體大家可以調(diào)試一下。
兩篇文章應(yīng)該夠入個門了= =,接下去我會做一些跟自己工作上業(yè)務(wù)結(jié)合的東西。
真正寫代碼的時候,樣例、文檔、源碼三者聯(lián)合起來看,你才能看的出作者的設(shè)計目的,但是用多了才能知道作者的設(shè)計初衷。
啊,如果有人可以看完這篇blog,在下感謝您的大力支持。
有刺盡管挑= = 大家和諧討論。應(yīng)該還有三的。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/64220.html
摘要:好吧,就是給指定的結(jié)點里面稱之為提供了統(tǒng)一的名稱。分布式鎖服務(wù)這個特性是最吸引我的特性了,如何實現(xiàn)分布式鎖呢,就是使用提供的有序且臨時的特性實現(xiàn)。當然詳細的可以參照分布式鎖避免羊群效應(yīng)這篇文章,同時寫了如何避免羊群效應(yīng)。 最近想學(xué)東西,于是就又拿起前段時間因為沒時間而落下的zookeeper啃了起來,第一次啃完教程發(fā)現(xiàn)什么都不明白,第二次啃完發(fā)現(xiàn),這東西,就這么簡單的東西?。? 先來摘...
摘要:后面聽到的時候,是因為可以作為分布式鎖的一種實現(xiàn)。二為什么能干這么多從上面我們可以知道,可以用來做統(tǒng)一配置管理統(tǒng)一命名服務(wù)分布式鎖集群管理。 前言 只有光頭才能變強。文本已收錄至我的GitHub倉庫,歡迎Star:https://github.com/ZhongFuCheng3y/3y 上次寫了一篇 什么是消息隊列?以后,本來想入門一下Kafka的(裝一下環(huán)境、看看Kafka一些概念...
摘要:項目地址前言大數(shù)據(jù)技術(shù)棧思維導(dǎo)圖大數(shù)據(jù)常用軟件安裝指南一分布式文件存儲系統(tǒng)分布式計算框架集群資源管理器單機偽集群環(huán)境搭建集群環(huán)境搭建常用命令的使用基于搭建高可用集群二簡介及核心概念環(huán)境下的安裝部署和命令行的基本使用常用操作分區(qū)表和分桶表視圖 項目GitHub地址:https://github.com/heibaiying... 前 言 大數(shù)據(jù)技術(shù)棧思維導(dǎo)圖 大數(shù)據(jù)常用軟件安裝指...
閱讀 1898·2021-11-17 09:33
閱讀 6497·2021-10-12 10:20
閱讀 2313·2021-09-22 15:50
閱讀 1802·2021-09-22 15:10
閱讀 636·2021-09-10 10:51
閱讀 640·2021-09-10 10:50
閱讀 3070·2021-08-11 11:19
閱讀 1792·2019-08-30 15:55