成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

JStorm源碼分析系列--01--Nimbus啟動分析

Carbs / 3388人閱讀

摘要:方法首先初始化一個回調(diào)函數(shù),這是當(dāng)一個成為之后就會調(diào)用的一個用于初始化一系列變量的方法,包括拓?fù)淙绾卧诩荷戏峙?,拓?fù)錉顟B(tài)更新,清除函數(shù),還有監(jiān)控線程等。

寫在前面的話,筆者第一次閱讀框架源碼,所以可能有些地方理解錯誤或者沒有詳細(xì)解釋,如果在閱讀過程發(fā)現(xiàn)錯誤很歡迎在文章下面評論指出。文章后續(xù)會陸續(xù)更新,可以關(guān)注或者收藏,轉(zhuǎn)發(fā)請先私信我,謝謝。對了,筆者看的是2.2.1這個版本

概述

??JStorm是一個分布式的實時計算引擎,是阿里巴巴根據(jù)storm的流處理模型進(jìn)行重寫的一個框架,支持相同的邏輯模型(也就是拓?fù)浣Y(jié)構(gòu)),然后底層的實現(xiàn)卻大有不同。不過本文并不是打算對兩個框架進(jìn)行比較,接下來我會從源碼的角度上來解析JStorm是如何工作的。
??作為第一個篇章,筆者先來介紹下nimbus以及它啟動的時候做了什么。JStorm的主節(jié)點上運行著nimbus的守護(hù)進(jìn)程,這個進(jìn)程主要負(fù)責(zé)與ZK通信,分發(fā)代碼,給集群中的從節(jié)點分配任務(wù),監(jiān)視集群狀態(tài)等等。此外nimbus需要維護(hù)的所有狀態(tài)都會存儲在ZK中,JStorm為了減少對ZK的訪問次數(shù)做了一些緩存,這個后續(xù)代碼分析會說到。以上是nimbus功能的簡介,接下來我們從源碼的角度看看Nimbus到底做了什么。首先在Nimbus啟動的時候:

//設(shè)置主線程由于未捕獲異常而突然中止時調(diào)用的默認(rèn)程序
Thread.setDefaultUncaughtExceptionHandler(new DefaultUncaughtExceptionHandler());
//加載集群的配置信息
Map config = Utils.readStormConfig();
//這下面這個方法內(nèi)部注釋掉了,筆者暫時沒有太在意,后續(xù)再補(bǔ)充
JStormServerUtils.startTaobaoJvmMonitor();
//創(chuàng)建一個NimbusServer實例
NimbusServer instance = new NimbusServer();
//創(chuàng)建一個默認(rèn)的nimbus啟動類
INimbus iNimbus = new DefaultInimbus();
//開始進(jìn)行實際的初始化
instance.launchServer(config, iNimbus);

??其實在DefaultUncaughtExceptionHandler中也并沒有太多的處理操作,簡單判斷是否是內(nèi)存溢出,然后正常關(guān)閉,否則就是異常直接拋出然后中斷。讀取配置的過程就不詳細(xì)講解了。NimbusServer這個類主要封裝了一些用于操作Nimbus的成員變量和方法,Nimbus的啟動操作基本都是定義在這個類內(nèi)的(上述代碼就是這個類中的main方法所定義的)。
??最重要的方法是launchServer,接下來就詳細(xì)的解說這個方法的作用,首先來看下launchServer這個方法內(nèi)部的代碼:

private void launchServer(final Map conf, INimbus inimbus) {
    LOG.info("Begin to start nimbus with conf " + conf);
    try {
        //判斷配置模式是否正確
        StormConfig.validate_distributed_mode(conf);
        createPid(conf);
        //設(shè)置退出時的操作
        initShutdownHook();
        //這個方法在默認(rèn)實現(xiàn)中沒有任何操作
        inimbus.prepare(conf, StormConfig.masterInimbus(conf));
        //創(chuàng)建NimbusData對象
        data = createNimbusData(conf, inimbus);
        //這個方法主要負(fù)責(zé)處理當(dāng)nimbus線程稱為leader線程之后的操作
        initFollowerThread(conf);
        int port = ConfigExtension.getNimbusDeamonHttpserverPort(conf);
        hs = new Httpserver(port, conf);
        hs.start();
        //如果集群是運行在yarn上,也需要做一些初始化操作。
        initContainerHBThread(conf);
        serviceHandler = new ServiceHandler(data);
        //thrift是一個分布式的RPC框架
        initThrift(conf);
    } catch (Throwable e) {
        if (e instanceof OutOfMemoryError) {
           LOG.error("Halting due to Out Of Memory Error...");
        }
        LOG.error("Fail to run nimbus ", e);
    } finally {
        cleanup();
    }

    LOG.info("Quit nimbus");
}
判斷配置中的模式

??只是判斷配置信息中的一個字段名為“storm.cluster.mode”是否是“distributed”,本地模式下是“l(fā)ocal”。

initShutdownHook

??添加退出的時候一些操作,包括設(shè)置參數(shù)提醒集群要退出,清除nimbus存儲下的一些工作線程(負(fù)責(zé)處理通信,分發(fā)代碼,心跳的一系列守護(hù)線程),關(guān)閉打開的各種資源等。

createNimbusData

??這個方法用于創(chuàng)建一個NimbusData的對象,這個對象封裝了Nimbus與ZK通信的一些成員變量。下面會在每個方法內(nèi)部逐漸講到NimbusData的一些成員變量以及他們的作用。首先來看看NimbusData的構(gòu)造方法。

public NimbusData(final Map conf, INimbus inimbus) throws Exception {
    this.conf = conf;
    //兩個方法分別處理打開的文件流和blob傳輸流
    createFileHandler();
    mkBlobCacheMap();
    this.nimbusHostPortInfo = NimbusInfo.fromConf(conf);
    this.blobStore = BlobStoreUtils.getNimbusBlobStore(conf, nimbusHostPortInfo);

    this.isLaunchedCleaner = false;
    this.isLaunchedMonitor = false;

    this.submittedCount = new AtomicInteger(0);

    this.stormClusterState = Cluster.mk_storm_cluster_state(conf);

    createCache();

    this.taskHeartbeatsCache = new ConcurrentHashMap>();
    //創(chuàng)建一個調(diào)度線程池,默認(rèn)大小為12
    this.scheduExec = Executors.newScheduledThreadPool(SCHEDULE_THREAD_NUM);

    this.statusTransition = new StatusTransition(this);

    this.startTime = TimeUtils.current_time_secs();

    this.inimubs = inimbus;

    localMode = StormConfig.local_mode(conf);

    this.metricCache = new JStormMetricCache(conf, this.stormClusterState);
    this.clusterName = ConfigExtension.getClusterName(conf);

    pendingSubmitTopologies = new TimeCacheMap(JStormUtils.MIN_10);
    topologyTaskTimeout = new ConcurrentHashMap();
    tasksHeartbeat = new ConcurrentHashMap();

    this.metricsReporter = new JStormMetricsReporter(this);
    this.metricRunnable = ClusterMetricsRunnable.mkInstance(this);
    String configUpdateHandlerClass = ConfigExtension.getNimbusConfigUpdateHandlerClass(conf);
    this.configUpdateHandler = (ConfigUpdateHandler) Utils.newInstance(configUpdateHandlerClass);
        

    if (conf.containsKey(Config.NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN)) {
        String string = (String) conf.get(Config.NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN);
        nimbusNotify = (ITopologyActionNotifierPlugin) Utils.newInstance(string);
            
    } else {
        nimbusNotify = null;
    }

}

??3.1 createFileHandler:在這方法內(nèi)部,實現(xiàn)了一個匿名的內(nèi)部類ExpiredCallback,在其內(nèi)部實現(xiàn)了一個方法叫expire,利用回調(diào)的方式來關(guān)閉Channel或者BufferFileInputStream實例對象。

public void createFileHandler() {
    ExpiredCallback expiredCallback = new ExpiredCallback() {
        @Override
        public void expire(Object key, Object val) {
            try {
                LOG.info("Close file " + String.valueOf(key));
                if (val != null) {
                    if (val instanceof Channel) {
                        Channel channel = (Channel) val;
                        channel.close();
                    } else if (val instanceof BufferFileInputStream) {
                        BufferFileInputStream is = (BufferFileInputStream) val;
                        is.close();
                    }
                }
            } catch (IOException e) {
                LOG.error(e.getMessage(), e);
            }

        }
    };
    //獲取超時時間
    int file_copy_expiration_secs = JStormUtils.parseInt(conf.get(Config.NIMBUS_FILE_COPY_EXPIRATION_SECS), 30);
    uploaders = new TimeCacheMap(file_copy_expiration_secs, expiredCallback);
    downloaders = new TimeCacheMap(file_copy_expiration_secs, expiredCallback);
}

??然后初始化NimbusData的兩個成員變量uploadersdownloaders,這兩個分別維護(hù)需要上傳的通道和需要下載的通道。TimeCacheMap這個類的主要實現(xiàn)邏輯是在其構(gòu)造函數(shù)內(nèi)部啟動一個守護(hù)線程。首先創(chuàng)建一個緩沖區(qū),只要系統(tǒng)不關(guān)閉,則在守護(hù)線程內(nèi)部不斷的緩沖區(qū)獲取對象,在對象不為空的情況下調(diào)用回調(diào)函數(shù)的expire方法,并執(zhí)行相應(yīng)的操作,這里具體傳進(jìn)來的expire方法是關(guān)閉Channel或者BufferFileInputStream
??3.2. mkBlobCacheMap:和上述的方法非常類似,也是申明一個匿名內(nèi)部類,然后初始化幾個成員變量。代碼幾乎和上個方法一樣就不浪費拌面去貼了。這里expire方法中要關(guān)閉的是兩個流AtomicOutputStreamBufferInputStream,blobUploadersblobDownloaders分別存放著上傳和下載所打開的流。blobListers存放上傳和下載的數(shù)據(jù)。
??3.3. 初始化幾個成員變量,包括NimbusInfo(包含了主機(jī)名,端口和標(biāo)志是否是leader),BlobStore(用來存儲blob數(shù)據(jù)的,使用鍵值存儲,阿里提供了兩個不同的blob存儲方式,一種是本地文件系統(tǒng)存儲,一種的hdfs存儲,兩種方式的區(qū)別在于,由于本地文件存儲并不能保證一致性,所以需要ZK介入來保證,這是JStorm的默認(rèn)配置。如果使用hdfs來存儲,則不需要ZK介入,因為hdfs能保證一致性和正確性),StormClusterState(存儲整個集群的狀態(tài),這個是從ZK上獲取的),為了避免多次向ZK通信,還需要設(shè)置緩存信息,任務(wù)的心跳信息等等。
??3.4. 初始化好metrics相關(guān)的報告線程和監(jiān)聽線程。

initFollowerThread

??4.1. 方法首先初始化一個回調(diào)函數(shù),這是當(dāng)一個nimbus成為leader之后就會調(diào)用的一個用于初始化一系列變量的方法,包括拓?fù)淙绾卧诩荷戏峙?,拓?fù)錉顟B(tài)更新,清除函數(shù),還有監(jiān)控線程等。后續(xù)會有新的篇章來介紹這個init方法,這里先放這個方法的源碼。

private void init(Map conf) throws Exception {
    data.init();
    
    NimbusUtils.cleanupCorruptTopologies(data);
    //拓?fù)浞峙?    initTopologyAssign();
    //狀態(tài)更新
    initTopologyStatus();
    //清除函數(shù)
    initCleaner(conf);
    
    initMetricRunnable();
    
    if (!data.isLocalMode()) {
        initMonitor(conf);
        //mkRefreshConfThread(data);
    }
}

??4.2. 初始化一個Runnable的子類,在構(gòu)造方法中,首先判斷集群并不是使用本地模式,然后更新ZK上的節(jié)點信息(將nimbus注冊到ZK上)。然后通過ZK獲取集群的狀態(tài)信息,畢竟nimbus是需要維護(hù)整個集群的。緊接著判斷是否存在leader,兩次都無法選舉出leader之后,則將ZK上的nimbus信息刪除并退出。如果blobstore使用的是本地文件模式(有本文模式還有hdfs模式兩種)還需要添加一個回調(diào)函數(shù),這個回調(diào)函數(shù)執(zhí)行的操作是,當(dāng)這個nimbus不是leader的時候,對blob進(jìn)行同步。此外還需要將那些active的blob存到ZK中,而將死掉的進(jìn)行清除(原因前文3.3也說到過,本地模式存儲無法保證一致性,所以需要ZK進(jìn)行維護(hù),而hdfs自帶容錯機(jī)制,能保證數(shù)據(jù)的一致性)。
??4.3. 設(shè)置該線程為守護(hù)線程,并啟動這個線程。run方法首先判斷當(dāng)前保存在ZK上的集群中是否有l(wèi)eader,如果沒有則選舉當(dāng)前nimbus為leader線程。如果有了leader線程,則需要判斷是否跟當(dāng)前的nimbus相同,如果不相同則停止當(dāng)前的nimbus,畢竟已經(jīng)有l(wèi)eader存在了。如果是相同的,則需要判斷本地的狀態(tài)中,如果還沒有設(shè)置為leader,表明當(dāng)前nimbus還沒有進(jìn)行初始化,則先設(shè)置nimbus為leader然后回調(diào)函數(shù)進(jìn)行初始化,也就是調(diào)用init(conf)方法。
獲取一個端口(默認(rèn)的端口是7621)用于構(gòu)建HttpServer實例對象??梢杂糜谔幚砗徒邮躷cp連接,啟動一個新的線程進(jìn)行httpserver的監(jiān)聽。(主要作用或者說在哪里用到尚且不明確)。

initContainerHBThread

??這個方法的主要作用是得知是否能在資源管理器(yarn)上運行jstorm集群,如果可以的話,則需要創(chuàng)建一個新的線程用于處理。(其實這里使用容器的目的是可以在一個物理集群上運行多個不一樣的邏輯集群甚至多個JStorm集群,能動態(tài)調(diào)整邏輯集群分到的資源,此外,資源管理器能提供非常強(qiáng)的可擴(kuò)展性)。容器線程會被添加到NimbusServer中,后續(xù)使用到的時候再詳細(xì)講解。這個容器線程也是守護(hù)線程,且馬上就會啟動,這個線程的run方法里面包含兩個處理:
??6.1. handleWriteDir:這個方法的主要作用是清除掉容器上的過期心跳信息,準(zhǔn)確的說,如果JStorm集群容器目錄下的心跳信息大于10,則需要清除(從最老的開始)。
??6.2. handlReadDir:這里主要是用于維護(hù)本地是否能接受到集群上的hb信息,如果多次超時則要拋出異常。

initThrift

??thrift是JStorm使用的一個分布式RPC框架。筆者后續(xù)再添加相應(yīng)的源碼解析。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/66980.html

相關(guān)文章

  • JStorm源碼分析系列--02--拓?fù)浞峙銽opologyAssign

    摘要:下面就來講講第一個初始化操作拓?fù)浞峙洹H绻麤]有舊的分配信息,說明拓?fù)浞峙漕愋蜑椤5竭@里,預(yù)分配,創(chuàng)建拓?fù)浞峙渖舷挛木屯瓿闪?。集群下的分配,見下文講解資源準(zhǔn)備首先第一步是判斷拓?fù)浞峙涞念愋褪欠穹弦螅环蟿t拋出異常。 ??寫在前面的話,筆者第一次閱讀框架源碼,所以可能有些地方理解錯誤或者沒有詳細(xì)解釋,如果在閱讀過程發(fā)現(xiàn)錯誤很歡迎在文章下面評論指出。文章后續(xù)會陸續(xù)更新,可以關(guān)注或者收藏...

    vincent_xyb 評論0 收藏0
  • Docker 調(diào)試技巧

    摘要:即使是容器已經(jīng)退出的也可以看到,所以可以通過這種方式來分析非預(yù)期的退出。也可以直接通過在容器內(nèi)啟動一個更方便地調(diào)試容器,不必一條條執(zhí)行。和獲得容器中進(jìn)程的狀態(tài)和在容器里執(zhí)行的效果類似。通過查看容器的詳細(xì)信息飯后鏡像和容器的詳細(xì)信息。 『重用』容器名 但我們在編寫/調(diào)試Dockerfile的時候我們經(jīng)常會重復(fù)之前的command,比如這種docker run --name jstorm-...

    Coding01 評論0 收藏0
  • 【深度】| 值得收藏的阿里開源技術(shù)

    摘要:淘寶定制基于,是國內(nèi)第一個優(yōu)化定制且開源的服務(wù)器版虛擬機(jī)。數(shù)據(jù)庫開源數(shù)據(jù)庫是基于官方版本的一個分支,由阿里云數(shù)據(jù)庫團(tuán)隊維護(hù),目前也應(yīng)用于阿里巴巴集團(tuán)業(yè)務(wù)以及阿里云數(shù)據(jù)庫服務(wù)。淘寶服務(wù)器是由淘寶網(wǎng)發(fā)起的服務(wù)器項目。 Java JAVA 研發(fā)框架 SOFAStack SOFAStack(Scalable Open Financial Architecture Stack)是用于快速構(gòu)建金融...

    econi 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<