日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
FlinkSQL知其所以然:SQLDDL!

SQL 語(yǔ)法篇

一、DDL:Create 子句

大家好,我是老羊,今天來(lái)學(xué)一波 Flink SQL 中的 DDL。

創(chuàng)新互聯(lián)專注于高縣網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠(chéng)為您提供高縣營(yíng)銷型網(wǎng)站建設(shè),高縣網(wǎng)站制作、高縣網(wǎng)頁(yè)設(shè)計(jì)、高縣網(wǎng)站官網(wǎng)定制、小程序開(kāi)發(fā)服務(wù),打造高縣網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供高縣網(wǎng)站排名全網(wǎng)營(yíng)銷落地服務(wù)。

CREATE 語(yǔ)句用于向當(dāng)前或指定的 Catalog 中注冊(cè)庫(kù)、表、視圖或函數(shù)。注冊(cè)后的庫(kù)、表、視圖和函數(shù)可以在 SQL 查詢中使用。

目前 Flink SQL 支持下列 CREATE 語(yǔ)句:

  1. CREATE TABLE。
  2. CREATE DATABASE。
  3. CREATE VIEW。
  4. CREATE FUNCTION。

此節(jié)重點(diǎn)介紹建表,建數(shù)據(jù)庫(kù)、視圖和 UDF 會(huì)在后面的擴(kuò)展章節(jié)進(jìn)行介紹。

1、建表語(yǔ)句

下面的 SQL 語(yǔ)句就是建表語(yǔ)句的定義,根據(jù)指定的表名創(chuàng)建一個(gè)表,如果同名表已經(jīng)在 catalog 中存在了,則無(wú)法注冊(cè)。

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
(
{ | | }[ , ...n]
[ ]
[ ][ , ...n]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [( )] ]
:
column_name column_type [ ] [COMMENT column_comment]
:
[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED
:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
:
column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]
:
column_name AS computed_column_expression [COMMENT column_comment]
:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
:
[catalog_name.][db_name.]table_name
:
{
{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[, ...]

2、表中的列

  • 常規(guī)列(即物理列)

物理列是數(shù)據(jù)庫(kù)中所說(shuō)的常規(guī)列。其定義了物理介質(zhì)中存儲(chǔ)的數(shù)據(jù)中字段的名稱、類型和順序。

其他類型的列可以在物理列之間聲明,但不會(huì)影響最終的物理列的讀取。

舉一個(gè)僅包含常規(guī)列的表的案例:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING
) WITH (
...
);
  • 元數(shù)據(jù)列

元數(shù)據(jù)列是 SQL 標(biāo)準(zhǔn)的擴(kuò)展,允許訪問(wèn)數(shù)據(jù)源本身具有的一些元數(shù)據(jù)。元數(shù)據(jù)列由 METADATA 關(guān)鍵字標(biāo)識(shí)。

例如,我們可以使用元數(shù)據(jù)列從 Kafka 數(shù)據(jù)中讀取 Kafka 數(shù)據(jù)自帶的時(shí)間戳(這個(gè)時(shí)間戳不是數(shù)據(jù)中的某個(gè)時(shí)間戳字段,而是數(shù)據(jù)寫入 Kafka 時(shí),Kafka 引擎給這條數(shù)據(jù)打上的時(shí)間戳標(biāo)記),然后我們可以在 Flink SQL 中使用這個(gè)時(shí)間戳,比如進(jìn)行基于時(shí)間的窗口操作。

舉例:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 讀取 kafka 本身自帶的時(shí)間戳
`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka'
...
);

元數(shù)據(jù)列可以用于后續(xù)數(shù)據(jù)的處理,或者寫入到目標(biāo)表中。

舉例:

INSERT INTO MyTable 
SELECT
user_id
, name
, record_time + INTERVAL '1' SECOND
FROM MyTable;

如果自定義的列名稱和 Connector 中定義 metadata 字段的名稱一樣的話,F(xiàn)ROM xxx 子句是可以被省略的。

舉例:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 讀取 kafka 本身自帶的時(shí)間戳
`timestamp` TIMESTAMP_LTZ(3) METADATA
) WITH (
'connector' = 'kafka'
...
);

關(guān)于 Flink SQL 的每種 Connector 都提供了哪些 metadata 字段,詳細(xì)可見(jiàn)官網(wǎng)文檔 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/。

如果自定義列的數(shù)據(jù)類型和 Connector 中定義的 metadata 字段的數(shù)據(jù)類型不一致的話,程序運(yùn)行時(shí)會(huì)自動(dòng) cast 強(qiáng)轉(zhuǎn)。但是這要求兩種數(shù)據(jù)類型是可以強(qiáng)轉(zhuǎn)的。舉例如下:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 將時(shí)間戳強(qiáng)轉(zhuǎn)為 BIGINT
`timestamp` BIGINT METADATA
) WITH (
'connector' = 'kafka'
...
);

默認(rèn)情況下,F(xiàn)link SQL planner 認(rèn)為 metadata 列是可以 讀取 也可以 寫入 的。但是有些外部存儲(chǔ)系統(tǒng)的元數(shù)據(jù)信息是只能用于讀取,不能寫入的。

那么在往一個(gè)表寫入的場(chǎng)景下,我們就可以使用 VIRTUAL 關(guān)鍵字來(lái)標(biāo)識(shí)某個(gè)元數(shù)據(jù)列不寫入到外部存儲(chǔ)中(不持久化)。

以 Kafka 舉例:

CREATE TABLE MyTable (
-- sink 時(shí)會(huì)寫入
`timestamp` BIGINT METADATA,
-- sink 時(shí)不寫入
`offset` BIGINT METADATA VIRTUAL,
`user_id` BIGINT,
`name` STRING,
) WITH (
'connector' = 'kafka'
...
);

在上面這個(gè)案例中,Kafka 引擎的 offset 是只讀的。所以我們?cè)诎?MyTable 作為數(shù)據(jù)源(輸入)表時(shí),schema 中是包含 offset 的。在把 MyTable 作為數(shù)據(jù)匯(輸出)表時(shí),schema 中是不包含 offset 的。如下:

-- 當(dāng)做數(shù)據(jù)源(輸入)的 schema
MyTable(`timestamp` BIGINT, `offset` BIGINT, `user_id` BIGINT, `name` STRING)
-- 當(dāng)做數(shù)據(jù)匯(輸出)的 schema
MyTable(`timestamp` BIGINT, `user_id` BIGINT, `name` STRING)

所以這里在寫入時(shí)需要注意,不要在 SQL 的 INSERT INTO 語(yǔ)句中寫入 offset 列,否則 Flink SQL 任務(wù)會(huì)直接報(bào)錯(cuò)。

  • 計(jì)算列

計(jì)算列其實(shí)就是在寫建表的 DDL 時(shí),可以拿已有的一些列經(jīng)過(guò)一些自定義的運(yùn)算生成的新列。這些列本身是沒(méi)有以物理形式存儲(chǔ)到數(shù)據(jù)源中的。

舉例:

CREATE TABLE MyTable (
`user_id` BIGINT,
`price` DOUBLE,
`quantity` DOUBLE,
-- cost 就是使用 price 和 quanitity 生成的計(jì)算列,計(jì)算方式為 price * quanitity
`cost` AS price * quanitity,
) WITH (
'connector' = 'kafka'
...
);

注意!!!

計(jì)算列可以包含其他列、常量或者函數(shù),但是不能寫一個(gè)子查詢進(jìn)去。

小伙伴萌這時(shí)會(huì)問(wèn)到一個(gè)問(wèn)題,既然只能包含列、常量或者函數(shù)計(jì)算,我就直接在 DML query 代碼中寫就完事了唄,為啥還要專門在 DDL 中定義呢?

結(jié)論:沒(méi)錯(cuò),如果只是簡(jiǎn)單的四則運(yùn)算的話直接寫在 DML 中就可以,但是計(jì)算列一般是用于定義時(shí)間屬性的(因?yàn)樵?SQL 任務(wù)中時(shí)間屬性只能在 DDL 中定義,不能在 DML 語(yǔ)句中定義)。比如要把輸入數(shù)據(jù)的時(shí)間格式標(biāo)準(zhǔn)化。處理時(shí)間、事件時(shí)間分別舉例如下:

  • 處理時(shí)間:使用 PROCTIME() 函數(shù)來(lái)定義處理時(shí)間列
  • 事件時(shí)間:事件時(shí)間的時(shí)間戳可以在聲明 Watermark 之前進(jìn)行預(yù)處理。比如如果字段不是 TIMESTAMP(3) 類型或者時(shí)間戳是嵌套在 JSON 字符串中的,則可以使用計(jì)算列進(jìn)行預(yù)處理。

注意!!!和虛擬 metadata 列是類似的,計(jì)算列也是只能讀不能寫的。

也就是說(shuō),我們?cè)诎?MyTable 作為數(shù)據(jù)源(輸入)表時(shí),schema 中是包含 cost 的。

在把 MyTable 作為數(shù)據(jù)匯(輸出)表時(shí),schema 中是不包含 cost 的。舉例:

-- 當(dāng)做數(shù)據(jù)源(輸入)的 schema
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE, `cost` DOUBLE)
-- 當(dāng)做數(shù)據(jù)匯(輸出)的 schema
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE)

3、定義 Watermark

Watermark 是在 Create Table 中進(jìn)行定義的。具體 SQL 語(yǔ)法標(biāo)準(zhǔn)是 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression。

其中:

  1. rowtime_column_name:表的事件時(shí)間屬性字段。該列必須是 TIMESTAMP(3)、TIMESTAMP_LTZ(3) 類,這個(gè)時(shí)間可以是一個(gè)計(jì)算列。
  2. watermark_strategy_expression:定義 Watermark 的生成策略。Watermark 的一般都是由 rowtime_column_name 列減掉一段固定時(shí)間間隔。SQL 中 Watermark 的生產(chǎn)策略是:當(dāng)前 Watermark 大于上次發(fā)出的 Watermark 時(shí)發(fā)出當(dāng)前 Watermark。

注意:

如果你使用的是事件時(shí)間語(yǔ)義,那么必須要設(shè)設(shè)置事件時(shí)間屬性和 WATERMARK 生成策略。

Watermark 的發(fā)出頻率:Watermark 發(fā)出一般是間隔一定時(shí)間的,Watermark 的發(fā)出間隔時(shí)間可以由 pipeline.auto-watermark-interval 進(jìn)行配置,如果設(shè)置為 200ms 則每 200ms 會(huì)計(jì)算一次 Watermark,然如果比之前發(fā)出的 Watermark 大,則發(fā)出。如果間隔設(shè)為 0ms,則 Watermark 只要滿足觸發(fā)條件就會(huì)發(fā)出,不會(huì)受到間隔時(shí)間控制。

Flink SQL 提供了幾種 WATERMARK 生產(chǎn)策略:

  1. 有界無(wú)序:設(shè)置方式為 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit。此類策略就可以用于設(shè)置最大亂序時(shí)間,假如設(shè)置為 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND,則生成的是運(yùn)行 5s 延遲的 Watermark。一般都用這種 Watermark 生成策略,此類 Watermark 生成策略通常用于有數(shù)據(jù)亂序的場(chǎng)景中,而對(duì)應(yīng)到實(shí)際的場(chǎng)景中,數(shù)據(jù)都是會(huì)存在亂序的,所以基本都使用此類策略。
  2. 嚴(yán)格升序:設(shè)置方式為 WATERMARK FOR rowtime_column AS rowtime_column。一般基本不用這種方式。如果你能保證你的數(shù)據(jù)源的時(shí)間戳是嚴(yán)格升序的,那就可以使用這種方式。嚴(yán)格升序代表 Flink 任務(wù)認(rèn)為時(shí)間戳只會(huì)越來(lái)越大,也不存在相等的情況,只要相等或者小于之前的,就認(rèn)為是遲到的數(shù)據(jù)。
  3. 遞增:設(shè)置方式為 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND。一般基本不用這種方式。如果設(shè)置此類,則允許有相同的時(shí)間戳出現(xiàn)。

4、Create Table With 子句

先看一個(gè)案例:

CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)

可以看到 DDL 中 With 子句就是在建表時(shí),描述數(shù)據(jù)源、數(shù)據(jù)匯的具體外部存儲(chǔ)的元數(shù)據(jù)信息的。

一般 With 中的配置項(xiàng)由 Flink SQL 的 Connector(鏈接外部存儲(chǔ)的連接器) 來(lái)定義,每種 Connector 提供的 With 配置項(xiàng)都是不同的。

注意:

Flink SQL 中 Connector 其實(shí)就是 Flink 用于鏈接外部數(shù)據(jù)源的接口。舉一個(gè)類似的例子,在 Java 中想連接到 MySQL,需要使用 mysql-connector-java 包提供的 Java API 去鏈接。映射到 Flink SQL 中,在 Flink SQL 中要連接到 Kafka,需要使用 kafka connector。

Flink SQL 已經(jīng)提供了一系列的內(nèi)置 Connector,具體可見(jiàn) https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/。

回到上述案例中,With 聲明了以下幾項(xiàng)信息:

  1. 'connector' = 'kafka':聲明外部存儲(chǔ)是 Kafka。
  2. 'topic' = 'user_behavior':聲明 Flink SQL 任務(wù)要連接的 Kafka 表的 topic 是 user_behavior。
  3. 'properties.bootstrap.servers' = 'localhost:9092':聲明 Kafka 的 server ip 是 localhost:9092。
  4. 'properties.group.id' = 'testGroup':聲明 Flink SQL 任務(wù)消費(fèi)這個(gè) Kafka topic,會(huì)使用 testGroup 的 group id 去消費(fèi)。
  5. 'scan.startup.mode' = 'earliest-offset':聲明 Flink SQL 任務(wù)消費(fèi)這個(gè) Kafka topic 會(huì)從最早位點(diǎn)開(kāi)始消費(fèi)。
  6. 'format' = 'csv':聲明 Flink SQL 任務(wù)讀入或者寫出時(shí)對(duì)于 Kafka 消息的序列化方式是 csv 格式。

從這里也可以看出來(lái) With 中具體要配置哪些配置項(xiàng)都是和每種 Connector 決定的。

5、Create Table Like 子句

Like 子句是 Create Table 子句的一個(gè)延伸。舉例:

下面定義了一張 Orders 表:

CREATE TABLE Orders (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset'
);

但是忘記定義 Watermark 了,那如果想加上 Watermark,就可以用 Like 子句定義一張帶 Watermark 的新表:

CREATE TABLE Orders_with_watermark (
-- 1. 添加了 WATERMARK 定義
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
-- 2. 覆蓋了原 Orders 表中 scan.startup.mode 參數(shù)
'scan.startup.mode' = 'latest-offset'
)
-- 3. Like 子句聲明是在原來(lái)的 Orders 表的基礎(chǔ)上定義 Orders_with_watermark 表
LIKE Orders;

上面這個(gè)語(yǔ)句的效果就等同于:

CREATE TABLE Orders_with_watermark (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'latest-offset'
);

不過(guò)這種不常使用。就不過(guò)多介紹了。如果小伙伴萌感興趣,直接去官網(wǎng)參考具體注意事項(xiàng):

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#like。


本文題目:FlinkSQL知其所以然:SQLDDL!
分享路徑:http://m.5511xx.com/article/dpoeess.html