摘要:下面就來講講第一個初始化操作拓?fù)浞峙洹H绻麤]有舊的分配信息,說明拓?fù)浞峙漕愋蜑椤5竭@里,預(yù)分配,創(chuàng)建拓?fù)浞峙渖舷挛木屯瓿闪?。集群下的分配,見下文講解資源準(zhǔn)備首先第一步是判斷拓?fù)浞峙涞念愋褪欠穹弦?,不符合則拋出異常。
??寫在前面的話,筆者第一次閱讀框架源碼,所以可能有些地方理解錯誤或者沒有詳細(xì)解釋,如果在閱讀過程發(fā)現(xiàn)錯誤很歡迎在文章下面評論指出。文章后續(xù)會陸續(xù)更新,可以關(guān)注或者收藏,轉(zhuǎn)發(fā)請先私信我,謝謝。對了,筆者看的是2.2.1這個版本。上一篇博客,JStorm源碼分析系列--01--Nimbus啟動分析筆者講解了Nimbus啟動過程中做的一些基本的操作,在initFollowerThread方法中,如果當(dāng)前的Nimbus變成Leader之后,這個方法內(nèi)會負(fù)責(zé)執(zhí)行一些初始化init操作。下面就來講講第一個初始化操作--拓?fù)浞峙?。本文將詳?xì)(非常長,所以慢慢看)的講解如何去為一個拓?fù)浞峙湎鄳?yīng)的資源。
??從方法initTopologyAssign開始,TopologyAssign是一個單例對象,在這個類的init方法內(nèi),做了簡單的賦值操作之后,并初始化一個調(diào)度器實例對象之后,就建立一個守護線程,這個守護線程的目的是不斷從TopologyAssign內(nèi)部維護的一個阻塞隊列中讀取系統(tǒng)提交的拓?fù)淙蝿?wù),并調(diào)用相應(yīng)的方法doTopologyAssignment進行分配操作。代碼都比較簡單,就不浪費版面去貼了。
??下面是doTopologyAssignment方法的源碼,
protected boolean doTopologyAssignment(TopologyAssignEvent event) { Assignment assignment; try { Assignment oldAssignment = null; boolean isReassign = event.isScratch(); if (isReassign) { //如果存在舊的分配信息,需要先將舊的分配信息存儲下來 oldAssignment = nimbusData.getStormClusterState().assignment_info(event.getTopologyId(), null); } //調(diào)用方法執(zhí)行新的分配 assignment = mkAssignment(event); //將task添加到集群的metrics中 pushTaskStartEvent(oldAssignment, assignment, event); if (!isReassign) { //如果是新建的拓?fù)?,需要把拓?fù)湓O(shè)置為active狀態(tài) setTopologyStatus(event); } } catch (Throwable e) { LOG.error("Failed to assign topology " + event.getTopologyId(), e); event.fail(e.getMessage()); return false; } if (assignment != null) //將拓?fù)鋫浞莸絑K上 backupAssignment(assignment, event); event.done(); return true; }
??所以,最重要的方法還是mkAssignment,這里執(zhí)行了實際的分配操作。下面就來詳細(xì)的介紹這個方法。
prepareTopologyAssign??prepareTopologyAssign這個方法總體的目的為了初始化拓?fù)浞峙涞纳舷挛男畔?,生成一個TopologyAssignContext的實例對象。這個上下文對象需要存下拓?fù)涞暮芏嚓P(guān)鍵信息,包括拓?fù)涞慕M件信息(用StormTopology對象保存,下文在添加acker的時候會詳細(xì)介紹這個類),拓?fù)涞呐渲眯畔?,拓?fù)渖纤械膖ask id,以及死掉的task id,unstopped task id(這里的解釋是,那些supervisor死掉但是worker還繼續(xù)運行的稱為unstopworker,而包含在unstopworker內(nèi)的task則稱為unstoppedTask)。以及這個拓?fù)淠芊峙涞降膚orker,以上提及的這些信息都會在這個方法內(nèi)慢慢的初始化。下面一步步來看吧。prepareTopologyAssign方法的源碼比較長,一部分一部分來講解。
//創(chuàng)建一個上下文的實例對象 TopologyAssignContext ret = new TopologyAssignContext(); String topologyId = event.getTopologyId(); ret.setTopologyId(topologyId); int topoMasterId = nimbusData.getTasksHeartbeat().get(topologyId).get_topologyMasterId(); ret.setTopologyMasterTaskId(topoMasterId); LOG.info("prepareTopologyAssign, topoMasterId={}", topoMasterId); Map
??緊接著,根據(jù)目前集群的狀態(tài),初始化一份集群上所有的supervisor,并獲取所有可用的worker
StormClusterState stormClusterState = nimbusData.getStormClusterState(); // get all running supervisor, don"t need callback to watch supervisor MapsupInfos = Cluster.get_all_SupervisorInfo(stormClusterState, null); // init all AvailableWorkerPorts for (Entry supInfo : supInfos.entrySet()) { SupervisorInfo supervisor = supInfo.getValue(); if (supervisor != null) //設(shè)置全部的端口都為可用,后面通過HB去除掉那些已經(jīng)被使用的worker //supervisor是一個k-v,k是supervisorid,v是保存實例信息 supervisor.setAvailableWorkerPorts(supervisor.getWorkerPorts()); } //這個方法就是利用HB去掉那些掛掉的supervisor //判斷的方法是獲取每個supervisor最近的HB時間, //由當(dāng)前時間減去最近HB時間和超時時間做對比。 getAliveSupervsByHb(supInfos, nimbusConf);
??接下來獲取拓?fù)渲卸x的taskid對應(yīng)上組件,這里要解釋下,對于一個拓?fù)涠?,taskid總是從1開始分配的,并且,相同的組件taskid是相鄰的。比如你定義了一個SocketSpout(并行度5),一個PrintBolt(并行度4,那么SocketSpout的taskid可能是1-5,PrintBolt的taskid可能是6-9。
//這個k-v,k是taskid,v是拓?fù)鋬?nèi)定義的組件的id。 //寫過應(yīng)用的同學(xué)都應(yīng)該知道,TopologyBuilder在setSpout或者Bolt的時候,需要指定<組件id,對象,和并行度>。 //eg:builder.setSpout("integer", new ReceiverSpout(), 2); MaptaskToComponent = Cluster.get_all_task_component(stormClusterState, topologyId, null); ret.setTaskToComponent(taskToComponent); //獲取所有的taskid。 Set allTaskIds = taskToComponent.keySet(); ret.setAllTaskIds(allTaskIds);
??如果原來存在舊的拓?fù)浞峙湫畔ⅲ€需要設(shè)置unstoppedTasks,deadTasks,unstoppedWorkers等信息。然后調(diào)用getFreeSlots方法負(fù)責(zé)去除那些已經(jīng)分配出去的worker。處理過程比較直觀,獲取集群上所有的拓?fù)浞峙湫畔?,然后根?jù)每個分配信息中保存的worker信息,從原先supInfos中移除那些被分配出去的worker。
??如果沒有舊的分配信息,說明拓?fù)浞峙漕愋蜑?b>ASSIGN_TYPE_NEW。如果存在同名的拓?fù)?,也會把同名的拓?fù)湓O(shè)置舊的分配信息,放到上下文中。如果存在舊的分配信息,需要把舊的分配信息放入到上下文中,此外還要判斷是ASSIGN_TYPE_REBALANCE還是ASSIGN_TYPE_MONITOR,因為還需要設(shè)置unstoppedWorkers的信息。到這里,預(yù)分配,創(chuàng)建拓?fù)浞峙渖舷挛木屯瓿闪恕D壳拔覀儙в斜容^重要的信息是拓?fù)渌械膖askid,以及拓?fù)浠镜慕M件信息。
??在完成拓?fù)渖舷挛某跏蓟螅_始實際給拓?fù)浞峙湎鄳?yīng)的worker,不過這里需要判斷是本地模式還是集群模式,本地模式下比較簡單,找個一個合適的端口,然后新建一個worker的資源對象ResourceWorkerSlot,將一些關(guān)鍵信息如hostname,port,allTaskId配置好。因為local模式下比較簡單,所以,即使設(shè)置多個worker也不會啟動多個jvm。而在集群模式下,一個worker表示的是一個jvm進程。下面就重點講解集群下的分配情況。我把集群上的分配過程(assignTasks這個方法)分成三個主要的部分,分別是資源準(zhǔn)備,worker分配,task分配。
Set資源準(zhǔn)備assignments = null; if (!StormConfig.local_mode(nimbusData.getConf())) { IToplogyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME); //集群下的分配,見下文講解 assignments = scheduler.assignTasks(context); } else { assignments = mkLocalAssignment(context); }
??首先第一步是判斷拓?fù)浞峙涞念愋褪欠穹弦?,不符合則拋出異常。緊接著,根據(jù)上一個方法生成的拓?fù)浞峙渖舷挛膩砩梢粋€默認(rèn)的拓?fù)浞峙渖舷挛膶嵗龑ο?,DefaultTopologyAssignContext這個類的構(gòu)造方法執(zhí)行了很多很細(xì)節(jié)的操作。包括為拓?fù)涮砑痈郊拥慕M件,存儲下taskid和組件的對應(yīng)信息,計算拓?fù)湫枰膚orker數(shù)目,計算unstopworker的數(shù)目等。
//根據(jù)之前的上下文,初始化一個分配的上下文對象 DefaultTopologyAssignContext defaultContext = new DefaultTopologyAssignContext(context); if (assignType == TopologyAssignContext.ASSIGN_TYPE_REBALANCE) { freeUsed(defaultContext); }
??下面代碼是DefaultTopologyAssignContext的構(gòu)造方法
public DefaultTopologyAssignContext(TopologyAssignContext context){ super(context); try { sysTopology = Common.system_topology(stormConf, rawTopology); } catch (Exception e) { throw new FailedAssignTopologyException("Failed to generate system topology"); } sidToHostname = generateSidToHost(); hostToSid = JStormUtils.reverse_map(sidToHostname); if (oldAssignment != null && oldAssignment.getWorkers() != null) { oldWorkers = oldAssignment.getWorkers(); } else { oldWorkers = new HashSet(); } refineDeadTasks(); componentTasks = JStormUtils.reverse_map(context.getTaskToComponent()); for (Entry > entry : componentTasks.entrySet()) { List componentTaskList = entry.getValue(); Collections.sort(componentTaskList); } totalWorkerNum = computeWorkerNum(); unstoppedWorkerNum = computeUnstoppedAssignments(); }
??從上面的代碼可以看出在DefaultTopologyAssignContext的構(gòu)造方法中,第一句是調(diào)用父類構(gòu)造方法先去初始化一些參數(shù),然后調(diào)用system_topology這個方法。下面來看看這個方法的內(nèi)部。第一個方法就是添加一個acker到原來的拓?fù)渲腥?。拓?fù)渥鳛镴Strom處理的一個邏輯模型,對用戶提供了非常簡單且強大的編程原語,只要分別繼承兩大組件,就可以構(gòu)造一個拓?fù)淠P?,但是實際上,一個實際運行的拓?fù)淠P瓦h遠不止用戶定義的用于處理輸入的spout和用于處理業(yè)務(wù)的bolt,JStorm為了保證消息的可靠性,拓?fù)銶etrics管理,拓?fù)銱B管理,再拓?fù)鋵嶋H模型中添加了幾個非常重要的bolt,下面就詳細(xì)的介紹acker,用于保證消息的可靠性。
public static StormTopology system_topology(Map storm_conf, StormTopology topology) throws InvalidTopologyException { StormTopology ret = topology.deepCopy(); add_acker(storm_conf, ret); addTopologyMaster(storm_conf, ret); add_metrics_component(ret); add_system_components(ret); return ret; }
??這里先來介紹下StormTopology這個類,才能往下理解。StormTopology這個類用于存儲拓?fù)涞慕M件信息,在這個類內(nèi)部,有三個非常重要的成員變量,分別存儲spout和bolt以及state_spout,第三個筆者暫時沒有弄清楚其作用,但是前兩個就非常明顯,分別存儲拓?fù)涞膬纱蠼M件,spout和bolt
private Mapspouts; // required private Map bolts; // required private Map state_spouts; // required
??Map中的key表示我們定義的組件的id,上文提到過的id。SpoutSpec和Bolt中有兩個重要的成員變量。
private ComponentObject spout_object; // required private ComponentCommon common; // required
??ComponentObject用于存儲序列化后的代碼信息,第二個ComponentCommon用于存儲很重要的配置信息,包括輸入的流,輸出的流和分組信息。有三個重要的成員變量
//GlobalStreamId有兩個String成員變量,componentId表示這個輸入組件的流來源的那個組件id, //streamId表示componentId所輸出的特定的流 private Mapinputs; // 輸入的來源和分組情況 //StreamInfo有個重要的成員變量List output_fields,表示輸出的域。 private Map streams; // 輸出的流 private int parallelism_hint; // 并行度
??根據(jù)上述的結(jié)構(gòu),StormTopology能夠完整的表示拓?fù)渲忻總€組件輸出之后的流所流向的位置。
??這一小節(jié)筆者不打算先從源碼的角度入手,先來將一個acker的作用以及從一個小例子來講解acker是怎樣工作的。我們都知道作為一個流式處理框架,消息的可靠性是一個非常特性之一。除開更加高級的事務(wù)框架能保證消息只被處理一次(exactly-once),JStorm本身也提供了at-least-once,這個機制能保證消息一定會被處理。下面從一個例子的角度來講解,這是如何實現(xiàn)的。
??如上圖所示,integer作為輸入的spout,sliding和printer都是負(fù)責(zé)處理的bolt,F(xiàn)ield表示之間輸出的元組內(nèi)的元素對應(yīng)的key。StreamID為默認(rèn),不指定數(shù)據(jù)流分組的形式,則默認(rèn)情況下shuffle。上述是一個非常簡單的拓?fù)溥壿嫿Y(jié)構(gòu),然后在經(jīng)過add_acker這個方法之后,實際的拓?fù)浣Y(jié)構(gòu)發(fā)生了一些變化,如下圖
??JStrom為原來的拓?fù)浣Y(jié)構(gòu)添加了一個_ack的bolt,負(fù)責(zé)維護拓?fù)涞目煽啃裕笾碌那闆r可以從上圖中看出,每當(dāng)一個元組被發(fā)送到拓?fù)湎掠蝏olt中去的時候,也會發(fā)送到_ack中去保存下來,然后后續(xù)處理的每個bolt每次調(diào)用ack函數(shù)都會發(fā)送給_ack(bolt),在指定時間間隔內(nèi)收到最后處理的ack,那么_ack(bolt)就發(fā)送一個消息給最初的spout,則保證了一個元組的可靠性。所以綜上,_ack這個Bolt就是維護了整個拓?fù)涞目煽啃裕敲醋x者可能會問,_ack里面保存了那么多的消息,如果某個元組經(jīng)過的組件非常多,是否會造成該元組的拓?fù)錁渥兊暮艽蟆_@里阿里利用異或,實現(xiàn)了一個非常簡單且高效低耗的判斷方法。
??其實在_ack中存儲的內(nèi)容非常簡單,就是一個k-v鍵值對,k是一個隨機無重復(fù)的id(root_id),且在元組被處理的整個過程中保持不變,將消息存儲為
??后續(xù)的幾個方法如addTopologyMaster,add_metrics_component,add_system_components都是添加了相應(yīng)的控件(bolt)來進行協(xié)同操作。比如topology master可以負(fù)責(zé)metrics,也可以負(fù)責(zé)baskpressure(反壓)機制。筆者還沒深入解讀,相應(yīng)部分后續(xù)再做相應(yīng)的添加,這里先挖個坑。
??在DefaultTopologyAssignContext的構(gòu)造函數(shù)中,添加完附加的組件之后,緊接著獲取supervisorid和hostname對應(yīng)的鍵值對,如果存在舊的分配信息,則獲取原先所有的worker,如果沒有,則新建一個worker的集合。去除deadtaskid中那些在unstopworker內(nèi)的task(這里的目的是分開處理,如果是new的情況下,這兩個都是空集)。然后計算需要的worker數(shù)目??聪旅娴脑创a,
private int computeWorkerNum() { //獲取拓?fù)湓O(shè)置的worker數(shù)目 Integer settingNum = JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_WORKERS)); // int ret = 0, hintSum = 0, tmCount = 0; Mapworker分配components = ThriftTopologyUtils.getComponents(sysTopology); for (Entry entry : components.entrySet()) { String componentName = entry.getKey(); Object component = entry.getValue(); ComponentCommon common = null; if (component instanceof Bolt) { common = ((Bolt) component).get_common(); } if (component instanceof SpoutSpec) { common = ((SpoutSpec) component).get_common(); } if (component instanceof StateSpoutSpec) { common = ((StateSpoutSpec) component).get_common(); } //獲取每個組件中設(shè)置的并行度 int hint = common.get_parallelism_hint(); if (componentName.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) { //如果是屬于TM組件,則加到tmCount tmCount += hint; continue; } //這個變量存下所有組件并行度的和 hintSum += hint; } //ret存下較小的值 if (settingNum == null) { ret = hintSum; } else { ret = Math.min(settingNum, hintSum); } //這里還需要判斷主TM是否需要一個獨立的worker節(jié)點用于處理 Boolean isTmSingleWorker = ConfigExtension.getTopologyMasterSingleWorker(stormConf); if (isTmSingleWorker != null) { if (isTmSingleWorker == true) { ret += tmCount; setAssignSingleWorkerForTM(true); } } else { if (ret >= 10) { ret += tmCount; setAssignSingleWorkerForTM(true); } } return ret; }
??實例化完DefaultTopologyAssignContext之后,如果是rebalance類型,則還需要先將原先占用的那些worker給釋放掉,具體做法就是將worker使用的端口放回可用端口集合中。幾個變量的含義,needAssignTasks:就是指需要分配的task,也就是除去unstopworker中的那些task。allocWorkerNum:等于原先計算好的worker的數(shù)目-減去unstopworker的數(shù)目再減去keepAssigns(只有在拓?fù)漕愋褪茿SSIGN_TYPE_MONITOR才有的)的數(shù)目。實際worker分配中,最重要是方法WorkerScheduler.getAvailableWorkers。下面就來詳細(xì)講解這個方法內(nèi)部怎么實現(xiàn)。
int workersNum = getAvailableWorkersNum(context); if (workersNum < allocWorkerNum) { throw new FailedAssignTopologyException("there"s no enough worker.allocWorkerNum="+ allocWorkerNum + ", availableWorkerNum="+ workersNum); } workersNum = allocWorkerNum; ListassignedWorkers = new ArrayList (); getRightWorkers(context,needAssign,assignedWorkers,workersNum,getUserDefineWorkers(context, ConfigExtension.getUserDefineAssignment(context.getStormConf())));
??首先得知集群上可用的全部worker,如果可用的worker小于需要分配的worker數(shù),則需要拋出異常。如果足夠,則會分配足量的worker給指定的拓?fù)洹U{(diào)用getRightWorkers這個方法來獲取合適的worker,這里所謂right的worker是指用戶自定義的worker,可以指定worker的資源分配情況。
??分為兩部分來講解這個方法,首先是準(zhǔn)備工作--getUserDefineWorkers這個方法,這個方法需要兩個參數(shù),拓?fù)涞纳舷挛男畔ontext,用戶自定義的worker列表workers??聪旅娴脑创a:
private ListgetUserDefineWorkers( DefaultTopologyAssignContext context, List workers) { List ret = new ArrayList (); //如果沒有用戶自定義的worker,則沒必要任何操作 if (workers == null) return ret; Map > componentToTask = (HashMap >) ((HashMap >) context .getComponentTasks()).clone(); //如果分配類型不是NEW,則還是從workers資源分配信息列表中去除unstopworker。 //這里是用戶有指定某些worker資源屬于unstopworker才能去掉。 if (context.getAssignType() != context.ASSIGN_TYPE_NEW) { checkUserDefineWorkers(context, workers, context.getTaskToComponent()); } //遍歷用戶定義的worker,去除那些沒有分配task的worker //用戶定義的worker中已經(jīng)指定哪些task該分配到哪個worker中 for (WorkerAssignment worker : workers) { ResourceWorkerSlot workerSlot = new ResourceWorkerSlot(worker,componentToTask); if (workerSlot.getTasks().size() != 0) { ret.add(workerSlot); } } return ret; }
??去除那些沒有指定task的worker之后,真正進入getRightWorkers方法內(nèi)部。源碼如下,這里解釋下五個參數(shù)的含義,context表示之前準(zhǔn)備的拓?fù)渖舷挛男畔ⅲ?b>needAssign表示這個拓?fù)湫枰峙涞母鱾€taskid,assignedWorkers表示用來存儲那些在這個方法內(nèi)分配到的worker資源,workersNum表示需要拓?fù)湫枰峙涞膚orker數(shù)目,workers表示上個方法中用戶自定義的可用的worker資源。簡而言之,這個方法就是從workers中選出已經(jīng)分配了指定的task的worker,然后存到assignedWorkers中去。
private void getRightWorkers(DefaultTopologyAssignContext context, SetneedAssign, List assignedWorkers, int workersNum, Collection workers) { Set assigned = new HashSet (); List users = new ArrayList (); if (workers == null) return; for (ResourceWorkerSlot worker : workers) { boolean right = true; Set tasks = worker.getTasks(); if (tasks == null) continue; for (Integer task : tasks) { if (!needAssign.contains(task) || assigned.contains(task)) { right = false; break; } } if (right) { assigned.addAll(tasks); users.add(worker); } } if (users.size() + assignedWorkers.size() > workersNum) { LOG.warn( "There are no enough workers for user define scheduler / keeping old assignment, userdefineWorkers={}, assignedWorkers={}, workerNum={}", users, assignedWorkers, workersNum); return; } assignedWorkers.addAll(users); needAssign.removeAll(assigned); }
??上面代碼主要的處理邏輯是在for循環(huán)中,在這個循環(huán)會去判斷worker內(nèi)是否存有本拓?fù)鋬?nèi)的taskid,如果有則把worker存儲起來,并且從taskid列表中移除掉那些分配出去的task,沒有則直接退出了。
??回到getAvailableWorkers方法內(nèi),看下面這段代碼。
//如果配置指定要使用舊的分配,則從舊的分配中選出合適的worker。 if (ConfigExtension.isUseOldAssignment(context.getStormConf())) { getRightWorkers(context, needAssign, assignedWorkers, workersNum, context.getOldWorkers()); } else if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE && context.isReassign() == false) { //如果是rebalance,且可以使用原來的worker,將原來使用的worker存儲起來。 int cnt = 0; for (ResourceWorkerSlot worker : context.getOldWorkers()) { if (cnt < workersNum) { ResourceWorkerSlot resFreeWorker = new ResourceWorkerSlot(); resFreeWorker.setPort(worker.getPort()); resFreeWorker.setHostname(worker.getHostname()); resFreeWorker.setNodeId(worker.getNodeId()); assignedWorkers.add(resFreeWorker); cnt++; } else { break; } } } // 計算TM bolt的個數(shù) int workersForSingleTM = 0; if (context.getAssignSingleWorkerForTM()) { for (Integer taskId : needAssign) { String componentName = context.getTaskToComponent().get(taskId); if (componentName.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) { workersForSingleTM++; } } } int restWokerNum = workersNum - assignedWorkers.size(); if (restWokerNum < 0) throw new FailedAssignTopologyException( "Too much workers are needed for user define or old assignments. workersNum=" + workersNum + ", assignedWokersNum=" + assignedWorkers.size());
??筆者一開始覺得上述的代碼可能是在判斷restWokerNum < 0是很可能會成立而導(dǎo)致拋出異常的,因為如果用戶一開始就指定了worker分配信息,然后rebalance情況下,不斷去添加舊的worker到assignedWorkers內(nèi),這樣就會導(dǎo)致assignedWorkers的大小比實際需要的worker數(shù)目workersNum大。但是還沒來得及用實際集群去測試,只是在github問了官方的人,如果有更新解決方案會后續(xù)再這里說明。
//restWokerNum是剩下需要的worker的數(shù)目,直接添加ResourceWorkerSlot實例對象。 for (int i = 0; i < restWokerNum; i++) { assignedWorkers.add(new ResourceWorkerSlot()); } //這里是獲取那些專門指定運行拓?fù)涞膕upervisor節(jié)點。 ListisolationSupervisors = this.getIsolationSupervisors(context); if (isolationSupervisors.size() != 0) { putAllWorkerToSupervisor(assignedWorkers, getResAvailSupervisors(isolationSupervisors)); } else { putAllWorkerToSupervisor(assignedWorkers, getResAvailSupervisors(context.getCluster())); } this.setAllWorkerMemAndCpu(context.getStormConf(), assignedWorkers); LOG.info("Assigned workers=" + assignedWorkers); return assignedWorkers;
??上述代碼中的isolationSupervisors存放的是那些指定給這個拓?fù)涞膕upervisor節(jié)點的id。如果有指定,則在這些特定的節(jié)點上分配,如果沒有指定,那么,就在全局內(nèi)分配。所以實際剩下的分配任務(wù)的是putAllWorkerToSupervisor這個方法,getResAvailSupervisors這個方法負(fù)責(zé)剔除那些無法分配worker的supervisor節(jié)點,因為節(jié)點上分配的worker已經(jīng)滿了。下面來介紹putAllWorkerToSupervisor這個方法的作用。
??putAllWorkerToSupervisor需要兩個參數(shù),第一個是已經(jīng)分配的worker,包含那些還沒有設(shè)定運行在那個節(jié)點的worker(上面直接新建的那些worker),第二個參數(shù)是目前可用的supervisor節(jié)點。下面是這個方法的代碼
private void putAllWorkerToSupervisor( ListassignedWorkers, List supervisors) { for (ResourceWorkerSlot worker : assignedWorkers) { if (worker.getHostname() != null) { for (SupervisorInfo supervisor : supervisors) { if (NetWorkUtils.equals(supervisor.getHostName(), worker.getHostname()) && supervisor.getAvailableWorkerPorts().size() > 0) { putWorkerToSupervisor(supervisor, worker); break; } } } } supervisors = getResAvailSupervisors(supervisors); Collections.sort(supervisors, new Comparator () { @Override public int compare(SupervisorInfo o1, SupervisorInfo o2) { // TODO Auto-generated method stub return -NumberUtils.compare( o1.getAvailableWorkerPorts().size(), o2.getAvailableWorkerPorts().size()); } }); putWorkerToSupervisor(assignedWorkers, supervisors); }
??進入方法的第一步,首先要做的事情,就是對于那些已經(jīng)分配好節(jié)點的worker,從supervisor節(jié)點上給該worker分配一個合適的端口。putWorkerToSupervisor這方法主要的操作是從supervisor節(jié)點上獲取一個可用的端口,然后設(shè)置worker的端口,并將該端口從supervisor節(jié)點的可用端口列表中移除。代碼結(jié)構(gòu)非常簡單,如下:
private void putWorkerToSupervisor(SupervisorInfo supervisor, ResourceWorkerSlot worker) { int port = worker.getPort(); if (!supervisor.getAvailableWorkerPorts().contains(worker.getPort())) { port = supervisor.getAvailableWorkerPorts().iterator().next(); } worker.setPort(port); supervisor.getAvailableWorkerPorts().remove(port); worker.setNodeId(supervisor.getSupervisorId()); }
??設(shè)置好了一部分已經(jīng)分配好的worker之后,繼續(xù)分配那些沒有指定supervisor的worker。根據(jù)supervisor中可用端口逆序,從大到小排。然后調(diào)用putWorkerToSupervisor這個方法。
??putWorkerToSupervisor方法內(nèi)部首先統(tǒng)計所有已經(jīng)使用的端口,然后計算出一個理論的負(fù)載平均值{(所有使用掉的+將要分配的)/supervisor的個數(shù),就會得到分配后,集群的一個理論上的負(fù)載值theoryAveragePorts,可以平攤到每個supervisor身上}。然后通過遍歷需要分配worker的list,進行第一次分配,可以將worker依次分配到那些負(fù)載值(跟理論值的計算方式一樣)小于理論平均負(fù)載的supervisor上。而超過負(fù)載的,則放進到負(fù)載列表中。經(jīng)過一輪分配之后,如果還存在沒有分配的worker(源碼這里先進行排序再進行判斷,很明顯造成排序時間浪費的可能性)。根據(jù)supervisor中可用端口逆序,從大到小排序。再不斷將worker分配進去。
??到這里,worker的分配就順利結(jié)束了,總結(jié)一下,首先是根據(jù)拓?fù)湫畔⒊跏蓟舷挛男畔?,然后計算出實際使用的worker數(shù)目,如果這些worker有指定運行在某個supervisor節(jié)點上,那么就在節(jié)點上分配合適的worker。如果沒有指定,那么就根據(jù)節(jié)點的負(fù)載情況,盡量平均的分配到每個supervisor節(jié)點上。如果大家的負(fù)載都比較大的情況下,再分配到哪些具有比較多的可用端口的節(jié)點,完成分配。
??getAvailableWorkers方法完成了worker的分配,以及如果用戶指定了特定的worker上運行指定的task,剩下的taskid將會在接下來的方法中說明如何去分配。主要在TaskScheduler的構(gòu)造函數(shù)中,這里需要三個參數(shù),第一個是拓?fù)涞纳舷挛男畔efaultContext,第二個是需要分配的task的列表needAssignTasks,以及上文中獲取到的合適的worker列表availableWorkers。(ps:記住,前文如果沒有指定特定的worker資源分配的信息,則沒有taskid被分配到worker中去,也就是worker內(nèi)部僅有supervisorid,內(nèi)存,cpu,端口等信息,不存在tasks信息)。接下來看看TaskScheduler的構(gòu)造函數(shù)。
public TaskScheduler(DefaultTopologyAssignContext context, Settasks, List workers) { this.tasks = tasks; LOG.info("Tasks " + tasks + " is going to be assigned in workers " + workers); this.context = context; this.taskContext = new TaskAssignContext(this.buildSupervisorToWorker(workers), Common.buildSpoutOutoputAndBoltInputMap(context), context.getTaskToComponent()); this.componentSelector = new ComponentNumSelector(taskContext); this.inputComponentSelector = new InputComponentNumSelector(taskContext); this.totalTaskNumSelector = new TotalTaskNumSelector(taskContext); if (tasks.size() == 0) return; if (context.getAssignType() != TopologyAssignContext.ASSIGN_TYPE_REBALANCE || context.isReassign() != false){ // warning ! it doesn"t consider HA TM now!! if (context.getAssignSingleWorkerForTM() && tasks.contains(context.getTopologyMasterTaskId())) { assignForTopologyMaster(); } } int taskNum = tasks.size(); Map workerSlotIntegerMap = taskContext.getWorkerToTaskNum(); Set preAssignWorkers = new HashSet (); for (Entry worker : workerSlotIntegerMap.entrySet()) { if (worker.getValue() > 0) { taskNum += worker.getValue(); preAssignWorkers.add(worker.getKey()); } } setTaskNum(taskNum, workerNum); // Check the worker assignment status of pre-assigned workers, e.g user defined or old assignment workers. // Remove the workers which have been assigned with enough workers. for (ResourceWorkerSlot worker : preAssignWorkers) { if (taskContext.getWorkerToTaskNum().keySet().contains(worker)){ Set doneWorkers = removeWorkerFromSrcPool(taskContext.getWorkerToTaskNum().get(worker), worker); if (doneWorkers != null) { for (ResourceWorkerSlot doneWorker : doneWorkers) { taskNum -= doneWorker.getTasks().size(); workerNum--; } } } } setTaskNum(taskNum, workerNum); // For Scale-out case, the old assignment should be kept. if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE && context.isReassign() == false) { keepAssignment(taskNum, context.getOldAssignment().getWorkers()); } }
??在這個構(gòu)造函數(shù)中,首先是構(gòu)造一個task分配的上下文信息。這個對象主要需要維護的幾個重要信息是
taskToComponent:一個Map,Key表示taskid,Value表示所對應(yīng)的組件id。
supervisorToWorker:也是一個Map,Key表示這個拓?fù)浞峙涞膕upervisorid,Value表示節(jié)點上分配到的worker列表。
relationship:維護這個拓?fù)涞囊粋€結(jié)構(gòu)信息,依然是個Map,Key表示組件bolt/spout的組件id,Value表示的是,如果Key對應(yīng)組件是一個bolt,則Value存下是所有輸入到組件的對應(yīng)組件的id。如果Key對應(yīng)組件是一個spout,則Value存下是這個組件所有輸出到的組件id。舉個例子,integer(spout)輸出到sliding(bolt),sliding(bolt)輸出到printer(bolt)。則relationship存下的是[{integer,[sliding]},{sliding,[integer]},{printer,[sliding]}]。
workerToTaskNum:Map,Key表示一個worker,Value表示實際在這個worker上運行的task的總數(shù)目。
workerToComponentNum:Map,Key表示一個worker,Value表示一個Map,存下的是組件id,以及對應(yīng)的數(shù)目。
??緊接著初始化三個selector,第一個是ComponentNumSelector(內(nèi)部定義了一二WorkerComparator,負(fù)責(zé)對worker進行比對,對比worker內(nèi)某個組件的task數(shù)目。以及對比每個supervisor上所有worker內(nèi)某個組件的總task和),第二個是InputComponentNumSelector(內(nèi)部也是定義了兩個比對函數(shù),一個是獲取worker內(nèi)某個組件的全部輸入的task個數(shù),以及在整個supervisor上的全部輸入task個數(shù)),第三個是TotalTaskNumSelector(worker內(nèi)全部task的個數(shù),和supervisor上全部task的個數(shù))。這三個selector的目的都是為了后續(xù)合理的將task分配到這些worker上做的準(zhǔn)備。
??如果集群資源足夠,用戶定義TM需要多帶帶分配到一個獨立的worker上,則需要調(diào)用assignForTopologyMaster進行多帶帶分配。
private void assignForTopologyMaster() { int taskId = context.getTopologyMasterTaskId(); ResourceWorkerSlot workerAssigned = null; int workerNumOfSuperv = 0; for (ResourceWorkerSlot workerSlot : taskContext.getWorkerToTaskNum().keySet()){ Listworkers = taskContext.getSupervisorToWorker().get(workerSlot.getNodeId()); if (workers != null && workers.size() > workerNumOfSuperv) { for (ResourceWorkerSlot worker : workers) { Set tasks = worker.getTasks(); if (tasks == null || tasks.size() == 0) { workerAssigned = worker; workerNumOfSuperv = workers.size(); break; } } } } if (workerAssigned == null) throw new FailedAssignTopologyException("there"s no enough workers for the assignment of topology master"); updateAssignedTasksOfWorker(taskId, workerAssigned); taskContext.getWorkerToTaskNum().remove(workerAssigned); assignments.add(workerAssigned); tasks.remove(taskId); workerNum--; LOG.info("assignForTopologyMaster, assignments=" + assignments); }
??這個方法首先是找出某個最合適的worker,這個worker符合兩個條件,一是沒有分配其他的task,第二,worker所在的supervisor相對分配了最多的worker,第二點的目的是保證負(fù)載均衡。如果找不到合適的worker,那么就拋出異常。如果能找到的話,就把負(fù)責(zé)TM的task分配給這個worker。updateAssignedTasksOfWorker這個方法的目的就是更新新的分配情況。
??接下來獲取全部的task數(shù)目,以及已經(jīng)分配出去的worker列表preAssignWorkers。根據(jù)獲得的總task數(shù)目來計算每個worker上平均的task數(shù)目avgTaskNum,以及剩下多少還沒有分配出去的task(總task%總worker,求得余數(shù)leftTaskNum)。然后遍歷preAssignWorkers,調(diào)用方法removeWorkerFromSrcPool來判斷一個worker是否分配了足夠的task,并且移除那些已經(jīng)合理分配的task和worker。
for (ResourceWorkerSlot worker : preAssignWorkers) { if (taskContext.getWorkerToTaskNum().keySet().contains(worker)){ SetdoneWorkers = removeWorkerFromSrcPool(taskContext.getWorkerToTaskNum().get(worker), worker); if (doneWorkers != null) { for (ResourceWorkerSlot doneWorker : doneWorkers) { taskNum -= doneWorker.getTasks().size(); workerNum--; } } } }
??removeWorkerFromSrcPool這個方法挺有趣的,第一次看的時候有點懵逼,但是其實仔細(xì)看下就很明確了。下面我簡單講解下:
private SetremoveWorkerFromSrcPool(int taskNum, ResourceWorkerSlot worker) { Set ret = new HashSet (); if (leftTaskNum <= 0) { if (taskNum >= avgTaskNum) { taskContext.getWorkerToTaskNum().remove(worker); assignments.add(worker); ret.add(worker); } } else { if (taskNum > avgTaskNum ) { taskContext.getWorkerToTaskNum().remove(worker); leftTaskNum = leftTaskNum -(taskNum -avgTaskNum); assignments.add(worker); ret.add(worker); } if (leftTaskNum <= 0) { List needDelete = new ArrayList (); for (Entry entry : taskContext.getWorkerToTaskNum().entrySet()) { if (avgTaskNum != 0 && entry.getValue() == avgTaskNum) needDelete.add(entry.getKey()); } for (ResourceWorkerSlot workerToDelete : needDelete) { taskContext.getWorkerToTaskNum().remove(workerToDelete); assignments.add(workerToDelete); ret.add(workerToDelete); } } } return ret; }
??ret保存的是需要返回給調(diào)用者需要移除的worker集合??催@個方法,首先判斷,在剩余數(shù)小于等于0的情況,如果當(dāng)前worker內(nèi)的task數(shù)目大于等于平均數(shù),說明這個worker的確分配了合理的task。(原因是,如果leftTaskNum小于等于0,是不是就看成,平均數(shù)會比正常情況下加1。舉個例子,有3個盒子,10個球放進去,那么,平均數(shù)為3的情況下,余數(shù)為1,如果平均數(shù)為4,那么余數(shù)就是-2了)。如果leftTaskNum大于0,判斷就復(fù)雜一點,首先如果數(shù)目taskNum大于平均的avgTaskNum,說明這個worker多分配了一些task,那么這些多分配的就必須從leftTaskNum減去。甚至可能taskNum的數(shù)目大于avgTaskNum+leftTaskNum的數(shù)目,那么直接導(dǎo)致leftTaskNum小于等于0。在leftTaskNum小于等于0的情況下,找出分配上下文中worker分配的task數(shù)目剛好是平均數(shù)的worker,存在needDelete列表中。然后遍歷這個列表,把這些worker從加到需要移除的集合ret中,并返回。(因為如果有某個worker分配的數(shù)目多于avgTaskNum+leftTaskNum的數(shù)目,那么那些分配數(shù)是平均數(shù)的worker肯定是合理的,剩下那些分配小于平均數(shù)的才是需要調(diào)整的)。
??在執(zhí)行完上述的操作之后,更新下目前的平均數(shù)avgTaskNum和分配剩余的task數(shù)目leftTaskNum。(此刻還有一些task尚未實際分配),完成分配的調(diào)度是在assign方法中。在這個方法內(nèi),如果已經(jīng)沒有需要分配的task,則將原來已經(jīng)分配好的返回就行了。如果還存在需要分配的task,遍歷這個需要分配的task列表,如果task對應(yīng)的組件屬于系統(tǒng)組件(組件id為__acker或者__topology_master的組件),則存下來,如果是一般的task,則調(diào)用chooseWorker方法選擇一個合適的worker,然后將task分配到worker上。(當(dāng)然這里還需要做一些額外的操作,比如清除那些已經(jīng)合理的分配的worker,通過調(diào)用removeWorkerFromSrcPool這個方法去清除)。而chooseWorker這個方法利用的就是前文提到的三個selector來選擇最佳的supervisor,選擇最佳的worker(需要考慮這個task接收的input,需要考慮supervisor節(jié)點的負(fù)載情況和worker內(nèi)的負(fù)載情況)。分配完普通的task之后,在分配系統(tǒng)組件,分配方式也是一樣的。
??至此,task的分配也完成,總結(jié)一下,除開那些已經(jīng)指定的分配外,比較重要的是,定義合理的selector(綜合考慮節(jié)點負(fù)載,worker負(fù)載,已經(jīng)input輸入,考慮本地化)。分配的同時不斷去檢測是否已經(jīng)有worker已經(jīng)合理分配了,就不要在繼續(xù)分配到那個worker上。
??上述完成task和worker的分配之后,回到mkAssignment方法。剩下的操作就是設(shè)置task的HB起始時間和超時時間。這些比較簡單就不再細(xì)說了。
結(jié)束語??解讀拓?fù)浞峙涞倪^程可以讓我們更加清楚,我們寫的一個邏輯拓?fù)?,實際上是如何變成一個可以實際運行在集群的拓?fù)洹R约巴負(fù)淙绾伪WC負(fù)載均衡等問題。筆者后續(xù)還會更新JStorm幾個比較重要的特性的源碼分析。包括如何實現(xiàn)反壓機制,如何實現(xiàn)nimbus和supervisor容錯,supervisor啟動的時候需要執(zhí)行那些操作。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/67029.html
摘要:方法首先初始化一個回調(diào)函數(shù),這是當(dāng)一個成為之后就會調(diào)用的一個用于初始化一系列變量的方法,包括拓?fù)淙绾卧诩荷戏峙?,拓?fù)錉顟B(tài)更新,清除函數(shù),還有監(jiān)控線程等。 寫在前面的話,筆者第一次閱讀框架源碼,所以可能有些地方理解錯誤或者沒有詳細(xì)解釋,如果在閱讀過程發(fā)現(xiàn)錯誤很歡迎在文章下面評論指出。文章后續(xù)會陸續(xù)更新,可以關(guān)注或者收藏,轉(zhuǎn)發(fā)請先私信我,謝謝。對了,筆者看的是2.2.1這個版本 概述 ?...
摘要:第二個問題就是說業(yè)務(wù)團隊之間沒有擴大管理,預(yù)算和審核是無頭緒的。支持一些高優(yōu)先級的比如說支持以及窗口等特性包括說。到現(xiàn)在為止,整體遷移完了,還剩下十個左右的作業(yè)沒有遷移完。 作者:張光輝 本文將為大家展示字節(jié)跳動公司怎么把Storm從Jstorm遷移到Flink的整個過程以及后續(xù)的計劃。你可以借此了解字節(jié)跳動公司引入Flink的背景以及Flink集群的構(gòu)建過程。字節(jié)跳動公司是如何兼容以...
摘要:即使是容器已經(jīng)退出的也可以看到,所以可以通過這種方式來分析非預(yù)期的退出。也可以直接通過在容器內(nèi)啟動一個更方便地調(diào)試容器,不必一條條執(zhí)行。和獲得容器中進程的狀態(tài)和在容器里執(zhí)行的效果類似。通過查看容器的詳細(xì)信息飯后鏡像和容器的詳細(xì)信息。 『重用』容器名 但我們在編寫/調(diào)試Dockerfile的時候我們經(jīng)常會重復(fù)之前的command,比如這種docker run --name jstorm-...
摘要:在每臺主機上我們執(zhí)行列出主機和網(wǎng)絡(luò)接口。其它的應(yīng)用服務(wù)容器每個容器有兩個地址,一個屬于子網(wǎng),另一個屬于的子網(wǎng)。雖然這會帶來一些性能上的影響,但是可以確保的網(wǎng)絡(luò)默認(rèn)是安全的。 本文中,我們首先將Rancher部署到EC2實例上,并且添加新的主機,之后用Rancher的Catalog啟動了RocketChat應(yīng)用,緊接著對運行中的容器的網(wǎng)絡(luò)接口和其他屬性的進行了分析。 同時,我們簡要介紹了...
閱讀 1632·2021-11-22 09:34
閱讀 3367·2021-09-29 09:35
閱讀 632·2021-09-04 16:40
閱讀 2954·2019-08-30 15:53
閱讀 2631·2019-08-30 15:44
閱讀 2631·2019-08-30 14:10
閱讀 1373·2019-08-29 18:43
閱讀 2255·2019-08-29 13:26