點擊上方“IT那活兒”公眾號,關(guān)注后了解更多內(nèi)容,不管IT什么活兒,干就完了?。?!
結(jié)構(gòu)化數(shù)據(jù)流支持streaming DataFrame/DataSet與靜態(tài)DataFrame/DataSet進(jìn)行JOIN操作,也支持兩個streaming DataFrame/DataSet進(jìn)行JOIN操作,流連接的結(jié)果以增量方式生成,與流聚合結(jié)果類似。
下面主要討論流連接支持的類型,inner,outer,semi連接等。
自Spark 2.0引入以來,支持流數(shù)據(jù)集與靜態(tài)數(shù)據(jù)集進(jìn)行JOIN操作,例如:
請注意流-靜態(tài)連接不是有狀態(tài)的,所以不需要狀態(tài)管理。
在Spark 2.3中,我們添加了對流連接的支持,也就是說,您可以連接兩個流數(shù)據(jù)集/數(shù)據(jù)幀。在兩個數(shù)據(jù)流之間生成連接結(jié)果的挑戰(zhàn)在于,在任何時間點,數(shù)據(jù)集的視圖對于連接的兩側(cè)都是不完整的,這使得查找輸入之間的匹配變得更加困難。從一個輸入流接收到的任何行都可以與將來從另一個輸入流接收到的任何行相匹配。
因此,對于這兩個輸入流,我們將過去的輸入緩沖為流狀態(tài),這樣我們就可以將每個未來的輸入與過去的輸入匹配,并相應(yīng)地生成連接的結(jié)果。
此外,與流式聚合類似,我們自動處理延遲、無序的數(shù)據(jù),并可以使用水印限制狀態(tài)。讓我們討論支持的不同類型的流連接以及如何使用它們。
3.1 基于水印(watermarking)的INNER JOIN操作
支持任何類型的列上的內(nèi)部連接以及任何類型的連接條件,但是,隨著流的運行,流狀態(tài)的大小將無限期的增長,因為所有過去的輸入都必須保存,因為任何輸入都有可能與過去的輸入匹配,為了避免無線的狀態(tài),必須定義額外的連接條件,以便過去的舊輸入無法與將來的輸入匹配,因此可以刪除舊狀態(tài),也就是說,你必須在連接中執(zhí)行以下附加步驟:
1)在兩個流輸入上定義水印,以便引擎知道輸入的延遲程度(類似于流聚合)。
2)定義跨兩個流輸入的事件時間約束,以便引擎知道何時不需要一個舊的輸入行(即不滿足時間約束)來匹配另一個輸入,可以通過以下兩種方式定義此約束:
時間范圍連接條件(例如lefttime between righttime and right+1hour)。
基于事件時間窗口進(jìn)行JOIN。
舉例來進(jìn)行說明
假如我們希望將一系列廣告播放與另一系列用戶點擊廣告的行為連接起來,要允許此連接中的狀態(tài)清理,必須指定水印延遲和時間約束,如下:
水印延遲(watermar delays):比如說在活動期間內(nèi),廣告印象和點擊事件可能分別延遲2小時/3小時。
事件時間范圍條件:再廣告播放0秒到1小時范圍內(nèi)可能發(fā)生點擊。
代碼如下:
3.2 基于水印(watermarking)的OUTER JOIN操作
雖然水印+事件時間約束條件對于inner join 不是必須的,但是對于outer join就必須指定,因為outer join會生成NULL,引擎必須知道輸入行將來何時與任何輸入都不匹配,因此指定水印和事件時間約束來生成正確結(jié)果。
代碼如下:
關(guān)于如何生成外部連接結(jié)果,有幾個重要的特征需要注意:
根據(jù)指定的水印延遲和事件時間范圍條件,將會生成NULL 結(jié)果,這是因為引擎必須等待很長時間以確保沒有匹配項,并且將來不會有更多的匹配項。
在當(dāng)前微批處理引擎中的實現(xiàn)中,水印在微批處理結(jié)束時被提前,下一個微批處理使用更新后的水印來清除狀態(tài)并輸出外部結(jié)果。
由于我們僅在有新數(shù)據(jù)要處理時觸發(fā)微批處理,因此如果流中沒有接收到新數(shù)據(jù),則外部結(jié)果的生成可能會延遲。
簡言之,如果正在連接的兩個輸入流中的任何一個在一段時間內(nèi)沒有接收數(shù)據(jù),則外部(兩種情況下,左或右)輸出可能會延遲。
3.3 基于水?。╳atermarking)的SEMI JOIN操作
半連接返回左側(cè)返回值,也被稱為左半連接,與外部連接類似,其也必須指定水印和事件時間約束,引擎必須知道左側(cè)的輸入行將來何時與右側(cè)的輸入行都不匹配。
支持級聯(lián)連接操作,例如df1.join(df2).join(df3).....................
自spark2.4版本之后,連接操作只支持Append輸出模式。
自spark2.4版本之后,在連接之前不能使用類似于non-map-like的操作,例如在連接之前不能進(jìn)行流聚合操作,在連接之前不能再Update輸出模式下使用mapGroupsWithState 、flatMapGroupsWithState 操作。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/129470.html
摘要:個人博客地址方案項目背景在現(xiàn)在題庫架構(gòu)下,針對新購買的多道數(shù)據(jù)進(jìn)行整合,不影響現(xiàn)有功能。數(shù)據(jù)切分盡量通過數(shù)據(jù)冗余或表分組來降低跨庫的可能。 個人博客地址 https://www.texixi.com/2019/0... 方案 項目背景 在現(xiàn)在題庫架構(gòu)下,針對新購買的1300W多道數(shù)據(jù)進(jìn)行整合,不影響現(xiàn)有功能。由于數(shù)據(jù)量偏多,需要進(jìn)行數(shù)據(jù)的切分 目標(biāo)場景 兼容舊的功能 對1300多W...
閱讀 1356·2023-01-11 13:20
閱讀 1707·2023-01-11 13:20
閱讀 1215·2023-01-11 13:20
閱讀 1906·2023-01-11 13:20
閱讀 4165·2023-01-11 13:20
閱讀 2757·2023-01-11 13:20
閱讀 1402·2023-01-11 13:20
閱讀 3671·2023-01-11 13:20