成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Apache Beam訪問HDFS

UCloud / 2829人閱讀

摘要:一直接訪問引入的相關(guān)包使用代替給指定配置與訪問本地文件一樣訪問文件實際測試中發(fā)現(xiàn)本地如能夠成功讀寫,但是集群模式下如讀寫失敗,原因未知。二通過訪問除了直接讀寫的數(shù)據(jù),還可以通過來進行讀寫。

一、直接訪問

1.引入HDFS的相關(guān)jar包:

    
    org.apache.beam
    beam-sdks-java-io-hadoop-file-system
    2.1.0
    

2.使用HadoopFileSystemOptions代替PipelineOptions

public interface WordCountOptions extends HadoopFileSystemOptions {
    @Description("input file")
    @Default.String("hdfs://localhost:9000/tmp/words2")
    String getInputFile();
    void setInputFile(String in);

    @Description("output")
    @Default.String("hdfs://localhost:9000/tmp/hdfsWordCount")
    String getOutput();
    void setOutput(String out);
}

3.給Options指定HDFS配置

    Configuration conf=new Configuration();
    conf.set("fs.default.name", "hdfs://localhost:9000");
    HDFSOption options= PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(HDFSOption.class);
    options.setHdfsConfiguration(ImmutableList.of(conf));
    

4.與訪問本地文件一樣訪問HDFS文件

    Pipeline p = Pipeline.create(options);
    Data = p.apply("Read from HDFS",
            TextIO.read().from(options.getInputFile()));
            

實際測試中發(fā)現(xiàn)本地runner(如Direct, Flink Local, Spark Local...)能夠成功讀寫HDFS,但是集群模式下(如Flink Cluster, Spark Cluster...)讀寫HDFS失敗,原因未知。

二、通過HBase訪問

除了直接讀寫HDFS的數(shù)據(jù),還可以通過HBase來進行讀寫。
1.添加相關(guān)jar包

    
    
        org.apache.beam
        beam-sdks-java-io-hbase
        ${beam.verson}
    

2.設(shè)置HBase連接信息

    Configuration conf = new Configuration();
    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
    conf.setStrings("hbase.master.hostname", "localhost");
    conf.setStrings("hbase.regionserver.hostname", "localhost");

3.使用上述的conf讀HBase數(shù)據(jù)

    pipe
            //指定配置和表名
            .apply("Read from HBase",
                    HBaseIO.read().withConfiguration(conf).withTableId("test_tb"))
                       
            .apply(ParDo.of(new DoFn() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                 //讀到的數(shù)據(jù)是HBase API中定義的Result格式,需要按照HBase官方說明進行剝?nèi)?
                    Result result = c.element();
                    String rowkey = Bytes.toString(result.getRow());
                    System.out.println("row key: ");

                    for(Cell cell : result.listCells())
                    {
                        System.out.println("qualifier:"+Bytes.toString(CellUtil.cloneQualifier(cell)));
                        System.out.println("value:"+Bytes.toString(CellUtil.cloneValue(cell)));
                    }

                    c.output(rowkey);
                }
            }));

4.寫入到HBase

    //寫入前需要將string數(shù)據(jù)封裝為Hbase數(shù)據(jù)格式mutation
    .apply(ParDo.of(new DoFn() {
        @ProcessElement
        public void processElement(ProcessContext context) {
            byte[] qual = Bytes.toBytes("qual");
            byte[] cf = Bytes.toBytes("cf");
            byte[] row = Bytes.toBytes("kafka");
            byte[] val = Bytes.toBytes(context.element());
            final Charset UTF_8 = Charset.forName("UTF-8");
            Mutation mutation = new Put(row).addColumn(cf, qual, val);
            context.output(mutation);
        }

    }))
    .apply("write to Hbase",
            HBaseIO.write()
                    .withConfiguration(conf)
                    .withTableId("test_tb"));

經(jīng)測試,無論本地runner還是集群runner都能成功讀寫。
但是發(fā)現(xiàn)一個問題,使用mvn exec:java進行調(diào)試成功,而使用shade插件打包成jar運行卻一直報錯,說Mutation沒有指定coder,beam論壇上求助后得到的回復(fù)是maven-shade-plugin版本太舊,需要更新到3.0.0以上版本,但我改了3.0的版本之后還是一樣的錯誤。后來添加了ServicesResourceTransformer才解決。


    

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/67699.html

相關(guān)文章

  • Apache Beam采坑系列——KafkaIO

    摘要:最近在用做流上的異常檢測,期間遇到了很多問題,但是發(fā)現(xiàn)網(wǎng)上相關(guān)的資料很少,基本只能自己啃文檔和瞎嘗試。其中如有錯漏,歡迎指出。即從一條數(shù)據(jù)中獲得時間戳,然后以的格式返回。丟棄掉中的附加信息使用這一設(shè)置時,得到的中的元素是的和組成的鍵值對。 最近在用Apache beam做流上的異常檢測,期間遇到了很多問題,但是發(fā)現(xiàn)網(wǎng)上相關(guān)的資料很少,基本只能自己啃文檔和瞎嘗試。所以想把自己踩過的坑記錄...

    iliyaku 評論0 收藏0
  • Apache Beam學(xué)習(xí)筆記——幾種常見的處理類Transform

    摘要:要說在中常見的函數(shù)是哪一個,當然是。是一個實現(xiàn)了接口的抽象類,其中是數(shù)據(jù)處理方法,強制子類必須實現(xiàn)。以上為學(xué)習(xí)一天的總結(jié),有錯誤歡迎指正。相同的是這個方法處理的都是中的一個元素。 在閱讀本文前,可先看一下官方的WordCount代碼, 對Apache Beam有大概的了解。 要說在Apache Beam中常見的函數(shù)是哪一個,當然是apply()。常見的寫法如下: [Final Outp...

    Chiclaim 評論0 收藏0
  • Apache Beam的分窗與觸發(fā)器

    摘要:需要注意的是和方法生成的觸發(fā)器是連續(xù)的而不是一次性的。其他的還有一次性觸發(fā)器將一次性觸發(fā)器變?yōu)檫B續(xù)型觸發(fā)器,觸發(fā)后再次等待觸發(fā)。例如與一起用可以實現(xiàn)每個數(shù)據(jù)到達后的分鐘進行處理,經(jīng)常用于全局窗口,可以用觸發(fā)器來設(shè)置停止條件。 本文參考Apache Beam官方編程手冊 可以結(jié)合官方的Mobile Game 代碼閱讀本文。 在默認情況下,Apache Beam是不分窗的,也就是采用Gl...

    NickZhou 評論0 收藏0
  • Apache beam其他學(xué)習(xí)記錄

    摘要:與用于與的轉(zhuǎn)換。其中方法返回的是在中的位置下標。對于設(shè)置了多個觸發(fā)器的,自動選擇最后一個觸發(fā)的結(jié)算結(jié)果。其他不是線程安全的,一般建議處理方法是冪等的。 Combine與GroupByKey GroupByKey是把相關(guān)key的元素聚合到一起,通常是形成一個Iterable的value,如: cat, [1,5,9] dog, [5,2] and, [1,2,6] Combine是對聚...

    jasperyang 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<