之前項(xiàng)目一直用的Flink-1.72版本,大多數(shù)用的流api進(jìn)行開(kāi)發(fā)的需求,現(xiàn)在掃描漏洞的時(shí)候必須升級(jí)到Flink-1.12.0或Flink-1.11.3,所以直接升級(jí)到Flink-1.12.0,發(fā)現(xiàn)之前用的api(assignTimestampsAndWatermarks)被設(shè)置為廢棄了。
先來(lái)看看項(xiàng)目之前用的:
后來(lái)查資料發(fā)現(xiàn)Flink在1.11版本中為了實(shí)現(xiàn)水印的通用以及方便,對(duì)水印進(jìn)行了重構(gòu)。
新版本的Flink在類classDataStream
WatermarkStrategy接口繼承了接口TimestampAssignerSupplier
先看一下interfaceTimestampAssignerSupplier
是創(chuàng)建一個(gè)TimestampAssigner
有一個(gè)longextractTimestamp方法,作用是從Flink消費(fèi)的記錄中抽取時(shí)間,既可以理解為我們?nèi)绻ㄟ^(guò)業(yè)務(wù)時(shí)間進(jìn)行統(tǒng)計(jì)時(shí),需要通過(guò)該方法對(duì)來(lái)提取記錄的業(yè)務(wù)時(shí)間。所以用到業(yè)務(wù)時(shí)間的話,一定要根據(jù)自己的業(yè)務(wù)場(chǎng)景對(duì)該方法進(jìn)行具體的實(shí)現(xiàn)。否則Flink會(huì)提供一個(gè)默認(rèn)的實(shí)現(xiàn)RecordTimestampAssigner<>()
而默認(rèn)實(shí)現(xiàn)的內(nèi)容也十分簡(jiǎn)單,一起看一下,必須是記錄中已經(jīng)注冊(cè)了時(shí)間屬性。
接下來(lái)interfaceWatermarkGeneratorSupplier
是返回一個(gè)WatermarkGenerator
提供了兩個(gè)水印發(fā)送的方式,接下來(lái)對(duì)這兩個(gè)方式進(jìn)行說(shuō)明:
onEvent:每條記錄進(jìn)來(lái)都會(huì)調(diào)用一次這個(gè)方法,入?yún)⒂?個(gè),第一個(gè)是記錄,第二個(gè)是記錄攜帶的時(shí)間,如果注冊(cè)了時(shí)間就會(huì)有,第三個(gè)參數(shù)時(shí)水印發(fā)射器WatermarkOutputoutput,可以通過(guò)這個(gè)參數(shù)對(duì)水印進(jìn)行發(fā)射,用戶可以根據(jù)自己的業(yè)務(wù)場(chǎng)景來(lái)編寫(xiě)自己的水印生成以及發(fā)射邏輯。該方法的重點(diǎn)是每條記錄都會(huì)調(diào)用.
onPeriodicEmit: 該方法是Flink提供的一個(gè)定時(shí)器方法,每隔一段時(shí)間會(huì)調(diào)用此方法,入?yún)⑹荳atermarkOutputoutput,用戶可以通過(guò)這個(gè)方法每隔一段時(shí)間發(fā)送一次水印,當(dāng)記錄數(shù)過(guò)多時(shí),每條記錄都發(fā)送一次水印明顯不合適,也影響性能,此時(shí)可以通過(guò)這個(gè)方法進(jìn)行水印的定時(shí)發(fā)送,而onEvent只記錄當(dāng)前水印而選擇不發(fā)射出去。該方法的參數(shù)配置為env.getConfig().setAutoWatermarkInterval(300L),入?yún)⑹呛撩霐?shù),表示隔多少毫秒向下游算子發(fā)送一次水印。
而WatermarkStrategy中也提供了一些常用的WatermarkGenerator
BoundedOutOfOrdernessWatermarks
使用方法也十分的簡(jiǎn)單,提供的是一個(gè)靜態(tài)方法,只需直接調(diào)用即可
WatermarkStrategy.
最后結(jié)合項(xiàng)目的需求將原來(lái)的使用水印的地方改成如下了
類圖及FLINK水印算子簡(jiǎn)要流程
先上類圖,方便理解
接著簡(jiǎn)單介紹下流程
首先TimestampsAndWatermarksOperator算子會(huì)在open方法中初始化用戶定義的水印邏輯及方式,并且如果需要定時(shí)發(fā)送水印會(huì),注冊(cè)一個(gè)定時(shí)器觸發(fā)水印定時(shí)發(fā)送。
當(dāng)元素到達(dá)算子后會(huì)調(diào)用processElement(StreamRecord
方法很簡(jiǎn)單,如果元素已經(jīng)被注冊(cè)了時(shí)間,就直接獲取時(shí)間,或者設(shè)置為L(zhǎng)ONG.MIN_VALUE,然后根據(jù)用戶定義的timestampAssigner.extractTimestamp從記錄中抽取時(shí)間屬性,然后再將時(shí)間寫(xiě)入元素中,最后調(diào)用用戶定義的watermarkGenerator.onEvent方法,根據(jù)用戶的邏輯選擇刷新水印以及是否發(fā)射水印。
上面初始化中提到了,如果需要定時(shí)發(fā)送水印,則會(huì)注冊(cè)一個(gè)定時(shí)器,而定時(shí)器的方法如下
通過(guò)onProcessingTime來(lái)觸發(fā)定時(shí)器的內(nèi)容,而內(nèi)容也十分簡(jiǎn)單,先調(diào)用用戶定義的watermarkGenerator.onPeriodicEmit方法發(fā)送水印,然后獲取當(dāng)前時(shí)間,最后注冊(cè)當(dāng)前時(shí)間加水印定時(shí)發(fā)送間隔的定時(shí)觸發(fā)器,等待下次觸發(fā)該方法。
參考資料
https://zhuanlan.zhihu.com/p/158951593
https://blog.csdn.net/zhaoyuqiang/article/details/107453466
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/130018.html
摘要:由于配置流是從關(guān)系型數(shù)據(jù)庫(kù)中讀取,速度較慢,導(dǎo)致實(shí)時(shí)數(shù)據(jù)流流入數(shù)據(jù)的時(shí)候,配置信息還未發(fā)送,這樣會(huì)導(dǎo)致有些實(shí)時(shí)數(shù)據(jù)讀取不到配置信息。從數(shù)據(jù)庫(kù)中解析出來(lái),再去統(tǒng)計(jì)近兩周占比。 Flink 學(xué)習(xí) https://github.com/zhisheng17/flink-learning 麻煩路過(guò)的各位親給這個(gè)項(xiàng)目點(diǎn)個(gè) star,太不易了,寫(xiě)了這么多,算是對(duì)我堅(jiān)持下來(lái)的一種鼓勵(lì)吧! showI...
摘要:在這種情況下,清除僅指窗口中的數(shù)據(jù)元,而不是窗口元數(shù)據(jù)。紫色圓圈表示流的數(shù)據(jù)元,這些數(shù)據(jù)元由某個(gè)鍵在這種情況下是用戶,用戶和用戶劃分。 0 相關(guān)源碼 掌握Flink中三種常用的Time處理方式,掌握Flink中滾動(dòng)窗口以及滑動(dòng)窗口的使用,了解Flink中的watermark。 Flink 在流處理工程中支持不同的時(shí)間概念。 1 處理時(shí)間(Processing time) 執(zhí)行相應(yīng)算子...
摘要:另外,將機(jī)制發(fā)揚(yáng)光大,對(duì)有著非常好的支持。系統(tǒng)也注意到并討論了和的問(wèn)題??偨Y(jié)本文分享了四本相關(guān)的書(shū)籍和一份領(lǐng)域相關(guān)的論文列表篇,涉及的設(shè)計(jì),實(shí)現(xiàn),故障恢復(fù),彈性擴(kuò)展等各方面。 前言 之前也分享了不少自己的文章,但是對(duì)于 Flink 來(lái)說(shuō),還是有不少新入門(mén)的朋友,這里給大家分享點(diǎn) Flink 相關(guān)的資料(國(guó)外數(shù)據(jù) pdf 和流處理相關(guān)的 Paper),期望可以幫你更好的理解 Flink。...
摘要:每小時(shí)窗口將包括在系統(tǒng)時(shí)鐘指示整個(gè)小時(shí)之間到達(dá)特定操作的所有事件。平行流中的水印水印是在源函數(shù)處生成的,或直接在源函數(shù)之后生成的。源函數(shù)的每個(gè)并行子任務(wù)通常獨(dú)立生成其水印。由于其輸入流更新其事件時(shí)間,因此操作員也是如此。 showImg(https://segmentfault.com/img/remote/1460000017877320?w=1280&h=857); 前言 Flin...
摘要:由于配置流是從關(guān)系型數(shù)據(jù)庫(kù)中讀取,速度較慢,導(dǎo)致實(shí)時(shí)數(shù)據(jù)流流入數(shù)據(jù)的時(shí)候,配置信息還未發(fā)送,這樣會(huì)導(dǎo)致有些實(shí)時(shí)數(shù)據(jù)讀取不到配置信息。從數(shù)據(jù)庫(kù)中解析出來(lái),再去統(tǒng)計(jì)近兩周占比。 showImg(https://segmentfault.com/img/remote/1460000019367651); Flink 學(xué)習(xí)項(xiàng)目代碼 https://github.com/zhisheng17/f...
閱讀 1356·2023-01-11 13:20
閱讀 1707·2023-01-11 13:20
閱讀 1215·2023-01-11 13:20
閱讀 1906·2023-01-11 13:20
閱讀 4165·2023-01-11 13:20
閱讀 2757·2023-01-11 13:20
閱讀 1402·2023-01-11 13:20
閱讀 3671·2023-01-11 13:20