摘要:需要注意的是和方法生成的觸發(fā)器是連續(xù)的而不是一次性的。其他的還有一次性觸發(fā)器將一次性觸發(fā)器變?yōu)檫B續(xù)型觸發(fā)器,觸發(fā)后再次等待觸發(fā)。例如與一起用可以實(shí)現(xiàn)每個(gè)數(shù)據(jù)到達(dá)后的分鐘進(jìn)行處理,經(jīng)常用于全局窗口,可以用觸發(fā)器來設(shè)置停止條件。
本文參考Apache Beam官方編程手冊(cè)
可以結(jié)合官方的Mobile Game 代碼閱讀本文。
在默認(rèn)情況下,Apache Beam是不分窗的,也就是采用GlobalWindow,而如果同時(shí)也不設(shè)置自定義的觸發(fā)器,那么Beam會(huì)在所有數(shù)據(jù)都收集到之后才開始對(duì)數(shù)據(jù)進(jìn)行處理。這通常只能適用于有限數(shù)據(jù)且對(duì)實(shí)時(shí)性要求不高的情況。當(dāng)輸入為無限流數(shù)據(jù),我們可以
1)設(shè)置合適的窗口大?。ǜ鶕?jù)時(shí)間戳),在窗口末端進(jìn)行數(shù)據(jù)處理;
2)設(shè)置觸發(fā)器,當(dāng)條件滿足時(shí)觸發(fā),進(jìn)行數(shù)據(jù)處理;
3)同時(shí)設(shè)置窗口和觸發(fā)器。
時(shí)間戳說明:Beam的數(shù)據(jù)都是保存在PCollection中。當(dāng)讀入數(shù)據(jù)時(shí),PCollection為每個(gè)元素都自動(dòng)生成一個(gè)內(nèi)置的時(shí)間戳,對(duì)于無限輸入,數(shù)據(jù)的時(shí)間戳不同。而對(duì)于有限輸入,由于是同時(shí)讀入,所有的元素的時(shí)間戳都是一樣的,這時(shí)候分窗是沒有意義的(都在一個(gè)窗口)。而我們可以手動(dòng)為每個(gè)元素設(shè)置時(shí)間戳,通常采用數(shù)據(jù)中已有的時(shí)間屬性(比如日志中一般都會(huì)帶有事件時(shí)間)??梢栽贒oFn中為數(shù)據(jù)帶上時(shí)間戳,如:
@ProcessElement public void processElement(ProcessContext c) { c.outputWithTimestamp(c.element(), new Instant(XXX)); }窗口類型:
1)全局窗口
就是默認(rèn)不分窗的情況。
apply(Windows.
2)固定時(shí)間大小窗口
最常見的分窗方式,按照時(shí)間戳把數(shù)據(jù)處理窗口分為固定長度。
apply(Windows.
3)滑動(dòng)窗口
需要設(shè)置2個(gè)參數(shù),窗口大小和窗口產(chǎn)生周期。窗口之間有重疊,通常用于計(jì)算平均數(shù)的情況(暫沒用過)
4)會(huì)話窗口
一般用于相同key數(shù)據(jù)聚合,同一個(gè)key的數(shù)據(jù)之間時(shí)間間隔較大的會(huì)被分到不同的窗口。
**
水位線和超時(shí)數(shù)據(jù):**
當(dāng)使用用戶自定義的時(shí)間戳?xí)r,先處理的數(shù)據(jù)并不總是時(shí)間戳較小的,有可能出現(xiàn)時(shí)間戳小的數(shù)據(jù)在后面才產(chǎn)生的情況。Beam通常會(huì)給窗口設(shè)定一個(gè)處理期限時(shí)間(圖中縱軸),當(dāng)超過這個(gè)時(shí)間的數(shù)據(jù)被視為超時(shí)數(shù)據(jù),而這些期限時(shí)間的連線即水位線。
系統(tǒng)會(huì)根據(jù)實(shí)際情況進(jìn)行預(yù)測(cè)生成水位線,在默認(rèn)情況下不對(duì)超時(shí)數(shù)據(jù)進(jìn)行處理,而我們可以通過設(shè)置觸發(fā)器對(duì)超時(shí)數(shù)據(jù)進(jìn)行額外處理。
觸發(fā)器種類1)時(shí)間時(shí)間觸發(fā)器
根據(jù)時(shí)間戳進(jìn)行觸發(fā)。
.triggering(AfterWatermark.pastEndOfWindow()//水位線到達(dá)時(shí)觸發(fā)一次 .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(FIVE_MINUTES))//水位線之前,每次觸發(fā)后第一個(gè)數(shù)據(jù)來到之后的5分鐘時(shí)再觸發(fā) .withLateFirings(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(TEN_MINUTES)))//水位線之后,每次觸發(fā)后第一個(gè)數(shù)據(jù)來到之后的10分鐘時(shí)再觸發(fā)
以上分別對(duì)水位線上中下的3種數(shù)據(jù)進(jìn)行不同的處理。需要注意的是withEarlyFirings和withLateFirings方法生成的觸發(fā)器是連續(xù)的而不是一次性的。
2)處理時(shí)間觸發(fā)器
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES)
如方法的字面意思,僅在第一個(gè)數(shù)據(jù)到達(dá)后的5分鐘時(shí)觸發(fā)一次。
3)數(shù)據(jù)驅(qū)動(dòng)型觸發(fā)器
AfterPane.elementCountAtleast(XX)
當(dāng)處理到XX個(gè)時(shí)觸發(fā)一次。需要注意的是當(dāng)數(shù)據(jù)個(gè)數(shù)小于XX時(shí)永遠(yuǎn)不會(huì)觸發(fā)數(shù)據(jù)處理。
4)混合觸發(fā)器
將多個(gè)觸發(fā)器混合起來,比如1)中的代碼就是3個(gè)觸發(fā)器混合。其他的還有
①Repeatedly.forever(一次性觸發(fā)器)
將一次性觸發(fā)器變?yōu)檫B續(xù)型觸發(fā)器,觸發(fā)后再次等待觸發(fā)。例如與AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES)一起用可以實(shí)現(xiàn)每個(gè)數(shù)據(jù)到達(dá)后的5分鐘進(jìn)行處理,經(jīng)常用于全局窗口,可以用orFinally(觸發(fā)器)來設(shè)置停止條件。
②AfterEach.inOrder(觸發(fā)器1,觸發(fā)器2...)
當(dāng)觸發(fā)器1滿足后等待觸發(fā)器2...知道所有觸發(fā)器滿足后開始數(shù)據(jù)處理。
③AfterFirst(觸發(fā)器1,觸發(fā)器2..)和AfterAll(觸發(fā)器1,觸發(fā)器2..)
這2個(gè)分別為或,與的邏輯。
④orFinally
見①
Accumulating Mode:
If our trigger is set to .accumulatingFiredPanes, the trigger emits the following values each time it fires. Keep in mind that the trigger fires every time three elements arrive:
First trigger firing: [5, 8, 3] Second trigger firing: [5, 8, 3, 15, 19, 23] Third trigger firing: [5, 8, 3, 15, 19, 23, 9, 13, 10]
Discarding Mode:
If our trigger is set to .discardingFiredPanes, the trigger emits the following values on each firing:
First trigger firing: [5, 8, 3] Second trigger firing: [15, 19, 23] Third trigger firing: [9, 13, 10]超時(shí)數(shù)據(jù)處理
.withAllowedLateness(Duration.XXXX(XXX))
可設(shè)置允許超時(shí)多長時(shí)間的數(shù)據(jù)。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/67679.html
摘要:與用于與的轉(zhuǎn)換。其中方法返回的是在中的位置下標(biāo)。對(duì)于設(shè)置了多個(gè)觸發(fā)器的,自動(dòng)選擇最后一個(gè)觸發(fā)的結(jié)算結(jié)果。其他不是線程安全的,一般建議處理方法是冪等的。 Combine與GroupByKey GroupByKey是把相關(guān)key的元素聚合到一起,通常是形成一個(gè)Iterable的value,如: cat, [1,5,9] dog, [5,2] and, [1,2,6] Combine是對(duì)聚...
摘要:最近在用做流上的異常檢測(cè),期間遇到了很多問題,但是發(fā)現(xiàn)網(wǎng)上相關(guān)的資料很少,基本只能自己啃文檔和瞎嘗試。其中如有錯(cuò)漏,歡迎指出。即從一條數(shù)據(jù)中獲得時(shí)間戳,然后以的格式返回。丟棄掉中的附加信息使用這一設(shè)置時(shí),得到的中的元素是的和組成的鍵值對(duì)。 最近在用Apache beam做流上的異常檢測(cè),期間遇到了很多問題,但是發(fā)現(xiàn)網(wǎng)上相關(guān)的資料很少,基本只能自己啃文檔和瞎嘗試。所以想把自己踩過的坑記錄...
摘要:一直接訪問引入的相關(guān)包使用代替給指定配置與訪問本地文件一樣訪問文件實(shí)際測(cè)試中發(fā)現(xiàn)本地如能夠成功讀寫,但是集群模式下如讀寫失敗,原因未知。二通過訪問除了直接讀寫的數(shù)據(jù),還可以通過來進(jìn)行讀寫。 一、直接訪問 1.引入HDFS的相關(guān)jar包: org.apache.beam beam-sdks-java-io-hadoop-file-system 2.1.0...
摘要:要說在中常見的函數(shù)是哪一個(gè),當(dāng)然是。是一個(gè)實(shí)現(xiàn)了接口的抽象類,其中是數(shù)據(jù)處理方法,強(qiáng)制子類必須實(shí)現(xiàn)。以上為學(xué)習(xí)一天的總結(jié),有錯(cuò)誤歡迎指正。相同的是這個(gè)方法處理的都是中的一個(gè)元素。 在閱讀本文前,可先看一下官方的WordCount代碼, 對(duì)Apache Beam有大概的了解。 要說在Apache Beam中常見的函數(shù)是哪一個(gè),當(dāng)然是apply()。常見的寫法如下: [Final Outp...
摘要:主頁暫時(shí)下線社區(qū)暫時(shí)下線知識(shí)庫自媒體平臺(tái)微博知乎簡書博客園我們不是的官方組織機(jī)構(gòu)團(tuán)體,只是技術(shù)棧以及的愛好者合作侵權(quán),請(qǐng)聯(lián)系請(qǐng)抄送一份到基礎(chǔ)編程思想和大數(shù)據(jù)中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔區(qū)塊 【主頁】 apachecn.org 【Github】@ApacheCN 暫時(shí)下線: 社區(qū) 暫時(shí)下線: cwiki 知識(shí)庫 自媒體平臺(tái) ...
閱讀 1238·2021-11-11 16:54
閱讀 887·2021-10-19 11:44
閱讀 1354·2021-09-22 15:18
閱讀 2456·2019-08-29 16:26
閱讀 2961·2019-08-29 13:57
閱讀 3106·2019-08-26 13:32
閱讀 1091·2019-08-26 11:58
閱讀 2341·2019-08-26 10:37