成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

JStorm源碼分析系列--02--拓?fù)浞峙銽opologyAssign

vincent_xyb / 1910人閱讀

摘要:下面就來講講第一個初始化操作拓?fù)浞峙洹H绻麤]有舊的分配信息,說明拓?fù)浞峙漕愋蜑椤5竭@里,預(yù)分配,創(chuàng)建拓?fù)浞峙渖舷挛木屯瓿闪?。集群下的分配,見下文講解資源準(zhǔn)備首先第一步是判斷拓?fù)浞峙涞念愋褪欠穹弦?,不符合則拋出異常。

??寫在前面的話,筆者第一次閱讀框架源碼,所以可能有些地方理解錯誤或者沒有詳細(xì)解釋,如果在閱讀過程發(fā)現(xiàn)錯誤很歡迎在文章下面評論指出。文章后續(xù)會陸續(xù)更新,可以關(guān)注或者收藏,轉(zhuǎn)發(fā)請先私信我,謝謝。對了,筆者看的是2.2.1這個版本。上一篇博客,JStorm源碼分析系列--01--Nimbus啟動分析筆者講解了Nimbus啟動過程中做的一些基本的操作,在initFollowerThread方法中,如果當(dāng)前的Nimbus變成Leader之后,這個方法內(nèi)會負(fù)責(zé)執(zhí)行一些初始化init操作。下面就來講講第一個初始化操作--拓?fù)浞峙?。本文將詳?xì)(非常長,所以慢慢看)的講解如何去為一個拓?fù)浞峙湎鄳?yīng)的資源。
??從方法initTopologyAssign開始,TopologyAssign是一個單例對象,在這個類的init方法內(nèi),做了簡單的賦值操作之后,并初始化一個調(diào)度器實例對象之后,就建立一個守護線程,這個守護線程的目的是不斷從TopologyAssign內(nèi)部維護的一個阻塞隊列中讀取系統(tǒng)提交的拓?fù)淙蝿?wù),并調(diào)用相應(yīng)的方法doTopologyAssignment進行分配操作。代碼都比較簡單,就不浪費版面去貼了。
??下面是doTopologyAssignment方法的源碼,

    protected boolean doTopologyAssignment(TopologyAssignEvent event) {
        Assignment assignment;
        try {
            Assignment oldAssignment = null;
            boolean isReassign = event.isScratch();
            if (isReassign) {
                //如果存在舊的分配信息,需要先將舊的分配信息存儲下來
                oldAssignment = nimbusData.getStormClusterState().assignment_info(event.getTopologyId(), null);
            }
            //調(diào)用方法執(zhí)行新的分配
            assignment = mkAssignment(event);
            //將task添加到集群的metrics中
            pushTaskStartEvent(oldAssignment, assignment, event);

            if (!isReassign) {
                //如果是新建的拓?fù)?,需要把拓?fù)湓O(shè)置為active狀態(tài)
                setTopologyStatus(event);
            }
        } catch (Throwable e) {
            LOG.error("Failed to assign topology " + event.getTopologyId(), e);
            event.fail(e.getMessage());
            return false;
        }

        if (assignment != null)
            //將拓?fù)鋫浞莸絑K上
            backupAssignment(assignment, event);
        event.done();
        return true;
    }

??所以,最重要的方法還是mkAssignment,這里執(zhí)行了實際的分配操作。下面就來詳細(xì)的介紹這個方法。

prepareTopologyAssign

??prepareTopologyAssign這個方法總體的目的為了初始化拓?fù)浞峙涞纳舷挛男畔?,生成一個TopologyAssignContext的實例對象。這個上下文對象需要存下拓?fù)涞暮芏嚓P(guān)鍵信息,包括拓?fù)涞慕M件信息(用StormTopology對象保存,下文在添加acker的時候會詳細(xì)介紹這個類),拓?fù)涞呐渲眯畔?,拓?fù)渖纤械膖ask id,以及死掉的task id,unstopped task id(這里的解釋是,那些supervisor死掉但是worker還繼續(xù)運行的稱為unstopworker,而包含在unstopworker內(nèi)的task則稱為unstoppedTask)。以及這個拓?fù)淠芊峙涞降膚orker,以上提及的這些信息都會在這個方法內(nèi)慢慢的初始化。下面一步步來看吧。prepareTopologyAssign方法的源碼比較長,一部分一部分來講解。

//創(chuàng)建一個上下文的實例對象
TopologyAssignContext ret = new TopologyAssignContext();

String topologyId = event.getTopologyId();
ret.setTopologyId(topologyId);

int topoMasterId = nimbusData.getTasksHeartbeat().get(topologyId).get_topologyMasterId();
ret.setTopologyMasterTaskId(topoMasterId);
LOG.info("prepareTopologyAssign, topoMasterId={}", topoMasterId);

Map nimbusConf = nimbusData.getConf();
//根據(jù)拓?fù)鋓d從nimbus上讀取拓?fù)涞呐渲眯畔?Map topologyConf = StormConfig.read_nimbus_topology_conf(topologyId, nimbusData.getBlobStore());
//這里讀取拓?fù)渲懈鱾€組件的一個結(jié)構(gòu),后續(xù)會講解這個類的組成
StormTopology rawTopology = StormConfig.read_nimbus_topology_code(topologyId, nimbusData.getBlobStore());
ret.setRawTopology(rawTopology);
//設(shè)置一些配置信息
Map stormConf = new HashMap();
stormConf.putAll(nimbusConf);
stormConf.putAll(topologyConf);
ret.setStormConf(stormConf);

??緊接著,根據(jù)目前集群的狀態(tài),初始化一份集群上所有的supervisor,并獲取所有可用的worker

StormClusterState stormClusterState = nimbusData.getStormClusterState();

// get all running supervisor, don"t need callback to watch supervisor
Map supInfos = Cluster.get_all_SupervisorInfo(stormClusterState, null);
// init all AvailableWorkerPorts
for (Entry supInfo : supInfos.entrySet()) {
     SupervisorInfo supervisor = supInfo.getValue();
     if (supervisor != null)
        //設(shè)置全部的端口都為可用,后面通過HB去除掉那些已經(jīng)被使用的worker
        //supervisor是一個k-v,k是supervisorid,v是保存實例信息
        supervisor.setAvailableWorkerPorts(supervisor.getWorkerPorts());
}
//這個方法就是利用HB去掉那些掛掉的supervisor
//判斷的方法是獲取每個supervisor最近的HB時間,
//由當(dāng)前時間減去最近HB時間和超時時間做對比。
getAliveSupervsByHb(supInfos, nimbusConf);

??接下來獲取拓?fù)渲卸x的taskid對應(yīng)上組件,這里要解釋下,對于一個拓?fù)涠?,taskid總是從1開始分配的,并且,相同的組件taskid是相鄰的。比如你定義了一個SocketSpout(并行度5),一個PrintBolt(并行度4,那么SocketSpout的taskid可能是1-5,PrintBolt的taskid可能是6-9。

//這個k-v,k是taskid,v是拓?fù)鋬?nèi)定義的組件的id。
//寫過應(yīng)用的同學(xué)都應(yīng)該知道,TopologyBuilder在setSpout或者Bolt的時候,需要指定<組件id,對象,和并行度>。
//eg:builder.setSpout("integer", new ReceiverSpout(), 2);
Map taskToComponent = Cluster.get_all_task_component(stormClusterState, topologyId, null);
ret.setTaskToComponent(taskToComponent);

//獲取所有的taskid。
Set allTaskIds = taskToComponent.keySet();
ret.setAllTaskIds(allTaskIds);

??如果原來存在舊的拓?fù)浞峙湫畔ⅲ€需要設(shè)置unstoppedTasks,deadTasks,unstoppedWorkers等信息。然后調(diào)用getFreeSlots方法負(fù)責(zé)去除那些已經(jīng)分配出去的worker。處理過程比較直觀,獲取集群上所有的拓?fù)浞峙湫畔?,然后根?jù)每個分配信息中保存的worker信息,從原先supInfos中移除那些被分配出去的worker。
??如果沒有舊的分配信息,說明拓?fù)浞峙漕愋蜑?b>ASSIGN_TYPE_NEW。如果存在同名的拓?fù)?,也會把同名的拓?fù)湓O(shè)置舊的分配信息,放到上下文中。如果存在舊的分配信息,需要把舊的分配信息放入到上下文中,此外還要判斷是ASSIGN_TYPE_REBALANCE還是ASSIGN_TYPE_MONITOR,因為還需要設(shè)置unstoppedWorkers的信息。到這里,預(yù)分配,創(chuàng)建拓?fù)浞峙渖舷挛木屯瓿闪恕D壳拔覀儙в斜容^重要的信息是拓?fù)渌械膖askid,以及拓?fù)浠镜慕M件信息。

集群assignTasks

??在完成拓?fù)渖舷挛某跏蓟螅_始實際給拓?fù)浞峙湎鄳?yīng)的worker,不過這里需要判斷是本地模式還是集群模式,本地模式下比較簡單,找個一個合適的端口,然后新建一個worker的資源對象ResourceWorkerSlot,將一些關(guān)鍵信息如hostname,port,allTaskId配置好。因為local模式下比較簡單,所以,即使設(shè)置多個worker也不會啟動多個jvm。而在集群模式下,一個worker表示的是一個jvm進程。下面就重點講解集群下的分配情況。我把集群上的分配過程(assignTasks這個方法)分成三個主要的部分,分別是資源準(zhǔn)備,worker分配,task分配。

Set assignments = null;
if (!StormConfig.local_mode(nimbusData.getConf())) {
    IToplogyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME);
    //集群下的分配,見下文講解
    assignments = scheduler.assignTasks(context);
} else {
    assignments = mkLocalAssignment(context);
}
資源準(zhǔn)備

??首先第一步是判斷拓?fù)浞峙涞念愋褪欠穹弦?,不符合則拋出異常。緊接著,根據(jù)上一個方法生成的拓?fù)浞峙渖舷挛膩砩梢粋€默認(rèn)的拓?fù)浞峙渖舷挛膶嵗龑ο?,DefaultTopologyAssignContext這個類的構(gòu)造方法執(zhí)行了很多很細(xì)節(jié)的操作。包括為拓?fù)涮砑痈郊拥慕M件,存儲下taskid和組件的對應(yīng)信息,計算拓?fù)湫枰膚orker數(shù)目,計算unstopworker的數(shù)目等。

//根據(jù)之前的上下文,初始化一個分配的上下文對象
DefaultTopologyAssignContext defaultContext = new DefaultTopologyAssignContext(context);
if (assignType == TopologyAssignContext.ASSIGN_TYPE_REBALANCE) {
    freeUsed(defaultContext);
}

??下面代碼是DefaultTopologyAssignContext的構(gòu)造方法

public DefaultTopologyAssignContext(TopologyAssignContext context){
    super(context);
    try {
        sysTopology = Common.system_topology(stormConf, rawTopology);
    } catch (Exception e) {
        throw new FailedAssignTopologyException("Failed to generate system topology");
    }

    sidToHostname = generateSidToHost();
    hostToSid = JStormUtils.reverse_map(sidToHostname);

    if (oldAssignment != null && oldAssignment.getWorkers() != null) {
        oldWorkers = oldAssignment.getWorkers();
    } else {
        oldWorkers = new HashSet();
    }

    refineDeadTasks();

    componentTasks = JStormUtils.reverse_map(context.getTaskToComponent());

    for (Entry> entry : componentTasks.entrySet()) {
    List componentTaskList = entry.getValue();
    Collections.sort(componentTaskList);
}

    totalWorkerNum = computeWorkerNum();
    unstoppedWorkerNum = computeUnstoppedAssignments();
}
添加附加組件

??從上面的代碼可以看出在DefaultTopologyAssignContext的構(gòu)造方法中,第一句是調(diào)用父類構(gòu)造方法先去初始化一些參數(shù),然后調(diào)用system_topology這個方法。下面來看看這個方法的內(nèi)部。第一個方法就是添加一個acker到原來的拓?fù)渲腥?。拓?fù)渥鳛镴Strom處理的一個邏輯模型,對用戶提供了非常簡單且強大的編程原語,只要分別繼承兩大組件,就可以構(gòu)造一個拓?fù)淠P?,但是實際上,一個實際運行的拓?fù)淠P瓦h遠不止用戶定義的用于處理輸入的spout和用于處理業(yè)務(wù)的bolt,JStorm為了保證消息的可靠性,拓?fù)銶etrics管理,拓?fù)銱B管理,再拓?fù)鋵嶋H模型中添加了幾個非常重要的bolt,下面就詳細(xì)的介紹acker,用于保證消息的可靠性。

public static StormTopology system_topology(Map storm_conf, StormTopology topology) throws InvalidTopologyException {
    StormTopology ret = topology.deepCopy();
    add_acker(storm_conf, ret);
    addTopologyMaster(storm_conf, ret);
    add_metrics_component(ret);
    add_system_components(ret);
    return ret;
}
StormTopology

??這里先來介紹下StormTopology這個類,才能往下理解。StormTopology這個類用于存儲拓?fù)涞慕M件信息,在這個類內(nèi)部,有三個非常重要的成員變量,分別存儲spout和bolt以及state_spout,第三個筆者暫時沒有弄清楚其作用,但是前兩個就非常明顯,分別存儲拓?fù)涞膬纱蠼M件,spout和bolt

  private Map spouts; // required
  private Map bolts; // required
  private Map state_spouts; // required

??Map中的key表示我們定義的組件的id,上文提到過的id。SpoutSpec和Bolt中有兩個重要的成員變量。

  private ComponentObject spout_object; // required
  private ComponentCommon common; // required

??ComponentObject用于存儲序列化后的代碼信息,第二個ComponentCommon用于存儲很重要的配置信息,包括輸入的流,輸出的流和分組信息。有三個重要的成員變量

  //GlobalStreamId有兩個String成員變量,componentId表示這個輸入組件的流來源的那個組件id,
  //streamId表示componentId所輸出的特定的流
  private Map inputs; // 輸入的來源和分組情況
  //StreamInfo有個重要的成員變量List output_fields,表示輸出的域。
  private Map streams; // 輸出的流
  private int parallelism_hint; // 并行度

??根據(jù)上述的結(jié)構(gòu),StormTopology能夠完整的表示拓?fù)渲忻總€組件輸出之后的流所流向的位置。

acker

??這一小節(jié)筆者不打算先從源碼的角度入手,先來將一個acker的作用以及從一個小例子來講解acker是怎樣工作的。我們都知道作為一個流式處理框架,消息的可靠性是一個非常特性之一。除開更加高級的事務(wù)框架能保證消息只被處理一次(exactly-once),JStorm本身也提供了at-least-once,這個機制能保證消息一定會被處理。下面從一個例子的角度來講解,這是如何實現(xiàn)的。

??如上圖所示,integer作為輸入的spout,sliding和printer都是負(fù)責(zé)處理的bolt,F(xiàn)ield表示之間輸出的元組內(nèi)的元素對應(yīng)的key。StreamID為默認(rèn),不指定數(shù)據(jù)流分組的形式,則默認(rèn)情況下shuffle。上述是一個非常簡單的拓?fù)溥壿嫿Y(jié)構(gòu),然后在經(jīng)過add_acker這個方法之后,實際的拓?fù)浣Y(jié)構(gòu)發(fā)生了一些變化,如下圖

??JStrom為原來的拓?fù)浣Y(jié)構(gòu)添加了一個_ack的bolt,負(fù)責(zé)維護拓?fù)涞目煽啃裕笾碌那闆r可以從上圖中看出,每當(dāng)一個元組被發(fā)送到拓?fù)湎掠蝏olt中去的時候,也會發(fā)送到_ack中去保存下來,然后后續(xù)處理的每個bolt每次調(diào)用ack函數(shù)都會發(fā)送給_ack(bolt),在指定時間間隔內(nèi)收到最后處理的ack,那么_ack(bolt)就發(fā)送一個消息給最初的spout,則保證了一個元組的可靠性。所以綜上,_ack這個Bolt就是維護了整個拓?fù)涞目煽啃裕敲醋x者可能會問,_ack里面保存了那么多的消息,如果某個元組經(jīng)過的組件非常多,是否會造成該元組的拓?fù)錁渥兊暮艽蟆_@里阿里利用異或,實現(xiàn)了一個非常簡單且高效低耗的判斷方法。
??其實在_ack中存儲的內(nèi)容非常簡單,就是一個k-v鍵值對,k是一個隨機無重復(fù)的id(root_id),且在元組被處理的整個過程中保持不變,將消息存儲為,random由每個收到元組的組件生成,每經(jīng)過一個組件,random就會改變一次。如上圖,integer在發(fā)送一個給sliding之后,也會發(fā)送一個給_ack,然后sliding經(jīng)過處理之后,發(fā)送給printer,并且發(fā)送一個給_ack,然后當(dāng)printer處理完之后在發(fā)送一個給_ack,此時的_ack內(nèi)部對于root_id這個消息的值是x^x^y^y=0。也就是處理成功,如果達到指定超時時間root_id對應(yīng)的值還不是0,則需要通知給出這個元組的task(_ack也是一個bolt,所以內(nèi)部也有保存某個消息的來源task),要求重發(fā)。以上就是JStorm用于保證消息可靠性所使用的方法,直觀且簡單。
??后續(xù)的幾個方法如addTopologyMaster,add_metrics_component,add_system_components都是添加了相應(yīng)的控件(bolt)來進行協(xié)同操作。比如topology master可以負(fù)責(zé)metrics,也可以負(fù)責(zé)baskpressure(反壓)機制。筆者還沒深入解讀,相應(yīng)部分后續(xù)再做相應(yīng)的添加,這里先挖個坑。

計算worker數(shù)目

??在DefaultTopologyAssignContext的構(gòu)造函數(shù)中,添加完附加的組件之后,緊接著獲取supervisorid和hostname對應(yīng)的鍵值對,如果存在舊的分配信息,則獲取原先所有的worker,如果沒有,則新建一個worker的集合。去除deadtaskid中那些在unstopworker內(nèi)的task(這里的目的是分開處理,如果是new的情況下,這兩個都是空集)。然后計算需要的worker數(shù)目??聪旅娴脑创a,

private int computeWorkerNum() {
    //獲取拓?fù)湓O(shè)置的worker數(shù)目
    Integer settingNum = JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_WORKERS));
    //
    int ret = 0, hintSum = 0, tmCount = 0;

    Map components =     ThriftTopologyUtils.getComponents(sysTopology);
    for (Entry entry : components.entrySet()) {
        String componentName = entry.getKey();
        Object component = entry.getValue();

        ComponentCommon common = null;
        if (component instanceof Bolt) {
            common = ((Bolt) component).get_common();
        }
        if (component instanceof SpoutSpec) {
            common = ((SpoutSpec) component).get_common();
        }
        if (component instanceof StateSpoutSpec) {
            common = ((StateSpoutSpec) component).get_common();
        }
        //獲取每個組件中設(shè)置的并行度
        int hint = common.get_parallelism_hint();
        if (componentName.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) {
            //如果是屬于TM組件,則加到tmCount
            tmCount += hint;
            continue;
        }
        //這個變量存下所有組件并行度的和
        hintSum += hint;
    }
    
    //ret存下較小的值
    if (settingNum == null) {
        ret = hintSum;
    } else {
        ret =  Math.min(settingNum, hintSum);
    }
    //這里還需要判斷主TM是否需要一個獨立的worker節(jié)點用于處理
    Boolean isTmSingleWorker = ConfigExtension.getTopologyMasterSingleWorker(stormConf);
    if (isTmSingleWorker != null) {
        if (isTmSingleWorker == true) {
        ret += tmCount;
        setAssignSingleWorkerForTM(true);
    }
    } else {
        if (ret >= 10) {
            ret += tmCount;
        setAssignSingleWorkerForTM(true);
        }
    }
    return ret;
}
worker分配

??實例化完DefaultTopologyAssignContext之后,如果是rebalance類型,則還需要先將原先占用的那些worker給釋放掉,具體做法就是將worker使用的端口放回可用端口集合中。幾個變量的含義,needAssignTasks:就是指需要分配的task,也就是除去unstopworker中的那些task。allocWorkerNum:等于原先計算好的worker的數(shù)目-減去unstopworker的數(shù)目再減去keepAssigns(只有在拓?fù)漕愋褪茿SSIGN_TYPE_MONITOR才有的)的數(shù)目。實際worker分配中,最重要是方法WorkerScheduler.getAvailableWorkers。下面就來詳細(xì)講解這個方法內(nèi)部怎么實現(xiàn)。

    int workersNum = getAvailableWorkersNum(context);
    if (workersNum < allocWorkerNum) {
        throw new FailedAssignTopologyException("there"s no enough worker.allocWorkerNum="+ allocWorkerNum + ", availableWorkerNum="+ workersNum);
}
    workersNum = allocWorkerNum;
    List assignedWorkers = new ArrayList();

    getRightWorkers(context,needAssign,assignedWorkers,workersNum,getUserDefineWorkers(context, ConfigExtension.getUserDefineAssignment(context.getStormConf())));

??首先得知集群上可用的全部worker,如果可用的worker小于需要分配的worker數(shù),則需要拋出異常。如果足夠,則會分配足量的worker給指定的拓?fù)洹U{(diào)用getRightWorkers這個方法來獲取合適的worker,這里所謂right的worker是指用戶自定義的worker,可以指定worker的資源分配情況。

getRightWorkers

??分為兩部分來講解這個方法,首先是準(zhǔn)備工作--getUserDefineWorkers這個方法,這個方法需要兩個參數(shù),拓?fù)涞纳舷挛男畔ontext,用戶自定義的worker列表workers??聪旅娴脑创a:

private List getUserDefineWorkers(
            DefaultTopologyAssignContext context, List workers) {
    List ret = new ArrayList();
    //如果沒有用戶自定義的worker,則沒必要任何操作
    if (workers == null)
        return ret;
    Map> componentToTask = (HashMap>) ((HashMap>) context
                .getComponentTasks()).clone();
    //如果分配類型不是NEW,則還是從workers資源分配信息列表中去除unstopworker。
    //這里是用戶有指定某些worker資源屬于unstopworker才能去掉。
    if (context.getAssignType() != context.ASSIGN_TYPE_NEW) {
        checkUserDefineWorkers(context, workers, context.getTaskToComponent());
}
    //遍歷用戶定義的worker,去除那些沒有分配task的worker
    //用戶定義的worker中已經(jīng)指定哪些task該分配到哪個worker中
    for (WorkerAssignment worker : workers) {
        ResourceWorkerSlot workerSlot = new ResourceWorkerSlot(worker,componentToTask);
        if (workerSlot.getTasks().size() != 0) {
            ret.add(workerSlot);
        }
    }
return ret;
}

??去除那些沒有指定task的worker之后,真正進入getRightWorkers方法內(nèi)部。源碼如下,這里解釋下五個參數(shù)的含義,context表示之前準(zhǔn)備的拓?fù)渖舷挛男畔ⅲ?b>needAssign表示這個拓?fù)湫枰峙涞母鱾€taskid,assignedWorkers表示用來存儲那些在這個方法內(nèi)分配到的worker資源,workersNum表示需要拓?fù)湫枰峙涞膚orker數(shù)目,workers表示上個方法中用戶自定義的可用的worker資源。簡而言之,這個方法就是從workers中選出已經(jīng)分配了指定的task的worker,然后存到assignedWorkers中去。

private void getRightWorkers(DefaultTopologyAssignContext context,
            Set needAssign, List assignedWorkers,
            int workersNum, Collection workers) {
        Set assigned = new HashSet();
        List users = new ArrayList();
        if (workers == null)
            return;
        for (ResourceWorkerSlot worker : workers) {
            boolean right = true;
            Set tasks = worker.getTasks();
            if (tasks == null)
                continue;
            for (Integer task : tasks) {
                if (!needAssign.contains(task) || assigned.contains(task)) {
                    right = false;
                    break;
                }
            }
            if (right) {
                assigned.addAll(tasks);
                users.add(worker);
            }
        }
        if (users.size() + assignedWorkers.size() > workersNum) {
            LOG.warn(
                    "There are no enough workers for user define scheduler / keeping old assignment, userdefineWorkers={}, assignedWorkers={}, workerNum={}",
                    users, assignedWorkers, workersNum);
            return;
        }

        assignedWorkers.addAll(users);
        needAssign.removeAll(assigned);
    }

??上面代碼主要的處理邏輯是在for循環(huán)中,在這個循環(huán)會去判斷worker內(nèi)是否存有本拓?fù)鋬?nèi)的taskid,如果有則把worker存儲起來,并且從taskid列表中移除掉那些分配出去的task,沒有則直接退出了。

使用舊分配/rebalance

??回到getAvailableWorkers方法內(nèi),看下面這段代碼。

    //如果配置指定要使用舊的分配,則從舊的分配中選出合適的worker。
        if (ConfigExtension.isUseOldAssignment(context.getStormConf())) {
            getRightWorkers(context, needAssign, assignedWorkers, workersNum,
                    context.getOldWorkers());
        } else if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE
                && context.isReassign() == false) {
            //如果是rebalance,且可以使用原來的worker,將原來使用的worker存儲起來。
            int cnt = 0;
            for (ResourceWorkerSlot worker : context.getOldWorkers()) {
                if (cnt < workersNum) {
                    ResourceWorkerSlot resFreeWorker = new ResourceWorkerSlot();
                    resFreeWorker.setPort(worker.getPort());
                    resFreeWorker.setHostname(worker.getHostname());
                    resFreeWorker.setNodeId(worker.getNodeId());
                    assignedWorkers.add(resFreeWorker);
                    cnt++;
                } else {
                    break;
                }
            }
        }
        // 計算TM bolt的個數(shù)
        int workersForSingleTM = 0;
        if (context.getAssignSingleWorkerForTM()) {
            for (Integer taskId : needAssign) {
                String componentName = context.getTaskToComponent().get(taskId);
                if (componentName.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) {
                    workersForSingleTM++;
                }
            }
        }
        int restWokerNum = workersNum - assignedWorkers.size();
        if (restWokerNum < 0)
            throw new FailedAssignTopologyException(
                    "Too much workers are needed for user define or old assignments. workersNum="
                            + workersNum + ", assignedWokersNum="
                            + assignedWorkers.size());

??筆者一開始覺得上述的代碼可能是在判斷restWokerNum < 0是很可能會成立而導(dǎo)致拋出異常的,因為如果用戶一開始就指定了worker分配信息,然后rebalance情況下,不斷去添加舊的worker到assignedWorkers內(nèi),這樣就會導(dǎo)致assignedWorkers的大小比實際需要的worker數(shù)目workersNum大。但是還沒來得及用實際集群去測試,只是在github問了官方的人,如果有更新解決方案會后續(xù)再這里說明。

分配剩下的worker
    //restWokerNum是剩下需要的worker的數(shù)目,直接添加ResourceWorkerSlot實例對象。
    for (int i = 0; i < restWokerNum; i++) {
        assignedWorkers.add(new ResourceWorkerSlot());
    }
    //這里是獲取那些專門指定運行拓?fù)涞膕upervisor節(jié)點。
    List isolationSupervisors = this.getIsolationSupervisors(context);
    if (isolationSupervisors.size() != 0) {
        putAllWorkerToSupervisor(assignedWorkers, getResAvailSupervisors(isolationSupervisors));
    } else {
        putAllWorkerToSupervisor(assignedWorkers, getResAvailSupervisors(context.getCluster()));
    }
    this.setAllWorkerMemAndCpu(context.getStormConf(), assignedWorkers);
    LOG.info("Assigned workers=" + assignedWorkers);
    return assignedWorkers;

??上述代碼中的isolationSupervisors存放的是那些指定給這個拓?fù)涞膕upervisor節(jié)點的id。如果有指定,則在這些特定的節(jié)點上分配,如果沒有指定,那么,就在全局內(nèi)分配。所以實際剩下的分配任務(wù)的是putAllWorkerToSupervisor這個方法,getResAvailSupervisors這個方法負(fù)責(zé)剔除那些無法分配worker的supervisor節(jié)點,因為節(jié)點上分配的worker已經(jīng)滿了。下面來介紹putAllWorkerToSupervisor這個方法的作用。
??putAllWorkerToSupervisor需要兩個參數(shù),第一個是已經(jīng)分配的worker,包含那些還沒有設(shè)定運行在那個節(jié)點的worker(上面直接新建的那些worker),第二個參數(shù)是目前可用的supervisor節(jié)點。下面是這個方法的代碼

private void putAllWorkerToSupervisor( List assignedWorkers, List supervisors) {
    for (ResourceWorkerSlot worker : assignedWorkers) {
        if (worker.getHostname() != null) {
            for (SupervisorInfo supervisor : supervisors) {
                if (NetWorkUtils.equals(supervisor.getHostName(), worker.getHostname()) && supervisor.getAvailableWorkerPorts().size() > 0) {
                    putWorkerToSupervisor(supervisor, worker);
                    break;
                }
            }
        }
    }
    supervisors = getResAvailSupervisors(supervisors);
    Collections.sort(supervisors, new Comparator() {

@Override
        public int compare(SupervisorInfo o1, SupervisorInfo o2) {
            // TODO Auto-generated method stub
            return -NumberUtils.compare( o1.getAvailableWorkerPorts().size(), o2.getAvailableWorkerPorts().size());
        }
    });
    putWorkerToSupervisor(assignedWorkers, supervisors);
}

??進入方法的第一步,首先要做的事情,就是對于那些已經(jīng)分配好節(jié)點的worker,從supervisor節(jié)點上給該worker分配一個合適的端口。putWorkerToSupervisor這方法主要的操作是從supervisor節(jié)點上獲取一個可用的端口,然后設(shè)置worker的端口,并將該端口從supervisor節(jié)點的可用端口列表中移除。代碼結(jié)構(gòu)非常簡單,如下:

private void putWorkerToSupervisor(SupervisorInfo supervisor, ResourceWorkerSlot worker) {
    int port = worker.getPort();
    if (!supervisor.getAvailableWorkerPorts().contains(worker.getPort())) {
        port = supervisor.getAvailableWorkerPorts().iterator().next();
    }
    worker.setPort(port);
    supervisor.getAvailableWorkerPorts().remove(port);
    worker.setNodeId(supervisor.getSupervisorId());
}

??設(shè)置好了一部分已經(jīng)分配好的worker之后,繼續(xù)分配那些沒有指定supervisor的worker。根據(jù)supervisor中可用端口逆序,從大到小排。然后調(diào)用putWorkerToSupervisor這個方法。
??putWorkerToSupervisor方法內(nèi)部首先統(tǒng)計所有已經(jīng)使用的端口,然后計算出一個理論的負(fù)載平均值{(所有使用掉的+將要分配的)/supervisor的個數(shù),就會得到分配后,集群的一個理論上的負(fù)載值theoryAveragePorts,可以平攤到每個supervisor身上}。然后通過遍歷需要分配worker的list,進行第一次分配,可以將worker依次分配到那些負(fù)載值(跟理論值的計算方式一樣)小于理論平均負(fù)載的supervisor上。而超過負(fù)載的,則放進到負(fù)載列表中。經(jīng)過一輪分配之后,如果還存在沒有分配的worker(源碼這里先進行排序再進行判斷,很明顯造成排序時間浪費的可能性)。根據(jù)supervisor中可用端口逆序,從大到小排序。再不斷將worker分配進去。
??到這里,worker的分配就順利結(jié)束了,總結(jié)一下,首先是根據(jù)拓?fù)湫畔⒊跏蓟舷挛男畔?,然后計算出實際使用的worker數(shù)目,如果這些worker有指定運行在某個supervisor節(jié)點上,那么就在節(jié)點上分配合適的worker。如果沒有指定,那么就根據(jù)節(jié)點的負(fù)載情況,盡量平均的分配到每個supervisor節(jié)點上。如果大家的負(fù)載都比較大的情況下,再分配到哪些具有比較多的可用端口的節(jié)點,完成分配。

task分配

??getAvailableWorkers方法完成了worker的分配,以及如果用戶指定了特定的worker上運行指定的task,剩下的taskid將會在接下來的方法中說明如何去分配。主要在TaskScheduler的構(gòu)造函數(shù)中,這里需要三個參數(shù),第一個是拓?fù)涞纳舷挛男畔efaultContext,第二個是需要分配的task的列表needAssignTasks,以及上文中獲取到的合適的worker列表availableWorkers。(ps:記住,前文如果沒有指定特定的worker資源分配的信息,則沒有taskid被分配到worker中去,也就是worker內(nèi)部僅有supervisorid,內(nèi)存,cpu,端口等信息,不存在tasks信息)。接下來看看TaskScheduler的構(gòu)造函數(shù)。

    public TaskScheduler(DefaultTopologyAssignContext context, Set tasks, List workers) {
        this.tasks = tasks;
        LOG.info("Tasks " + tasks + " is going to be assigned in workers " + workers);
        this.context = context;
        this.taskContext =
                new TaskAssignContext(this.buildSupervisorToWorker(workers), Common.buildSpoutOutoputAndBoltInputMap(context), context.getTaskToComponent());
        this.componentSelector = new ComponentNumSelector(taskContext);
        this.inputComponentSelector = new InputComponentNumSelector(taskContext);
        this.totalTaskNumSelector = new TotalTaskNumSelector(taskContext);
        if (tasks.size() == 0)
            return;
        if (context.getAssignType() != TopologyAssignContext.ASSIGN_TYPE_REBALANCE || context.isReassign() != false){
            // warning ! it doesn"t consider HA TM now!!
            if (context.getAssignSingleWorkerForTM() && tasks.contains(context.getTopologyMasterTaskId())) {
                assignForTopologyMaster();
            }
        }

        int taskNum = tasks.size();
        Map workerSlotIntegerMap = taskContext.getWorkerToTaskNum();
        Set preAssignWorkers = new HashSet();
        for (Entry worker : workerSlotIntegerMap.entrySet()) {
            if (worker.getValue() > 0) {
                taskNum += worker.getValue();
                preAssignWorkers.add(worker.getKey());
            }
        }
        setTaskNum(taskNum, workerNum);

        // Check the worker assignment status of pre-assigned workers, e.g user defined or old assignment workers.
        // Remove the workers which have been assigned with enough workers.
        for (ResourceWorkerSlot worker : preAssignWorkers) {
            if (taskContext.getWorkerToTaskNum().keySet().contains(worker)){

                Set doneWorkers = removeWorkerFromSrcPool(taskContext.getWorkerToTaskNum().get(worker), worker);
                if (doneWorkers != null) {
                    for (ResourceWorkerSlot doneWorker : doneWorkers) {
                        taskNum -= doneWorker.getTasks().size();
                        workerNum--;
                    }
                }

            }

        }
        setTaskNum(taskNum, workerNum);

        // For Scale-out case, the old assignment should be kept.
        if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE && context.isReassign() == false) {
            keepAssignment(taskNum, context.getOldAssignment().getWorkers());
        }
    }
初始化

??在這個構(gòu)造函數(shù)中,首先是構(gòu)造一個task分配的上下文信息。這個對象主要需要維護的幾個重要信息是

taskToComponent:一個Map,Key表示taskid,Value表示所對應(yīng)的組件id。

supervisorToWorker:也是一個Map,Key表示這個拓?fù)浞峙涞膕upervisorid,Value表示節(jié)點上分配到的worker列表。

relationship:維護這個拓?fù)涞囊粋€結(jié)構(gòu)信息,依然是個Map,Key表示組件bolt/spout的組件id,Value表示的是,如果Key對應(yīng)組件是一個bolt,則Value存下是所有輸入到組件的對應(yīng)組件的id。如果Key對應(yīng)組件是一個spout,則Value存下是這個組件所有輸出到的組件id。舉個例子,integer(spout)輸出到sliding(bolt),sliding(bolt)輸出到printer(bolt)。則relationship存下的是[{integer,[sliding]},{sliding,[integer]},{printer,[sliding]}]。

workerToTaskNum:Map,Key表示一個worker,Value表示實際在這個worker上運行的task的總數(shù)目。

workerToComponentNum:Map,Key表示一個worker,Value表示一個Map,存下的是組件id,以及對應(yīng)的數(shù)目。

??緊接著初始化三個selector,第一個是ComponentNumSelector(內(nèi)部定義了一二WorkerComparator,負(fù)責(zé)對worker進行比對,對比worker內(nèi)某個組件的task數(shù)目。以及對比每個supervisor上所有worker內(nèi)某個組件的總task和),第二個是InputComponentNumSelector(內(nèi)部也是定義了兩個比對函數(shù),一個是獲取worker內(nèi)某個組件的全部輸入的task個數(shù),以及在整個supervisor上的全部輸入task個數(shù)),第三個是TotalTaskNumSelector(worker內(nèi)全部task的個數(shù),和supervisor上全部task的個數(shù))。這三個selector的目的都是為了后續(xù)合理的將task分配到這些worker上做的準(zhǔn)備。

分配TM bolt

??如果集群資源足夠,用戶定義TM需要多帶帶分配到一個獨立的worker上,則需要調(diào)用assignForTopologyMaster進行多帶帶分配。

private void assignForTopologyMaster() {
        int taskId = context.getTopologyMasterTaskId();
        ResourceWorkerSlot workerAssigned = null;
        int workerNumOfSuperv = 0;
        for (ResourceWorkerSlot workerSlot : taskContext.getWorkerToTaskNum().keySet()){
            List workers = taskContext.getSupervisorToWorker().get(workerSlot.getNodeId());
            if (workers != null && workers.size() > workerNumOfSuperv) {
                for (ResourceWorkerSlot worker : workers) {
                    Set tasks = worker.getTasks();
                    if (tasks == null || tasks.size() == 0) {
                        workerAssigned = worker;
                        workerNumOfSuperv = workers.size();
                        break;
                    }
                }
            }
        }

        if (workerAssigned == null)
            throw new FailedAssignTopologyException("there"s no enough workers for the assignment of topology master");
        updateAssignedTasksOfWorker(taskId, workerAssigned);
        taskContext.getWorkerToTaskNum().remove(workerAssigned);
        assignments.add(workerAssigned);
        tasks.remove(taskId);
        workerNum--;
        LOG.info("assignForTopologyMaster, assignments=" + assignments);
    }

??這個方法首先是找出某個最合適的worker,這個worker符合兩個條件,一是沒有分配其他的task,第二,worker所在的supervisor相對分配了最多的worker,第二點的目的是保證負(fù)載均衡。如果找不到合適的worker,那么就拋出異常。如果能找到的話,就把負(fù)責(zé)TM的task分配給這個worker。updateAssignedTasksOfWorker這個方法的目的就是更新新的分配情況。

task分配

??接下來獲取全部的task數(shù)目,以及已經(jīng)分配出去的worker列表preAssignWorkers。根據(jù)獲得的總task數(shù)目來計算每個worker上平均的task數(shù)目avgTaskNum,以及剩下多少還沒有分配出去的task(總task%總worker,求得余數(shù)leftTaskNum)。然后遍歷preAssignWorkers,調(diào)用方法removeWorkerFromSrcPool來判斷一個worker是否分配了足夠的task,并且移除那些已經(jīng)合理分配的task和worker。

for (ResourceWorkerSlot worker : preAssignWorkers) {
            if (taskContext.getWorkerToTaskNum().keySet().contains(worker)){

                Set doneWorkers = removeWorkerFromSrcPool(taskContext.getWorkerToTaskNum().get(worker), worker);
                if (doneWorkers != null) {
                    for (ResourceWorkerSlot doneWorker : doneWorkers) {
                        taskNum -= doneWorker.getTasks().size();
                        workerNum--;
                    }
                }

            }

        }

??removeWorkerFromSrcPool這個方法挺有趣的,第一次看的時候有點懵逼,但是其實仔細(xì)看下就很明確了。下面我簡單講解下:

private Set removeWorkerFromSrcPool(int taskNum, ResourceWorkerSlot worker) {
        Set ret = new HashSet();

        if (leftTaskNum <= 0) {
            if (taskNum >= avgTaskNum) {
                taskContext.getWorkerToTaskNum().remove(worker);
                assignments.add(worker);
                ret.add(worker);
            }
        } else {
            if (taskNum > avgTaskNum ) {
                taskContext.getWorkerToTaskNum().remove(worker);
                leftTaskNum = leftTaskNum -(taskNum -avgTaskNum);
                assignments.add(worker);
                ret.add(worker);
            }
            if (leftTaskNum <= 0) {
                List needDelete = new ArrayList();
                for (Entry entry : taskContext.getWorkerToTaskNum().entrySet()) {
                    if (avgTaskNum != 0 && entry.getValue() == avgTaskNum)
                        needDelete.add(entry.getKey());
                }
                for (ResourceWorkerSlot workerToDelete : needDelete) {
                    taskContext.getWorkerToTaskNum().remove(workerToDelete);
                    assignments.add(workerToDelete);
                    ret.add(workerToDelete);
                }
            }
        }

        return ret;
    }

??ret保存的是需要返回給調(diào)用者需要移除的worker集合??催@個方法,首先判斷,在剩余數(shù)小于等于0的情況,如果當(dāng)前worker內(nèi)的task數(shù)目大于等于平均數(shù),說明這個worker的確分配了合理的task。(原因是,如果leftTaskNum小于等于0,是不是就看成,平均數(shù)會比正常情況下加1。舉個例子,有3個盒子,10個球放進去,那么,平均數(shù)為3的情況下,余數(shù)為1,如果平均數(shù)為4,那么余數(shù)就是-2了)。如果leftTaskNum大于0,判斷就復(fù)雜一點,首先如果數(shù)目taskNum大于平均的avgTaskNum,說明這個worker多分配了一些task,那么這些多分配的就必須從leftTaskNum減去。甚至可能taskNum的數(shù)目大于avgTaskNum+leftTaskNum的數(shù)目,那么直接導(dǎo)致leftTaskNum小于等于0。在leftTaskNum小于等于0的情況下,找出分配上下文中worker分配的task數(shù)目剛好是平均數(shù)的worker,存在needDelete列表中。然后遍歷這個列表,把這些worker從加到需要移除的集合ret中,并返回。(因為如果有某個worker分配的數(shù)目多于avgTaskNum+leftTaskNum的數(shù)目,那么那些分配數(shù)是平均數(shù)的worker肯定是合理的,剩下那些分配小于平均數(shù)的才是需要調(diào)整的)。
??在執(zhí)行完上述的操作之后,更新下目前的平均數(shù)avgTaskNum和分配剩余的task數(shù)目leftTaskNum。(此刻還有一些task尚未實際分配),完成分配的調(diào)度是在assign方法中。在這個方法內(nèi),如果已經(jīng)沒有需要分配的task,則將原來已經(jīng)分配好的返回就行了。如果還存在需要分配的task,遍歷這個需要分配的task列表,如果task對應(yīng)的組件屬于系統(tǒng)組件(組件id為__acker或者__topology_master的組件),則存下來,如果是一般的task,則調(diào)用chooseWorker方法選擇一個合適的worker,然后將task分配到worker上。(當(dāng)然這里還需要做一些額外的操作,比如清除那些已經(jīng)合理的分配的worker,通過調(diào)用removeWorkerFromSrcPool這個方法去清除)。而chooseWorker這個方法利用的就是前文提到的三個selector來選擇最佳的supervisor,選擇最佳的worker(需要考慮這個task接收的input,需要考慮supervisor節(jié)點的負(fù)載情況和worker內(nèi)的負(fù)載情況)。分配完普通的task之后,在分配系統(tǒng)組件,分配方式也是一樣的。
??至此,task的分配也完成,總結(jié)一下,除開那些已經(jīng)指定的分配外,比較重要的是,定義合理的selector(綜合考慮節(jié)點負(fù)載,worker負(fù)載,已經(jīng)input輸入,考慮本地化)。分配的同時不斷去檢測是否已經(jīng)有worker已經(jīng)合理分配了,就不要在繼續(xù)分配到那個worker上。

HeartBeat操作

??上述完成task和worker的分配之后,回到mkAssignment方法。剩下的操作就是設(shè)置task的HB起始時間和超時時間。這些比較簡單就不再細(xì)說了。

結(jié)束語

??解讀拓?fù)浞峙涞倪^程可以讓我們更加清楚,我們寫的一個邏輯拓?fù)?,實際上是如何變成一個可以實際運行在集群的拓?fù)洹R约巴負(fù)淙绾伪WC負(fù)載均衡等問題。筆者后續(xù)還會更新JStorm幾個比較重要的特性的源碼分析。包括如何實現(xiàn)反壓機制,如何實現(xiàn)nimbus和supervisor容錯,supervisor啟動的時候需要執(zhí)行那些操作。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/67029.html

相關(guān)文章

  • JStorm源碼分析系列--01--Nimbus啟動分析

    摘要:方法首先初始化一個回調(diào)函數(shù),這是當(dāng)一個成為之后就會調(diào)用的一個用于初始化一系列變量的方法,包括拓?fù)淙绾卧诩荷戏峙?,拓?fù)錉顟B(tài)更新,清除函數(shù),還有監(jiān)控線程等。 寫在前面的話,筆者第一次閱讀框架源碼,所以可能有些地方理解錯誤或者沒有詳細(xì)解釋,如果在閱讀過程發(fā)現(xiàn)錯誤很歡迎在文章下面評論指出。文章后續(xù)會陸續(xù)更新,可以關(guān)注或者收藏,轉(zhuǎn)發(fā)請先私信我,謝謝。對了,筆者看的是2.2.1這個版本 概述 ?...

    Carbs 評論0 收藏0
  • Jstorm到Flink 在今日頭條的遷移實踐

    摘要:第二個問題就是說業(yè)務(wù)團隊之間沒有擴大管理,預(yù)算和審核是無頭緒的。支持一些高優(yōu)先級的比如說支持以及窗口等特性包括說。到現(xiàn)在為止,整體遷移完了,還剩下十個左右的作業(yè)沒有遷移完。 作者:張光輝 本文將為大家展示字節(jié)跳動公司怎么把Storm從Jstorm遷移到Flink的整個過程以及后續(xù)的計劃。你可以借此了解字節(jié)跳動公司引入Flink的背景以及Flink集群的構(gòu)建過程。字節(jié)跳動公司是如何兼容以...

    luckyyulin 評論0 收藏0
  • Docker 調(diào)試技巧

    摘要:即使是容器已經(jīng)退出的也可以看到,所以可以通過這種方式來分析非預(yù)期的退出。也可以直接通過在容器內(nèi)啟動一個更方便地調(diào)試容器,不必一條條執(zhí)行。和獲得容器中進程的狀態(tài)和在容器里執(zhí)行的效果類似。通過查看容器的詳細(xì)信息飯后鏡像和容器的詳細(xì)信息。 『重用』容器名 但我們在編寫/調(diào)試Dockerfile的時候我們經(jīng)常會重復(fù)之前的command,比如這種docker run --name jstorm-...

    Coding01 評論0 收藏0
  • Rancher網(wǎng)絡(luò)全解讀

    摘要:在每臺主機上我們執(zhí)行列出主機和網(wǎng)絡(luò)接口。其它的應(yīng)用服務(wù)容器每個容器有兩個地址,一個屬于子網(wǎng),另一個屬于的子網(wǎng)。雖然這會帶來一些性能上的影響,但是可以確保的網(wǎng)絡(luò)默認(rèn)是安全的。 本文中,我們首先將Rancher部署到EC2實例上,并且添加新的主機,之后用Rancher的Catalog啟動了RocketChat應(yīng)用,緊接著對運行中的容器的網(wǎng)絡(luò)接口和其他屬性的進行了分析。 同時,我們簡要介紹了...

    hss01248 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<