摘要:這些切片稱為窗口。函數(shù)允許對常規(guī)數(shù)據(jù)流進(jìn)行分組。通常,這是非并行數(shù)據(jù)轉(zhuǎn)換,因為它在非分區(qū)數(shù)據(jù)流上運(yùn)行。
前言
在第一篇介紹 Flink 的文章 《《從0到1學(xué)習(xí)Flink》—— Apache Flink 介紹》 中就說過 Flink 程序的結(jié)構(gòu)
Flink 應(yīng)用程序結(jié)構(gòu)就是如上圖所示:
1、Source: 數(shù)據(jù)源,F(xiàn)link 在流處理和批處理上的 source 大概有 4 類:基于本地集合的 source、基于文件的 source、基于網(wǎng)絡(luò)套接字的 source、自定義的 source。自定義的 source 常見的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,當(dāng)然你也可以定義自己的 source。
2、Transformation:數(shù)據(jù)轉(zhuǎn)換的各種操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操作很多,可以將數(shù)據(jù)轉(zhuǎn)換計算成你想要的數(shù)據(jù)。
3、Sink:接收器,F(xiàn)link 將轉(zhuǎn)換計算后的數(shù)據(jù)發(fā)送的地點 ,你可能需要存儲下來,F(xiàn)link 常見的 Sink 大概有如下幾類:寫入文件、打印出來、寫入 socket 、自定義的 sink 。自定義的 sink 常見的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定義自己的 Sink。
在上四篇文章介紹了 Source 和 Sink:
1、《從0到1學(xué)習(xí)Flink》—— Data Source 介紹
2、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Source ?
3、《從0到1學(xué)習(xí)Flink》—— Data Sink 介紹
4、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Sink ?
那么這篇文章我們就來看下 Flink Data Transformation 吧,數(shù)據(jù)轉(zhuǎn)換操作還是蠻多的,需要好好講講!
Transformation Map這是最簡單的轉(zhuǎn)換之一,其中輸入是一個數(shù)據(jù)流,輸出的也是一個數(shù)據(jù)流:
還是拿上一篇文章的案例來將數(shù)據(jù)進(jìn)行 map 轉(zhuǎn)換操作:
SingleOutputStreamOperatormap = student.map(new MapFunction () { @Override public Student map(Student value) throws Exception { Student s1 = new Student(); s1.id = value.id; s1.name = value.name; s1.password = value.password; s1.age = value.age + 5; return s1; } }); map.print();
將每個人的年齡都增加 5 歲,其他不變。
FlatMapFlatMap 采用一條記錄并輸出零個,一個或多個記錄。
SingleOutputStreamOperatorflatMap = student.flatMap(new FlatMapFunction () { @Override public void flatMap(Student value, Collector out) throws Exception { if (value.id % 2 == 0) { out.collect(value); } } }); flatMap.print();
這里將 id 為偶數(shù)的聚集出來。
FilterFilter 函數(shù)根據(jù)條件判斷出結(jié)果。
SingleOutputStreamOperatorfilter = student.filter(new FilterFunction () { @Override public boolean filter(Student value) throws Exception { if (value.id > 95) { return true; } return false; } }); filter.print();
這里將 id 大于 95 的過濾出來,然后打印出來。
KeyByKeyBy 在邏輯上是基于 key 對流進(jìn)行分區(qū)。在內(nèi)部,它使用 hash 函數(shù)對流進(jìn)行分區(qū)。它返回 KeyedDataStream 數(shù)據(jù)流。
KeyedStreamkeyBy = student.keyBy(new KeySelector () { @Override public Integer getKey(Student value) throws Exception { return value.age; } }); keyBy.print();
上面對 student 的 age 做 KeyBy 操作分區(qū)
ReduceReduce 返回單個的結(jié)果值,并且 reduce 操作每處理一個元素總是創(chuàng)建一個新值。常用的方法有 average, sum, min, max, count,使用 reduce 方法都可實現(xiàn)。
SingleOutputStreamOperatorreduce = student.keyBy(new KeySelector () { @Override public Integer getKey(Student value) throws Exception { return value.age; } }).reduce(new ReduceFunction () { @Override public Student reduce(Student value1, Student value2) throws Exception { Student student1 = new Student(); student1.name = value1.name + value2.name; student1.id = (value1.id + value2.id) / 2; student1.password = value1.password + value2.password; student1.age = (value1.age + value2.age) / 2; return student1; } }); reduce.print();
上面先將數(shù)據(jù)流進(jìn)行 keyby 操作,因為執(zhí)行 reduce 操作只能是 KeyedStream,然后將 student 對象的 age 做了一個求平均值的操作。
FoldFold 通過將最后一個文件夾流與當(dāng)前記錄組合來推出 KeyedStream。 它會發(fā)回數(shù)據(jù)流。
KeyedStream.fold("1", new FoldFunctionAggregations() { @Override public String fold(String accumulator, Integer value) throws Exception { return accumulator + "=" + value; } })
DataStream API 支持各種聚合,例如 min,max,sum 等。 這些函數(shù)可以應(yīng)用于 KeyedStream 以獲得 Aggregations 聚合。
KeyedStream.sum(0) KeyedStream.sum("key") KeyedStream.min(0) KeyedStream.min("key") KeyedStream.max(0) KeyedStream.max("key") KeyedStream.minBy(0) KeyedStream.minBy("key") KeyedStream.maxBy(0) KeyedStream.maxBy("key")
max 和 maxBy 之間的區(qū)別在于 max 返回流中的最大值,但 maxBy 返回具有最大值的鍵, min 和 minBy 同理。
WindowWindow 函數(shù)允許按時間或其他條件對現(xiàn)有 KeyedStream 進(jìn)行分組。 以下是以 10 秒的時間窗口聚合:
inputStream.keyBy(0).window(Time.seconds(10));
Flink 定義數(shù)據(jù)片段以便(可能)處理無限數(shù)據(jù)流。 這些切片稱為窗口。 此切片有助于通過應(yīng)用轉(zhuǎn)換處理數(shù)據(jù)塊。 要對流進(jìn)行窗口化,我們需要分配一個可以進(jìn)行分發(fā)的鍵和一個描述要對窗口化流執(zhí)行哪些轉(zhuǎn)換的函數(shù)
要將流切片到窗口,我們可以使用 Flink 自帶的窗口分配器。 我們有選項,如 tumbling windows, sliding windows, global 和 session windows。 Flink 還允許您通過擴(kuò)展 WindowAssginer 類來編寫自定義窗口分配器。 這里先預(yù)留下篇文章來講解這些不同的 windows 是如何工作的。
WindowAllwindowAll 函數(shù)允許對常規(guī)數(shù)據(jù)流進(jìn)行分組。 通常,這是非并行數(shù)據(jù)轉(zhuǎn)換,因為它在非分區(qū)數(shù)據(jù)流上運(yùn)行。
與常規(guī)數(shù)據(jù)流功能類似,我們也有窗口數(shù)據(jù)流功能。 唯一的區(qū)別是它們處理窗口數(shù)據(jù)流。 所以窗口縮小就像 Reduce 函數(shù)一樣,Window fold 就像 Fold 函數(shù)一樣,并且還有聚合。
inputStream.keyBy(0).windowAll(Time.seconds(10));Union
Union 函數(shù)將兩個或多個數(shù)據(jù)流結(jié)合在一起。 這樣就可以并行地組合數(shù)據(jù)流。 如果我們將一個流與自身組合,那么它會輸出每個記錄兩次。
inputStream.union(inputStream1, inputStream2, ...);Window join
我們可以通過一些 key 將同一個 window 的兩個數(shù)據(jù)流 join 起來。
inputStream.join(inputStream1) .where(0).equalTo(1) .window(Time.seconds(5)) .apply (new JoinFunction () {...});
以上示例是在 5 秒的窗口中連接兩個流,其中第一個流的第一個屬性的連接條件等于另一個流的第二個屬性。
Split此功能根據(jù)條件將流拆分為兩個或多個流。 當(dāng)您獲得混合流并且您可能希望多帶帶處理每個數(shù)據(jù)流時,可以使用此方法。
SplitStreamSelectsplit = inputStream.split(new OutputSelector () { @Override public Iterable select(Integer value) { List output = new ArrayList (); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; } });
此功能允許您從拆分流中選擇特定流。
SplitStreamProjectsplit; DataStream even = split.select("even"); DataStream odd = split.select("odd"); DataStream all = split.select("even","odd");
Project 函數(shù)允許您從事件流中選擇屬性子集,并僅將所選元素發(fā)送到下一個處理流。
DataStream> in = // [...] DataStream > out = in.project(3,2);
上述函數(shù)從給定記錄中選擇屬性號 2 和 3。 以下是示例輸入和輸出記錄:
(1,10.0,A,B)=> (B,A) (2,20.0,C,D)=> (D,C)最后
本文主要介紹了 Flink Data 的常用轉(zhuǎn)換方式:Map、FlatMap、Filter、KeyBy、Reduce、Fold、Aggregations、Window、WindowAll、Union、Window Join、Split、Select、Project 等。并用了點簡單的 demo 介紹了如何使用,具體在項目中該如何將數(shù)據(jù)流轉(zhuǎn)換成我們想要的格式,還需要根據(jù)實際情況對待。
關(guān)注我轉(zhuǎn)載請務(wù)必注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2018/11/04/Flink-Data-transformation/
微信公眾號:zhisheng
另外我自己整理了些 Flink 的學(xué)習(xí)資料,目前已經(jīng)全部放到微信公眾號了。你可以加我的微信:zhisheng_tian,然后回復(fù)關(guān)鍵字:Flink 即可無條件獲取到。
Github 代碼倉庫https://github.com/zhisheng17/flink-learning/
以后這個項目的所有代碼都將放在這個倉庫里,包含了自己學(xué)習(xí) flink 的一些 demo 和博客
相關(guān)文章1、《從0到1學(xué)習(xí)Flink》—— Apache Flink 介紹
2、《從0到1學(xué)習(xí)Flink》—— Mac 上搭建 Flink 1.6.0 環(huán)境并構(gòu)建運(yùn)行簡單程序入門
3、《從0到1學(xué)習(xí)Flink》—— Flink 配置文件詳解
4、《從0到1學(xué)習(xí)Flink》—— Data Source 介紹
5、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Source ?
6、《從0到1學(xué)習(xí)Flink》—— Data Sink 介紹
7、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Sink ?
8、《從0到1學(xué)習(xí)Flink》—— Flink Data transformation(轉(zhuǎn)換)
9、《從0到1學(xué)習(xí)Flink》—— 介紹Flink中的Stream Windows
10、《從0到1學(xué)習(xí)Flink》—— Flink 中的幾種 Time 詳解
11、《從0到1學(xué)習(xí)Flink》—— Flink 寫入數(shù)據(jù)到 ElasticSearch
12、《從0到1學(xué)習(xí)Flink》—— Flink 項目如何運(yùn)行?
13、《從0到1學(xué)習(xí)Flink》—— Flink 寫入數(shù)據(jù)到 Kafka
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/72976.html
showImg(https://segmentfault.com/img/remote/1460000019961426); 今天在 Apache Flink meetup ·北京站進(jìn)行 Flink 1.9 重大新特性進(jìn)行了講解,兩位講師分別是 戴資力/楊克特,zhisheng 我也從看完了整個 1.9 特性解讀的直播,預(yù)計 Flink 1.9 版本正式發(fā)布時間大概是 7 月底 8 月初左右正式發(fā)...
摘要:模塊中的類結(jié)構(gòu)如下博客從到學(xué)習(xí)介紹從到學(xué)習(xí)上搭建環(huán)境并構(gòu)建運(yùn)行簡單程序入門從到學(xué)習(xí)配置文件詳解從到學(xué)習(xí)介紹從到學(xué)習(xí)如何自定義從到學(xué)習(xí)介紹從到學(xué)習(xí)如何自定義從到學(xué)習(xí)轉(zhuǎn)換從到學(xué)習(xí)介紹中的從到學(xué)習(xí)中的幾種詳解從到學(xué)習(xí)讀取數(shù)據(jù)寫入到從到學(xué) Flink-Client 模塊中的類結(jié)構(gòu)如下: https://t.zsxq.com/IMzNZjY showImg(https://segmentfau...
閱讀 3483·2021-09-22 15:02
閱讀 3530·2021-09-02 15:21
閱讀 2144·2019-08-30 15:55
閱讀 2794·2019-08-30 15:44
閱讀 791·2019-08-29 16:56
閱讀 2423·2019-08-23 18:22
閱讀 3351·2019-08-23 12:20
閱讀 3099·2019-08-23 11:28