成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

Flink 在有贊實(shí)時(shí)計(jì)算的實(shí)踐

fish / 2704人閱讀

摘要:第三個(gè)就是比較重點(diǎn)的內(nèi)容,在有贊的實(shí)踐。第四部分是將實(shí)時(shí)計(jì)算化,界面化的一些實(shí)踐。二有贊實(shí)時(shí)平臺(tái)架構(gòu)有贊的實(shí)時(shí)平臺(tái)架構(gòu)呢有幾個(gè)主要的組成部分。實(shí)時(shí)平臺(tái)提供了集群管理,項(xiàng)目管理,任務(wù)管理和報(bào)警監(jiān)控的功能。。

一、前言

這篇主要由五個(gè)部分來(lái)組成:

首先是有贊的實(shí)時(shí)平臺(tái)架構(gòu)。

其次是在調(diào)研階段我們?yōu)槭裁催x擇了 Flink。在這個(gè)部分,主要是 Flink 與 Spark 的 structured streaming 的一些對(duì)比和選擇 Flink 的原因。

第三個(gè)就是比較重點(diǎn)的內(nèi)容,F(xiàn)link 在有贊的實(shí)踐。這其中包括了我們?cè)谑褂?Flink 的過(guò)程中碰到的一些坑,也有一些具體的經(jīng)驗(yàn)。

第四部分是將實(shí)時(shí)計(jì)算 SQL 化,界面化的一些實(shí)踐。

最后的話就是對(duì) Flink 未來(lái)的一些展望。這塊可以分為兩個(gè)部分,一部分是我們公司接下來(lái)會(huì)怎么去更深入的使用 Flink,另一部分就是 Flink 以后可能會(huì)有的的一些新的特性。


二、有贊實(shí)時(shí)平臺(tái)架構(gòu)

有贊的實(shí)時(shí)平臺(tái)架構(gòu)呢有幾個(gè)主要的組成部分。

首先,對(duì)于實(shí)時(shí)數(shù)據(jù)來(lái)說(shuō),一個(gè)消息中間件肯定是必不可少的。在有贊呢,除了業(yè)界常用的 Kafka 以外,還有 NSQ。與 Kafka 有別的是,NSQ 是使用 Go 開(kāi)發(fā)的,所以公司封了一層 Java 的客戶端是通過(guò) push 和 ack 的模式去保證消息至少投遞一次,所以 Connector 也會(huì)有比較大的差距,尤其是實(shí)現(xiàn)容錯(cuò)的部分。在實(shí)現(xiàn)的過(guò)程中呢,參考了 Flink 官方提供的 Rabbit MQ 的連接器,結(jié)合 NSQ client 的特性做了一些改造。

接下來(lái)就是計(jì)算引擎了,最古老的就是 Storm 了,現(xiàn)在依然還有一些任務(wù)在 Storm 上面跑,至于新的任務(wù)基本已經(jīng)不會(huì)基于它來(lái)開(kāi)發(fā)了,因?yàn)槌碎_(kāi)發(fā)成本高以外,語(yǔ)義的支持,SQL 的支持包括狀態(tài)管理的支持都做得不太好,吞吐量還比較低,將 Storm 的任務(wù)遷移到 Flink 上也是我們接下來(lái)的任務(wù)之一。還有呢就是 Spark Streaming 了,相對(duì)來(lái)說(shuō) Spark 有一個(gè)比較好的生態(tài),但是 Spark Streaming 是微批處理的,這給它帶來(lái)了很多限制,除了延遲高以外還會(huì)比較依賴(lài)外部存儲(chǔ)來(lái)保存中間狀態(tài)。 Flink 在有贊是比較新的引擎,為什么在有了 Spark 和 Storm 的情況下我們還要引入 Flink 呢,下一個(gè)部分我會(huì)提到。

存儲(chǔ)引擎,除了傳統(tǒng)的 MySQL 以外,我們還使用 HBase ,ES 和 ZanKV。ZanKV 是我們公司開(kāi)發(fā)的一個(gè)兼容 Redis 協(xié)議的分布式 KV 數(shù)據(jù)庫(kù),所以姑且就把它當(dāng)成 Redis 來(lái)理解好了。

實(shí)時(shí) OLAP 引擎的話基于 Druid,在多維的統(tǒng)計(jì)上面有非常好的應(yīng)用。

最后是我們的實(shí)時(shí)平臺(tái)。實(shí)時(shí)平臺(tái)提供了集群管理,項(xiàng)目管理,任務(wù)管理和報(bào)警監(jiān)控的功能。。

關(guān)于實(shí)時(shí)平臺(tái)的架構(gòu)就簡(jiǎn)單介紹到這里,接下來(lái)是 Flink 在有贊的探索階段。在這個(gè)部分,我主要會(huì)對(duì)比的 Spark Structured Streaming。


三、為什么選擇引入 Flink?

至于為什么和 Spark Structured Streaming(SSS) 進(jìn)行對(duì)比呢?因?yàn)檫@是實(shí)時(shí)SQL化這個(gè)大背景下比較有代表性的兩個(gè)引擎。

首先是性能上,從幾個(gè)角度來(lái)比較一下。首先是延遲,毫無(wú)疑問(wèn),F(xiàn)link 作為一個(gè)流式引擎是優(yōu)于 SSS 的微批引擎的。雖然說(shuō) Spark 也引入了一個(gè)連續(xù)的計(jì)算引擎,但是不管從語(yǔ)義的保證上,還是從成熟度上,都是不如 Flink 的。據(jù)我所知,他們是通過(guò)將 rdd 長(zhǎng)期分配到一個(gè)結(jié)點(diǎn)上來(lái)實(shí)現(xiàn)的。

其次比較直觀的指標(biāo)就是吞吐了,這一點(diǎn)在某些場(chǎng)景下 Flink 略遜于 Spark 。但是當(dāng)涉及到中間狀態(tài)比較大的任務(wù)呢,F(xiàn)link 基于 RocksDB 的狀態(tài)管理就顯示出了它的優(yōu)勢(shì)。
?
Flink 在中間狀態(tài)的管理上可以使用純內(nèi)存,也可以使用 RocksDB 。至于 RocksDB ,簡(jiǎn)單點(diǎn)理解的話就是一個(gè)帶緩存的嵌入式數(shù)據(jù)庫(kù)。借助持久化到磁盤(pán)的能力,F(xiàn)link 相比 SSS 來(lái)說(shuō)可以保存的狀態(tài)量大得多,并且不容易OOM。并且在做 checkpoint 中選用了增量模式,應(yīng)該是只需要備份與上一次 checkpoint 時(shí)不同的 sst 文件。使用過(guò)程中,發(fā)現(xiàn) RocksDB 作為狀態(tài)管理性能也是可以滿足我們需求的。

聊完性能,接下來(lái)就說(shuō)一說(shuō) SQL 化,這也是現(xiàn)在的一個(gè)大方向吧。我在開(kāi)始嘗試 SSS 的時(shí)候,嘗試了一個(gè) SQL 語(yǔ)句中有多個(gè)聚合操作,但是卻拋了異常。 后面仔細(xì)看了文檔,發(fā)現(xiàn)確實(shí)這在 SSS 中是不支持的。第二個(gè)是 distinct 也是不支持的。這兩點(diǎn) Flink 是遠(yuǎn)優(yōu)于 SSS 的。所以從實(shí)時(shí) SQL 的角度,F(xiàn)link 又為自己贏得了一票。除此之外,F(xiàn)link 有更靈活的窗口。還有輸出的話,同樣參考的是 DataFlow 模型,F(xiàn)link 實(shí)現(xiàn)支持刪除并更新的操作,SSS 僅支持更新的操作。(這邊 SSS 是基于 Spark 的 2.3版本)

API 的靈活性。在 SSS 中,誠(chéng)然 table 帶來(lái)了比較大的方便,但是對(duì)于有一些操作依然會(huì)想通過(guò) DStream 或者 rdd 的形式來(lái)操作,但是 SSS 并沒(méi)有提供這樣的轉(zhuǎn)換,只能編寫(xiě)一些 UDF。但是在 Flink 中,Table 和 DataStream 可以靈活地互相轉(zhuǎn)換,以應(yīng)對(duì)更復(fù)雜的場(chǎng)景。


四、Flink在有贊的實(shí)踐

在真正開(kāi)始使用 Flink 之前呢,第一個(gè)要考慮的就是部署的問(wèn)題。因?yàn)楝F(xiàn)有的技術(shù)棧,所以選擇了部署在 Yarn 上,并且使用的是 Single Job 的模式,雖然會(huì)有更多的 ApplicationMaster,但無(wú)疑是增加了隔離性的。

4.1 問(wèn)題一: FLINK-9567

在開(kāi)始部署的時(shí)候我遇到了一個(gè)比較奇怪的問(wèn)題。先講一下背景吧,因?yàn)檫€處于調(diào)研階段,所以使用的是 Yarn 的默認(rèn)隊(duì)列,優(yōu)先級(jí)比較低,在資源緊張的時(shí)候也容易被搶占。
有一個(gè)上午,我起了一個(gè)任務(wù),申請(qǐng)了5個(gè) Container 來(lái)運(yùn)行 TaskExecutor ,一個(gè)比較簡(jiǎn)單地帶狀態(tài)的流式任務(wù),想多跑一段時(shí)間看看穩(wěn)定不穩(wěn)定。這個(gè) Flink 任務(wù)最后占了100多個(gè) container,還在不停增加,但是只有五個(gè) Container 在工作,其他的 container 都注冊(cè)了 slot,并且 slot 都處于閑置的狀態(tài)。以下兩張圖分別代表正常狀態(tài)下的任務(wù),和出問(wèn)題的任務(wù)。

出錯(cuò)后

在涉及到這個(gè)問(wèn)題細(xì)節(jié)之前,我先介紹一下 Flink 是如何和 Yarn 整合到一塊的。根據(jù)下圖,我們從下往上一個(gè)一個(gè)介紹這些組件是做什么的。

TaskExecutor 是實(shí)際任務(wù)的執(zhí)行者,它可能有多個(gè)槽位,每個(gè)槽位執(zhí)行一個(gè)具體的子任務(wù)。每個(gè) TaskExecutor 會(huì)將自己的槽位注冊(cè)到 SlotManager 上,并匯報(bào)自己的狀態(tài),是忙碌狀態(tài),還是處于一個(gè)閑置的狀態(tài)。

SlotManager 既是 Slot 的管理者,也負(fù)責(zé)給正在運(yùn)行的任務(wù)提供符合需求的槽位。還記錄了當(dāng)前積壓的槽位申請(qǐng)。當(dāng)槽位不夠的時(shí)候向Flink的ResourceManager申請(qǐng)容器。

Pending slots 積壓的 Slot 申請(qǐng)及計(jì)數(shù)器

Flink 的 ResourceManager 則負(fù)責(zé)了與 Yarn 的 ResourceManager 進(jìn)行交互,進(jìn)行一系列例如申請(qǐng)容器,啟動(dòng)容器,處理容器的退出等等操作。因?yàn)椴捎玫氖钱惒缴暾?qǐng)的方式,所以還需要記錄當(dāng)前積壓的容器申請(qǐng),防止接收過(guò)多容器。

Pending container request 積壓容器的計(jì)數(shù)器

AMRMClient 是異步申請(qǐng)的執(zhí)行者,CallbackHandler 則在接收到容器和容器退出的時(shí)候通知 Flink 的 ResourceManager。

Yarn 的 ResourceManager 則像是一個(gè)資源的分發(fā)器,負(fù)責(zé)接收容器請(qǐng)求,并為 Client 準(zhǔn)備好容器。

這邊一下子引入的概念有點(diǎn)多,下面我用一個(gè)簡(jiǎn)單地例子來(lái)描述一下這些組件在運(yùn)行中起到的角色。

首先,我們的配置是3個(gè) TaskManager,每個(gè) TaskManager 有兩個(gè) Slot,也就是總共需要6個(gè)槽位。當(dāng)前已經(jīng)擁有了4個(gè)槽位,任務(wù)的調(diào)度器向 Slot 申請(qǐng)還需要兩個(gè)槽位來(lái)運(yùn)行子任務(wù)。

這時(shí) SlotManager 發(fā)現(xiàn)所有的槽位都已經(jīng)被占用了,所以它將這個(gè) slot 的 request 放入了 pending slots 當(dāng)中。所以可以看到 pending slots 的那個(gè)計(jì)數(shù)器從剛才的0跳轉(zhuǎn)到了現(xiàn)在的2. 之后 SlotManager 就向 Flink 的 ResourceManager 申請(qǐng)一個(gè)新的 TaskExecutor,正好就可以滿足這兩個(gè)槽位的需求。于是 Flink 的 ResourceManager 將 pending container request 加1,并通過(guò) AMRM Client 去向 Yarn 申請(qǐng)資源。

當(dāng) Yarn 將相應(yīng)的 Container 準(zhǔn)備好以后,通過(guò) CallbackHandler 去通知 Flink 的 ResourceManager。Flink 就會(huì)根據(jù)在每一個(gè)收到的 container 中啟動(dòng)一個(gè) TaskExecutor ,并且將 pending container request 減1,當(dāng) pending container request 變?yōu)?之后,即使收到新的 container 也會(huì)馬上退回。

當(dāng) TaskExecutor 啟動(dòng)之后,會(huì)向 SlotManager 注冊(cè)自己的兩個(gè) Slot 可用,SlotManager 便會(huì)將兩個(gè)積壓的 SlotRequest 完成,通知調(diào)度器這兩個(gè)子任務(wù)可以到這個(gè)新的 TaskExecutor 上執(zhí)行,并且 pending requests 也被置為0. 到這兒一切都符合預(yù)期。

那這個(gè)超發(fā)的問(wèn)題又是如何出現(xiàn)的呢?首先我們看一看這就是剛剛那個(gè)正常運(yùn)行的任務(wù)。它占用了6個(gè) Slot。

如果在這個(gè)時(shí)候,出現(xiàn)了一些原因?qū)е铝?TaskExecutor 非正常退出,比如說(shuō) Yarn 將資源給搶占了。這時(shí) Yarn 就會(huì)通知 Flink 的 ResourceManager 這三個(gè) Container 已經(jīng)異常退出。所以 Flink 的 ResourceManager 會(huì)立即申請(qǐng)三個(gè)新的 container。在這兒會(huì)討論的是一個(gè) worst case,因?yàn)檫@個(gè)問(wèn)題其實(shí)也不是穩(wěn)定復(fù)現(xiàn)的。

CallbackHandler 兩次接收到回調(diào)發(fā)現(xiàn) Container 是異常退出,所以立即申請(qǐng)新的 Container,pending container requests 也被置為了3.

如果在這時(shí),任務(wù)重啟,調(diào)度器會(huì)向 SlotManager 申請(qǐng)6個(gè) Slot,SlotManager 中也沒(méi)有可用 Slot,就會(huì)向 Flink 的 ResourceManager 申請(qǐng)3個(gè) Container,這時(shí) pending container requests 變?yōu)榱?.

最后呢結(jié)果就如圖所示,起了6個(gè) TaskExecutor,總共12個(gè) Slot,但是只有6個(gè)是被正常使用的,還有6個(gè)一直處于閑置的狀態(tài)。

在修復(fù)這個(gè)問(wèn)題的過(guò)程中,我有兩次嘗試。第一次嘗試,在 Container 異常退出以后,我不去立即申請(qǐng)新的 container。但是問(wèn)題在于,如果 Container 在啟動(dòng) TaskExecutor 的過(guò)程中出錯(cuò),那么失去了這種補(bǔ)償?shù)臋C(jī)制,有些 Slot Request 會(huì)被一直積壓,因?yàn)?SlotManager 已經(jīng)為它們申請(qǐng)了 Container。
?
第二次嘗試是在 Flink 的 ResourceManager 申請(qǐng)新的 container 之前先去檢查 pending slots,如果當(dāng)前的積壓 slots 已經(jīng)可以被積壓的 container 給滿足,那就沒(méi)有必要申請(qǐng)新的 container 了。

4.2 問(wèn)題二: 監(jiān)控

我們使用過(guò)程中踩到的第二個(gè)坑,其實(shí)是跟延遲監(jiān)控相關(guān)的。例子是一個(gè)很簡(jiǎn)單的任務(wù),兩個(gè) source,兩個(gè)除了 source 之外的 operator,并行度都是2. 每個(gè) source 和 operator 它都有兩個(gè)子任務(wù)。

任務(wù)的邏輯是很簡(jiǎn)單,但是呢當(dāng)我們打開(kāi)延時(shí)監(jiān)控。即使是這么簡(jiǎn)單的一個(gè)任務(wù),它會(huì)記錄每一個(gè) source 的子任務(wù)到每一個(gè)算子的子任務(wù)的延遲數(shù)據(jù)。這個(gè)延遲數(shù)據(jù)里還包含了平均延遲,最大延遲,百分之99的延遲等等等等。那我們可以得出一個(gè)公式,延遲數(shù)據(jù)的數(shù)量是 source 的子任務(wù)數(shù)量乘以的 source 的數(shù)量乘以算子的并行度乘以算子的數(shù)量。N = n(subtasks per source) n(sources) n(subtasks per operator) * n(operator)

這邊我做一個(gè)比較簡(jiǎn)單地假設(shè),那就是 source 的子任務(wù)數(shù)量和算則的子任務(wù)數(shù)量都是 p - 并行度。從下面這個(gè)公式我們可以看出,監(jiān)控的數(shù)量隨著并行度的上升呈平方增長(zhǎng)。N = p^2 n(sources) n(operator)

如果我們把上個(gè)任務(wù)提升到10個(gè)并行度,那么就會(huì)收到400份的延遲數(shù)據(jù)。這可能看起來(lái)還沒(méi)有太大的問(wèn)題,這貌似并不影響組件的正常運(yùn)行。

但是,在 Flink 的 dev mailing list 當(dāng)中,有一個(gè)用戶反饋在開(kāi)啟了延遲監(jiān)控之后,JobMaster 很快就會(huì)掛掉。他收到了24000+的監(jiān)控?cái)?shù)據(jù),并且包含這些數(shù)據(jù)的 ConcurrentHashMap 在內(nèi)存中占用了1.6 G 的內(nèi)存。常規(guī)情況 Flink 的 JobMaster 時(shí)會(huì)給到多少內(nèi)存,我一般會(huì)配1-2 g,最后會(huì)導(dǎo)致長(zhǎng)期 FullGC 和 OOM 的情況。

那怎么去解決這個(gè)問(wèn)題呢?當(dāng)延遲監(jiān)控已經(jīng)開(kāi)始影響到系統(tǒng)的正常工作的時(shí)候,最簡(jiǎn)單的辦法就是把它給關(guān)掉??墒前蜒訒r(shí)監(jiān)控關(guān)掉,一方面我們無(wú)法得知當(dāng)前任務(wù)的延時(shí),另一方面,又沒(méi)有辦法去針對(duì)延時(shí)做一些報(bào)警的功能。
?
所以另一個(gè)解決方案就如下。首先是 Flink-10243,它提供了更多的延遲監(jiān)控粒度的選項(xiàng),從源頭上減少數(shù)量。比如說(shuō)我們使用了 Single 模式去采集這些數(shù)據(jù),那它只會(huì)記錄每個(gè) operator 的子任務(wù)的延遲,忽略是從哪個(gè) source 或是 source 的子任務(wù)中來(lái)。這樣就可以得出這樣一個(gè)公式,也能將之前我們提到的十個(gè)并行度的任務(wù)產(chǎn)生的400個(gè)延時(shí)監(jiān)控降低到了40個(gè)。這個(gè)功能發(fā)布在了1.7.0中,并且 backport 回了1.5.5和1.6.2.
?
此外,F(xiàn)link-10246 提出了改進(jìn) MetricQueryService。它包含了幾個(gè)子任務(wù),前三個(gè)子任務(wù)為監(jiān)控服務(wù)建立了一個(gè)專(zhuān)有的低優(yōu)先級(jí)的 ActorSystem,在這里可以簡(jiǎn)單的理解為一個(gè)獨(dú)立的線程池提供低優(yōu)先級(jí)的線程去處理相關(guān)任務(wù)。它的目的也是為了防止監(jiān)控任務(wù)影響到主要的組件。這個(gè)功能發(fā)布在了1.7.0中。
?
還有一個(gè)就是 Flink-10252,它還依舊處于 review 和改進(jìn)當(dāng)中,目的是為了控制監(jiān)控消息的大小。

? 4.3 具體實(shí)踐一

接下來(lái)會(huì)談一下 Flink 在有贊的一些具體應(yīng)用。
?
首先是 Flink 結(jié)合 Spring。為什么要將這兩者做結(jié)合呢,首先在有贊有很多服務(wù)都只暴露了 Dubbo 的接口,而用戶往往都是通過(guò) Spring 去獲取這個(gè)服務(wù)的 client,在實(shí)時(shí)計(jì)算的一些應(yīng)用中也是如此。
?
另外,有不少數(shù)據(jù)應(yīng)用的開(kāi)發(fā)也是 Java 工程師,他們希望能在 Flink 中使用 Spring 以及生態(tài)中的一些組件去簡(jiǎn)化他們的開(kāi)發(fā)。用戶的需求肯定得得到滿足。接下來(lái)我會(huì)講一些錯(cuò)誤的典型,以及最后是怎么去使用的。

第一個(gè)錯(cuò)誤的典型就是在 Flink 的用戶代碼中啟動(dòng)一個(gè) Spring 環(huán)境,然后在算子中取調(diào)用相關(guān)的 bean。但是事實(shí)上,最后這個(gè) Spring Context 是啟動(dòng)在 client 端的,也就是提交任務(wù)的這一端,在圖中有一個(gè)紅色的方框中間寫(xiě)著 Spring Context 表示了它啟動(dòng)的位置??墒怯脩粼趯?shí)際調(diào)用時(shí)確實(shí)在 TaskManager 的 TaskSlot 中,它們都處在不同的 jvm,這明顯是不合理的。所以呢我們又遇到了第二個(gè)錯(cuò)誤。

第二個(gè)錯(cuò)誤比第一個(gè)錯(cuò)誤看起來(lái)要好多了,我們?cè)谒阕又惺褂昧?RichFunction,并且在 open 方法中通過(guò)配置文件獲取了一個(gè) Spring Context。但是先不說(shuō)一個(gè) TaskManager 中啟動(dòng)幾個(gè) Spring Context 是不是浪費(fèi),一個(gè) Jvm 中啟動(dòng)兩個(gè) Spring Context 就會(huì)出問(wèn)題。可能有用戶就覺(jué)得,那還不簡(jiǎn)單,把 TaskSlot 設(shè)為1不就行了。可是還有 OperatorChain 這個(gè)機(jī)制將幾個(gè)窄依賴(lài)的算子綁定到一塊運(yùn)行在一個(gè) TaskSlot 中。那我們關(guān)閉 OperatorChain 不就行了?還是不行,F(xiàn)link可能會(huì)做基于 CoLocationGroup 的優(yōu)化,將多個(gè) subtask 放到一個(gè) TaskSlot 中輪番執(zhí)行。

但其實(shí)最后的解決方案還是比較容易的,無(wú)非是使用單例模式來(lái)封裝 SpringContext,確保每個(gè)jvm中只有一個(gè),在算子函數(shù)的 open 方法中通過(guò)這個(gè)單例來(lái)獲取相應(yīng)的 Bean。

可是在調(diào)用 Dubbo 服務(wù)的時(shí)候,一次響應(yīng)往往最少也要在10 ms 以上。一個(gè) TaskSlot 最大的吞吐也就在一千,可以說(shuō)對(duì)性能是大大的浪費(fèi)。那么解決這個(gè)問(wèn)題的話可以通過(guò)異步和緩存,對(duì)于多次返回同一個(gè)值的調(diào)用可以使用緩存,提升吞吐我們可以使用異步。

4.4 具體實(shí)踐二

可是如果想同時(shí)使用異步和緩存呢?剛開(kāi)始我覺(jué)得這是一個(gè)挺容易實(shí)現(xiàn)的功能,但在實(shí)際寫(xiě) RichAsyncFunction 的時(shí)候我發(fā)現(xiàn)并沒(méi)有辦法使用 Flink 托管的 KeyedState。所以最初想到的方法就是做一個(gè)類(lèi)似 LRU 的 Cache 去緩存數(shù)據(jù)。但是這完全不能借助到 Flink 的狀態(tài)管理的優(yōu)勢(shì)。所以我研究了一下實(shí)現(xiàn)。

為什么不支持呢?

當(dāng)一條記錄進(jìn)入算子的時(shí)候,F(xiàn)link 會(huì)先將 key 提取出來(lái)并將 KeyedState 指向與這個(gè) key 關(guān)聯(lián)的存儲(chǔ)空間,圖上就指向了 key4 相關(guān)的存儲(chǔ)空間。但是如果此時(shí) key1 關(guān)聯(lián)的異步操作完成了,希望把內(nèi)容緩存起來(lái),會(huì)將內(nèi)容寫(xiě)入到 key4 綁定的存儲(chǔ)空間。當(dāng)下一次 key1 相關(guān)的記錄進(jìn)入算子時(shí),回去 key1 關(guān)聯(lián)的存儲(chǔ)空間查找,可是根本找不到數(shù)據(jù),只好再次請(qǐng)求。

所以解決的方法是定制一個(gè)算子,每條記錄進(jìn)入系統(tǒng),都讓它指向同一個(gè)公用 key 的存儲(chǔ)空間。在這個(gè)空間使用 MapState 來(lái)做緩存。最后算子運(yùn)行的 function 繼承 AbstractRichFunction 在 open 方法中來(lái)獲取 KeyedState,實(shí)現(xiàn) AsyncFunction 接口來(lái)做異步操作。


五、實(shí)時(shí)計(jì)算 SQL 化與界面化

最早我們使用 SDK 的方式來(lái)簡(jiǎn)化 SQL 實(shí)時(shí)任務(wù)的開(kāi)發(fā),但是這對(duì)用戶來(lái)說(shuō)也不算非常友好,所以現(xiàn)在講 SQL 實(shí)時(shí)任務(wù)界面化,用 Flink 作為底層引擎去執(zhí)行這些任務(wù)。

在做 SQL 實(shí)時(shí)任務(wù)時(shí),首先是外部系統(tǒng)的抽象,將數(shù)據(jù)源和數(shù)據(jù)池抽象為流資源,用戶將它們數(shù)據(jù)的 Schema 信息和元信息注冊(cè)到平臺(tái)中,平臺(tái)根據(jù)用戶所在的項(xiàng)目組管理讀寫(xiě)的權(quán)限。在這里消息源的格式如果能做到統(tǒng)一能降低很多復(fù)雜度。比如在有贊,想要接入的用戶必須保證是 Json 格式的消息,通過(guò)一條樣例消息可以直接生成 Schema 信息。

接下來(lái)是根據(jù)用戶選擇的數(shù)據(jù)源和數(shù)據(jù)池,獲取相應(yīng)的 Schema 信息和元信息,在 Flink 任務(wù)中注冊(cè)相應(yīng)的外部系統(tǒng) Table 連接器,再執(zhí)行相應(yīng)的 SQL 語(yǔ)句。

在 SQL 語(yǔ)義不支持的功能上盡量使用 UDF 的方式來(lái)拓展。

有數(shù)據(jù)源和數(shù)據(jù)池之間的元信息,還可以獲取實(shí)時(shí)任務(wù)之間可能存在的依賴(lài)關(guān)系,并且能做到整個(gè)鏈路的監(jiān)控


六、未來(lái)與展望

Flink 的批處理和 ML 模塊的嘗試,會(huì)跟 Spark 進(jìn)行對(duì)比,分析優(yōu)劣勢(shì)。目前還處于調(diào)研階段,目前比較關(guān)注的是 Flink 和 Hive的結(jié)合,對(duì)應(yīng) FLINK-10566 這個(gè) issue。

從 Flink 的發(fā)展來(lái)講呢,我比較關(guān)注并參與接下來(lái)對(duì)于調(diào)度和資源管理的優(yōu)化?,F(xiàn)在 Flink 的調(diào)度和任務(wù)執(zhí)行圖是耦合在一塊的,使用比較簡(jiǎn)單地調(diào)度機(jī)制。通過(guò)將調(diào)度器隔離出來(lái),做成可插拔式的,可以應(yīng)用更多的調(diào)度機(jī)制。此外,基于新的調(diào)度器,還可以去做更靈活的資源補(bǔ)充和減少機(jī)制,實(shí)現(xiàn) Auto Scaling。這可能在接下來(lái)的版本中會(huì)是一個(gè)重要的特性。對(duì)應(yīng) FLINK-10404 和 FLINK-10429 這兩個(gè) issue。


最后打個(gè)小廣告,有贊大數(shù)據(jù)團(tuán)隊(duì)基礎(chǔ)設(shè)施團(tuán)隊(duì),主要負(fù)責(zé)有贊的數(shù)據(jù)平臺(tái)(DP), 實(shí)時(shí)計(jì)算(Storm, Spark Streaming, Flink),離線計(jì)算(HDFS,YARN,HIVE, SPARK SQL),在線存儲(chǔ)(HBase),實(shí)時(shí) OLAP(Druid) 等數(shù)個(gè)技術(shù)產(chǎn)品,歡迎感興趣的小伙伴聯(lián)系 [email protected]

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/11438.html

相關(guān)文章

  • Flink 有贊實(shí)時(shí)計(jì)算實(shí)踐

    摘要:第三個(gè)就是比較重點(diǎn)的內(nèi)容,在有贊的實(shí)踐。第四部分是將實(shí)時(shí)計(jì)算化,界面化的一些實(shí)踐。二有贊實(shí)時(shí)平臺(tái)架構(gòu)有贊的實(shí)時(shí)平臺(tái)架構(gòu)呢有幾個(gè)主要的組成部分。實(shí)時(shí)平臺(tái)提供了集群管理,項(xiàng)目管理,任務(wù)管理和報(bào)警監(jiān)控的功能。。 一、前言 這篇主要由五個(gè)部分來(lái)組成: 首先是有贊的實(shí)時(shí)平臺(tái)架構(gòu)。 其次是在調(diào)研階段我們?yōu)槭裁催x擇了 Flink。在這個(gè)部分,主要是 Flink 與 Spark 的 structure...

    ?。琛?/span> 評(píng)論0 收藏0
  • SparkSQL 有贊實(shí)踐

    摘要:在有贊的技術(shù)演進(jìn)。業(yè)務(wù)數(shù)據(jù)量正在不斷增大,這些任務(wù)會(huì)影響業(yè)務(wù)對(duì)外服務(wù)的承諾。監(jiān)控需要收集上執(zhí)行的的審計(jì)信息,包括提交者執(zhí)行的具體,開(kāi)始結(jié)束時(shí)間,執(zhí)行完成狀態(tài)。還有一點(diǎn)是詳細(xì)介紹了的原理,實(shí)踐中設(shè)置了的比默認(rèn)的減少了以上的時(shí)間。 前言 有贊數(shù)據(jù)平臺(tái)從2017年上半年開(kāi)始,逐步使用 SparkSQL 替代 Hive 執(zhí)行離線任務(wù),目前 SparkSQL 每天的運(yùn)行作業(yè)數(shù)量5000個(gè),占離線...

    hzx 評(píng)論0 收藏0
  • SparkSQL 有贊實(shí)踐

    摘要:在有贊的技術(shù)演進(jìn)。業(yè)務(wù)數(shù)據(jù)量正在不斷增大,這些任務(wù)會(huì)影響業(yè)務(wù)對(duì)外服務(wù)的承諾。監(jiān)控需要收集上執(zhí)行的的審計(jì)信息,包括提交者執(zhí)行的具體,開(kāi)始結(jié)束時(shí)間,執(zhí)行完成狀態(tài)。還有一點(diǎn)是詳細(xì)介紹了的原理,實(shí)踐中設(shè)置了的比默認(rèn)的減少了以上的時(shí)間。 前言 有贊數(shù)據(jù)平臺(tái)從2017年上半年開(kāi)始,逐步使用 SparkSQL 替代 Hive 執(zhí)行離線任務(wù),目前 SparkSQL 每天的運(yùn)行作業(yè)數(shù)量5000個(gè),占離線...

    Xufc 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<