日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產(chǎn)品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
ApacheFlink漫談系列(13)-TableAPI概述

一、什么是Table API

創(chuàng)新互聯(lián)專注于如皋企業(yè)網(wǎng)站建設,響應式網(wǎng)站開發(fā),商城網(wǎng)站定制開發(fā)。如皋網(wǎng)站建設公司,為如皋等地區(qū)提供建站服務。全流程按需網(wǎng)站制作,專業(yè)設計,全程項目跟蹤,創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務

在《Apache Flink 漫談系列(08) - SQL概覽》中我們概要的向大家介紹了什么是好SQL,SQL和Table API是Apache Flink中的同一層次的API抽象,如下圖所示:

Apache Flink 針對不同的用戶場景提供了三層用戶API,最下層ProcessFunction API可以對State,Timer等復雜機制進行有效的控制,但用戶使用的便捷性很弱,也就是說即使很簡單統(tǒng)計邏輯,也要較多的代碼開發(fā)。第二層DataStream API對窗口,聚合等算子進行了封裝,用戶的便捷性有所增強。最上層是SQL/Table API,Table API是Apache Flink中的聲明式,可被查詢優(yōu)化器優(yōu)化的高級分析API。

二、Table API的特點

Table API和SQL都是Apache Flink中最高層的分析API,SQL所具備的特點Table API也都具有,如下:

  • 聲明式 - 用戶只關心做什么,不用關心怎么做;
  • 高性能 - 支持查詢優(yōu)化,可以獲取最好的執(zhí)行性能;
  • 流批統(tǒng)一 - 相同的統(tǒng)計邏輯,既可以流模式運行,也可以批模式運行;
  • 標準穩(wěn)定 - 語義遵循SQL標準,語法語義明確,不易變動。

當然除了SQL的特性,因為Table API是在Flink中專門設計的,所以Table API還具有自身的特點:

  • 表達方式的擴展性 - 在Flink中可以為Table API開發(fā)很多便捷性功能,如:Row.flatten(), map/flatMap 等
  • 功能的擴展性 - 在Flink中可以為Table API擴展更多的功能,如:Iteration,flatAggregate 等新功能
  • 編譯檢查 - Table API支持java和scala語言開發(fā),支持IDE中進行編譯檢查。

說明:上面說的map/flatMap/flatAggregate都是Apache Flink 社區(qū) FLIP-29 中規(guī)劃的新功能。

三、HelloWorld

在介紹Table API所有算子之前我們先編寫一個簡單的HelloWorld來直觀了解如何進行Table API的開發(fā)。

1. Maven 依賴

在pom文件中增加如下配置,本篇以flink-1.7.0功能為準進行后續(xù)介紹。

 
 
 
 
  1. 1.7.0
  2. org.apache.flink
  3. flink-table_2.11
  4. ${table.version}
  5. org.apache.flink
  6. flink-scala_2.11
  7. ${table.version}
  8. org.apache.flink
  9. flink-streaming-scala_2.11
  10. ${table.version}
  11. org.apache.flink
  12. flink-streaming-java_2.11
  13. ${table.version}

2. 程序結構

在編寫第一Flink Table API job之前我們先簡單了解一下Flink Table API job的結構,如下圖所示:

  • 外部數(shù)據(jù)源,比如Kafka, Rabbitmq, CSV 等等;
  • 查詢計算邏輯,比如最簡單的數(shù)據(jù)導入select,雙流Join,Window Aggregate 等;
  • 外部結果存儲,比如Kafka,Cassandra,CSV等。

說明:1和3 在Apache Flink中統(tǒng)稱為Connector。

3. 主程序

我們以一個統(tǒng)計單詞數(shù)量的業(yè)務場景,編寫第一個HelloWorld程序。

根據(jù)上面Flink job基本結構介紹,要Table API完成WordCount的計算需求,我們需要完成三部分代碼:

  • TableSoruce Code - 用于創(chuàng)建數(shù)據(jù)源的代碼
  • Table API Query - 用于進行word count統(tǒng)計的Table API 查詢邏輯
  • TableSink Code - 用于保存word count計算結果的結果表代碼

(1) 運行模式選擇

一個job我們要選擇是Stream方式運行還是Batch模式運行,所以任何統(tǒng)計job的第一步是進行運行模式選擇,如下我們選擇Stream方式運行。

 
 
 
 
  1. // Stream運行環(huán)境
  2. val env = StreamExecutionEnvironment.getExecutionEnvironment
  3. val tEnv = TableEnvironment.getTableEnvironment(env)

(2) 構建測試Source

我們用最簡單的構建Source方式進行本次測試,代碼如下:

 
 
 
 
  1. // 測試數(shù)據(jù)
  2. val data = Seq("Flink", "Bob", "Bob", "something", "Hello", "Flink", "Bob")
  3. // 最簡單的獲取Source方式
  4. val source = env.fromCollection(data).toTable(tEnv, 'word)

(3) WordCount 統(tǒng)計邏輯

WordCount核心統(tǒng)計邏輯就是按照單詞分組,然后計算每個單詞的數(shù)量,統(tǒng)計邏輯如下:

 
 
 
 
  1. // 單詞統(tǒng)計核心邏輯
  2. val result = source
  3. .groupBy('word) // 單詞分組
  4. .select('word, 'word.count) // 單詞統(tǒng)計

(4) 定義Sink

將WordCount的統(tǒng)計結果寫入Sink中,代碼如下:

 
 
 
 
  1. // 自定義Sink
  2. val sink = new RetractSink // 自定義Sink(下面有完整代碼)
  3. // 計算結果寫入sink
  4. result.toRetractStream[(String, Long)].addSink(sink)

(5) 完整的HelloWord代碼

為了方便大家運行WordCount查詢統(tǒng)計,將完整的代碼分享大家(基于flink-1.7.0),如下:

 
 
 
 
  1. import org.apache.flink.api.scala._
  2. import org.apache.flink.configuration.Configuration
  3. import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
  4. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  5. import org.apache.flink.table.api.TableEnvironment
  6. import org.apache.flink.table.api.scala._
  7. import scala.collection.mutable
  8. object HelloWord {
  9. def main(args: Array[String]): Unit = {
  10. // 測試數(shù)據(jù)
  11. val data = Seq("Flink", "Bob", "Bob", "something", "Hello", "Flink", "Bob")
  12. // Stream運行環(huán)境
  13. val env = StreamExecutionEnvironment.getExecutionEnvironment
  14. val tEnv = TableEnvironment.getTableEnvironment(env)
  15. // 最簡單的獲取Source方式
  16. val source = env.fromCollection(data).toTable(tEnv, 'word)
  17. // 單詞統(tǒng)計核心邏輯
  18. val result = source
  19. .groupBy('word) // 單詞分組
  20. .select('word, 'word.count) // 單詞統(tǒng)計
  21. // 自定義Sink
  22. val sink = new RetractSink
  23. // 計算結果寫入sink
  24. result.toRetractStream[(String, Long)].addSink(sink)
  25. env.execute
  26. }
  27. }
  28. class RetractSink extends RichSinkFunction[(Boolean, (String, Long))] {
  29. private var resultSet: mutable.Set[(String, Long)] = _
  30. override def open(parameters: Configuration): Unit = {
  31. // 初始化內(nèi)存存儲結構
  32. resultSet = new mutable.HashSet[(String, Long)]
  33. }
  34. override def invoke(v: (Boolean, (String, Long)), context: SinkFunction.Context[_]): Unit = {
  35. if (v._1) {
  36. // 計算數(shù)據(jù)
  37. resultSet.add(v._2)
  38. }
  39. else {
  40. // 撤回數(shù)據(jù)
  41. resultSet.remove(v._2)
  42. }
  43. }
  44. override def close(): Unit = {
  45. // 打印寫入sink的結果數(shù)據(jù)
  46. resultSet.foreach(println)
  47. }
  48. }

運行結果如下:

雖然上面用了較長的紙墨介紹簡單的WordCount統(tǒng)計邏輯,但source和sink部分都是可以在學習后面算子中被復用的。本例核心的統(tǒng)計邏輯只有一行代碼:

 
 
 
 
  1. source.groupBy('word).select('word, 'word.count)

所以Table API開發(fā)技術任務非常的簡潔高效。

四、Table API 算子

雖然Table API與SQL的算子語義一致,但在表達方式上面SQL以文本的方式展現(xiàn),Table API是以java或者scala語言的方式進行開發(fā)。為了大家方便閱讀,即便是在《Apache Flink 漫談系列(08) - SQL概覽》中介紹過的算子,在這里也會再次進行介紹,當然對于Table API和SQL不同的地方會進行詳盡介紹。

1. 示例數(shù)據(jù)及測試類

(1) 測試數(shù)據(jù)

  • customer_tab 表 - 客戶表保存客戶id,客戶姓名和客戶描述信息。字段及測試數(shù)據(jù)如下:
  • order_tab 表 - 訂單表保存客戶購買的訂單信息,包括訂單id,訂單時間和訂單描述信息。 字段節(jié)測試數(shù)據(jù)如下:
  • Item_tab商品表, 攜帶商品id,商品類型,出售時間,價格等信息,具體如下:
  • PageAccess_tab頁面訪問表,包含用戶ID,訪問時間,用戶所在地域信息,具體數(shù)據(jù)如下:
  • PageAccessCount_tab頁面訪問表,訪問量,訪問時間,用戶所在地域信息,具體數(shù)據(jù)如下:
  • PageAccessSession_tab頁面訪問表,訪問量,訪問時間,用戶所在地域信息,具體數(shù)據(jù)如下:

(2) 測試類

我們創(chuàng)建一個TableAPIOverviewITCase.scala 用于接下來介紹Flink Table API算子的功能體驗。代碼如下:

 
 
 
 
  1. import org.apache.flink.api.scala._
  2. import org.apache.flink.runtime.state.StateBackend
  3. import org.apache.flink.runtime.state.memory.MemoryStateBackend
  4. import org.apache.flink.streaming.api.TimeCharacteristic
  5. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
  6. import org.apache.flink.streaming.api.functions.source.SourceFunction
  7. import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
  8. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  9. import org.apache.flink.streaming.api.watermark.Watermark
  10. import org.apache.flink.table.api.{Table, TableEnvironment}
  11. import org.apache.flink.table.api.scala._
  12. import org.apache.flink.types.Row
  13. import org.junit.rules.TemporaryFolder
  14. import org.junit.{Rule, Test}
  15. import scala.collection.mutable
  16. import scala.collection.mutable.ArrayBuffer
  17. class Table APIOverviewITCase {
  18. // 客戶表數(shù)據(jù)
  19. val customer_data = new mutable.MutableList[(String, String, String)]
  20. customer_data.+=(("c_001", "Kevin", "from JinLin"))
  21. customer_data.+=(("c_002", "Sunny", "from JinLin"))
  22. customer_data.+=(("c_003", "JinCheng", "from HeBei"))
  23. // 訂單表數(shù)據(jù)
  24. val order_data = new mutable.MutableList[(String, String, String, String)]
  25. order_data.+=(("o_001", "c_002", "2018-11-05 10:01:01", "iphone"))
  26. order_data.+=(("o_002", "c_001", "2018-11-05 10:01:55", "ipad"))
  27. order_data.+=(("o_003", "c_001", "2018-11-05 10:03:44", "flink book"))
  28. // 商品銷售表數(shù)據(jù)
  29. val item_data = Seq(
  30. Left((1510365660000L, (1510365660000L, 20, "ITEM001", "Electronic"))),
  31. Right((1510365660000L)),
  32. Left((1510365720000L, (1510365720000L, 50, "ITEM002", "Electronic"))),
  33. Right((1510365720000L)),
  34. Left((1510365780000L, (1510365780000L, 30, "ITEM003", "Electronic"))),
  35. Left((1510365780000L, (1510365780000L, 60, "ITEM004", "Electronic"))),
  36. Right((1510365780000L)),
  37. Left((1510365900000L, (1510365900000L, 40, "ITEM005", "Electronic"))),
  38. Right((1510365900000L)),
  39. Left((1510365960000L, (1510365960000L, 20, "ITEM006", "Electronic"))),
  40. Right((1510365960000L)),
  41. Left((1510366020000L, (1510366020000L, 70, "ITEM007", "Electronic"))),
  42. Right((1510366020000L)),
  43. Left((1510366080000L, (1510366080000L, 20, "ITEM008", "Clothes"))),
  44. Right((151036608000L)))
  45. // 頁面訪問表數(shù)據(jù)
  46. val pageAccess_data = Seq(
  47. Left((1510365660000L, (1510365660000L, "ShangHai", "U0010"))),
  48. Right((1510365660000L)),
  49. Left((1510365660000L, (1510365660000L, "BeiJing", "U1001"))),
  50. Right((1510365660000L)),
  51. Left((1510366200000L, (1510366200000L, "BeiJing", "U2032"))),
  52. Right((1510366200000L)),
  53. Left((1510366260000L, (1510366260000L, "BeiJing", "U1100"))),
  54. Right((1510366260000L)),
  55. Left((1510373400000L, (1510373400000L, "ShangHai", "U0011"))),
  56. Right((1510373400000L)))
  57. // 頁面訪問量表數(shù)據(jù)2
  58. val pageAccessCount_data = Seq(
  59. Left((1510365660000L, (1510365660000L, "ShangHai", 100))),
  60. Right((1510365660000L)),
  61. Left((1510365660000L, (1510365660000L, "BeiJing", 86))),
  62. Right((1510365660000L)),
  63. Left((1510365960000L, (1510365960000L, "BeiJing", 210))),
  64. Right((1510366200000L)),
  65. Left((1510366200000L, (1510366200000L, "BeiJing", 33))),
  66. Right((1510366200000L)),
  67. Left((1510373400000L, (1510373400000L, "ShangHai", 129))),
  68. Right((1510373400000L)))
  69. // 頁面訪問表數(shù)據(jù)3
  70. val pageAccessSession_data = Seq(
  71. Left((1510365660000L, (1510365660000L, "ShangHai", "U0011"))),
  72. Right((1510365660000L)),
  73. Left((1510365720000L, (1510365720000L, "ShangHai", "U0012"))),
  74. Right((1510365720000L)),
  75. Left((1510365720000L, (1510365720000L, "ShangHai", "U0013"))),
  76. Right((1510365720000L)),
  77. Left((1510365900000L, (1510365900000L, "ShangHai", "U0015"))),
  78. Right((1510365900000L)),
  79. Left((1510366200000L, (1510366200000L, "ShangHai", "U0011"))),
  80. Right((1510366200000L)),
  81. Left((1510366200000L, (1510366200000L, "BeiJing", "U2010"))),
  82. Right((1510366200000L)),
  83. Left((1510366260000L, (1510366260000L, "ShangHai", "U0011"))),
  84. Right((1510366260000L)),
  85. Left((1510373760000L, (1510373760000L, "ShangHai", "U0410"))),
  86. Right((1510373760000L)))
  87. val _tempFolder = new TemporaryFolder
  88. // Streaming 環(huán)境
  89. val env = StreamExecutionEnvironment.getExecutionEnvironment
  90. val tEnv = TableEnvironment.getTableEnvironment(env)
  91. env.setParallelism(1)
  92. env.setStateBackend(getStateBackend)
  93. def getProcTimeTables(): (Table, Table) = {
  94. // 將order_tab, customer_tab 注冊到catalog
  95. val customer = env.fromCollection(customer_data).toTable(tEnv).as('c_id, 'c_name, 'c_desc)
  96. val order = env.fromCollection(order_data).toTable(tEnv).as('o_id, 'c_id, 'o_time, 'o_desc)
  97. (customer, order)
  98. }
  99. def getEventTimeTables(): (Table, Table, Table, Table) = {
  100. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  101. // 將item_tab, pageAccess_tab 注冊到catalog
  102. val item =
  103. env.addSource(new EventTimeSourceFunction[(Long, Int, String, String)](item_data))
  104. .toTable(tEnv, 'onSellTime, 'price, 'itemID, 'itemType, 'rowtime.rowtime)
  105. val pageAccess =
  106. env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccess_data))
  107. .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)
  108. val pageAccessCount =
  109. env.addSource(new EventTimeSourceFunction[(Long, String, Int)](pageAccessCount_data))
  110. .toTable(tEnv, 'accessTime, 'region, 'accessCount, 'rowtime.rowtime)
  111. val pageAccessSession =
  112. env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccessSession_data))
  113. .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)
  114. (item, pageAccess, pageAccessCount, pageAccessSession)
  115. }
  116. @Rule
  117. def tempFolder: TemporaryFolder = _tempFolder
  118. def getStateBackend: StateBackend = {
  119. new MemoryStateBackend()
  120. }
  121. def procTimePrint(result: Table): Unit = {
  122. val sink = new RetractingSink
  123. result.toRetractStream[Row].addSink(sink)
  124. env.execute()
  125. }
  126. def rowTimePrint(result: Table): Unit = {
  127. val sink = new RetractingSink
  128. result.toRetractStream[Row].addSink(sink)
  129. env.execute()
  130. }
  131. @Test
  132. def testProc(): Unit = {
  133. val (customer, order) = getProcTimeTables()
  134. val result = ...// 測試的查詢邏輯
  135. procTimePrint(result)
  136. }
  137. @Test
  138. def testEvent(): Unit = {
  139. val (item, pageAccess, pageAccessCount, pageAccessSession) = getEventTimeTables()
  140. val result = ...// 測試的查詢邏輯
  141. procTimePrint(result)
  142. }
  143. }
  144. // 自定義Sink
  145. final class RetractingSink extends RichSinkFunction[(Boolean, Row)] {
  146. var retractedResults: ArrayBuffer[String] = null
  147. override def open(parameters: Configuration): Unit = {
  148. super.open(parameters)
  149. retractedResults = mutable.ArrayBuffer.empty[String]
  150. }
  151. def invoke(v: (Boolean, Row)) {
  152. retractedResults.synchronized {
  153. val vvalue = v._2.toString
  154. if (v._1) {
  155. retractedResults += value
  156. } else {
  157. val idx = retractedResults.indexOf(value)
  158. if (idx >= 0) {
  159. retractedResults.remove(idx)
  160. } else {
  161. throw new RuntimeException("Tried to retract a value that wasn't added first. " +
  162. "This is probably an incorrectly implemented test. " +
  163. "Try to set the parallelism of the sink to 1.")
  164. }
  165. }
  166. }
  167. }
  168. override def close(): Unit = {
  169. super.close()
  170. retractedResults.sorted.foreach(println(_))
  171. }
  172. }
  173. // Water mark 生成器
  174. class EventTimeSourceFunction[T](
  175. dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
  176. override def run(ctx: SourceContext[T]): Unit = {
  177. dataWithTimestampList.foreach {
  178. case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
  179. case Right(w) => ctx.emitWatermark(new Watermark(w))
  180. }
  181. }
  182. override def cancel(): Unit = ???
  183. }

2. SELECT

SELECT 用于從數(shù)據(jù)集/流中選擇數(shù)據(jù),語義是關系代數(shù)中的投影(Projection),對關系進行垂直分割,消去或增加某些列, 如下圖所示:

(1) Table API 示例

從customer_tab選擇用戶姓名,并用內(nèi)置的CONCAT函數(shù)拼接客戶信息,如下:

 
 
 
 
  1. val result = customer
  2. .select('c_name, concat_ws('c_name, " come ", 'c_desc))

(2) Result

(3) 特別說明

大家看到在 SELECT 不僅可以使用普通的字段選擇,還可以使用ScalarFunction,當然也包括User-Defined Function,同時還可以進行字段的alias設置。其實SELECT可以結合聚合,在GROUPBY部分會進行介紹,一個比較特殊的使用場景是去重的場景,示例如下:

  • Table API示例

在訂單表查詢所有的客戶id,消除重復客戶id, 如下:

 
 
 
 
  1. val result = order
  2. .groupBy('c_id)
  3. .select('c_id)
  • Result

3. WHERE

WHERE 用于從數(shù)據(jù)集/流中過濾數(shù)據(jù),與SELECT一起使用,語義是關系代數(shù)的Selection,根據(jù)某些條件對關系做水平分割,即選擇符合條件的記錄,如下所示:

(1) Table API 示例

在customer_tab查詢客戶id為c_001和c_003的客戶信息,如下:

 
 
 
 
  1. val result = customer
  2. .where("c_|| c_")
  3. .select( 'c_id, 'c_name, 'c_desc)

(2) Result

(3) 特別說明

我們發(fā)現(xiàn)WHERE是對滿足一定條件的數(shù)據(jù)進行過濾,WHERE支持=, <, >, <>, >=, <=以及&&, ||等表達式的組合,最終滿足過濾條件的數(shù)據(jù)會被選擇出來。 SQL中的IN和NOT IN在Table API里面用intersect 和 minus描述(flink-1.7.0版本)。

  • Intersect 示例

Intersect只在Batch模式下進行支持,Stream模式下我們可以利用雙流JOIN來實現(xiàn),如:在customer_tab查詢已經(jīng)下過訂單的客戶信息,如下:

 
 
 
 
  1. // 計算客戶id,并去重
  2. val distinct_cids = order
  3. .groupBy('c_id) // 去重
  4. .select('c_id as 'o_c_id)
  5. val result = customer
  6. .join(distinct_cids, 'c_id === 'o_c_id)
  7. .select('c_id, 'c_name, 'c_desc)
  • Result

  • Minus 示例

Minus只在Batch模式下進行支持,Stream模式下我們可以利用雙流JOIN來實現(xiàn),如:在customer_tab查詢沒有下過訂單的客戶信息,如下:

 
 
 
 
  1. // 查詢下過訂單的客戶id,并去重
  2. val distinct_cids = order
  3. .groupBy('c_id)
  4. .select('c_id as 'o_c_id)
  5. // 查詢沒有下過訂單的客戶信息
  6. val result = customer
  7. .leftOuterJoin(distinct_cids, 'c_id === 'o_c_id)
  8. .where('o_c_id isNull)
  9. .select('c_id, 'c_name, 'c_desc)

說明上面實現(xiàn)邏輯比較復雜,我們后續(xù)考慮如何在流上支持更簡潔的方式。

  • Result

  • Intersect/Minus與關系代數(shù)

如上介紹Intersect是關系代數(shù)中的Intersection, Minus是關系代數(shù)的Difference, 如下圖示意:

a. Intersect(Intersection):

b. Minus(Difference):

4. GROUP BY

GROUP BY 是對數(shù)據(jù)進行分組的操作,比如我需要分別計算一下一個學生表里面女生和男生的人數(shù)分別是多少,如下:

(1) Table API 示例

將order_tab信息按c_id分組統(tǒng)計訂單數(shù)量,簡單示例如下:

 
 
 
 
  1. val result = order
  2. .groupBy('c_id)
  3. .select('c_id, 'o_id.count)

(2) Result

(3) 特別說明

在實際的業(yè)務場景中,GROUP BY除了按業(yè)務字段進行分組外,很多時候用戶也可以用時間來進行分組(相當于劃分窗口),比如統(tǒng)計每分鐘的訂單數(shù)量:

  • Table API 示例

按時間進行分組,查詢每分鐘的訂單數(shù)量,如下:

 
 
 
 
  1. ```
  2. val result = order
  3. .select('o_id, 'c_id, 'o_time.substring(1, 16) as 'o_time_min)
  4. .groupBy('o_time_min)
  5. .select('o_time_min, 'o_id.count)
  6. ```
  • Result

說明:如果我們時間字段是timestamp類型,建議使用內(nèi)置的 DATE_FORMAT 函數(shù)。

5. UNION ALL

UNION ALL 將兩個表合并起來,要求兩個表的字段完全一致,包括字段類型、字段順序,語義對應關系代數(shù)的Union,只是關系代數(shù)是Set集合操作,會有去重復操作,UNION ALL 不進行去重,如下所示:

(1) Table API 示例

我們簡單的將customer_tab查詢2次,將查詢結果合并起來,如下:

 
 
 
 
  1. val result = customer.unionAll(customer)

(2) Result

(3) 特別說明

UNION ALL 對結果數(shù)據(jù)不進行去重,如果想對結果數(shù)據(jù)進行去重,傳統(tǒng)數(shù)據(jù)庫需要進行UNION操作。

6. UNION

UNION 將兩個流給合并起來,要求兩個流的字段完全一致,包括字段類型、字段順序,并其UNION 不同于UNION ALL,UNION會對結果數(shù)據(jù)去重,與關系代數(shù)的Union語義一致,如下:

(1) Table API 示例

我們簡單的將customer_tab查詢2次,將查詢結果合并起來,如下:

 
 
 
 
  1. val result = customer.union(customer)

我們發(fā)現(xiàn)完全一樣的表數(shù)據(jù)進行 UNION之后,數(shù)據(jù)是被去重的,UNION之后的數(shù)據(jù)并沒有增加。

(2) Result

(3) 特別說明

UNION 對結果數(shù)據(jù)進行去重,在實際的實現(xiàn)過程需要對數(shù)據(jù)進行排序操作,所以非必要去重情況請使用UNION ALL操作。

7. JOIN

JOIN 用于把來自兩個表的行聯(lián)合起來形成一個寬表,Apache Flink支持的JOIN類型:

  • JOIN - INNER JOIN
  • LEFT JOIN - LEFT OUTER JOIN
  • RIGHT JOIN - RIGHT OUTER JOIN
  • FULL JOIN - FULL OUTER JOIN

JOIN與關系代數(shù)的Join語義相同,具體如下:

(1) Table API 示例 (JOIN)

INNER JOIN只選擇滿足ON條件的記錄,我們查詢customer_tab 和 order_tab表,將有訂單的客戶和訂單信息選擇出來,如下:

 
 
 
 
  1. val result = customer
  2. .join(order.select('o_id, 'c_id as 'o_c_id, 'o_time, 'o_desc), 'c_id === 'o_c_id)

(2)Result:

(3) Table API 示例 (LEFT JOIN)

LEFT JOIN與INNER JOIN的區(qū)別是當右表沒有與左邊相JOIN的數(shù)據(jù)時候,右邊對應的字段補NULL輸出,語義如下:

對應的SQL語句如下(LEFT JOIN):

 
 
 
 
  1. SELECT ColA, ColB, T2.ColC, ColE FROM TI LEFT JOIN T2 ON T1.ColC = T2.ColC ;

細心的讀者可能發(fā)現(xiàn)上面T2.ColC是添加了前綴T2了,這里需要說明一下,當兩張表有字段名字一樣的時候,我需要指定是從那個表里面投影的。

我們查詢customer_tab 和 order_tab表,將客戶和訂單信息選擇出來如下:

 
 
 
 
  1. val result = customer
  2. .leftOuterJoin(order.select('o_id, 'c_id as 'o_c_id, 'o_time, 'o_desc), 'c_id === 'o_c_id)

(4) Result

(5) 特別說明

RIGHT JOIN 相當于 LEFT JOIN 左右兩個表交互一下位置。FULL JOIN相當于 RIGHT JOIN 和 LEFT JOIN 之后進行UNION ALL操作。

8. Time-Interval JOIN

Time-Interval JOIN 相對于UnBounded的雙流JOIN來說是Bounded JOIN。就是每條流的每一條數(shù)據(jù)會與另一條流上的不同時間區(qū)域的數(shù)據(jù)進行JOIN。對應Apache Flink官方文檔的 Time-windowed JOIN(release-1.7之前都叫Time-Windowed JOIN)。 Time-Interval JOIN的語義和實現(xiàn)原理詳見《Apache Flink 漫談系列(12) - Time Interval(Time-windowed) JOIN》。其Table API核心的語法示例,如下:

 
 
 
 
  1. ...
  2. val result = left
  3. .join(right)
  4. // 定義Time Interval
  5. .where('a === 'd && 'c >= 'f - 5.seconds && 'c < 'f + 6.seconds)
  6. ...

9. Lateral JOIN

Apache Flink Lateral JOIN 是左邊Table與一個UDTF進行JOIN,詳細的語義和實現(xiàn)原理請參考《Apache Flink 漫談系列(10) - JOIN LATERAL》。其Table API核心的語法示例,如下:

 
 
 
 
  1. ...
  2. val udtf = new UDTF
  3. val result = source.join(udtf('c) as ('d, 'e))
  4. ...

10. Temporal Table JOIN

Temporal Table JOIN 是左邊表與右邊一個攜帶版本信息的表進行JOIN,詳細的語法,語義和實現(xiàn)原理詳見《Apache Flink 漫談系列(11) - Temporal Table JOIN》,其Table API核心的語法示例,如下:

 
 
 
 
  1. ...
  2. val rates = tEnv.scan("versonedTable").createTemporalTableFunction('rowtime, 'r_currency)
  3. val result = left.join(rates('o_rowtime), 'r_currency === 'o_currency)...

11. Window

在Apache Flink中有2種類型的Window,一種是OverWindow,即傳統(tǒng)數(shù)據(jù)庫的標準開窗,每一個元素都對應一個窗口。一種是GroupWindow,目前在SQL中GroupWindow都是基于時間進行窗口劃分的。

(1) Over Window

Apache Flink中對OVER Window的定義遵循標準SQL的定義語法。

按ROWS和RANGE分類是傳統(tǒng)數(shù)據(jù)庫的標準分類方法,在Apache Flink中還可以根據(jù)時間類型(ProcTime/EventTime)和窗口的有限和無限(Bounded/UnBounded)進行分類,共計8種類型。為了避免大家對過細分類造成困擾,我們按照確定當前行的不同方式將OVER Window分成兩大類進行介紹,如下:

  • ROWS OVER Window - 每一行元素都視為新的計算行,即,每一行都是一個新的窗口。
  • RANGE OVER Window - 具有相同時間值的所有元素行視為同一計算行,即,具有相同時間值的所有行都是同一個窗口。

(a) Bounded ROWS OVER Window

Bounded ROWS OVER Window 每一行元素都視為新的計算行,即,每一行都是一個新的窗口。

  • 語義

我們以3個元素(2 PRECEDING)的窗口為例,如下圖:

上圖所示窗口 user 1 的 w5和w6, user 2的 窗口 w2 和 w3,雖然有元素都是同一時刻到達,但是他們?nèi)匀皇窃诓煌拇翱?,這一點有別于RANGE OVER Window。

  • Table API 示例

利用item_tab測試數(shù)據(jù),我們統(tǒng)計同類商品中當前和當前商品之前2個商品中的最高價格。

 
 
 
 
  1. val result = item
  2. .window(Over partitionBy 'itemType orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
  3. .select('itemID, 'itemType, 'onSellTime, 'price, 'price.max over 'w as 'maxPrice)
  • Result

(b) Bounded RANGE OVER Window

Bounded RANGE OVER Window 具有相同時間值的所有元素行視為同一計算行,即,具有相同時間值的所有行都是同一個窗口。

  • 語義

我們以3秒中數(shù)據(jù)(INTERVAL '2' SECOND)的窗口為例,如下圖:

注意: 上圖所示窗口 user 1 的 w6, user 2的 窗口 w3,元素都是同一時刻到達,他們是在同一個窗口,這一點有別于ROWS OVER Window。

  • Tabel API 示例

我們統(tǒng)計同類商品中當前和當前商品之前2分鐘商品中的最高價格。

 
 
 
 
  1. val result = item
  2. .window(Over partitionBy 'itemType orderBy 'rowtime preceding 2.minute following CURRENT_RANGE as 'w)
  3. .select('itemID, 'itemType, 'onSellTime, 'price, 'price.max over 'w as 'maxPrice)

(c) Result(Bounded RANGE OVER Window)

  • 特別說明

OverWindow最重要是要理解每一行數(shù)據(jù)都確定一個窗口,同時目前在Apache Flink中只支持按時間字段排序。并且OverWindow開窗與GroupBy方式數(shù)據(jù)分組最大的不同在于,GroupBy數(shù)據(jù)分組統(tǒng)計時候,在SELECT中除了GROUP BY的key,不能直接選擇其他非key的字段,但是OverWindow沒有這個限制,SELECT可以選擇任何字段。比如一張表table(a,b,c,d)4個字段,如果按d分組求c的最大值,兩種寫完如下:

  • GROUP BY - tab.groupBy('d).select(d, MAX(c))
  • OVER Window = tab.window(Over.. as 'w).select('a, 'b, 'c, 'd, c.max over 'w)

如上 OVER Window 雖然PARTITION BY d,但SELECT 中仍然可以選擇 a,b,c字段。但在GROUPBY中,SELECT 只能選擇 d 字段。

(2) Group Window

根據(jù)窗口數(shù)據(jù)劃分的不同,目前Apache Flink有如下3種Bounded Winodw:

  • Tumble - 滾動窗口,窗口數(shù)據(jù)有固定的大小,窗口數(shù)據(jù)無疊加;
  • Hop - 滑動窗口,窗口數(shù)據(jù)有固定大小,并且有固定的窗口重建頻率,窗口數(shù)據(jù)有疊加;
  • Session - 會話窗口,窗口數(shù)據(jù)沒有固定的大小,根據(jù)窗口數(shù)據(jù)活躍程度劃分窗口,窗口數(shù)據(jù)無疊加。

說明: Aapche Flink 還支持UnBounded的 Group Window,也就是全局Window,流上所有數(shù)據(jù)都在一個窗口里面,語義非常簡單,這里不做詳細介紹了。

(a) Tumble

  • 語義

Tumble 滾動窗口有固定size,窗口數(shù)據(jù)不重疊,具體語義如下:

  • Table API 示例

利用pageAccess_tab測試數(shù)據(jù),我們需要按不同地域統(tǒng)計每2分鐘的淘寶首頁的訪問量(PV)。

 
 
 
 
  1. val result = pageAccess
  2. .window(Tumble over 2.minute on 'rowtime as 'w)
  3. .groupBy('w, 'region)
  4. .select('region, 'w.start, 'w.end, 'region.count as 'pv)
  • Result

(b) Hop

Hop 滑動窗口和滾動窗口類似,窗口有固定的size,與滾動窗口不同的是滑動窗口可以通過slide參數(shù)控制滑動窗口的新建頻率。因此當slide值小于窗口size的值的時候多個滑動窗口會重疊。

  • 語義

Hop 滑動窗口語義如下所示:

  • Table API 示例

利用pageAccessCount_tab測試數(shù)據(jù),我們需要每5分鐘統(tǒng)計近10分鐘的頁面訪問量(PV).

 
 
 
 
  1. val result = pageAccessCount
  2. .window(Slide over 10.minute every 5.minute on 'rowtime as 'w)
  3. .groupBy('w)
  4. .select('w.start, 'w.end, 'accessCount.sum as 'accessCount)
  • Result

(c) Session

Seeeion 會話窗口 是沒有固定大小的窗口,通過session的活躍度分組元素。不同于滾動窗口和滑動窗口,會話窗口不重疊,也沒有固定的起止時間。一個會話窗口在一段時間內(nèi)沒有接收到元素時,即當出現(xiàn)非活躍間隙時關閉。一個會話窗口 分配器通過配置session gap來指定非活躍周期的時長.

  • 語義

Session 會話窗口語義如下所示:

 
 
 
 
  1. val result = pageAccessSession
  2. .window(Session withGap 3.minute on 'rowtime as 'w)
  3. .groupBy('w, 'region)
  4. .select('region, 'w.start, 'w.end, 'region.count as 'pv)
  • Result

(d) 嵌套Window

在Window之后再進行Window劃分也是比較常見的統(tǒng)計需求,那么在一個Event-Time的Window之后,如何再寫一個Event-Time的Window呢?一個Window之后再描述一個Event-Time的Window最重要的是Event-time屬性的傳遞,在Table API中我們可以利用'w.rowtime來傳遞時間屬性,比如:Tumble Window之后再接一個Session Window 示例如下:

 
 
 
 
  1. ...
  2. val result = pageAccess
  3. .window(Tumble over 2.minute on 'rowtime as 'w1)
  4. .groupBy('w1)
  5. .select('w1.rowtime as 'rowtime, 'col1.count as 'cnt)
  6. .window(Session withGap 3.minute on 'rowtime as 'w2)
  7. .groupBy('w2)
  8. .select('cnt.sum)
  9. ...

五、Source&Sink

上面我們介紹了Apache Flink Table API核心算子的語義和具體示例,這部分將選取Bounded EventTime Tumble Window為例為大家編寫一個完整的包括Source和Sink定義的Apache Flink Table API Job。假設有一張?zhí)詫氻撁嬖L問表(PageAccess_tab),有地域,用戶ID和訪問時間。我們需要按不同地域統(tǒng)計每2分鐘的淘寶首頁的訪問量(PV)。具體數(shù)據(jù)如下:

1. Source 定義

自定義Apache Flink Stream Source需要實現(xiàn)StreamTableSource, StreamTableSource中通過StreamExecutionEnvironment 的addSource方法獲取DataStream, 所以我們需要自定義一個 SourceFunction, 并且要支持產(chǎn)生WaterMark,也就是要實現(xiàn)DefinedRowtimeAttributes接口。

(1) Source Function定義

支持接收攜帶EventTime的數(shù)據(jù)集合,Either的數(shù)據(jù)結構,Right表示W(wǎng)aterMark和Left表示數(shù)據(jù):

 
 
 
 
  1. class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]])
  2. extends SourceFunction[T] {
  3. override def run(ctx: SourceContext[T]): Unit = {
  4. dataWithTimestampList.foreach {
  5. case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
  6. case Right(w) => ctx.emitWatermark(new Watermark(w))
  7. }
  8. }
  9. override def cancel(): Unit = ???}

(2) 定義 StreamTableSource

我們自定義的Source要攜帶我們測試的數(shù)據(jù),以及對應的WaterMark數(shù)據(jù),具體如下:

 
 
 
 
  1. class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {
  2. val fieldNames = Array("accessTime", "region", "userId")
  3. val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))
  4. val rowType = new RowTypeInfo(
  5. Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
  6. fieldNames)
  7. // 頁面訪問表數(shù)據(jù) rows with timestamps and watermarks
  8. val data = Seq(
  9. Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),
  10. Right(1510365660000L),
  11. Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")),
  12. Right(1510365660000L),
  13. Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")),
  14. Right(1510366200000L),
  15. Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")),
  16. Right(1510366260000L),
  17. Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")),
  18. Right(1510373400000L)
  19. )
  20. override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
  21. Collections.singletonList(new RowtimeAttributeDescriptor(
  22. "accessTime",
  23. new ExistingField("accessTime"),
  24. PreserveWatermarks.INSTANCE))
  25. }
  26. override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
  27. execEnv.addSource(new MySourceFunction[Row](data)).returns(rowType).setParallelism(1)
  28. }
  29. override def getReturnType: TypeInformation[Row] = rowType
  30. override def getTableSchema: TableSchema = schema
  31. }

(3) Sink 定義

我們簡單的將計算結果寫入到Apache Flink內(nèi)置支持的CSVSink中,定義Sink如下:

 
 
 
 
  1. def getCsvTableSink: TableSink[Row] = {
  2. val tempFile = File.createTempFile("csv_sink_", "tem")
  3. // 打印sink的文件路徑,方便我們查看運行結果
  4. println("Sink path : " + tempFile)
  5. if (tempFile.exists()) {
  6. tempFile.delete
    分享題目:ApacheFlink漫談系列(13)-TableAPI概述
    瀏覽路徑:http://m.5511xx.com/article/ccoscog.html