摘要:第二個(gè)問題就是說業(yè)務(wù)團(tuán)隊(duì)之間沒有擴(kuò)大管理,預(yù)算和審核是無頭緒的。支持一些高優(yōu)先級的比如說支持以及窗口等特性包括說。到現(xiàn)在為止,整體遷移完了,還剩下十個(gè)左右的作業(yè)沒有遷移完。
作者:張光輝
本文將為大家展示字節(jié)跳動(dòng)公司怎么把Storm從Jstorm遷移到Flink的整個(gè)過程以及后續(xù)的計(jì)劃。你可以借此了解字節(jié)跳動(dòng)公司引入Flink的背景以及Flink集群的構(gòu)建過程。字節(jié)跳動(dòng)公司是如何兼容以前的Jstorm作業(yè)以及基于Flink做一個(gè)任務(wù)管理平臺的呢?本文將一一為你揭開這些神秘的面紗。
本文內(nèi)容如下:
引入Flink的背景
Flink集群的構(gòu)建過程
構(gòu)建流式管理平臺
引入Flink的背景下面這幅圖展示的是字節(jié)跳動(dòng)公司的業(yè)務(wù)場景
首先,應(yīng)用層有廣告,也有AB測,也有推送和數(shù)據(jù)倉庫的一些業(yè)務(wù)。然后在使用J storm的過程中,增加了一層模板主要應(yīng)用于storm的計(jì)算模型,使用的語言是python。所以說中間相對抽象了一個(gè)schema,跑在最下面一層J storm計(jì)算引擎的上面。
字節(jié)跳動(dòng)公司有很多J-storm集群,在當(dāng)時(shí)17年7月份的時(shí)候,也就是在計(jì)劃遷移到Flink之前,J storm集群的規(guī)模大概是下圖所示的規(guī)模級別,當(dāng)時(shí)已經(jīng)有5000臺機(jī)器左右了。
接下來,介紹下遷移Flink的整個(gè)過程。先詳細(xì)地介紹一下當(dāng)時(shí)J-Storm是怎么用的。
上面是一個(gè)word count的例子:左邊是一個(gè)目錄結(jié)構(gòu),這個(gè)目錄結(jié)構(gòu)在resources下面,里面的Spout/Bolt的邏輯都是一些python腳本寫的。然后在最外層還有一個(gè)topology_online.yaml配置文件。
這個(gè)配置文件是用來干什么的?就是把所有的Spout和Bolt串聯(lián)起來構(gòu)成一個(gè)有向無關(guān)圖,也就是DAG圖。這就是使用J storm時(shí)的整個(gè)目錄結(jié)構(gòu),大部分用戶都是這樣用的。右邊是Spout和Bolt的邏輯,其實(shí)是抽象出來了一個(gè)函數(shù),就在這里面寫業(yè)務(wù)方面的函數(shù),然后將tuple_batch也就是上游流下來的數(shù)據(jù)去做一些計(jì)算邏輯。
下面詳細(xì)介紹一下配置文件的信息,其實(shí)我們有整個(gè)拓?fù)浣Y(jié)構(gòu)拓?fù)涞男畔ⅲ热缯f作業(yè)名叫什么,作業(yè)需要多少資源,需要多少work數(shù)。這里面會(huì)有單個(gè)的spout和Bolt的配置信息,比如是消費(fèi)的topic還是一些并發(fā)度?
除了這些信息還有整個(gè)這個(gè)數(shù)據(jù)流的流轉(zhuǎn),比如說spout的輸出,輸出messsage的消息等等。最后還有整個(gè)的Spout到Bolt之間的shuffle邏輯。這就是我們之前Jstorm的整個(gè)使用方式。最后會(huì)把整個(gè)目錄結(jié)構(gòu)里面的內(nèi)容去解析出來,根據(jù)配置文件把整個(gè)storm的拓?fù)浣Y(jié)構(gòu)構(gòu)建出來,然后提交到集群上面去跑。
使用Jstorm集群遇到了什么問題呢?第一個(gè)問題,因?yàn)槲覀儺?dāng)時(shí)是用使用python寫的代碼,整個(gè)集群是沒有內(nèi)存隔離的,job和work之間是沒有內(nèi)存限制的。比如說在實(shí)際過程中會(huì)經(jīng)常遇到一個(gè)用戶,他可能代碼寫的有問題導(dǎo)致一個(gè)work可能占了70G內(nèi)存,把機(jī)器的內(nèi)存占了1/3。第二個(gè)問題就是說業(yè)務(wù)團(tuán)隊(duì)之間沒有擴(kuò)大管理,預(yù)算和審核是無頭緒的。我們當(dāng)時(shí)都是都是跑在一個(gè)大集群上面,然后個(gè)別業(yè)務(wù)是多帶帶跑在一些小集群,但是我們每次都是資源不足,也沒辦法梳理這個(gè)預(yù)算。
第三個(gè)問題就是集群過多,運(yùn)維平臺化做得不太好,都是靠人來運(yùn)維的。這個(gè)時(shí)候集群多了基本上是管不過來的。
第四個(gè)問題就是說我們用python寫的代碼,有些性能比較差。但是我們在Storm的基礎(chǔ)上面去推廣這個(gè)Java也比較難,因?yàn)槲覀儾糠滞聦?shí)際上是不認(rèn)可Java的,因?yàn)樗X得java開發(fā)速度太慢了。
我們當(dāng)時(shí)想解決上面的問題,一個(gè)思路是把Jstorm放在yarn上面,直接把Jstorm在yarn上面兼容做這一套。后來因?yàn)橹腊⒗镌谟肍link所以去調(diào)研Flink,發(fā)現(xiàn)了Flink的一些優(yōu)勢,所以想嘗試用Flink解決存在的問題。
使用Flink首先第一個(gè)問題可以成功解決,因?yàn)镕link作業(yè)是跑在yarn上面的,這就解決了內(nèi)存隔離的問題。然后Yarn也是支持隊(duì)列的,我們可以根據(jù)業(yè)務(wù)去劃分隊(duì)列,這樣我們的擴(kuò)大預(yù)算審核的問題得到解決了。我們也不需要自己運(yùn)維一個(gè)集群了,因?yàn)橛衴arn去管理我們的資源,這樣也節(jié)省了運(yùn)維成員。在此基礎(chǔ)上還可以做一些物理隔離隊(duì)列,其實(shí)物理隔離隊(duì)列現(xiàn)在也遇到了問題。因?yàn)槲锢砀綦x隊(duì)列只是說這個(gè)機(jī)器隔離了,但是相當(dāng)于是機(jī)柜也沒有隔離網(wǎng)絡(luò)帶寬也沒有隔離,所以說即使是物理隔離隊(duì)列,現(xiàn)在也遇到比如說和離線作業(yè)共用機(jī)柜的時(shí)候,這個(gè)機(jī)柜的出口帶寬被打滿的問題。針對這些問題,我們后續(xù)可能想在這個(gè)離線離線集群上面做QOS這種流量級別的方式來解決這個(gè)問題。
Flink實(shí)際上是可以兼容Storm的,比如說之前的歷史作業(yè)是可以遷移過來的,不需要維護(hù)兩套計(jì)算引擎。Flink支持一些高優(yōu)先級的API比如說支持SQL以及窗口等特性包括說checkpoint。我們頭條的業(yè)務(wù)對exactly-once的需求不是特別的強(qiáng)烈。
以上就是Flink的優(yōu)勢,于是我們就決定從J storm往Flink去遷移。
Flink集群的構(gòu)建過程在遷移的過程中,第一件事情是先把Flink集群建立起來。一開始肯定都是追求穩(wěn)定性,比如說把離線的yarn集群隔離開,然后不依賴于HDFS也可以把Hdfs線上的name node, name space隔離出來。然后我們梳理了原來storm上面的作業(yè),哪些作業(yè)屬于不同的業(yè)務(wù),然后映射到不同的隊(duì)列里面去,最后把一些特殊的隊(duì)列也隔離開來。這是我們準(zhǔn)備這個(gè)Fink集群的時(shí)候考慮的幾個(gè)點(diǎn)。
下面就考慮Flink怎么兼容J storm,然后把它遷移過來。
我們當(dāng)時(shí)Flink用的是1.32版本,因?yàn)镕link有Flink-storm這個(gè)工程,它能把Storm作業(yè)轉(zhuǎn)化成Flink作業(yè),我們就借鑒這些技術(shù)上實(shí)現(xiàn)了一個(gè)Flink –jstorm。相當(dāng)于把一個(gè)J storm的拓?fù)浣Y(jié)構(gòu)轉(zhuǎn)化成了一個(gè)Flink job。只做完這件事情是不夠的,因?yàn)槲覀冇幸幌盗械耐鈬ぞ咝枰R。比如說之前提交作業(yè)的時(shí)候是通過一個(gè)腳本提交的讓用戶去屏蔽一些其他的參數(shù)。使用 flink的話我們同樣也是需要構(gòu)建這么一個(gè)腳本,然后去提交Flink Job,最后停止flink Job。第三點(diǎn)是構(gòu)建flink job外圍工具,自動(dòng)注冊報(bào)警,比如說消費(fèi)延遲報(bào)警,自動(dòng)注冊這個(gè)Dashboard以及一些log service,所有的這些為外圍工具都要和原來的服務(wù)去對齊。
對齊完之后,我們需要構(gòu)建一個(gè)遷移腳本,遷移的過程中最困難的是資源配置這一塊。因?yàn)樵瓉鞸torm用了多少資源,Storm怎么配,這對于遷移的用戶來說,如果是第一次做肯定是不了解這些東西。因此我們寫這么一個(gè)腳本,幫用戶生成它Flink集群里面對應(yīng)的資源使用情況。這些工作做完了之后,我們就開始去遷移。到現(xiàn)在為止,整體遷移完了,還剩下十個(gè)左右的作業(yè)沒有遷移完。現(xiàn)在集群規(guī)模達(dá)到了大概是6000多臺。
在遷移的過程中我們有一些其他優(yōu)化,比如說J storm是能夠支持task和work維度的重啟的,F(xiàn)link這一塊做得不是特別好。我們在這方面做了一些優(yōu)化實(shí)現(xiàn)了一個(gè)single task和single tm粒度的重啟,這樣就解決部分作業(yè)因?yàn)閠ask重啟導(dǎo)致整個(gè)作業(yè)全部重啟。
構(gòu)建流式管理平臺遷移完之后,我們又構(gòu)建了一個(gè)流式管理平臺。這個(gè)平臺是為了解決實(shí)際過程中遇到了一些問題,比如說整個(gè)機(jī)群掛了無法確定哪些作業(yè)在上面跑著,也通知不到具體的用戶,有些用戶作業(yè)都不知道自己提交了哪些作業(yè)。我們構(gòu)建流式作業(yè)的時(shí)候目標(biāo)實(shí)際上就是和其他的管理平臺是一樣的,比如說我們提供一些界面操作,然后提供一個(gè)版本管理,就是為了方便方便用戶升級和回滾的操作,我們還提供了一站式的查問題的工具:把一些用戶需要的信息都聚合在一個(gè)頁面上面,防止用戶不斷跳來跳去以及避免不同系統(tǒng)之間的切換。有一些歷史記錄之前不管是跑在yarn上面還是跑到storm上面,我一個(gè)作業(yè)被別人kill到了,其實(shí)我都是不知道的。針對這個(gè)問題我們提供了一些歷史操作記錄的一些目標(biāo)。
設(shè)計(jì)這個(gè)管理平臺的時(shí)候,我們考慮到提供這么一個(gè)前端管理平臺可能只是針對公司內(nèi)部的一部分產(chǎn)品,其他的產(chǎn)品也做了自己的一套前端。他們可以用一個(gè)模板,根據(jù)自己的邏輯去生成一個(gè)storm任務(wù)?;诖?,我們把整個(gè)管理平臺抽象了兩層:最上一層實(shí)際上相當(dāng)于一個(gè)面向用戶或者說是類似于前端的一個(gè)產(chǎn)品。中間這一層實(shí)際上是一個(gè)類似于提交作業(yè)調(diào)度任務(wù),這一層只負(fù)責(zé)提任務(wù),然后停任務(wù),管理生命周期以及因?yàn)楣收蠈?dǎo)致作業(yè)失敗了,將作業(yè)重新拉起來。這是中間層TSS層做的事情。
這樣,我們就可以對接到所有的前端平臺。通過一個(gè)RPC進(jìn)行TSS通信,就把所有的底層的服務(wù)和Filnk和Yarn還有HDFS這些交互的底層的邏輯完全屏蔽開來了。
接下來,用戶寫一個(gè)作業(yè)就比較簡單了,流程如下:
第一步用戶先要生成自己的一個(gè)作業(yè)模板,我們這邊通過maven提供的腳本架去生成一些作業(yè)的schema,這個(gè)作業(yè)執(zhí)行完之后,它會(huì)把幫你把一些porm文件,還有一些類似于kafkasource這種常規(guī)的組件都幫你準(zhǔn)備好,然后你直接在這個(gè)模板里面填自己的主要邏輯就可以了。因?yàn)槲覀儗慗ava程序遇到最多的一個(gè)問題就是包沖突問題。所以porm文件幫助用戶把一些可能沖突的一些jar包都給以exclude掉,這樣包沖突的概率會(huì)越來越小。
我們測試作業(yè)基本上是用IDEA或者local模式去測試,也提供了一個(gè)腳本去提交作業(yè),通過這個(gè)腳本提交到stage環(huán)境上面。在提交注冊在平臺上面去注冊這個(gè)作業(yè),然后添加一些配置信息。
下面是一個(gè)代碼版本管理的界面:
把整個(gè)作業(yè)提交之后如下圖所示:
提交完一個(gè)作業(yè)之后,用戶可能想看作業(yè)運(yùn)行的狀態(tài)怎么樣,我們通過四種方式去給用戶展示他的作業(yè)運(yùn)行狀態(tài)的。
第一個(gè)是Flink UI,也就是官方自帶的UI用戶可以去看。第二個(gè)是Dashboard,我們展示了作業(yè)里面的task維度,QPS以及task之間的網(wǎng)絡(luò)buffer,這些重要的信息匯聚到一起創(chuàng)建了一個(gè)Dashboard,這樣可能查問題的時(shí)候方便一些。第三個(gè)是錯(cuò)誤日志,其實(shí)和大家的思路一樣,把一個(gè)分布式的日志然后聚合起來,然后寫到ES上面去。第四是做了一個(gè)Jobtrace的工具,就是我們把Flink里面常見的一些異常匹配出來,然后直接給用戶一個(gè)wiki的使用指南,告訴用戶比如說你的作業(yè)OM了需要擴(kuò)大內(nèi)存。只要用戶的作業(yè)出現(xiàn)了某些問題,我們把已知的所有的異常都會(huì)匹配給用戶。
下面是ES的kibana:
這是我們Jobtrace的功能,我們把Flink的這些常見的異常都匹配出來,每一個(gè)異常其實(shí)對應(yīng)了一個(gè)wiki然后去讓用戶去解決自己的問題。
最后分享下我們的近期規(guī)劃,前面的基本做完并且趨于穩(wěn)定了,但是現(xiàn)在又遇到了一些新的問題。比如資源使用率這個(gè)問題,因?yàn)橛脩籼峤蛔鳂I(yè)的時(shí)候,用戶對資源不是特別敏感就隨意把一個(gè)資源提上去了,可能他明明需要兩個(gè)CPU,但是他提了四個(gè)CPU。我們想通過一個(gè)工具能夠監(jiān)控到他需要多少資源,然后通知yarn去把這個(gè)資源給重置了。就是動(dòng)態(tài)調(diào)整job資源,自動(dòng)把資源重置。
第二個(gè)問題是優(yōu)化作業(yè)重啟速度。我們這邊好多業(yè)務(wù)是根據(jù)流式計(jì)算的指標(biāo)來監(jiān)控它業(yè)務(wù)的穩(wěn)定性,如果最上游重啟一個(gè)作業(yè),底下一群人收到報(bào)警說線上出現(xiàn)一些問題了。原因是最上游某一個(gè)作業(yè)再重啟。我們想把重啟時(shí)間間隔去做到最短或者是無縫重啟,這是下一階段需要去探索探索的一個(gè)問題。
第四點(diǎn):Flink SQL也剛上線,可能需要一些精力投入去推廣。
最后一點(diǎn),我們希望在此抽象出更多的模式作業(yè)模型來,因?yàn)槲覀儽旧硎怯幸恍┍热缯fkafka2ES,kafka2hdfs這些需求,能不能把他們抽象成一個(gè)schema,然后去對外提供一些服務(wù)。
以上就是我本次分享的主要內(nèi)容,感謝Flink的舉辦者和參與者,感謝我們的同事,因?yàn)橐陨系姆窒韮?nèi)容是我和我們同事一起做的。
更多資訊請?jiān)L問 Apache Flink 中文社區(qū)網(wǎng)站
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/36000.html
摘要:在有贊的技術(shù)演進(jìn)。業(yè)務(wù)數(shù)據(jù)量正在不斷增大,這些任務(wù)會(huì)影響業(yè)務(wù)對外服務(wù)的承諾。監(jiān)控需要收集上執(zhí)行的的審計(jì)信息,包括提交者執(zhí)行的具體,開始結(jié)束時(shí)間,執(zhí)行完成狀態(tài)。還有一點(diǎn)是詳細(xì)介紹了的原理,實(shí)踐中設(shè)置了的比默認(rèn)的減少了以上的時(shí)間。 前言 有贊數(shù)據(jù)平臺從2017年上半年開始,逐步使用 SparkSQL 替代 Hive 執(zhí)行離線任務(wù),目前 SparkSQL 每天的運(yùn)行作業(yè)數(shù)量5000個(gè),占離線...
摘要:在有贊的技術(shù)演進(jìn)。業(yè)務(wù)數(shù)據(jù)量正在不斷增大,這些任務(wù)會(huì)影響業(yè)務(wù)對外服務(wù)的承諾。監(jiān)控需要收集上執(zhí)行的的審計(jì)信息,包括提交者執(zhí)行的具體,開始結(jié)束時(shí)間,執(zhí)行完成狀態(tài)。還有一點(diǎn)是詳細(xì)介紹了的原理,實(shí)踐中設(shè)置了的比默認(rèn)的減少了以上的時(shí)間。 前言 有贊數(shù)據(jù)平臺從2017年上半年開始,逐步使用 SparkSQL 替代 Hive 執(zhí)行離線任務(wù),目前 SparkSQL 每天的運(yùn)行作業(yè)數(shù)量5000個(gè),占離線...
閱讀 1587·2021-09-26 09:46
閱讀 2675·2021-09-07 09:59
閱讀 2760·2021-09-07 09:59
閱讀 1887·2019-08-30 14:20
閱讀 936·2019-08-26 13:39
閱讀 3183·2019-08-26 12:24
閱讀 781·2019-08-26 11:55
閱讀 1222·2019-08-23 16:49