成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Apache Beam采坑系列——KafkaIO

iliyaku / 946人閱讀

摘要:最近在用做流上的異常檢測,期間遇到了很多問題,但是發(fā)現(xiàn)網(wǎng)上相關(guān)的資料很少,基本只能自己啃文檔和瞎嘗試。其中如有錯(cuò)漏,歡迎指出。即從一條數(shù)據(jù)中獲得時(shí)間戳,然后以的格式返回。丟棄掉中的附加信息使用這一設(shè)置時(shí),得到的中的元素是的和組成的鍵值對。

最近在用Apache beam做流上的異常檢測,期間遇到了很多問題,但是發(fā)現(xiàn)網(wǎng)上相關(guān)的資料很少,基本只能自己啃文檔和瞎嘗試。
所以想把自己踩過的坑記錄下來,希望能對大家有所幫助。
其中如有錯(cuò)漏,歡迎指出。

KafkaIO

顧名思義,是從kafka上讀取數(shù)據(jù)到beam上或者將beam上的數(shù)據(jù)寫入到kafka中。官方文檔中沒有直接的教程,要在GitHub上的源碼中找到相關(guān)使用說明。
Github上的Kafka源碼

這里僅說明讀數(shù)據(jù)部分。
maven依賴示例


    org.apache.beam
    beam-sdks-java-io-kafka
    ...

讀數(shù)據(jù)示例

PCollection> lines = //這里kV后說明kafka中的key和value均為String類型
                p.apply(KafkaIO.read()
                .withBootstrapServers("hadoop1:9092, hadoop2:9092")//必需,設(shè)置kafka的服務(wù)器地址和端口
                .withTopic("mytopic")//必需,設(shè)置要讀取的kafka的topic名稱
                .withKeyDeserializer(StringDeserializer.class)//必需
                .withValueDeserializer(StringDeserializer.class)//必需
                .withMaxNumRecords(301)
                .withTimestampFn(new MyTimestampFunction())
                .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
                .withoutMetadata()
        )

以下分別后面非必需的一些設(shè)置

1.設(shè)置最大記錄條數(shù)

.withMaxNumRecords(301)

通過這個(gè)函數(shù),可以設(shè)置最大讀取的記錄條數(shù)。

2.設(shè)置PCollection中元素對應(yīng)的時(shí)間戳

.withTimestampFn(new MyTimestampFunction())

當(dāng)不進(jìn)行這個(gè)設(shè)置的時(shí)候,beam會(huì)根據(jù)當(dāng)前的系統(tǒng)時(shí)間為每個(gè)元素分配一個(gè)時(shí)間戳。
而有的時(shí)候,我們希望用kafka的數(shù)據(jù)中自身帶有的時(shí)間戳來作為PCollection中元素的時(shí)間戳,從而進(jìn)行后續(xù)的窗口操作。這時(shí)就需要通過上面的函數(shù)來達(dá)到這一目的。
其中MyTimestampFunction()是我們自定義的一個(gè)函數(shù),其要實(shí)現(xiàn)SerializableFunction, Instant>這個(gè)接口。
即從一條kafka數(shù)據(jù)中獲得時(shí)間戳,然后以Instant(org.joda.time.Instant)的格式返回。

public class MyTimestampFunction implements SerializableFunction, Instant> {

    public Instant apply(KV input){
        String[] temps = input.getValue().split(",");
        DateTime t = new DateTime(Long.valueOf(temps[1]));
        return t.toInstant();
    }
}

3.設(shè)置讀kafka數(shù)據(jù)的順序

updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))

KafkaIO默認(rèn)的數(shù)據(jù)讀取順序是從最新的數(shù)據(jù)開始。當(dāng)我們開發(fā)測試的時(shí)候,如果沒有一個(gè)生產(chǎn)者同步向kafka生產(chǎn)數(shù)據(jù),那么這里就拿不到數(shù)據(jù)。(在這坑了很久,才發(fā)現(xiàn)這個(gè)原因...)
當(dāng)我們想實(shí)現(xiàn)類似于kafka shell中的--from-beginning的功能的時(shí)候,即從最早的數(shù)據(jù)開始讀,就需要進(jìn)行這一設(shè)置。
這里不僅可以改變讀取數(shù)據(jù)的順序,按照類似的方式,還可以進(jìn)行其他設(shè)置。

4.丟棄掉kafka中的附加信息

.withoutMetadata()

使用這一設(shè)置時(shí),得到的PCollection中的元素是kafka的key和value組成的鍵值對。
當(dāng)不使用其時(shí),得到的PCollection中的元素是KafkaRecord。會(huì)附件很多元數(shù)據(jù)。

5.其他設(shè)置

// custom function for watermark (default is record timestamp)
 *       .withWatermarkFn(new MyWatermarkFunction())
 *
 *       // restrict reader to committed messages on Kafka (see method documentation).
 *       .withReadCommitted()
 *

在源碼的使用說明中還提到另外的兩個(gè)設(shè)置,但因?yàn)闀簳r(shí)沒用到,這里就暫且省略了。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/35904.html

相關(guān)文章

  • Apache Beam訪問HDFS

    摘要:一直接訪問引入的相關(guān)包使用代替給指定配置與訪問本地文件一樣訪問文件實(shí)際測試中發(fā)現(xiàn)本地如能夠成功讀寫,但是集群模式下如讀寫失敗,原因未知。二通過訪問除了直接讀寫的數(shù)據(jù),還可以通過來進(jìn)行讀寫。 一、直接訪問 1.引入HDFS的相關(guān)jar包: org.apache.beam beam-sdks-java-io-hadoop-file-system 2.1.0...

    UCloud 評論0 收藏0
  • Apache Beam學(xué)習(xí)筆記——幾種常見的處理類Transform

    摘要:要說在中常見的函數(shù)是哪一個(gè),當(dāng)然是。是一個(gè)實(shí)現(xiàn)了接口的抽象類,其中是數(shù)據(jù)處理方法,強(qiáng)制子類必須實(shí)現(xiàn)。以上為學(xué)習(xí)一天的總結(jié),有錯(cuò)誤歡迎指正。相同的是這個(gè)方法處理的都是中的一個(gè)元素。 在閱讀本文前,可先看一下官方的WordCount代碼, 對Apache Beam有大概的了解。 要說在Apache Beam中常見的函數(shù)是哪一個(gè),當(dāng)然是apply()。常見的寫法如下: [Final Outp...

    Chiclaim 評論0 收藏0
  • Apache Beam的分窗與觸發(fā)器

    摘要:需要注意的是和方法生成的觸發(fā)器是連續(xù)的而不是一次性的。其他的還有一次性觸發(fā)器將一次性觸發(fā)器變?yōu)檫B續(xù)型觸發(fā)器,觸發(fā)后再次等待觸發(fā)。例如與一起用可以實(shí)現(xiàn)每個(gè)數(shù)據(jù)到達(dá)后的分鐘進(jìn)行處理,經(jīng)常用于全局窗口,可以用觸發(fā)器來設(shè)置停止條件。 本文參考Apache Beam官方編程手冊 可以結(jié)合官方的Mobile Game 代碼閱讀本文。 在默認(rèn)情況下,Apache Beam是不分窗的,也就是采用Gl...

    NickZhou 評論0 收藏0
  • Apache beam其他學(xué)習(xí)記錄

    摘要:與用于與的轉(zhuǎn)換。其中方法返回的是在中的位置下標(biāo)。對于設(shè)置了多個(gè)觸發(fā)器的,自動(dòng)選擇最后一個(gè)觸發(fā)的結(jié)算結(jié)果。其他不是線程安全的,一般建議處理方法是冪等的。 Combine與GroupByKey GroupByKey是把相關(guān)key的元素聚合到一起,通常是形成一個(gè)Iterable的value,如: cat, [1,5,9] dog, [5,2] and, [1,2,6] Combine是對聚...

    jasperyang 評論0 收藏0

發(fā)表評論

0條評論

閱讀需要支付1元查看
<