點(diǎn)擊上方“IT那活兒”公眾號(hào),關(guān)注后了解更多內(nèi)容,不管IT什么活兒,干就完了?。?!
基于event-time的窗口操作
Event-time就是事件產(chǎn)生的時(shí)間而不是spark接受到消息的時(shí)間,在結(jié)構(gòu)化數(shù)據(jù)流模型中,產(chǎn)生一個(gè)事件就是一行數(shù)據(jù),event-time就是行中的一列,這就允許基于event-time的窗口聚合操作,每個(gè)窗口都是一個(gè)組,每行數(shù)據(jù)可以屬于多個(gè)窗口,因此基于事件窗口的聚合查詢可以適用于靜態(tài)表和數(shù)據(jù)流。
此外,結(jié)構(gòu)化數(shù)據(jù)流模型很自然的處理基于event-time的延遲數(shù)據(jù),因?yàn)閟park是更新結(jié)果表,只要延遲數(shù)據(jù)到達(dá)就會(huì)刪除舊狀態(tài)進(jìn)行更新,自spark2.1以后可以使用水?。╳atermark)指定延遲數(shù)據(jù)闕值清除舊狀態(tài)。
想象這樣一種場(chǎng)景,spark不斷接受輸入,然后進(jìn)行詞頻統(tǒng)計(jì),輸入包括詞語(yǔ)和詞語(yǔ)產(chǎn)生時(shí)間,我們要統(tǒng)計(jì)每10分鐘之內(nèi)的詞頻統(tǒng)計(jì),每5分鐘統(tǒng)計(jì)一次,模型如下:
如圖可知時(shí)間步長(zhǎng)是5分鐘(每5分鐘統(tǒng)計(jì)一次)每次統(tǒng)計(jì)的是10分鐘之內(nèi)的數(shù)據(jù),應(yīng)用程序12:00啟動(dòng),開始接受數(shù)據(jù),12:00-12:05時(shí)間內(nèi)接到兩條數(shù)據(jù)(產(chǎn)生時(shí)間分別是12:02和12:03),到12:05時(shí)開始第一次統(tǒng)計(jì)數(shù)據(jù),統(tǒng)計(jì)的是12:00-12:10之間接受到的數(shù)據(jù),然后12:05-12:10時(shí)間內(nèi)接受到一條數(shù)據(jù)(產(chǎn)生時(shí)間為12:07),12:10時(shí)第二次統(tǒng)計(jì)數(shù)據(jù),統(tǒng)計(jì)的是12:05-12:15之間接受到的數(shù)據(jù),請(qǐng)注意12:07這條數(shù)據(jù)也屬于12:00-12:10分窗口中的數(shù)據(jù),所以更新了上一個(gè)窗口的數(shù)據(jù),也新增了新窗口的數(shù)據(jù),最后12:10-12:15時(shí)間內(nèi)接受到了兩條數(shù)據(jù)(產(chǎn)生時(shí)間分別為12:11,12:13),12:15進(jìn)行了第三次窗口統(tǒng)計(jì),同樣最后兩條數(shù)據(jù)不僅屬于12:05-12:15窗口也屬于12:10-12:20窗口,所以接受的這兩條數(shù)據(jù)更新了12:05-12:15窗口的結(jié)果也新增了12:10-12:20窗口數(shù)據(jù)。
代碼中可以這樣寫:
現(xiàn)在想象一下,如果某條數(shù)據(jù)產(chǎn)生時(shí)間是12:04,但是spark接受到該條數(shù)據(jù)時(shí)間是12:11,這就屬于遲到數(shù)據(jù),正常情況下該條數(shù)據(jù)到達(dá)時(shí)間與產(chǎn)生時(shí)間基本一致,對(duì)于這種遲到數(shù)據(jù)結(jié)構(gòu)化數(shù)據(jù)模型會(huì)保持這種遲到數(shù)據(jù)再內(nèi)存中,所以該條數(shù)據(jù)還是按照12:04來(lái)處理的。
但是這也存在一個(gè)問題,假如應(yīng)用程序需要長(zhǎng)時(shí)間運(yùn)行,那么內(nèi)存中會(huì)保存大量這種遲到數(shù)據(jù)狀態(tài),所以系統(tǒng)就需要遲到何時(shí)應(yīng)該丟棄遲到數(shù)據(jù),為了解決這個(gè)問題,自spark2.1,引入了watermarking,你可以通過(guò)指定event-time列并且指定數(shù)據(jù)可以遲到時(shí)間闕值,遲到時(shí)間在闕值以內(nèi),watermarking依然會(huì)將其按照正確時(shí)間處理,遲到時(shí)間在闕值之外會(huì)將其丟棄。
通過(guò)一下例子進(jìn)行理解:
如上指定event time列timeStamp,并且指定了遲到時(shí)間闕值為10分鐘。此查詢模式為Update。所以結(jié)果表中將保持更新的狀態(tài)。
藍(lán)色虛線:目前為止可以看到的最大event-time。
紅色實(shí)線:watermarking線=藍(lán)色虛線(最大event-time)-闕值,水印值只能增加,不能減小。
當(dāng)觀察到12:04數(shù)據(jù)時(shí),將下一個(gè)水印設(shè)置為12:04,此水印可以保持10分鐘的中間狀態(tài),以便對(duì)于較晚的數(shù)據(jù)進(jìn)行計(jì)數(shù),例如對(duì)于12:09這條數(shù)據(jù)的延遲,其仍在12:04水印線之前,所以仍保持其中間狀態(tài),但是當(dāng)觀察到12:21數(shù)據(jù)時(shí),水印更新為12:11,并將12:00-12:05窗口的中間狀態(tài)清除,這時(shí)12:04的數(shù)據(jù)就會(huì)被丟棄,可以這樣說(shuō),藍(lán)色線和紅色線中間的數(shù)據(jù)都不會(huì)被丟棄,水印線之下的數(shù)據(jù)都會(huì)被丟棄。
然后再來(lái)看下在Update輸出模式下,每次觸發(fā)后哪些數(shù)據(jù)會(huì)被輸出:
12:05分第一次觸發(fā)后,未觀察到數(shù)據(jù)。
12:10分第二次觸發(fā)時(shí)有兩條數(shù)據(jù)(12:07dog,12:08:owl),這兩條數(shù)據(jù)分別屬于兩個(gè)窗口,12:00-12:10和12:05-12:15,(如圖)此時(shí),這些數(shù)據(jù)都會(huì)被輸出。
12:15第三次觸發(fā)后,又觀察到兩條新數(shù)據(jù)(12:09cat,12:14dog),其中12:09cat數(shù)據(jù)屬于窗口12;00-12:10和12:05-12:15,可以看到這兩個(gè)窗口分別新增了一條數(shù)據(jù)cat(如上圖),12:14dog數(shù)據(jù)屬于窗口12:05-12:15和窗口12:10-12:20,所以12:05-12:15窗口dog計(jì)數(shù)+1,12:10-12:20窗口新增一條dog計(jì)數(shù),此時(shí)這些更新的和新增的數(shù)據(jù)將是被輸出的(紫色的)。
12:20第四次觸發(fā)后,此時(shí)觀察到新增數(shù)據(jù)有4條(12:08dog,12:13owl,12:15cat,12:21owl),12:08dog數(shù)據(jù)屬于窗口12:00-12:10和12:05-12:15,所以這兩個(gè)窗口dog計(jì)數(shù)+1,12:13owl屬于窗口12:05-12:15和12:10-12:20,所以12:05-12:15窗口owl計(jì)數(shù)+1,12:10-12:20窗口新增一條owl計(jì)數(shù),12:15cat屬于12:05-12:15和12:10-12:20窗口,所以12:05-12:15窗口cat計(jì)數(shù)+1,12:10-12:20窗口新增一條cat計(jì)數(shù),12:21owl屬于12:15-12:25和12:20-12:30窗口,所以這兩個(gè)窗口新增一條owl計(jì)數(shù)(圖中未標(biāo)識(shí)出 ),此時(shí),這些更新和新增數(shù)據(jù)將會(huì)被輸出(如圖紫色部分)。
12:25第五次觸發(fā)時(shí)觀察到12:04donkey數(shù)據(jù)(該數(shù)據(jù)太遲被丟棄,不參與計(jì)數(shù))和其他1條數(shù)據(jù)(12:17owl),12:17owl屬于12:10-12:20和12:15-12:25窗口,所以12:10-12:20窗口owl計(jì)數(shù)+1,12:15-12:25窗口owl計(jì)數(shù)+1(圖中未標(biāo)識(shí)出),此時(shí)這些更新數(shù)據(jù)將會(huì)被輸出。
再來(lái)看下Append輸出模式下,該模式下僅將最終數(shù)據(jù)寫入存儲(chǔ)器,如圖:
例如12:25觸發(fā)時(shí),很明顯12:00-12:10窗口的數(shù)據(jù)已經(jīng)確定(水印線值大于窗口endtime),不可能再接受event time在12:00-12:10之間的數(shù)據(jù)了(太遲的數(shù)據(jù)會(huì)被丟棄),此時(shí)窗口計(jì)數(shù)如圖,這也是第一次進(jìn)行輸出。
12:30時(shí)12:05-12:15窗口計(jì)數(shù)已經(jīng)確定,如圖,這次輸出的是圖中紫色部分。每次輸出一個(gè)窗口的計(jì)數(shù)。請(qǐng)注意設(shè)置水印后只支持append和Update模式。
使用水印清除中間狀態(tài)條件
輸出模式必須是Append,Update,因?yàn)閏omplete模式需要保留所有聚合數(shù)據(jù)。
聚合必須有event-time列或者event-time列的窗口。
水印作用的列必須與聚合列保持一致,例如df.withWatermark("time", "1 min").groupBy("time2").count()對(duì)于Append模式不可用。
水印函數(shù)調(diào)用必須在聚合函數(shù)之前。df.groupBy("time").count().withWatermark("time", "1 min")不可用。
水印聚合語(yǔ)義保證
水印延遲(設(shè)置為withWatermark)為“ 2小時(shí)”,確保引擎永遠(yuǎn)不會(huì)丟棄任何少于2小時(shí)的數(shù)據(jù)。換句話說(shuō),任何在此之前處理過(guò)的最新數(shù)據(jù)比事件時(shí)間少2小時(shí)(以事件時(shí)間計(jì))的數(shù)據(jù)都可以保證得到匯總。
保證僅在一個(gè)方向上嚴(yán)格。延遲超過(guò)2小時(shí)的數(shù)據(jù)不能保證被刪除;它可能會(huì)或可能不會(huì)聚合。數(shù)據(jù)延遲更多,引擎處理數(shù)據(jù)的可能性越小。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/129472.html
摘要:基于云遷移的三個(gè)階段細(xì)分為八個(gè)主要步驟,評(píng)估階段主要包括項(xiàng)目啟動(dòng)現(xiàn)狀梳理以及應(yīng)用系統(tǒng)關(guān)聯(lián)關(guān)系分析三個(gè)步驟,設(shè)計(jì)階段包括云架構(gòu)優(yōu)化設(shè)計(jì)和云遷移方案設(shè)計(jì),實(shí)施階段包括目標(biāo)架構(gòu)遷移演練及實(shí)施和試運(yùn)行三個(gè)步驟。 在云計(jì)算市場(chǎng)規(guī)模不斷擴(kuò)大的大背景下,云遷移的需求越來(lái)越大且面臨挑戰(zhàn)。云遷移不是一個(gè)遷移軟件工具,而是一種服務(wù)。前IBM資深架構(gòu)師姜亞杰從云遷移的三個(gè)階段、四個(gè)維度到八個(gè)步驟的方法,簡(jiǎn)述...
摘要:理解數(shù)組實(shí)現(xiàn)的滑動(dòng)窗口,看下邊這個(gè)圖就可以了。第秒,開始計(jì)數(shù),此時(shí)數(shù)組內(nèi)開始存入計(jì)數(shù)周期,保存在數(shù)組第個(gè)位置,表示這是當(dāng)前滑動(dòng)窗口內(nèi)的第個(gè)計(jì)數(shù)周期。在FireflySoft.RateLimit之前的版本中,進(jìn)程內(nèi)滑動(dòng)窗口的實(shí)現(xiàn)是基于MemoryCache做的,雖然能夠正確的實(shí)現(xiàn)滑動(dòng)窗口的算法邏輯,但是性能比較差,其吞吐量只有其它算法的1/4。性能為何如此之差呢?滑動(dòng)窗口的原理我們先來(lái)看下滑動(dòng)...
摘要:兩個(gè)瀏覽器窗口間通信總結(jié)一個(gè)窗口更新,另一個(gè)窗口監(jiān)聽對(duì)象的事件,來(lái)實(shí)現(xiàn)通信。通過(guò)窗口的屬性來(lái)指定哪些窗口能接收到消息事件,其值可以是字符串表示無(wú)限制或者一個(gè)。父窗口先打開一個(gè)子窗口,載入一個(gè)不同源的網(wǎng)頁(yè),該網(wǎng)頁(yè)將信息寫入屬性。 兩個(gè)瀏覽器窗口間通信總結(jié) 1、localStorage 一個(gè)窗口更新localStorage,另一個(gè)窗口監(jiān)聽window對(duì)象的storage事件,來(lái)實(shí)現(xiàn)通信。注...
摘要:代碼實(shí)現(xiàn)代碼實(shí)現(xiàn)接下來(lái)思考一個(gè)熔斷器如何實(shí)現(xiàn)。同時(shí)熔斷器的狀態(tài)也需要依靠指標(biāo)統(tǒng)計(jì)來(lái)實(shí)現(xiàn)可觀測(cè)性,我們實(shí)現(xiàn)任何系統(tǒng)第一步需要考慮就是可觀測(cè)性,不然系統(tǒng)就是一個(gè)黑盒??赡苁?,熔斷器需要實(shí)時(shí)收集此數(shù)據(jù)。熔斷方法,自動(dòng)上報(bào)執(zhí)行結(jié)果自動(dòng)擋。。。為什么需要熔斷微服務(wù)集群中,每個(gè)應(yīng)用基本都會(huì)依賴一定數(shù)量的外部服務(wù)。有可能隨時(shí)都會(huì)遇到網(wǎng)絡(luò)連接緩慢,超時(shí),依賴服務(wù)過(guò)載,服務(wù)不可用的情況,在高并發(fā)場(chǎng)景下如果此時(shí)...
摘要:你只可以看到在滑動(dòng)窗口內(nèi)的數(shù)字?;瑒?dòng)窗口每次只向右移動(dòng)一位。返回滑動(dòng)窗口最大值。算法思路暴力破解法用兩個(gè)指針,分別指向窗口的起始位置和終止位置,然后遍歷窗口中的數(shù)據(jù),求出最大值向前移動(dòng)兩個(gè)指針,然后操作,直到遍歷數(shù)據(jù)完成位置。 Time:2019/4/16Title: Sliding Window MaximumDifficulty: DifficultyAuthor: 小鹿 題目...
閱讀 1356·2023-01-11 13:20
閱讀 1707·2023-01-11 13:20
閱讀 1215·2023-01-11 13:20
閱讀 1906·2023-01-11 13:20
閱讀 4165·2023-01-11 13:20
閱讀 2757·2023-01-11 13:20
閱讀 1402·2023-01-11 13:20
閱讀 3671·2023-01-11 13:20