摘要:在每個(gè)事件上,觸發(fā)器都可以決定觸發(fā)即清除刪除窗口并丟棄其內(nèi)容,或者啟動(dòng)并清除窗口。請(qǐng)注意,指定的觸發(fā)器不會(huì)添加其他觸發(fā)條件,但會(huì)替換當(dāng)前觸發(fā)器。結(jié)論對(duì)于現(xiàn)代流處理器來說,支持連續(xù)數(shù)據(jù)流上的各種類型的窗口是必不可少的。
前言
目前有許多數(shù)據(jù)分析的場(chǎng)景從批處理到流處理的演變, 雖然可以將批處理作為流處理的特殊情況來處理,但是分析無窮集的流數(shù)據(jù)通常需要思維方式的轉(zhuǎn)變并且具有其自己的術(shù)語(例如,“windowing(窗口化)”、“at-least-once(至少一次)”、“exactly-once(只有一次)” )。
對(duì)于剛剛接觸流處理的人來說,這種轉(zhuǎn)變和新術(shù)語可能會(huì)非常混亂。 Apache Flink 是一個(gè)為生產(chǎn)環(huán)境而生的流處理器,具有易于使用的 API,可以用于定義高級(jí)流分析程序。
Flink 的 API 在數(shù)據(jù)流上具有非常靈活的窗口定義,使其在其他開源流處理框架中脫穎而出。
在這篇文章中,我們將討論用于流處理的窗口的概念,介紹 Flink 的內(nèi)置窗口,并解釋它對(duì)自定義窗口語義的支持。
什么是 Windows?下面我們結(jié)合一個(gè)現(xiàn)實(shí)的例子來說明。
就拿交通傳感器的示例:統(tǒng)計(jì)經(jīng)過某紅綠燈的汽車數(shù)量之和?
假設(shè)在一個(gè)紅綠燈處,我們每隔 15 秒統(tǒng)計(jì)一次通過此紅綠燈的汽車數(shù)量,如下圖:
可以把汽車的經(jīng)過看成一個(gè)流,無窮的流,不斷有汽車經(jīng)過此紅綠燈,因此無法統(tǒng)計(jì)總共的汽車數(shù)量。但是,我們可以換一種思路,每隔 15 秒,我們都將與上一次的結(jié)果進(jìn)行 sum 操作(滑動(dòng)聚合),如下:
這個(gè)結(jié)果似乎還是無法回答我們的問題,根本原因在于流是無界的,我們不能限制流,但可以在有一個(gè)有界的范圍內(nèi)處理無界的流數(shù)據(jù)。
因此,我們需要換一個(gè)問題的提法:每分鐘經(jīng)過某紅綠燈的汽車數(shù)量之和?
這個(gè)問題,就相當(dāng)于一個(gè)定義了一個(gè) Window(窗口),window 的界限是1分鐘,且每分鐘內(nèi)的數(shù)據(jù)互不干擾,因此也可以稱為翻滾(不重合)窗口,如下圖:
第一分鐘的數(shù)量為8,第二分鐘是22,第三分鐘是27。。。這樣,1個(gè)小時(shí)內(nèi)會(huì)有60個(gè)window。
再考慮一種情況,每30秒統(tǒng)計(jì)一次過去1分鐘的汽車數(shù)量之和:
此時(shí),window 出現(xiàn)了重合。這樣,1個(gè)小時(shí)內(nèi)會(huì)有120個(gè) window。
擴(kuò)展一下,我們可以在某個(gè)地區(qū),收集每一個(gè)紅綠燈處汽車經(jīng)過的數(shù)量,然后每個(gè)紅綠燈處都做一次基于1分鐘的window統(tǒng)計(jì),即并行處理:
它有什么作用?通常來講,Window 就是用來對(duì)一個(gè)無限的流設(shè)置一個(gè)有限的集合,在有界的數(shù)據(jù)集上進(jìn)行操作的一種機(jī)制。window 又可以分為基于時(shí)間(Time-based)的 window 以及基于數(shù)量(Count-based)的 window。
Flink 自帶的 windowFlink DataStream API 提供了 Time 和 Count 的 window,同時(shí)增加了基于 Session 的 window。同時(shí),由于某些特殊的需要,DataStream API 也提供了定制化的 window 操作,供用戶自定義 window。
下面,主要介紹 Time-Based window 以及 Count-Based window,以及自定義的 window 操作,Session-Based Window 操作將會(huì)在后續(xù)的文章中講到。
Time Windows正如命名那樣,Time Windows 根據(jù)時(shí)間來聚合流數(shù)據(jù)。例如:一分鐘的 tumbling time window 收集一分鐘的元素,并在一分鐘過后對(duì)窗口中的所有元素應(yīng)用于一個(gè)函數(shù)。
在 Flink 中定義 tumbling time windows(翻滾時(shí)間窗口) 和 sliding time windows(滑動(dòng)時(shí)間窗口) 非常簡(jiǎn)單:
tumbling time windows(翻滾時(shí)間窗口)
輸入一個(gè)時(shí)間參數(shù)
data.keyBy(1) .timeWindow(Time.minutes(1)) //tumbling time window 每分鐘統(tǒng)計(jì)一次數(shù)量和 .sum(1);
sliding time windows(滑動(dòng)時(shí)間窗口)
輸入兩個(gè)時(shí)間參數(shù)
data.keyBy(1) .timeWindow(Time.minutes(1), Time.seconds(30)) //sliding time window 每隔 30s 統(tǒng)計(jì)過去一分鐘的數(shù)量和 .sum(1);
有一點(diǎn)我們還沒有討論,即“收集一分鐘的元素”的確切含義,它可以歸結(jié)為一個(gè)問題,“流處理器如何解釋時(shí)間?”
Apache Flink 具有三個(gè)不同的時(shí)間概念,即 processing time, event time 和 ingestion time。
這里可以參考我下一篇文章:
《從0到1學(xué)習(xí)Flink》—— 介紹Flink中的Event Time、Processing Time和Ingestion Time
Count WindowsApache Flink 還提供計(jì)數(shù)窗口功能。如果計(jì)數(shù)窗口設(shè)置的為 100 ,那么將會(huì)在窗口中收集 100 個(gè)事件,并在添加第 100 個(gè)元素時(shí)計(jì)算窗口的值。
在 Flink 的 DataStream API 中,tumbling count window 和 sliding count window 的定義如下:
tumbling count window
輸入一個(gè)時(shí)間參數(shù)
data.keyBy(1) .countWindow(100) //統(tǒng)計(jì)每 100 個(gè)元素的數(shù)量之和 .sum(1);
sliding count window
輸入兩個(gè)時(shí)間參數(shù)
data.keyBy(1) .countWindow(100, 10) //每 10 個(gè)元素統(tǒng)計(jì)過去 100 個(gè)元素的數(shù)量之和 .sum(1);解剖 Flink 的窗口機(jī)制
Flink 的內(nèi)置 time window 和 count window 已經(jīng)覆蓋了大多數(shù)應(yīng)用場(chǎng)景,但是有時(shí)候也需要定制窗口邏輯,此時(shí) Flink 的內(nèi)置的 window 無法解決這些問題。為了還支持自定義 window 實(shí)現(xiàn)不同的邏輯,DataStream API 為其窗口機(jī)制提供了接口。
下圖描述了 Flink 的窗口機(jī)制,并介紹了所涉及的組件:
到達(dá)窗口操作符的元素被傳遞給 WindowAssigner。WindowAssigner 將元素分配給一個(gè)或多個(gè)窗口,可能會(huì)創(chuàng)建新的窗口。
窗口本身只是元素列表的標(biāo)識(shí)符,它可能提供一些可選的元信息,例如 TimeWindow 中的開始和結(jié)束時(shí)間。注意,元素可以被添加到多個(gè)窗口,這也意味著一個(gè)元素可以同時(shí)在多個(gè)窗口存在。
每個(gè)窗口都擁有一個(gè) Trigger(觸發(fā)器),該 Trigger(觸發(fā)器) 決定何時(shí)計(jì)算和清除窗口。當(dāng)先前注冊(cè)的計(jì)時(shí)器超時(shí)時(shí),將為插入窗口的每個(gè)元素調(diào)用觸發(fā)器。在每個(gè)事件上,觸發(fā)器都可以決定觸發(fā)(即、清除(刪除窗口并丟棄其內(nèi)容),或者啟動(dòng)并清除窗口。一個(gè)窗口可以被求值多次,并且在被清除之前一直存在。注意,在清除窗口之前,窗口將一直消耗內(nèi)存。
當(dāng) Trigger(觸發(fā)器) 觸發(fā)時(shí),可以將窗口元素列表提供給可選的 Evictor,Evictor 可以遍歷窗口元素列表,并可以決定從列表的開頭刪除首先進(jìn)入窗口的一些元素。然后其余的元素被賦給一個(gè)計(jì)算函數(shù),如果沒有定義 Evictor,觸發(fā)器直接將所有窗口元素交給計(jì)算函數(shù)。
計(jì)算函數(shù)接收 Evictor 過濾后的窗口元素,并計(jì)算窗口的一個(gè)或多個(gè)元素的結(jié)果。 DataStream API 接受不同類型的計(jì)算函數(shù),包括預(yù)定義的聚合函數(shù),如 sum(),min(),max(),以及 ReduceFunction,F(xiàn)oldFunction 或 WindowFunction。
這些是構(gòu)成 Flink 窗口機(jī)制的組件。 接下來我們逐步演示如何使用 DataStream API 實(shí)現(xiàn)自定義窗口邏輯。 我們從 DataStream [IN] 類型的流開始,并使用 key 選擇器函數(shù)對(duì)其分組,該函數(shù)將 key 相同類型的數(shù)據(jù)分組在一塊。
SingleOutputStreamOperator如何自定義 Window?data = env.addSource(...); data.keyBy()
1、Window Assigner
負(fù)責(zé)將元素分配到不同的 window。
Window API 提供了自定義的 WindowAssigner 接口,我們可以實(shí)現(xiàn) WindowAssigner 的
public abstract CollectionassignWindows(T element, long timestamp)
方法。同時(shí),對(duì)于基于 Count 的 window 而言,默認(rèn)采用了 GlobalWindow 的 window assigner,例如:
keyBy.window(GlobalWindows.create())
2、Trigger
Trigger 即觸發(fā)器,定義何時(shí)或什么情況下移除 window
我們可以指定觸發(fā)器來覆蓋 WindowAssigner 提供的默認(rèn)觸發(fā)器。 請(qǐng)注意,指定的觸發(fā)器不會(huì)添加其他觸發(fā)條件,但會(huì)替換當(dāng)前觸發(fā)器。
3、Evictor(可選)
驅(qū)逐者,即保留上一 window 留下的某些元素
4、通過 apply WindowFunction 來返回 DataStream 類型數(shù)據(jù)。
利用 Flink 的內(nèi)部窗口機(jī)制和 DataStream API 可以實(shí)現(xiàn)自定義的窗口邏輯,例如 session window。
結(jié)論對(duì)于現(xiàn)代流處理器來說,支持連續(xù)數(shù)據(jù)流上的各種類型的窗口是必不可少的。 Apache Flink 是一個(gè)具有強(qiáng)大功能集的流處理器,包括一個(gè)非常靈活的機(jī)制,可以在連續(xù)數(shù)據(jù)流上構(gòu)建窗口。 Flink 為常見場(chǎng)景提供內(nèi)置的窗口運(yùn)算符,以及允許用戶自定義窗口邏輯。
參考1、[https://flink.apache.org/news...]()
2、[https://blog.csdn.net/lmalds/...]()
關(guān)注我轉(zhuǎn)載請(qǐng)務(wù)必注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2018/12/08/Flink-Stream-Windows/
微信公眾號(hào):zhisheng
另外我自己整理了些 Flink 的學(xué)習(xí)資料,目前已經(jīng)全部放到微信公眾號(hào)了。你可以加我的微信:zhisheng_tian,然后回復(fù)關(guān)鍵字:Flink 即可無條件獲取到。
Github 代碼倉庫https://github.com/zhisheng17/flink-learning/
以后這個(gè)項(xiàng)目的所有代碼都將放在這個(gè)倉庫里,包含了自己學(xué)習(xí) flink 的一些 demo 和博客
相關(guān)文章1、《從0到1學(xué)習(xí)Flink》—— Apache Flink 介紹
2、《從0到1學(xué)習(xí)Flink》—— Mac 上搭建 Flink 1.6.0 環(huán)境并構(gòu)建運(yùn)行簡(jiǎn)單程序入門
3、《從0到1學(xué)習(xí)Flink》—— Flink 配置文件詳解
4、《從0到1學(xué)習(xí)Flink》—— Data Source 介紹
5、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Source ?
6、《從0到1學(xué)習(xí)Flink》—— Data Sink 介紹
7、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Sink ?
8、《從0到1學(xué)習(xí)Flink》—— Flink Data transformation(轉(zhuǎn)換)
9、《從0到1學(xué)習(xí)Flink》—— 介紹Flink中的Stream Windows
10、《從0到1學(xué)習(xí)Flink》—— Flink 中的幾種 Time 詳解
11、《從0到1學(xué)習(xí)Flink》—— Flink 寫入數(shù)據(jù)到 ElasticSearch
12、《從0到1學(xué)習(xí)Flink》—— Flink 項(xiàng)目如何運(yùn)行?
13、《從0到1學(xué)習(xí)Flink》—— Flink 寫入數(shù)據(jù)到 Kafka
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/73007.html
摘要:另外,將機(jī)制發(fā)揚(yáng)光大,對(duì)有著非常好的支持。系統(tǒng)也注意到并討論了和的問題??偨Y(jié)本文分享了四本相關(guān)的書籍和一份領(lǐng)域相關(guān)的論文列表篇,涉及的設(shè)計(jì),實(shí)現(xiàn),故障恢復(fù),彈性擴(kuò)展等各方面。 前言 之前也分享了不少自己的文章,但是對(duì)于 Flink 來說,還是有不少新入門的朋友,這里給大家分享點(diǎn) Flink 相關(guān)的資料(國(guó)外數(shù)據(jù) pdf 和流處理相關(guān)的 Paper),期望可以幫你更好的理解 Flink。...
摘要:從到學(xué)習(xí)介紹從到學(xué)習(xí)介紹其中包括了和的,后面我也講了下如何自定義自己的和。這個(gè)問題可是線上很容易遇到的關(guān)注我轉(zhuǎn)載請(qǐng)務(wù)必注明原創(chuàng)地址為微信公眾號(hào)另外我自己整理了些的學(xué)習(xí)資料,目前已經(jīng)全部放到微信公眾號(hào)了。 showImg(https://segmentfault.com/img/remote/1460000017935460?w=1280&h=853); 前言 前面 FLink 的文章中...
閱讀 3231·2021-11-25 09:43
閱讀 3444·2021-11-11 16:54
閱讀 875·2021-11-02 14:42
閱讀 3794·2021-09-30 09:58
閱讀 3717·2021-09-29 09:44
閱讀 1326·2019-08-30 15:56
閱讀 2128·2019-08-30 15:54
閱讀 3017·2019-08-30 15:43