摘要:實(shí)際上,本身就預(yù)留了與外部元數(shù)據(jù)對(duì)接的能力,分別提供了和這兩個(gè)抽象。對(duì)接外部數(shù)據(jù)源搞清楚了注冊(cè)庫(kù)表的過(guò)程,給我們帶來(lái)這樣一個(gè)思路如果外部元數(shù)據(jù)創(chuàng)建的表也能被轉(zhuǎn)換成可識(shí)別的,那么就能被無(wú)縫地注冊(cè)到。
本文整理自 2019 年 4 月 13 日在深圳舉行的 Flink Meetup 會(huì)議,分享嘉賓張俊,目前擔(dān)任 OPPO 大數(shù)據(jù)平臺(tái)研發(fā)負(fù)責(zé)人,也是 Apache Flink contributor。本文主要內(nèi)容如下:
OPPO 實(shí)時(shí)數(shù)倉(cāng)的演進(jìn)思路;
基于 Flink SQL 的擴(kuò)展工作;
構(gòu)建實(shí)時(shí)數(shù)倉(cāng)的應(yīng)用案例;
未來(lái)工作的思考和展望。
一.OPPO 實(shí)時(shí)數(shù)倉(cāng)的演進(jìn)思路
1.1.OPPO 業(yè)務(wù)與數(shù)據(jù)規(guī)模
大家都知道 OPPO 是做智能手機(jī)的,但并不知道 OPPO 與互聯(lián)網(wǎng)以及大數(shù)據(jù)有什么關(guān)系,下圖概要介紹了 OPPO 的業(yè)務(wù)與數(shù)據(jù)情況:
OPPO 作為手機(jī)廠(chǎng)商,基于 Android 定制了自己的 ColorOS 系統(tǒng),當(dāng)前日活躍用戶(hù)超過(guò) 2 億。圍繞 ColorOS,OPPO 構(gòu)建了很多互聯(lián)網(wǎng)應(yīng)用,比如應(yīng)用商店、瀏覽器、信息流等。在運(yùn)營(yíng)這些互聯(lián)網(wǎng)應(yīng)用的過(guò)程中,OPPO 積累了大量的數(shù)據(jù),上圖右邊是整體數(shù)據(jù)規(guī)模的演進(jìn):從 2012 年開(kāi)始每年都是 2~3 倍的增長(zhǎng)速度,截至目前總數(shù)據(jù)量已經(jīng)超過(guò) 100PB,日增數(shù)據(jù)量超過(guò) 200TB。
要支撐這么大的一個(gè)數(shù)據(jù)量,OPPO 研發(fā)出一整套的數(shù)據(jù)系統(tǒng)與服務(wù),并逐漸形成了自己的數(shù)據(jù)中臺(tái)體系。
1.2.OPPO 數(shù)據(jù)中臺(tái)
今年大家都在談數(shù)據(jù)中臺(tái),OPPO 是如何理解數(shù)據(jù)中臺(tái)的呢?我們把它分成了 4 個(gè)層次:
最下層是統(tǒng)一工具體系,涵蓋了"接入 - 治理 - 開(kāi)發(fā) - 消費(fèi)"全數(shù)據(jù)鏈路;
基于工具體系之上構(gòu)建了數(shù)據(jù)倉(cāng)庫(kù),劃分成"原始層 - 明細(xì)層 - 匯總層 - 應(yīng)用層",這也是經(jīng)典的數(shù)倉(cāng)架構(gòu);
再往上是全域的數(shù)據(jù)體系,什么是全域呢?就是把公司所有的業(yè)務(wù)數(shù)據(jù)都打通,形成統(tǒng)一的數(shù)據(jù)資產(chǎn),比如 ID-Mapping、用戶(hù)標(biāo)簽等;
最終,數(shù)據(jù)要能被業(yè)務(wù)用起來(lái),需要場(chǎng)景驅(qū)動(dòng)的數(shù)據(jù)產(chǎn)品與服務(wù)。
以上就是 OPPO 數(shù)據(jù)中臺(tái)的整個(gè)體系,而數(shù)據(jù)倉(cāng)庫(kù)在其中處于非?;A(chǔ)與核心的位置。
1.3. 構(gòu)建 OPPO 離線(xiàn)數(shù)倉(cāng)
過(guò)往 2、3 年,我們的重點(diǎn)聚焦在離線(xiàn)數(shù)倉(cāng)的構(gòu)建。上圖大致描述了整個(gè)構(gòu)建過(guò)程:首先,數(shù)據(jù)來(lái)源基本是手機(jī)、日志文件以及 DB 數(shù)據(jù)庫(kù),我們基于 Apache NiFi 打造了高可用、高吞吐的接入系統(tǒng),將數(shù)據(jù)統(tǒng)一落入 HDFS,形成原始層;緊接著,基于 Hive 的小時(shí)級(jí) ETL 與天級(jí)匯總 Hive 任務(wù),分別負(fù)責(zé)計(jì)算生成明細(xì)層與匯總層;最后,應(yīng)用層是基于 OPPO 內(nèi)部研發(fā)的數(shù)據(jù)產(chǎn)品,主要是報(bào)表分析、用戶(hù)畫(huà)像以及接口服務(wù)。此外,中間的明細(xì)層還支持基于 Presto 的即席查詢(xún)與自助提數(shù)。
伴隨著離線(xiàn)數(shù)倉(cāng)的逐步完善,業(yè)務(wù)對(duì)實(shí)時(shí)數(shù)倉(cāng)的訴求也愈發(fā)強(qiáng)烈。
1.4. 數(shù)倉(cāng)實(shí)時(shí)化的訴求
對(duì)于數(shù)倉(cāng)實(shí)時(shí)化的訴求,大家通常都是從業(yè)務(wù)視角來(lái)看,但其實(shí)站在平臺(tái)的角度,實(shí)時(shí)化也能帶來(lái)切實(shí)的好處。首先,從業(yè)務(wù)側(cè)來(lái)看,報(bào)表、標(biāo)簽、接口等都會(huì)有實(shí)時(shí)的應(yīng)用場(chǎng)景,分別參見(jiàn)上圖左邊的幾個(gè)案例;其次,對(duì)平臺(tái)側(cè)來(lái)說(shuō),我們可以從三個(gè)案例來(lái)看:第一,OPPO 大量的批量任務(wù)都是從 0 點(diǎn)開(kāi)始啟動(dòng),都是通過(guò) T+1 的方式去做數(shù)據(jù)處理,這會(huì)導(dǎo)致計(jì)算負(fù)載集中爆發(fā),對(duì)集群的壓力很大;第二,標(biāo)簽導(dǎo)入也屬于一種 T+1 批量任務(wù),每次全量導(dǎo)入都會(huì)耗費(fèi)很長(zhǎng)的時(shí)間;第三,數(shù)據(jù)質(zhì)量的監(jiān)控也必須是 T+1 的,導(dǎo)致沒(méi)辦法及時(shí)發(fā)現(xiàn)數(shù)據(jù)的一些問(wèn)題。
既然業(yè)務(wù)側(cè)和平臺(tái)側(cè)都有實(shí)時(shí)化的這個(gè)訴求,那 OPPO 是如何來(lái)構(gòu)建自己的實(shí)時(shí)數(shù)倉(cāng)呢?
1.5. 離線(xiàn)到實(shí)時(shí)的平滑遷移
無(wú)論是一個(gè)平臺(tái)還是一個(gè)系統(tǒng),都離不開(kāi)上下兩個(gè)層次的構(gòu)成:上層是 API,是面向用戶(hù)的編程抽象與接口;下層是 Runtime,是面向內(nèi)核的執(zhí)行引擎。我們希望從離線(xiàn)到實(shí)時(shí)的遷移是平滑的,是什么意思呢?從 API 這層來(lái)看,數(shù)倉(cāng)的抽象是 Table、編程接口是 SQL+UDF,離線(xiàn)數(shù)倉(cāng)時(shí)代用戶(hù)已經(jīng)習(xí)慣了這樣的 API,遷移到實(shí)時(shí)數(shù)倉(cāng)后最好也能保持一致。而從 Runtime 這層來(lái)看,計(jì)算引擎從 Hive 演進(jìn)到了 Flink,存儲(chǔ)引擎從 HDFS 演進(jìn)到了 Kafka。
基于以上的思路,只需要把之前提到的離線(xiàn)數(shù)倉(cāng) pipeline 改造下,就得到了實(shí)時(shí)數(shù)倉(cāng) pipeline。
1.6. 構(gòu)建 OPPO 實(shí)時(shí)數(shù)倉(cāng)
從上圖可以看到,整個(gè) pipeline 與離線(xiàn)數(shù)倉(cāng)基本相似,只是把 Hive 替換為 Flink,把 HDFS 替換為 Kafka。從總體流程來(lái)看,基本模型是不變的,還是由原始層、明細(xì)層、匯總層、應(yīng)用層的級(jí)聯(lián)計(jì)算來(lái)構(gòu)成。
因此,這里的核心問(wèn)題是如何基于 Flink 構(gòu)建出這個(gè) pipeline,下面就介紹下我們基于 Flink SQL 所做的一些工作。
二. 基于 Flink SQL 的擴(kuò)展工作
2.1.Why Flink SQL
首先,為什么要用 Flink SQL? 下圖展示了 Flink 框架的基本結(jié)構(gòu),最下面是 Runtime,這個(gè)執(zhí)行引擎我們認(rèn)為最核心的優(yōu)勢(shì)是四個(gè):第一,低延遲,高吞吐;第二,端到端的 Exactly-once;第三,可容錯(cuò)的狀態(tài)管理;第四,Window & Event time 的支持?;?Runtime 抽象出 3 個(gè)層次的 API,SQL 處于最上層。
Flink SQL API 有哪些優(yōu)勢(shì)呢?我們也從四個(gè)方面去看:第一,支持 ANSI SQL 的標(biāo)準(zhǔn);第二,支持豐富的數(shù)據(jù)類(lèi)型與內(nèi)置函數(shù),包括常見(jiàn)的算術(shù)運(yùn)算與統(tǒng)計(jì)聚合;第三,可自定義 Source/Sink,基于此可以靈活地?cái)U(kuò)展上下游;第四,批流統(tǒng)一,同樣的 SQL,既可以跑離線(xiàn)也可以跑實(shí)時(shí)。
那么,基于 Flink SQL API 如何編程呢?下面是一個(gè)簡(jiǎn)單的演示:
首先是定義與注冊(cè)輸入 / 輸出表,這里創(chuàng)建了 2 張 Kakfa 的表,指定 kafka 版本是什么、對(duì)應(yīng)哪個(gè) topic;接下來(lái)是注冊(cè) UDF,篇幅原因這里沒(méi)有列出 UDF 的定義;最后是才是執(zhí)行真正的 SQL??梢钥吹?,為了執(zhí)行 SQL,需要做這么多的編碼工作,這并不是我們希望暴露給用戶(hù)的接口。
2.2. 基于 WEB 的開(kāi)發(fā) IDE
前面提到過(guò),數(shù)倉(cāng)的抽象是 Table,編程接口是 SQL+UDF。對(duì)于用戶(hù)來(lái)說(shuō),平臺(tái)提供的編程界面應(yīng)該是類(lèi)似上圖的那種,有用過(guò) HUE 做交互查詢(xún)的應(yīng)該很熟悉。左邊的菜單是 Table 列表,右邊是 SQL 編輯器,可以在上面直接寫(xiě) SQL,然后提交執(zhí)行。要實(shí)現(xiàn)這樣一種交互方式,F(xiàn)link SQL 默認(rèn)是無(wú)法實(shí)現(xiàn)的,中間存在 gap,總結(jié)下來(lái)就 2 點(diǎn):第一,元數(shù)據(jù)的管理,怎么去創(chuàng)建庫(kù)表,怎么去上傳 UDF,使得之后在 SQL 中可直接引用;第二,SQL 作業(yè)的管理,怎么去編譯 SQL,怎么去提交作業(yè)。
在技術(shù)調(diào)研過(guò)程中,我們發(fā)現(xiàn)了 Uber 在 2017 年開(kāi)源的 AthenaX 框架。
2.3.AthenaX:基于 REST 的 SQL 管理器
AthenaX 可以看作是一個(gè)基于 REST 的 SQL 管理器,它是怎么實(shí)現(xiàn) SQL 作業(yè)與元數(shù)據(jù)管理的呢?
對(duì)于 SQL 作業(yè)提交,AthenaX 中有一個(gè) Job 的抽象,封裝了要執(zhí)行的 SQL 以及作業(yè)資源等信息。所有的 Job 由一個(gè) JobStore 來(lái)托管,它定期跟 YARN 當(dāng)中處于 Running 狀態(tài)的 App 做一個(gè)匹配。如果不一致,就會(huì)向 YARN 提交對(duì)應(yīng)的 Job。
對(duì)于元數(shù)據(jù)管理,核心的問(wèn)題是如何將外部創(chuàng)建的庫(kù)表注入 Flink,使得 SQL 中可以識(shí)別到。實(shí)際上,F(xiàn)link 本身就預(yù)留了與外部元數(shù)據(jù)對(duì)接的能力,分別提供了 ExternalCatalog 和 ExternalCatalogTable 這兩個(gè)抽象。AthenaX 在此基礎(chǔ)上再封裝出一個(gè) TableCatalog,在接口層面做了一定的擴(kuò)展。在提交 SQL 作業(yè)的階段,AthenaX 會(huì)自動(dòng)將 TableCatalog 注冊(cè)到 Flink,再調(diào)用 Flink SQL 的接口將 SQL 編譯為 Flink 的可執(zhí)行單元 JobGraph,并最終提交到 YARN 生成新的 App。
AthenaX 雖然定義好了 TableCatalog 接口,但并沒(méi)有提供可直接使用的實(shí)現(xiàn)。那么,我們?cè)趺磥?lái)實(shí)現(xiàn),以便對(duì)接到我們已有的元數(shù)據(jù)系統(tǒng)呢?
2.4.Flink SQL 注冊(cè)庫(kù)表的過(guò)程
首先,我們得搞清楚 Flink SQL 內(nèi)部是如何注冊(cè)庫(kù)表的。整個(gè)過(guò)程涉及到三個(gè)基本的抽象:TableDescriptor、TableFactory 以及 TableEnvironment。
TableDescriptor 顧名思義,是對(duì)表的描述,它由三個(gè)子描述符構(gòu)成:第一是 Connector,描述數(shù)據(jù)的來(lái)源,比如 Kafka、ES 等;第二是 Format,描述數(shù)據(jù)的格式,比如 csv、json、avro 等;第三是 Schema,描述每個(gè)字段的名稱(chēng)與類(lèi)型。TableDescriptor 有兩個(gè)基本的實(shí)現(xiàn)——ConnectTableDescriptor 用于描述內(nèi)部表,也就是編程方式創(chuàng)建的表;ExternalCatalogTable 用于描述外部表。
有了 TableDescriptor,接下來(lái)需要 TableFactory 根據(jù)描述信息來(lái)實(shí)例化 Table。不同的描述信息需要不同的 TableFactory 來(lái)處理,F(xiàn)link 如何找到匹配的 TableFactory 實(shí)現(xiàn)呢?實(shí)際上,為了保證框架的可擴(kuò)展性,F(xiàn)link 采用了 Java SPI 機(jī)制來(lái)加載所有聲明過(guò)的 TableFactory,通過(guò)遍歷的方式去尋找哪個(gè) TableFactory 是匹配該 TableDescriptor 的。TableDescriptor 在傳遞給 TableFactory 前,被轉(zhuǎn)換成一個(gè) map,所有的描述信息都用 key-value 形式來(lái)表達(dá)。TableFactory 定義了兩個(gè)用于過(guò)濾匹配的方法——一個(gè)是 requiredContext(),用于檢測(cè)某些特定 key 的 value 是否匹配,比如 connector.type 是否為 kakfa;另一個(gè)是 supportedProperties(),用于檢測(cè) key 是否能識(shí)別,如果出現(xiàn)不識(shí)別的 key,說(shuō)明無(wú)法匹配。
匹配到了正確的 TableFactory,接下來(lái)就是創(chuàng)建真正的 Table,然后將其通過(guò) TableEnvironment 注冊(cè)。最終注冊(cè)成功的 Table,才能在 SQL 中引用。
2.5.Flink SQL 對(duì)接外部數(shù)據(jù)源
搞清楚了 Flink SQL 注冊(cè)庫(kù)表的過(guò)程,給我們帶來(lái)這樣一個(gè)思路:如果外部元數(shù)據(jù)創(chuàng)建的表也能被轉(zhuǎn)換成 TableFactory 可識(shí)別的 map,那么就能被無(wú)縫地注冊(cè)到 TableEnvironment?;谶@個(gè)思路,我們實(shí)現(xiàn)了 Flink SQL 與已有元數(shù)據(jù)中心的對(duì)接,大致過(guò)程參見(jiàn)下圖:
通過(guò)元數(shù)據(jù)中心創(chuàng)建的表,都會(huì)將元數(shù)據(jù)信息存儲(chǔ)到 MySQL,我們用一張表來(lái)記錄 Table 的基本信息,然后另外三張表分別記錄 Connector、Format、Schema 轉(zhuǎn)換成 key-value 后的描述信息。之所以拆開(kāi)成三張表,是為了能夠能獨(dú)立的更新這三種描述信息。接下來(lái)是定制實(shí)現(xiàn)的 ExternalCatalog,能夠讀取 MySQL 這四張表,并轉(zhuǎn)換成 map 結(jié)構(gòu)。
2.6. 實(shí)時(shí)表 - 維表關(guān)聯(lián)
到目前為止,我們的平臺(tái)已經(jīng)具備了元數(shù)據(jù)管理與 SQL 作業(yè)管理的能力,但是要真正開(kāi)放給用戶(hù)使用,還有一點(diǎn)基本特性存在缺失。通過(guò)我們?nèi)?gòu)建數(shù)倉(cāng),星型模型是無(wú)法避免的。這里有一個(gè)比較簡(jiǎn)單的案例:中間的事實(shí)表記錄了廣告點(diǎn)擊流,周邊是關(guān)于用戶(hù)、廣告、產(chǎn)品、渠道的維度表。
假定我們有一個(gè) SQL 分析,需要將點(diǎn)擊流表與用戶(hù)維表進(jìn)行關(guān)聯(lián),這個(gè)目前在 Flink SQL 中應(yīng)該怎么來(lái)實(shí)現(xiàn)?我們有兩種實(shí)現(xiàn)方式,一個(gè)基于 UDF,一個(gè)基于 SQL 轉(zhuǎn)換,下面分別展開(kāi)來(lái)講一下。
2.7. 基于 UDF 的維表關(guān)聯(lián)
首先是基于 UDF 的實(shí)現(xiàn),需要用戶(hù)將原始 SQL 改寫(xiě)為帶 UDF 調(diào)用的 SQL,這里是 userDimFunc,上圖右邊是它的代碼實(shí)現(xiàn)。UserDimFunc 繼承了 Flink SQL 抽象的 TableFunction,它是其中一種 UDF 類(lèi)型,可以將任意一行數(shù)據(jù)轉(zhuǎn)換成一行或多行數(shù)據(jù)。為了實(shí)現(xiàn)維表關(guān)聯(lián),在 UDF 初始化時(shí)需要從 MySQL 全量加載維表的數(shù)據(jù),緩存在內(nèi)存 cache 中。后續(xù)對(duì)每行數(shù)據(jù)的處理,TableFunction 會(huì)調(diào)用 eval() 方法,在 eval() 中根據(jù) user_id 去查找 cache,從而實(shí)現(xiàn)關(guān)聯(lián)。當(dāng)然,這里是假定維表數(shù)據(jù)比較小,如果數(shù)據(jù)量很大,不適合全量的加載與緩存,這里不做展開(kāi)了。
基于 UDF 的實(shí)現(xiàn),對(duì)用戶(hù)和平臺(tái)來(lái)說(shuō)都不太友好:用戶(hù)需要寫(xiě)奇怪的 SQL 語(yǔ)句,比如圖中的 LATERAL TABLE;平臺(tái)需要為每個(gè)關(guān)聯(lián)場(chǎng)景定制特定的 UDF,維護(hù)成本太高。有沒(méi)有更好的方式呢?下面我們來(lái)看看基于 SQL 轉(zhuǎn)換的實(shí)現(xiàn)。
2.8. 基于 SQL 轉(zhuǎn)換的維表關(guān)聯(lián)
我們希望解決基于 UDF 實(shí)現(xiàn)所帶來(lái)的問(wèn)題,用戶(hù)不需要改寫(xiě)原始 SQL,平臺(tái)不需要開(kāi)發(fā)很多 UDF。有一種思路是,是否可以在 SQL 交給 Flink 編譯之前,加一層 SQL 的解析與改寫(xiě),自動(dòng)實(shí)現(xiàn)維表的關(guān)聯(lián)?經(jīng)過(guò)一定的技術(shù)調(diào)研與 POC,我們發(fā)現(xiàn)是行得通的,所以稱(chēng)之為基于 SQL 轉(zhuǎn)換的實(shí)現(xiàn)。下面將該思路展開(kāi)解釋下。
首先,增加的 SQL 解析是為了識(shí)別 SQL 中是否存在預(yù)先定義的維度表,比如上圖中的 user_dim。一旦識(shí)別到維表,將觸發(fā) SQL 改寫(xiě)的流程,將紅框標(biāo)注的 join 語(yǔ)句改寫(xiě)成新的 Table,這個(gè) Table 怎么得到呢?我們知道,流計(jì)算領(lǐng)域近年來(lái)發(fā)展出“流表二象性”的理念,F(xiàn)link 也是該理念的踐行者。這意味著,在 Flink 中 Stream 與 Table 之間是可以相互轉(zhuǎn)換的。我們把 ad_clicks 對(duì)應(yīng)的 Table 轉(zhuǎn)換成 Stream,再調(diào)用 flatmap 形成另一個(gè) Stream,最后再轉(zhuǎn)換回 Table,就得到了 ad_clicks_user。最后的問(wèn)題是,flatmap 是如何實(shí)現(xiàn)維表關(guān)聯(lián)的?
Flink 中對(duì)于 Stream 的 flatmap 操作,實(shí)際上是執(zhí)行一個(gè) RichFlatmapFunciton,每來(lái)一行數(shù)據(jù)就調(diào)用其 flatmap() 方法做轉(zhuǎn)換。那么,我們可以定制一個(gè) RichFlatmapFunction,來(lái)實(shí)現(xiàn)維表數(shù)據(jù)的加載、緩存、查找以及關(guān)聯(lián),功能與基于 UDF 的 TableFunction 實(shí)現(xiàn)類(lèi)似。
既然 RichFlatmapFunciton 的實(shí)現(xiàn)邏輯與 TableFunction 相似,那為什么相比基于 UDF 的方式,這種實(shí)現(xiàn)能更加通用呢?核心的點(diǎn)在于多了一層 SQL 解析,可以將維表的信息獲取出來(lái)(比如維表名、關(guān)聯(lián)字段、select 字段等),再封裝成 JoinContext 傳遞給 RichFlatmapFunciton,使得的表達(dá)能力就具備通用性了。
二.構(gòu)建實(shí)時(shí)數(shù)倉(cāng)的應(yīng)用案例
下面分享幾個(gè)典型的應(yīng)用案例,都是在我們的平臺(tái)上用 Flink SQL 來(lái)實(shí)現(xiàn)的。
3.1. 實(shí)時(shí) ETL 拆分
這里是一個(gè)典型的實(shí)時(shí) ETL 鏈路,從大表中拆分出各業(yè)務(wù)對(duì)應(yīng)的小表:
OPPO 的最大數(shù)據(jù)來(lái)源是手機(jī)端埋點(diǎn),從手機(jī) APP 過(guò)來(lái)的數(shù)據(jù)有一個(gè)特點(diǎn),所有的數(shù)據(jù)是通過(guò)統(tǒng)一的幾個(gè)通道上報(bào)過(guò)來(lái)。因?yàn)椴豢赡苊恳淮螛I(yè)務(wù)有新的埋點(diǎn),都要去升級(jí)客戶(hù)端,去增加新的通道。比如我們有個(gè) sdk_log 通道,所有 APP 應(yīng)用的埋點(diǎn)都往這個(gè)通道上報(bào)數(shù)據(jù),導(dǎo)致這個(gè)通道對(duì)應(yīng)的原始層表巨大,一天幾十個(gè) TB。但實(shí)際上,每個(gè)業(yè)務(wù)只關(guān)心它自身的那部分?jǐn)?shù)據(jù),這就要求我們?cè)谠紝舆M(jìn)行 ETL 拆分。
這個(gè) SQL 邏輯比較簡(jiǎn)單,無(wú)非是根據(jù)某些業(yè)務(wù)字段做篩選,插入到不同的業(yè)務(wù)表中去。它的特點(diǎn)是,多行 SQL 最終合并成一個(gè) SQL 提交給 Flink 執(zhí)行。大家擔(dān)心的是,包含了 4 個(gè) SQL,會(huì)不會(huì)對(duì)同一份數(shù)據(jù)重復(fù)讀取 4 次?其實(shí),在 Flink 編譯 SQL 的階段是會(huì)做一些優(yōu)化的,因?yàn)樽罱K指向的是同一個(gè) kafka topic,所以只會(huì)讀取 1 次數(shù)據(jù)。
另外,同樣的 Flink SQL,我們同時(shí)用于離線(xiàn)與實(shí)時(shí)數(shù)倉(cāng)的 ETL 拆分,分別落入 HDFS 與 Kafka。Flink 中本身支持寫(xiě)入 HDFS 的 Sink,比如 RollingFileSink。
3.2. 實(shí)時(shí)指標(biāo)統(tǒng)計(jì)
這里是一個(gè)典型的計(jì)算信息流 CTR 的這個(gè)案例,分別計(jì)算一定時(shí)間段內(nèi)的曝光與點(diǎn)擊次數(shù),相除得到點(diǎn)擊率導(dǎo)入 Mysql,然后通過(guò)我們內(nèi)部的報(bào)表系統(tǒng)來(lái)可視化。這個(gè) SQL 的特點(diǎn)是它用到了窗口 (Tumbling Window) 以及子查詢(xún)。
3.3. 實(shí)時(shí)標(biāo)簽導(dǎo)入
這里是一個(gè)實(shí)時(shí)標(biāo)簽導(dǎo)入的案例,手機(jī)端實(shí)時(shí)感知到當(dāng)前用戶(hù)的經(jīng)緯度,轉(zhuǎn)換成具體 POI 后導(dǎo)入 ES,最終在標(biāo)簽系統(tǒng)上做用戶(hù)定向。
這個(gè) SQL 的特點(diǎn)是用了 AggregateFunction,在 5 分鐘的窗口內(nèi),我們只關(guān)心用戶(hù)最新一次上報(bào)的經(jīng)緯度。AggregateFunction 是一種 UDF 類(lèi)型,通常是用于聚合指標(biāo)的統(tǒng)計(jì),比如計(jì)算 sum 或者 average。在這個(gè)示例中,由于我們只關(guān)心最新的經(jīng)緯度,所以每次都替換老的數(shù)據(jù)即可。
四. 未來(lái)工作的思考和展望
最后,給大家分享一下關(guān)于未來(lái)工作,我們的一些思考與規(guī)劃,還不是太成熟,拋出來(lái)和大家探討一下。
4.1. 端到端的實(shí)時(shí)流處理
什么是端到端?一端是采集到的原始數(shù)據(jù),另一端是報(bào)表 / 標(biāo)簽 / 接口這些對(duì)數(shù)據(jù)的呈現(xiàn)與應(yīng)用,連接兩端的是中間實(shí)時(shí)流。當(dāng)前我們基于 SQL 的實(shí)時(shí)流處理,源表是 Kafka,目標(biāo)表也是 Kafka,統(tǒng)一經(jīng)過(guò) Kafka 后再導(dǎo)入到 Druid/ES/HBase。這樣設(shè)計(jì)的目的是提高整體流程的穩(wěn)定性與可用性:首先,kafka 作為下游系統(tǒng)的緩沖,可以避免下游系統(tǒng)的異常影響實(shí)時(shí)流的計(jì)算(一個(gè)系統(tǒng)保持穩(wěn)定,比起多個(gè)系統(tǒng)同時(shí)穩(wěn)定,概率上更高點(diǎn));其次,kafka 到 kafka 的實(shí)時(shí)流,exactly-once 語(yǔ)義是比較成熟的,一致性上有保證。
然后,上述的端到端其實(shí)是由割裂的三個(gè)步驟來(lái)完成的,每一步可能需要由不同角色人去負(fù)責(zé)處理:數(shù)據(jù)處理需要數(shù)據(jù)開(kāi)發(fā)人員,數(shù)據(jù)導(dǎo)入需要引擎開(kāi)發(fā)人員,數(shù)據(jù)資產(chǎn)化需要產(chǎn)品開(kāi)發(fā)人員。
我們的平臺(tái)能否把端到端給自動(dòng)化起來(lái),只需要一次 SQL 提交就能打通處理、導(dǎo)入、資產(chǎn)化這三步?在這個(gè)思路下,數(shù)據(jù)開(kāi)發(fā)中看到的不再是 Kafka Table,而應(yīng)該是面向場(chǎng)景的展示表 / 標(biāo)簽表 / 接口表。比如對(duì)于展示表,創(chuàng)建表的時(shí)候只要指定維度、指標(biāo)等字段,平臺(tái)會(huì)將實(shí)時(shí)流結(jié)果數(shù)據(jù)從 Kafka 自動(dòng)導(dǎo)入 Druid,再在報(bào)表系統(tǒng)自動(dòng)導(dǎo)入 Druid 數(shù)據(jù)源,甚至自動(dòng)生成報(bào)表模板。
4.2. 實(shí)時(shí)流的血緣分析
關(guān)于血緣分析,做過(guò)離線(xiàn)數(shù)倉(cāng)的朋友都很清楚它的重要性,它在數(shù)據(jù)治理中都起著不可或缺的關(guān)鍵作用。對(duì)于實(shí)時(shí)數(shù)倉(cāng)來(lái)說(shuō)也莫不如此。我們希望構(gòu)建端到端的血緣關(guān)系,從采集系統(tǒng)的接入通道開(kāi)始,到中間流經(jīng)的實(shí)時(shí)表與實(shí)時(shí)作業(yè),再到消費(fèi)數(shù)據(jù)的產(chǎn)品,都能很清晰地展現(xiàn)出來(lái)?;谘夑P(guān)系的分析,我們才能評(píng)估數(shù)據(jù)的應(yīng)用價(jià)值,核算數(shù)據(jù)的計(jì)算成本。
4.3. 離線(xiàn) - 實(shí)時(shí)數(shù)倉(cāng)一體化
最后提一個(gè)方向是離線(xiàn)實(shí)時(shí)數(shù)倉(cāng)的一體化。我們認(rèn)為短期內(nèi),實(shí)時(shí)數(shù)倉(cāng)無(wú)法替代離線(xiàn)數(shù)倉(cāng),兩者并存是新常態(tài)。在離線(xiàn)數(shù)倉(cāng)時(shí)代,我們積累的工具體系,如何去適配實(shí)時(shí)數(shù)倉(cāng),如何實(shí)現(xiàn)離線(xiàn)與實(shí)時(shí)數(shù)倉(cāng)的一體化管理?理論上來(lái)講,它們的數(shù)據(jù)來(lái)源是一致的,上層抽象也都是 Table 與 SQL,但本質(zhì)上也有不同的點(diǎn),比如時(shí)間粒度以及計(jì)算模式。對(duì)于數(shù)據(jù)工具與產(chǎn)品來(lái)說(shuō),需要做哪些改造來(lái)實(shí)現(xiàn)完全的一體化,這也是我們?cè)谔剿骱退伎嫉摹?/p>
更多資訊請(qǐng)?jiān)L問(wèn) Apache Flink 中文社區(qū)網(wǎng)站
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/36025.html
摘要:模塊中的類(lèi)結(jié)構(gòu)如下博客從到學(xué)習(xí)介紹從到學(xué)習(xí)上搭建環(huán)境并構(gòu)建運(yùn)行簡(jiǎn)單程序入門(mén)從到學(xué)習(xí)配置文件詳解從到學(xué)習(xí)介紹從到學(xué)習(xí)如何自定義從到學(xué)習(xí)介紹從到學(xué)習(xí)如何自定義從到學(xué)習(xí)轉(zhuǎn)換從到學(xué)習(xí)介紹中的從到學(xué)習(xí)中的幾種詳解從到學(xué)習(xí)讀取數(shù)據(jù)寫(xiě)入到從到學(xué) Flink-Client 模塊中的類(lèi)結(jié)構(gòu)如下: https://t.zsxq.com/IMzNZjY showImg(https://segmentfau...
摘要:模塊中的類(lèi)結(jié)構(gòu)如下博客從到學(xué)習(xí)介紹從到學(xué)習(xí)上搭建環(huán)境并構(gòu)建運(yùn)行簡(jiǎn)單程序入門(mén)從到學(xué)習(xí)配置文件詳解從到學(xué)習(xí)介紹從到學(xué)習(xí)如何自定義從到學(xué)習(xí)介紹從到學(xué)習(xí)如何自定義從到學(xué)習(xí)轉(zhuǎn)換從到學(xué)習(xí)介紹中的從到學(xué)習(xí)中的幾種詳解從到學(xué)習(xí)讀取數(shù)據(jù)寫(xiě)入到從到學(xué) Flink-Annotations 模塊中的類(lèi)結(jié)構(gòu)如下: https://t.zsxq.com/f6eAu3J showImg(https://segme...
摘要:博客從到學(xué)習(xí)介紹從到學(xué)習(xí)上搭建環(huán)境并構(gòu)建運(yùn)行簡(jiǎn)單程序入門(mén)從到學(xué)習(xí)配置文件詳解從到學(xué)習(xí)介紹從到學(xué)習(xí)如何自定義從到學(xué)習(xí)介紹從到學(xué)習(xí)如何自定義從到學(xué)習(xí)轉(zhuǎn)換從到學(xué)習(xí)介紹中的從到學(xué)習(xí)中的幾種詳解從到學(xué)習(xí)讀取數(shù)據(jù)寫(xiě)入到從到學(xué)習(xí)項(xiàng)目如何運(yùn)行從到學(xué) https://t.zsxq.com/UnA2jIi 博客 1、Flink 從0到1學(xué)習(xí) —— Apache Flink 介紹 2、Flink 從0到1學(xué)...
摘要:博客從到學(xué)習(xí)介紹從到學(xué)習(xí)上搭建環(huán)境并構(gòu)建運(yùn)行簡(jiǎn)單程序入門(mén)從到學(xué)習(xí)配置文件詳解從到學(xué)習(xí)介紹從到學(xué)習(xí)如何自定義從到學(xué)習(xí)介紹從到學(xué)習(xí)如何自定義從到學(xué)習(xí)轉(zhuǎn)換從到學(xué)習(xí)介紹中的從到學(xué)習(xí)中的幾種詳解從到學(xué)習(xí)讀取數(shù)據(jù)寫(xiě)入到從到學(xué)習(xí)項(xiàng)目如何運(yùn)行從到學(xué) JobGraph https://t.zsxq.com/naaMf6y 博客 1、Flink 從0到1學(xué)習(xí) —— Apache Flink 介紹 2、Fl...
摘要:處理博客從到學(xué)習(xí)介紹從到學(xué)習(xí)上搭建環(huán)境并構(gòu)建運(yùn)行簡(jiǎn)單程序入門(mén)從到學(xué)習(xí)配置文件詳解從到學(xué)習(xí)介紹從到學(xué)習(xí)如何自定義從到學(xué)習(xí)介紹從到學(xué)習(xí)如何自定義從到學(xué)習(xí)轉(zhuǎn)換從到學(xué)習(xí)介紹中的從到學(xué)習(xí)中的幾種詳解從到學(xué)習(xí)讀取數(shù)據(jù)寫(xiě)入到從到學(xué)習(xí)項(xiàng)目如何運(yùn)行從 JobManager 處理 SubmitJobhttps://t.zsxq.com/3JQJMzZ 博客 1、Flink 從0到1學(xué)習(xí) —— Apache...
閱讀 1294·2023-04-26 03:05
閱讀 740·2021-10-19 11:43
閱讀 2995·2021-09-26 09:55
閱讀 792·2019-08-30 15:56
閱讀 961·2019-08-30 15:44
閱讀 1191·2019-08-30 15:44
閱讀 2683·2019-08-30 14:23
閱讀 3213·2019-08-30 13:13