成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

淺析 Flink Table/SQL API

soasme / 3127人閱讀

摘要:對批處理表的查詢不支持,和很多中常見的標量函數(shù)。此外,可以同時在靜態(tài)表和流表上進行查詢,這和的愿景是一樣的,將批處理看做特殊的流處理批看作是有限的流。最后,使用標準進行流處理意味著有很多成熟的工具支持。查詢結(jié)果直接顯示在中。

從何而來

關系型API有很多好處:是聲明式的,用戶只需要告訴需要什么,系統(tǒng)決定如何計算;用戶不必特地實現(xiàn);更方便優(yōu)化,可以執(zhí)行得更高效。本身Flink就是一個統(tǒng)一批和流的分布式計算平臺,所以社區(qū)設計關系型API的目的之一是可以讓關系型API作為統(tǒng)一的一層,兩種查詢擁有同樣的語義和語法。大多數(shù)流處理框架的API都是比較low-level的API,學習成本高而且很多邏輯需要寫到UDF中,所以Apache Flink 添加了SQL-like的API處理關系型數(shù)據(jù)--Table API。這套API中最重要的概念是Table(可以在上面進行關系型操作的結(jié)構(gòu)化的DataSet或DataStream)。Table API 與 DataSetDataStream API 結(jié)合緊密,DataSet 和 DataStream都可以很容易地轉(zhuǎn)換成 Table,同樣轉(zhuǎn)換回來也很方便:

val execEnv = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(execEnv)

// obtain a DataSet from somewhere
val tempData: DataSet[(String, Long, Double)] =

// convert the DataSet to a Table
val tempTable: Table = tempData.toTable(tableEnv, "location, "time, "tempF)
// compute your result
val avgTempCTable: Table = tempTable
 .where("location.like("room%"))
 .select(
   ("time / (3600 * 24)) as "day, 
   "Location as "room, 
   (("tempF - 32) * 0.556) as "tempC
  )
 .groupBy("day, "room)
 .select("day, "room, "tempC.avg as "avgTempC)
// convert result Table back into a DataSet and print it
avgTempCTable.toDataSet[Row].print()

example使用的是Scala的API,Java版API也有同樣的功能。

下圖展示了 Table API 的架構(gòu):

從 DataSet 或 DataStream 創(chuàng)建一個 Table,然后在上面進行關系型操作比如 fliter、join、select。對Table的操作將會轉(zhuǎn)換成邏輯運算符樹。Table 轉(zhuǎn)換回 DataSet 和 DataStream 的時候?qū)D(zhuǎn)換成DataSet 和 DataStream的算子。有些類似 "location.like("room%") 的表達式將會通過 code generation 編譯成Flink的函數(shù)。

然而,最初傳統(tǒng)的Table API 有一定的限制。首先,它不能獨立使用。Table API 的 query 必須嵌入到 DataSet 或 DataStream的程序中。對批處理表的查詢不支持outer join,sorting和很多SQL中常見的標量函數(shù)。對于流處理的查詢只支持filtetr unionprojection,不支持aggregationjoin。而且,轉(zhuǎn)換過程中沒有利用太多查詢優(yōu)化技術,除了適用于所有DataSet程序的優(yōu)化。

Table API 和 SQL 緊密結(jié)合

隨著流處理的日益普及和Flink在該領域的增長,F(xiàn)link社區(qū)認為需要一個更簡單的API使更多的用戶能夠分析流數(shù)據(jù)。一年前Flink社區(qū)決定將Table API提升到一個新的層級,擴展Table API中流處理的能力以及支持SQL。社區(qū)不想重復造輪子,于是決定在 Apache Calcite (一個比較流行的SQL解析和優(yōu)化框架)的基礎上構(gòu)建新的 Table API。Apache Calcite 被用在很多項目中,包括 Apache Hive,Apache Drill,Cascading等等。除此之外,Calcite社區(qū)將 SQL on Stream 寫入它的roadmap,所以Flink的SQL很適合和它結(jié)合。

以Calcite為核心的新架構(gòu)圖:

新架構(gòu)提供兩種API進行關系型查詢,Table API 和 SQL。這兩種API的查詢都會用包含注冊過的Table的catalog進行驗證,然后轉(zhuǎn)換成統(tǒng)一Calcite的logical plan。在這種表示中,stream和batch的查詢看起來完全一樣。下一步,利用 Calcite的 cost-based 優(yōu)化器優(yōu)化轉(zhuǎn)換規(guī)則和logical plan。根據(jù)數(shù)據(jù)源的性質(zhì)(流式和靜態(tài))使用不同的規(guī)則進行優(yōu)化。最終優(yōu)化后的plan轉(zhuǎn)傳成常規(guī)的Flink DataSet 或 DataStream 程序。這步還涉及code generation(將關系表達式轉(zhuǎn)換成Flink函數(shù))。

下面我們舉一個例子來理解新的架構(gòu)。表達式轉(zhuǎn)換成Logical Plan如下圖所示:

調(diào)用Table API 實際上是創(chuàng)建了很多 Table API 的 LogicalNode,創(chuàng)建的過程中對會對整個query進行validate。比如table是CalalogNode,window groupBy之后在select時會創(chuàng)建WindowAggregateProject,where對應Filter。然后用RelBuilder翻譯成Calcite LogicalPlan。如果是SQL API 將直接用Calcite的Parser進行解釋然后validate生成Calcite LogicalPlan。

利用Calcite內(nèi)置的一些rule來優(yōu)化LogicalPlan,也可以自己添加或者覆蓋這些rule。轉(zhuǎn)換成Optimized Calcite Plan后,仍然是Calcite的內(nèi)部表示方式,現(xiàn)在需要transform成DataStream Plan,對應上圖第三列的類,里面封裝了如何translate成普通的DataStream或DataSet程序。隨后調(diào)用相應的tanslateToPlan方法轉(zhuǎn)換和利用CodeGen元編程成Flink的各種算子?,F(xiàn)在就相當于我們直接利用Flink的DataSet或DataStream API開發(fā)的程序。

Table API的新架構(gòu)除了維持最初的原理還改進了很多。為流式數(shù)據(jù)和靜態(tài)數(shù)據(jù)的關系查詢保留統(tǒng)一的接口,而且利用了Calcite的查詢優(yōu)化框架和SQL parser。該設計是基于Flink已構(gòu)建好的API構(gòu)建的,DataStream API 提供低延時高吞吐的流處理能力而且就有exactly-once語義而且可以基于event-time進行處理。而且DataSet擁有穩(wěn)定高效的內(nèi)存算子和流水線式的數(shù)據(jù)交換。Flink的core API和引擎的所有改進都會自動應用到Table API和SQL上。

新的SQL接口集成到了Table API中。DataSteam, DataSet和外部數(shù)據(jù)源可以在TableEnvironment中注冊成表,為了是他們可以通過SQL進行查詢。TableEnvironment.sql()方法用來聲明SQL和將結(jié)果作為Table返回。下面的是一個完整的樣例,從一個JSON編碼的Kafka topic中讀取流表,然后用SQL處理并寫到另一個Kafka topic。

// get environments
val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(execEnv)

// configure Kafka connection
val kafkaProps = ...
// define a JSON encoded Kafka topic as external table
val sensorSource = new KafkaJsonSource[(String, Long, Double)](
    "sensorTopic",
    kafkaProps,
    ("location", "time", "tempF"))

// register external table
tableEnv.registerTableSource("sensorData", sensorSource)

// define query in external table
val roomSensors: Table = tableEnv.sql(
    "SELECT STREAM time, location AS room, (tempF - 32) * 0.556 AS tempC " +
    "FROM sensorData " +
    "WHERE location LIKE "room%""
  )

// define a JSON encoded Kafka topic as external sink
val roomSensorSink = new KafkaJsonSink(...)

// define sink for room sensor data and execute query
roomSensors.toSink(roomSensorSink)
execEnv.execute()

這個樣例中忽略了流處理中最有趣的部分:window aggregate 和 join。這些操作如何用SQL表達呢?Apache Calcite社區(qū)提出了一個proposal來討論SQL on streams的語法和語義。社區(qū)將Calcite的stream SQL描述為標準SQL的擴展而不是另外的 SQL-like語言。這有很多好處,首先,熟悉SQL標準的人能夠在不學習新語法的情況下分析流數(shù)據(jù)。靜態(tài)表和流表的查詢幾乎相同,可以輕松地移植。此外,可以同時在靜態(tài)表和流表上進行查詢,這和flink的愿景是一樣的,將批處理看做特殊的流處理(批看作是有限的流)。最后,使用標準SQL進行流處理意味著有很多成熟的工具支持。

下面的example展示了如何用SQL和Table API進行滑動窗口查詢:

SQL

SELECT STREAM
  TUMBLE_END(time, INTERVAL "1" DAY) AS day,
  location AS room,
  AVG((tempF - 32) * 0.556) AS avgTempC
FROM sensorData
WHERE location LIKE "room%"
GROUP BY TUMBLE(time, INTERVAL "1" DAY), location

Table API

val avgRoomTemp: Table = tableEnv.ingest("sensorData")
  .where("location.like("room%"))
  .partitionBy("location)
  .window(Tumbling every Days(1) on "time as "w)
  .select("w.end, "location, , (("tempF - 32) * 0.556).avg as "avgTempCs)
Table API的現(xiàn)狀 Batch SQL & Table API 支持:

Selection, Projection, Sort, Inner & Outer Joins, Set operations

Windows for Slide, Tumble, Session

Streaming Table API 支持:

Selection, Projection, Union

Windows for Slide, Tumble, Session

Streaming SQL:

Selection, Projection, Union, Tumble

Streaming SQL案例 持續(xù)的ETL和數(shù)據(jù)導入

獲取流式數(shù)據(jù),然后轉(zhuǎn)換這些數(shù)據(jù)(歸一化,聚合...),將其寫入其他系統(tǒng)(File,Kafka,DBMS)。這些query的結(jié)果通常會存儲到log-style的系統(tǒng)。

實時的Dashboards 和 報表

獲取流式數(shù)據(jù),然后對數(shù)據(jù)進行聚合來支持在線系統(tǒng)(dashboard,推薦)或者數(shù)據(jù)分析系統(tǒng)(Tableau)。通常結(jié)果被寫到k-v存儲中(Cassandra,Hbase,可查詢的Flink狀態(tài)),建立索引(Elasticsearch)或者DBMS(MySQL,PostgreSQL...)。這些查詢通??梢员桓?,改進。

即席分析

針對流數(shù)據(jù)的即席查詢,以實時的方式進行分析和瀏覽數(shù)據(jù)。查詢結(jié)果直接顯示在notebook(Apache Zeppelin)中。

Flink社區(qū)還提出來和數(shù)據(jù)庫中Materialized View很相似的Dynamic table 動態(tài)表概念,將在以后的版本中支持,具體細節(jié)將另開文章解釋。

文章版權歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/66881.html

相關文章

  • Flink實戰(zhàn)(六) - Table API & SQL編程

    摘要:每個在簡潔性和表達性之間提供不同的權衡,并針對不同的用例。在這些中處理的數(shù)據(jù)類型在相應的編程語言中表示為類。該是為中心的聲明性表,其可被動態(tài)地改變的表表示流時。這種抽象在語義和表達方面類似于,但是將程序表示為查詢表達式。 1 意義 1.1 分層的 APIs & 抽象層次 Flink提供三層API。 每個API在簡潔性和表達性之間提供不同的權衡,并針對不同的用例。 showImg(ht...

    lifefriend_007 評論0 收藏0
  • Flink1.7穩(wěn)定版發(fā)布:新增功能為企業(yè)生產(chǎn)帶來哪些好處

    摘要:通過狀態(tài)演變,可以在狀態(tài)模式中添加或刪除列,以便更改應用程序部署后應捕獲的業(yè)務功能。本地恢復通過擴展的調(diào)度來完成本地恢復功能,以便在恢復時考慮先前的部署位置。此功能大大提高了恢復速度。問題導讀1.Flink1.7開始支持Scala哪個版本?2.Flink1.7狀態(tài)演變在實際生產(chǎn)中有什么好處?3.支持SQL/Table API中的富集連接可以做那些事情?4.Flink1.7新增了哪些連接器Ap...

    Hwg 評論0 收藏0
  • 《從0到1學習Flink》—— Apache Flink 介紹

    摘要:擴展庫還包括用于復雜事件處理,機器學習,圖形處理和兼容性的專用代碼庫。事件時間機制使得那些事件無序到達甚至延遲到達的數(shù)據(jù)流能夠計算出精確的結(jié)果。負責接受用戶的程序代碼,然后創(chuàng)建數(shù)據(jù)流,將數(shù)據(jù)流提交給以便進一步執(zhí)行。 showImg(https://segmentfault.com/img/remote/1460000016902812); 前言 Flink 是一種流式計算框架,為什么我...

    flyer_dev 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<