新聞中心
一、什么是Table API

創(chuàng)新互聯(lián)專注于如皋企業(yè)網(wǎng)站建設,響應式網(wǎng)站開發(fā),商城網(wǎng)站定制開發(fā)。如皋網(wǎng)站建設公司,為如皋等地區(qū)提供建站服務。全流程按需網(wǎng)站制作,專業(yè)設計,全程項目跟蹤,創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務
在《Apache Flink 漫談系列(08) - SQL概覽》中我們概要的向大家介紹了什么是好SQL,SQL和Table API是Apache Flink中的同一層次的API抽象,如下圖所示:
Apache Flink 針對不同的用戶場景提供了三層用戶API,最下層ProcessFunction API可以對State,Timer等復雜機制進行有效的控制,但用戶使用的便捷性很弱,也就是說即使很簡單統(tǒng)計邏輯,也要較多的代碼開發(fā)。第二層DataStream API對窗口,聚合等算子進行了封裝,用戶的便捷性有所增強。最上層是SQL/Table API,Table API是Apache Flink中的聲明式,可被查詢優(yōu)化器優(yōu)化的高級分析API。
二、Table API的特點
Table API和SQL都是Apache Flink中最高層的分析API,SQL所具備的特點Table API也都具有,如下:
- 聲明式 - 用戶只關心做什么,不用關心怎么做;
- 高性能 - 支持查詢優(yōu)化,可以獲取最好的執(zhí)行性能;
- 流批統(tǒng)一 - 相同的統(tǒng)計邏輯,既可以流模式運行,也可以批模式運行;
- 標準穩(wěn)定 - 語義遵循SQL標準,語法語義明確,不易變動。
當然除了SQL的特性,因為Table API是在Flink中專門設計的,所以Table API還具有自身的特點:
- 表達方式的擴展性 - 在Flink中可以為Table API開發(fā)很多便捷性功能,如:Row.flatten(), map/flatMap 等
- 功能的擴展性 - 在Flink中可以為Table API擴展更多的功能,如:Iteration,flatAggregate 等新功能
- 編譯檢查 - Table API支持java和scala語言開發(fā),支持IDE中進行編譯檢查。
說明:上面說的map/flatMap/flatAggregate都是Apache Flink 社區(qū) FLIP-29 中規(guī)劃的新功能。
三、HelloWorld
在介紹Table API所有算子之前我們先編寫一個簡單的HelloWorld來直觀了解如何進行Table API的開發(fā)。
1. Maven 依賴
在pom文件中增加如下配置,本篇以flink-1.7.0功能為準進行后續(xù)介紹。
1.7.0 org.apache.flink flink-table_2.11 ${table.version} org.apache.flink flink-scala_2.11 ${table.version} org.apache.flink flink-streaming-scala_2.11 ${table.version} org.apache.flink flink-streaming-java_2.11 ${table.version}
2. 程序結構
在編寫第一Flink Table API job之前我們先簡單了解一下Flink Table API job的結構,如下圖所示:
- 外部數(shù)據(jù)源,比如Kafka, Rabbitmq, CSV 等等;
- 查詢計算邏輯,比如最簡單的數(shù)據(jù)導入select,雙流Join,Window Aggregate 等;
- 外部結果存儲,比如Kafka,Cassandra,CSV等。
說明:1和3 在Apache Flink中統(tǒng)稱為Connector。
3. 主程序
我們以一個統(tǒng)計單詞數(shù)量的業(yè)務場景,編寫第一個HelloWorld程序。
根據(jù)上面Flink job基本結構介紹,要Table API完成WordCount的計算需求,我們需要完成三部分代碼:
- TableSoruce Code - 用于創(chuàng)建數(shù)據(jù)源的代碼
- Table API Query - 用于進行word count統(tǒng)計的Table API 查詢邏輯
- TableSink Code - 用于保存word count計算結果的結果表代碼
(1) 運行模式選擇
一個job我們要選擇是Stream方式運行還是Batch模式運行,所以任何統(tǒng)計job的第一步是進行運行模式選擇,如下我們選擇Stream方式運行。
- // Stream運行環(huán)境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
(2) 構建測試Source
我們用最簡單的構建Source方式進行本次測試,代碼如下:
- // 測試數(shù)據(jù)
- val data = Seq("Flink", "Bob", "Bob", "something", "Hello", "Flink", "Bob")
- // 最簡單的獲取Source方式
- val source = env.fromCollection(data).toTable(tEnv, 'word)
(3) WordCount 統(tǒng)計邏輯
WordCount核心統(tǒng)計邏輯就是按照單詞分組,然后計算每個單詞的數(shù)量,統(tǒng)計邏輯如下:
- // 單詞統(tǒng)計核心邏輯
- val result = source
- .groupBy('word) // 單詞分組
- .select('word, 'word.count) // 單詞統(tǒng)計
(4) 定義Sink
將WordCount的統(tǒng)計結果寫入Sink中,代碼如下:
- // 自定義Sink
- val sink = new RetractSink // 自定義Sink(下面有完整代碼)
- // 計算結果寫入sink
- result.toRetractStream[(String, Long)].addSink(sink)
(5) 完整的HelloWord代碼
為了方便大家運行WordCount查詢統(tǒng)計,將完整的代碼分享大家(基于flink-1.7.0),如下:
- import org.apache.flink.api.scala._
- import org.apache.flink.configuration.Configuration
- import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
- import org.apache.flink.table.api.TableEnvironment
- import org.apache.flink.table.api.scala._
- import scala.collection.mutable
- object HelloWord {
- def main(args: Array[String]): Unit = {
- // 測試數(shù)據(jù)
- val data = Seq("Flink", "Bob", "Bob", "something", "Hello", "Flink", "Bob")
- // Stream運行環(huán)境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- // 最簡單的獲取Source方式
- val source = env.fromCollection(data).toTable(tEnv, 'word)
- // 單詞統(tǒng)計核心邏輯
- val result = source
- .groupBy('word) // 單詞分組
- .select('word, 'word.count) // 單詞統(tǒng)計
- // 自定義Sink
- val sink = new RetractSink
- // 計算結果寫入sink
- result.toRetractStream[(String, Long)].addSink(sink)
- env.execute
- }
- }
- class RetractSink extends RichSinkFunction[(Boolean, (String, Long))] {
- private var resultSet: mutable.Set[(String, Long)] = _
- override def open(parameters: Configuration): Unit = {
- // 初始化內(nèi)存存儲結構
- resultSet = new mutable.HashSet[(String, Long)]
- }
- override def invoke(v: (Boolean, (String, Long)), context: SinkFunction.Context[_]): Unit = {
- if (v._1) {
- // 計算數(shù)據(jù)
- resultSet.add(v._2)
- }
- else {
- // 撤回數(shù)據(jù)
- resultSet.remove(v._2)
- }
- }
- override def close(): Unit = {
- // 打印寫入sink的結果數(shù)據(jù)
- resultSet.foreach(println)
- }
- }
運行結果如下:
雖然上面用了較長的紙墨介紹簡單的WordCount統(tǒng)計邏輯,但source和sink部分都是可以在學習后面算子中被復用的。本例核心的統(tǒng)計邏輯只有一行代碼:
- source.groupBy('word).select('word, 'word.count)
所以Table API開發(fā)技術任務非常的簡潔高效。
四、Table API 算子
雖然Table API與SQL的算子語義一致,但在表達方式上面SQL以文本的方式展現(xiàn),Table API是以java或者scala語言的方式進行開發(fā)。為了大家方便閱讀,即便是在《Apache Flink 漫談系列(08) - SQL概覽》中介紹過的算子,在這里也會再次進行介紹,當然對于Table API和SQL不同的地方會進行詳盡介紹。
1. 示例數(shù)據(jù)及測試類
(1) 測試數(shù)據(jù)
- customer_tab 表 - 客戶表保存客戶id,客戶姓名和客戶描述信息。字段及測試數(shù)據(jù)如下:
- order_tab 表 - 訂單表保存客戶購買的訂單信息,包括訂單id,訂單時間和訂單描述信息。 字段節(jié)測試數(shù)據(jù)如下:
- Item_tab商品表, 攜帶商品id,商品類型,出售時間,價格等信息,具體如下:
- PageAccess_tab頁面訪問表,包含用戶ID,訪問時間,用戶所在地域信息,具體數(shù)據(jù)如下:
- PageAccessCount_tab頁面訪問表,訪問量,訪問時間,用戶所在地域信息,具體數(shù)據(jù)如下:
- PageAccessSession_tab頁面訪問表,訪問量,訪問時間,用戶所在地域信息,具體數(shù)據(jù)如下:
(2) 測試類
我們創(chuàng)建一個TableAPIOverviewITCase.scala 用于接下來介紹Flink Table API算子的功能體驗。代碼如下:
- import org.apache.flink.api.scala._
- import org.apache.flink.runtime.state.StateBackend
- import org.apache.flink.runtime.state.memory.MemoryStateBackend
- import org.apache.flink.streaming.api.TimeCharacteristic
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
- import org.apache.flink.streaming.api.functions.source.SourceFunction
- import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
- import org.apache.flink.streaming.api.watermark.Watermark
- import org.apache.flink.table.api.{Table, TableEnvironment}
- import org.apache.flink.table.api.scala._
- import org.apache.flink.types.Row
- import org.junit.rules.TemporaryFolder
- import org.junit.{Rule, Test}
- import scala.collection.mutable
- import scala.collection.mutable.ArrayBuffer
- class Table APIOverviewITCase {
- // 客戶表數(shù)據(jù)
- val customer_data = new mutable.MutableList[(String, String, String)]
- customer_data.+=(("c_001", "Kevin", "from JinLin"))
- customer_data.+=(("c_002", "Sunny", "from JinLin"))
- customer_data.+=(("c_003", "JinCheng", "from HeBei"))
- // 訂單表數(shù)據(jù)
- val order_data = new mutable.MutableList[(String, String, String, String)]
- order_data.+=(("o_001", "c_002", "2018-11-05 10:01:01", "iphone"))
- order_data.+=(("o_002", "c_001", "2018-11-05 10:01:55", "ipad"))
- order_data.+=(("o_003", "c_001", "2018-11-05 10:03:44", "flink book"))
- // 商品銷售表數(shù)據(jù)
- val item_data = Seq(
- Left((1510365660000L, (1510365660000L, 20, "ITEM001", "Electronic"))),
- Right((1510365660000L)),
- Left((1510365720000L, (1510365720000L, 50, "ITEM002", "Electronic"))),
- Right((1510365720000L)),
- Left((1510365780000L, (1510365780000L, 30, "ITEM003", "Electronic"))),
- Left((1510365780000L, (1510365780000L, 60, "ITEM004", "Electronic"))),
- Right((1510365780000L)),
- Left((1510365900000L, (1510365900000L, 40, "ITEM005", "Electronic"))),
- Right((1510365900000L)),
- Left((1510365960000L, (1510365960000L, 20, "ITEM006", "Electronic"))),
- Right((1510365960000L)),
- Left((1510366020000L, (1510366020000L, 70, "ITEM007", "Electronic"))),
- Right((1510366020000L)),
- Left((1510366080000L, (1510366080000L, 20, "ITEM008", "Clothes"))),
- Right((151036608000L)))
- // 頁面訪問表數(shù)據(jù)
- val pageAccess_data = Seq(
- Left((1510365660000L, (1510365660000L, "ShangHai", "U0010"))),
- Right((1510365660000L)),
- Left((1510365660000L, (1510365660000L, "BeiJing", "U1001"))),
- Right((1510365660000L)),
- Left((1510366200000L, (1510366200000L, "BeiJing", "U2032"))),
- Right((1510366200000L)),
- Left((1510366260000L, (1510366260000L, "BeiJing", "U1100"))),
- Right((1510366260000L)),
- Left((1510373400000L, (1510373400000L, "ShangHai", "U0011"))),
- Right((1510373400000L)))
- // 頁面訪問量表數(shù)據(jù)2
- val pageAccessCount_data = Seq(
- Left((1510365660000L, (1510365660000L, "ShangHai", 100))),
- Right((1510365660000L)),
- Left((1510365660000L, (1510365660000L, "BeiJing", 86))),
- Right((1510365660000L)),
- Left((1510365960000L, (1510365960000L, "BeiJing", 210))),
- Right((1510366200000L)),
- Left((1510366200000L, (1510366200000L, "BeiJing", 33))),
- Right((1510366200000L)),
- Left((1510373400000L, (1510373400000L, "ShangHai", 129))),
- Right((1510373400000L)))
- // 頁面訪問表數(shù)據(jù)3
- val pageAccessSession_data = Seq(
- Left((1510365660000L, (1510365660000L, "ShangHai", "U0011"))),
- Right((1510365660000L)),
- Left((1510365720000L, (1510365720000L, "ShangHai", "U0012"))),
- Right((1510365720000L)),
- Left((1510365720000L, (1510365720000L, "ShangHai", "U0013"))),
- Right((1510365720000L)),
- Left((1510365900000L, (1510365900000L, "ShangHai", "U0015"))),
- Right((1510365900000L)),
- Left((1510366200000L, (1510366200000L, "ShangHai", "U0011"))),
- Right((1510366200000L)),
- Left((1510366200000L, (1510366200000L, "BeiJing", "U2010"))),
- Right((1510366200000L)),
- Left((1510366260000L, (1510366260000L, "ShangHai", "U0011"))),
- Right((1510366260000L)),
- Left((1510373760000L, (1510373760000L, "ShangHai", "U0410"))),
- Right((1510373760000L)))
- val _tempFolder = new TemporaryFolder
- // Streaming 環(huán)境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setParallelism(1)
- env.setStateBackend(getStateBackend)
- def getProcTimeTables(): (Table, Table) = {
- // 將order_tab, customer_tab 注冊到catalog
- val customer = env.fromCollection(customer_data).toTable(tEnv).as('c_id, 'c_name, 'c_desc)
- val order = env.fromCollection(order_data).toTable(tEnv).as('o_id, 'c_id, 'o_time, 'o_desc)
- (customer, order)
- }
- def getEventTimeTables(): (Table, Table, Table, Table) = {
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- // 將item_tab, pageAccess_tab 注冊到catalog
- val item =
- env.addSource(new EventTimeSourceFunction[(Long, Int, String, String)](item_data))
- .toTable(tEnv, 'onSellTime, 'price, 'itemID, 'itemType, 'rowtime.rowtime)
- val pageAccess =
- env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccess_data))
- .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)
- val pageAccessCount =
- env.addSource(new EventTimeSourceFunction[(Long, String, Int)](pageAccessCount_data))
- .toTable(tEnv, 'accessTime, 'region, 'accessCount, 'rowtime.rowtime)
- val pageAccessSession =
- env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccessSession_data))
- .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)
- (item, pageAccess, pageAccessCount, pageAccessSession)
- }
- @Rule
- def tempFolder: TemporaryFolder = _tempFolder
- def getStateBackend: StateBackend = {
- new MemoryStateBackend()
- }
- def procTimePrint(result: Table): Unit = {
- val sink = new RetractingSink
- result.toRetractStream[Row].addSink(sink)
- env.execute()
- }
- def rowTimePrint(result: Table): Unit = {
- val sink = new RetractingSink
- result.toRetractStream[Row].addSink(sink)
- env.execute()
- }
- @Test
- def testProc(): Unit = {
- val (customer, order) = getProcTimeTables()
- val result = ...// 測試的查詢邏輯
- procTimePrint(result)
- }
- @Test
- def testEvent(): Unit = {
- val (item, pageAccess, pageAccessCount, pageAccessSession) = getEventTimeTables()
- val result = ...// 測試的查詢邏輯
- procTimePrint(result)
- }
- }
- // 自定義Sink
- final class RetractingSink extends RichSinkFunction[(Boolean, Row)] {
- var retractedResults: ArrayBuffer[String] = null
- override def open(parameters: Configuration): Unit = {
- super.open(parameters)
- retractedResults = mutable.ArrayBuffer.empty[String]
- }
- def invoke(v: (Boolean, Row)) {
- retractedResults.synchronized {
- val vvalue = v._2.toString
- if (v._1) {
- retractedResults += value
- } else {
- val idx = retractedResults.indexOf(value)
- if (idx >= 0) {
- retractedResults.remove(idx)
- } else {
- throw new RuntimeException("Tried to retract a value that wasn't added first. " +
- "This is probably an incorrectly implemented test. " +
- "Try to set the parallelism of the sink to 1.")
- }
- }
- }
- }
- override def close(): Unit = {
- super.close()
- retractedResults.sorted.foreach(println(_))
- }
- }
- // Water mark 生成器
- class EventTimeSourceFunction[T](
- dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
- override def run(ctx: SourceContext[T]): Unit = {
- dataWithTimestampList.foreach {
- case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
- case Right(w) => ctx.emitWatermark(new Watermark(w))
- }
- }
- override def cancel(): Unit = ???
- }
2. SELECT
SELECT 用于從數(shù)據(jù)集/流中選擇數(shù)據(jù),語義是關系代數(shù)中的投影(Projection),對關系進行垂直分割,消去或增加某些列, 如下圖所示:
(1) Table API 示例
從customer_tab選擇用戶姓名,并用內(nèi)置的CONCAT函數(shù)拼接客戶信息,如下:
- val result = customer
- .select('c_name, concat_ws('c_name, " come ", 'c_desc))
(2) Result
(3) 特別說明
大家看到在 SELECT 不僅可以使用普通的字段選擇,還可以使用ScalarFunction,當然也包括User-Defined Function,同時還可以進行字段的alias設置。其實SELECT可以結合聚合,在GROUPBY部分會進行介紹,一個比較特殊的使用場景是去重的場景,示例如下:
- Table API示例
在訂單表查詢所有的客戶id,消除重復客戶id, 如下:
- val result = order
- .groupBy('c_id)
- .select('c_id)
- Result
3. WHERE
WHERE 用于從數(shù)據(jù)集/流中過濾數(shù)據(jù),與SELECT一起使用,語義是關系代數(shù)的Selection,根據(jù)某些條件對關系做水平分割,即選擇符合條件的記錄,如下所示:
(1) Table API 示例
在customer_tab查詢客戶id為c_001和c_003的客戶信息,如下:
- val result = customer
- .where("c_|| c_")
- .select( 'c_id, 'c_name, 'c_desc)
(2) Result
(3) 特別說明
我們發(fā)現(xiàn)WHERE是對滿足一定條件的數(shù)據(jù)進行過濾,WHERE支持=, <, >, <>, >=, <=以及&&, ||等表達式的組合,最終滿足過濾條件的數(shù)據(jù)會被選擇出來。 SQL中的IN和NOT IN在Table API里面用intersect 和 minus描述(flink-1.7.0版本)。
- Intersect 示例
Intersect只在Batch模式下進行支持,Stream模式下我們可以利用雙流JOIN來實現(xiàn),如:在customer_tab查詢已經(jīng)下過訂單的客戶信息,如下:
- // 計算客戶id,并去重
- val distinct_cids = order
- .groupBy('c_id) // 去重
- .select('c_id as 'o_c_id)
- val result = customer
- .join(distinct_cids, 'c_id === 'o_c_id)
- .select('c_id, 'c_name, 'c_desc)
- Result
- Minus 示例
Minus只在Batch模式下進行支持,Stream模式下我們可以利用雙流JOIN來實現(xiàn),如:在customer_tab查詢沒有下過訂單的客戶信息,如下:
- // 查詢下過訂單的客戶id,并去重
- val distinct_cids = order
- .groupBy('c_id)
- .select('c_id as 'o_c_id)
- // 查詢沒有下過訂單的客戶信息
- val result = customer
- .leftOuterJoin(distinct_cids, 'c_id === 'o_c_id)
- .where('o_c_id isNull)
- .select('c_id, 'c_name, 'c_desc)
說明上面實現(xiàn)邏輯比較復雜,我們后續(xù)考慮如何在流上支持更簡潔的方式。
- Result
- Intersect/Minus與關系代數(shù)
如上介紹Intersect是關系代數(shù)中的Intersection, Minus是關系代數(shù)的Difference, 如下圖示意:
a. Intersect(Intersection):
b. Minus(Difference):
4. GROUP BY
GROUP BY 是對數(shù)據(jù)進行分組的操作,比如我需要分別計算一下一個學生表里面女生和男生的人數(shù)分別是多少,如下:
(1) Table API 示例
將order_tab信息按c_id分組統(tǒng)計訂單數(shù)量,簡單示例如下:
- val result = order
- .groupBy('c_id)
- .select('c_id, 'o_id.count)
(2) Result
(3) 特別說明
在實際的業(yè)務場景中,GROUP BY除了按業(yè)務字段進行分組外,很多時候用戶也可以用時間來進行分組(相當于劃分窗口),比如統(tǒng)計每分鐘的訂單數(shù)量:
- Table API 示例
按時間進行分組,查詢每分鐘的訂單數(shù)量,如下:
- ```
- val result = order
- .select('o_id, 'c_id, 'o_time.substring(1, 16) as 'o_time_min)
- .groupBy('o_time_min)
- .select('o_time_min, 'o_id.count)
- ```
- Result
說明:如果我們時間字段是timestamp類型,建議使用內(nèi)置的 DATE_FORMAT 函數(shù)。
5. UNION ALL
UNION ALL 將兩個表合并起來,要求兩個表的字段完全一致,包括字段類型、字段順序,語義對應關系代數(shù)的Union,只是關系代數(shù)是Set集合操作,會有去重復操作,UNION ALL 不進行去重,如下所示:
(1) Table API 示例
我們簡單的將customer_tab查詢2次,將查詢結果合并起來,如下:
- val result = customer.unionAll(customer)
(2) Result
(3) 特別說明
UNION ALL 對結果數(shù)據(jù)不進行去重,如果想對結果數(shù)據(jù)進行去重,傳統(tǒng)數(shù)據(jù)庫需要進行UNION操作。
6. UNION
UNION 將兩個流給合并起來,要求兩個流的字段完全一致,包括字段類型、字段順序,并其UNION 不同于UNION ALL,UNION會對結果數(shù)據(jù)去重,與關系代數(shù)的Union語義一致,如下:
(1) Table API 示例
我們簡單的將customer_tab查詢2次,將查詢結果合并起來,如下:
- val result = customer.union(customer)
我們發(fā)現(xiàn)完全一樣的表數(shù)據(jù)進行 UNION之后,數(shù)據(jù)是被去重的,UNION之后的數(shù)據(jù)并沒有增加。
(2) Result
(3) 特別說明
UNION 對結果數(shù)據(jù)進行去重,在實際的實現(xiàn)過程需要對數(shù)據(jù)進行排序操作,所以非必要去重情況請使用UNION ALL操作。
7. JOIN
JOIN 用于把來自兩個表的行聯(lián)合起來形成一個寬表,Apache Flink支持的JOIN類型:
- JOIN - INNER JOIN
- LEFT JOIN - LEFT OUTER JOIN
- RIGHT JOIN - RIGHT OUTER JOIN
- FULL JOIN - FULL OUTER JOIN
JOIN與關系代數(shù)的Join語義相同,具體如下:
(1) Table API 示例 (JOIN)
INNER JOIN只選擇滿足ON條件的記錄,我們查詢customer_tab 和 order_tab表,將有訂單的客戶和訂單信息選擇出來,如下:
- val result = customer
- .join(order.select('o_id, 'c_id as 'o_c_id, 'o_time, 'o_desc), 'c_id === 'o_c_id)
(2)Result:
(3) Table API 示例 (LEFT JOIN)
LEFT JOIN與INNER JOIN的區(qū)別是當右表沒有與左邊相JOIN的數(shù)據(jù)時候,右邊對應的字段補NULL輸出,語義如下:
對應的SQL語句如下(LEFT JOIN):
- SELECT ColA, ColB, T2.ColC, ColE FROM TI LEFT JOIN T2 ON T1.ColC = T2.ColC ;
細心的讀者可能發(fā)現(xiàn)上面T2.ColC是添加了前綴T2了,這里需要說明一下,當兩張表有字段名字一樣的時候,我需要指定是從那個表里面投影的。
我們查詢customer_tab 和 order_tab表,將客戶和訂單信息選擇出來如下:
- val result = customer
- .leftOuterJoin(order.select('o_id, 'c_id as 'o_c_id, 'o_time, 'o_desc), 'c_id === 'o_c_id)
(4) Result
(5) 特別說明
RIGHT JOIN 相當于 LEFT JOIN 左右兩個表交互一下位置。FULL JOIN相當于 RIGHT JOIN 和 LEFT JOIN 之后進行UNION ALL操作。
8. Time-Interval JOIN
Time-Interval JOIN 相對于UnBounded的雙流JOIN來說是Bounded JOIN。就是每條流的每一條數(shù)據(jù)會與另一條流上的不同時間區(qū)域的數(shù)據(jù)進行JOIN。對應Apache Flink官方文檔的 Time-windowed JOIN(release-1.7之前都叫Time-Windowed JOIN)。 Time-Interval JOIN的語義和實現(xiàn)原理詳見《Apache Flink 漫談系列(12) - Time Interval(Time-windowed) JOIN》。其Table API核心的語法示例,如下:
- ...
- val result = left
- .join(right)
- // 定義Time Interval
- .where('a === 'd && 'c >= 'f - 5.seconds && 'c < 'f + 6.seconds)
- ...
9. Lateral JOIN
Apache Flink Lateral JOIN 是左邊Table與一個UDTF進行JOIN,詳細的語義和實現(xiàn)原理請參考《Apache Flink 漫談系列(10) - JOIN LATERAL》。其Table API核心的語法示例,如下:
- ...
- val udtf = new UDTF
- val result = source.join(udtf('c) as ('d, 'e))
- ...
10. Temporal Table JOIN
Temporal Table JOIN 是左邊表與右邊一個攜帶版本信息的表進行JOIN,詳細的語法,語義和實現(xiàn)原理詳見《Apache Flink 漫談系列(11) - Temporal Table JOIN》,其Table API核心的語法示例,如下:
- ...
- val rates = tEnv.scan("versonedTable").createTemporalTableFunction('rowtime, 'r_currency)
- val result = left.join(rates('o_rowtime), 'r_currency === 'o_currency)...
11. Window
在Apache Flink中有2種類型的Window,一種是OverWindow,即傳統(tǒng)數(shù)據(jù)庫的標準開窗,每一個元素都對應一個窗口。一種是GroupWindow,目前在SQL中GroupWindow都是基于時間進行窗口劃分的。
(1) Over Window
Apache Flink中對OVER Window的定義遵循標準SQL的定義語法。
按ROWS和RANGE分類是傳統(tǒng)數(shù)據(jù)庫的標準分類方法,在Apache Flink中還可以根據(jù)時間類型(ProcTime/EventTime)和窗口的有限和無限(Bounded/UnBounded)進行分類,共計8種類型。為了避免大家對過細分類造成困擾,我們按照確定當前行的不同方式將OVER Window分成兩大類進行介紹,如下:
- ROWS OVER Window - 每一行元素都視為新的計算行,即,每一行都是一個新的窗口。
- RANGE OVER Window - 具有相同時間值的所有元素行視為同一計算行,即,具有相同時間值的所有行都是同一個窗口。
(a) Bounded ROWS OVER Window
Bounded ROWS OVER Window 每一行元素都視為新的計算行,即,每一行都是一個新的窗口。
- 語義
我們以3個元素(2 PRECEDING)的窗口為例,如下圖:
上圖所示窗口 user 1 的 w5和w6, user 2的 窗口 w2 和 w3,雖然有元素都是同一時刻到達,但是他們?nèi)匀皇窃诓煌拇翱?,這一點有別于RANGE OVER Window。
- Table API 示例
利用item_tab測試數(shù)據(jù),我們統(tǒng)計同類商品中當前和當前商品之前2個商品中的最高價格。
- val result = item
- .window(Over partitionBy 'itemType orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
- .select('itemID, 'itemType, 'onSellTime, 'price, 'price.max over 'w as 'maxPrice)
- Result
(b) Bounded RANGE OVER Window
Bounded RANGE OVER Window 具有相同時間值的所有元素行視為同一計算行,即,具有相同時間值的所有行都是同一個窗口。
- 語義
我們以3秒中數(shù)據(jù)(INTERVAL '2' SECOND)的窗口為例,如下圖:
注意: 上圖所示窗口 user 1 的 w6, user 2的 窗口 w3,元素都是同一時刻到達,他們是在同一個窗口,這一點有別于ROWS OVER Window。
- Tabel API 示例
我們統(tǒng)計同類商品中當前和當前商品之前2分鐘商品中的最高價格。
- val result = item
- .window(Over partitionBy 'itemType orderBy 'rowtime preceding 2.minute following CURRENT_RANGE as 'w)
- .select('itemID, 'itemType, 'onSellTime, 'price, 'price.max over 'w as 'maxPrice)
(c) Result(Bounded RANGE OVER Window)
- 特別說明
OverWindow最重要是要理解每一行數(shù)據(jù)都確定一個窗口,同時目前在Apache Flink中只支持按時間字段排序。并且OverWindow開窗與GroupBy方式數(shù)據(jù)分組最大的不同在于,GroupBy數(shù)據(jù)分組統(tǒng)計時候,在SELECT中除了GROUP BY的key,不能直接選擇其他非key的字段,但是OverWindow沒有這個限制,SELECT可以選擇任何字段。比如一張表table(a,b,c,d)4個字段,如果按d分組求c的最大值,兩種寫完如下:
- GROUP BY - tab.groupBy('d).select(d, MAX(c))
- OVER Window = tab.window(Over.. as 'w).select('a, 'b, 'c, 'd, c.max over 'w)
如上 OVER Window 雖然PARTITION BY d,但SELECT 中仍然可以選擇 a,b,c字段。但在GROUPBY中,SELECT 只能選擇 d 字段。
(2) Group Window
根據(jù)窗口數(shù)據(jù)劃分的不同,目前Apache Flink有如下3種Bounded Winodw:
- Tumble - 滾動窗口,窗口數(shù)據(jù)有固定的大小,窗口數(shù)據(jù)無疊加;
- Hop - 滑動窗口,窗口數(shù)據(jù)有固定大小,并且有固定的窗口重建頻率,窗口數(shù)據(jù)有疊加;
- Session - 會話窗口,窗口數(shù)據(jù)沒有固定的大小,根據(jù)窗口數(shù)據(jù)活躍程度劃分窗口,窗口數(shù)據(jù)無疊加。
說明: Aapche Flink 還支持UnBounded的 Group Window,也就是全局Window,流上所有數(shù)據(jù)都在一個窗口里面,語義非常簡單,這里不做詳細介紹了。
(a) Tumble
- 語義
Tumble 滾動窗口有固定size,窗口數(shù)據(jù)不重疊,具體語義如下:
- Table API 示例
利用pageAccess_tab測試數(shù)據(jù),我們需要按不同地域統(tǒng)計每2分鐘的淘寶首頁的訪問量(PV)。
- val result = pageAccess
- .window(Tumble over 2.minute on 'rowtime as 'w)
- .groupBy('w, 'region)
- .select('region, 'w.start, 'w.end, 'region.count as 'pv)
- Result
(b) Hop
Hop 滑動窗口和滾動窗口類似,窗口有固定的size,與滾動窗口不同的是滑動窗口可以通過slide參數(shù)控制滑動窗口的新建頻率。因此當slide值小于窗口size的值的時候多個滑動窗口會重疊。
- 語義
Hop 滑動窗口語義如下所示:
- Table API 示例
利用pageAccessCount_tab測試數(shù)據(jù),我們需要每5分鐘統(tǒng)計近10分鐘的頁面訪問量(PV).
- val result = pageAccessCount
- .window(Slide over 10.minute every 5.minute on 'rowtime as 'w)
- .groupBy('w)
- .select('w.start, 'w.end, 'accessCount.sum as 'accessCount)
- Result
(c) Session
Seeeion 會話窗口 是沒有固定大小的窗口,通過session的活躍度分組元素。不同于滾動窗口和滑動窗口,會話窗口不重疊,也沒有固定的起止時間。一個會話窗口在一段時間內(nèi)沒有接收到元素時,即當出現(xiàn)非活躍間隙時關閉。一個會話窗口 分配器通過配置session gap來指定非活躍周期的時長.
- 語義
Session 會話窗口語義如下所示:
- val result = pageAccessSession
- .window(Session withGap 3.minute on 'rowtime as 'w)
- .groupBy('w, 'region)
- .select('region, 'w.start, 'w.end, 'region.count as 'pv)
- Result
(d) 嵌套Window
在Window之后再進行Window劃分也是比較常見的統(tǒng)計需求,那么在一個Event-Time的Window之后,如何再寫一個Event-Time的Window呢?一個Window之后再描述一個Event-Time的Window最重要的是Event-time屬性的傳遞,在Table API中我們可以利用'w.rowtime來傳遞時間屬性,比如:Tumble Window之后再接一個Session Window 示例如下:
- ...
- val result = pageAccess
- .window(Tumble over 2.minute on 'rowtime as 'w1)
- .groupBy('w1)
- .select('w1.rowtime as 'rowtime, 'col1.count as 'cnt)
- .window(Session withGap 3.minute on 'rowtime as 'w2)
- .groupBy('w2)
- .select('cnt.sum)
- ...
五、Source&Sink
上面我們介紹了Apache Flink Table API核心算子的語義和具體示例,這部分將選取Bounded EventTime Tumble Window為例為大家編寫一個完整的包括Source和Sink定義的Apache Flink Table API Job。假設有一張?zhí)詫氻撁嬖L問表(PageAccess_tab),有地域,用戶ID和訪問時間。我們需要按不同地域統(tǒng)計每2分鐘的淘寶首頁的訪問量(PV)。具體數(shù)據(jù)如下:
1. Source 定義
自定義Apache Flink Stream Source需要實現(xiàn)StreamTableSource, StreamTableSource中通過StreamExecutionEnvironment 的addSource方法獲取DataStream, 所以我們需要自定義一個 SourceFunction, 并且要支持產(chǎn)生WaterMark,也就是要實現(xiàn)DefinedRowtimeAttributes接口。
(1) Source Function定義
支持接收攜帶EventTime的數(shù)據(jù)集合,Either的數(shù)據(jù)結構,Right表示W(wǎng)aterMark和Left表示數(shù)據(jù):
- class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]])
- extends SourceFunction[T] {
- override def run(ctx: SourceContext[T]): Unit = {
- dataWithTimestampList.foreach {
- case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
- case Right(w) => ctx.emitWatermark(new Watermark(w))
- }
- }
- override def cancel(): Unit = ???}
(2) 定義 StreamTableSource
我們自定義的Source要攜帶我們測試的數(shù)據(jù),以及對應的WaterMark數(shù)據(jù),具體如下:
- class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {
- val fieldNames = Array("accessTime", "region", "userId")
- val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))
- val rowType = new RowTypeInfo(
- Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
- fieldNames)
- // 頁面訪問表數(shù)據(jù) rows with timestamps and watermarks
- val data = Seq(
- Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),
- Right(1510365660000L),
- Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")),
- Right(1510365660000L),
- Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")),
- Right(1510366200000L),
- Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")),
- Right(1510366260000L),
- Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")),
- Right(1510373400000L)
- )
- override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
- Collections.singletonList(new RowtimeAttributeDescriptor(
- "accessTime",
- new ExistingField("accessTime"),
- PreserveWatermarks.INSTANCE))
- }
- override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
- execEnv.addSource(new MySourceFunction[Row](data)).returns(rowType).setParallelism(1)
- }
- override def getReturnType: TypeInformation[Row] = rowType
- override def getTableSchema: TableSchema = schema
- }
(3) Sink 定義
我們簡單的將計算結果寫入到Apache Flink內(nèi)置支持的CSVSink中,定義Sink如下:
- def getCsvTableSink: TableSink[Row] = {
- val tempFile = File.createTempFile("csv_sink_", "tem")
- // 打印sink的文件路徑,方便我們查看運行結果
- println("Sink path : " + tempFile)
- if (tempFile.exists()) {
- tempFile.delete
分享題目:ApacheFlink漫談系列(13)-TableAPI概述
瀏覽路徑:http://m.5511xx.com/article/ccoscog.html


咨詢
建站咨詢
