新聞中心
SQL 的時(shí)間語義

hello,我是老羊,今天跟著老羊的思路學(xué)習(xí) Flink SQL 的時(shí)間語義:
- 與離線處理中常見的時(shí)間分區(qū)字段一樣,在實(shí)時(shí)處理中,時(shí)間屬性也是一個(gè)核心概念。Flink 支持 處理時(shí)間、事件時(shí)間、攝入時(shí)間 三種時(shí)間語義。
- 下文會(huì)分別介紹三種時(shí)間語義的應(yīng)用場(chǎng)景及案例。三種時(shí)間在生產(chǎn)環(huán)境的使用頻次 事件時(shí)間(SQL 常用) > 處理時(shí)間(SQL 幾乎不用,DataStream 少用) > 攝入時(shí)間(不用)
一、Flink 三種時(shí)間屬性簡介
time
- 事件時(shí)間:指的是數(shù)據(jù)本身攜帶的時(shí)間,這個(gè)時(shí)間是在事件產(chǎn)生時(shí)的時(shí)間,而且在 Flink SQL 觸發(fā)計(jì)算時(shí),也使用數(shù)據(jù)本身攜帶的時(shí)間。這就叫做 事件時(shí)間。目前生產(chǎn)環(huán)境中用的最多。
- 處理時(shí)間:指的是具體算子計(jì)算數(shù)據(jù)執(zhí)行時(shí)的機(jī)器時(shí)間(例如在算子中 Java 取 System.currentTimeMillis()) ),在生產(chǎn)環(huán)境中用的次多。
- 攝入時(shí)間:指的是數(shù)據(jù)從數(shù)據(jù)源進(jìn)入 Flink 的時(shí)間。攝入時(shí)間用的最少,可以說基本不使用。
小伙伴萌要注意到:
- 上述的三種時(shí)間概念不是由于有了數(shù)據(jù)而誕生的,而是有了 Flink 之后根據(jù)實(shí)際的應(yīng)用場(chǎng)景而誕生的。以事件時(shí)間舉個(gè)例子,如果只是數(shù)據(jù)攜帶了時(shí)間,F(xiàn)link 也消費(fèi)了這個(gè)數(shù)據(jù),但是在 Flink 中沒有使用數(shù)據(jù)的這個(gè)時(shí)間作為計(jì)算的觸發(fā)條件,也不能把這個(gè) Flink 任務(wù)叫做事件時(shí)間的任務(wù)。
- 其次,要認(rèn)識(shí)到,一般一個(gè) Flink 任務(wù)只會(huì)有一個(gè)時(shí)間屬性,所以時(shí)間屬性通常認(rèn)為是一個(gè)任務(wù)粒度的。舉例:我們可以說 A 任務(wù)是事件時(shí)間語義的任務(wù),B 任務(wù)是處理時(shí)間語義的任務(wù)。當(dāng)然了,一個(gè)任務(wù)也可以存在多個(gè)時(shí)間屬性。
二、Flink 三種時(shí)間屬性的應(yīng)用場(chǎng)景
講到這里,xdm 會(huì)問,博主上面寫的 3 種時(shí)間屬性到底對(duì)我們的任務(wù)有啥影響呢?3 種時(shí)間屬性的應(yīng)用場(chǎng)景是啥?
先說結(jié)論,在 Flink 中時(shí)間的作用:
- 主要體現(xiàn)在包含時(shí)間窗口的計(jì)算中:用于標(biāo)識(shí)任務(wù)的時(shí)間進(jìn)度,來判斷是否需要觸發(fā)窗口的計(jì)算。比如常用的滾動(dòng)窗口、滑動(dòng)窗口等都需要時(shí)間推動(dòng)觸發(fā)。這些窗口的應(yīng)用場(chǎng)景后續(xù)會(huì)詳細(xì)介紹。
- 次要體現(xiàn)在自定義時(shí)間語義的計(jì)算中:舉個(gè)例子,比如用戶可以自定義每隔 10s 的本地時(shí)間,或者消費(fèi)到的數(shù)據(jù)的時(shí)間戳每增大 10s,就把計(jì)算結(jié)果輸出一次,時(shí)間在此類應(yīng)用中也是一種標(biāo)識(shí)任務(wù)進(jìn)度的作用。
博主以 滾動(dòng)窗口 的聚合任務(wù)為例來介紹一下事件時(shí)間和處理時(shí)間的對(duì)比區(qū)別。
1. 事件時(shí)間案例:還是以之前的 clicks 表拿來舉例。
tumble window
上面這個(gè)案例的窗口大小是 1 小時(shí),需求方需要按照用戶點(diǎn)擊時(shí)間戳 cTime 劃分?jǐn)?shù)據(jù)(劃分滾動(dòng)窗口),然后計(jì)算出 count 聚合結(jié)果(這樣計(jì)算能反映出事件的真實(shí)發(fā)生時(shí)間),那么就需要把 cTime 設(shè)置為窗口的劃分時(shí)間戳,即代碼中 tumble(cTime, interval '1' hour)。
上面這種就叫做事件時(shí)間。即用數(shù)據(jù)中自帶的時(shí)間戳進(jìn)行窗口的劃分(點(diǎn)擊操作真實(shí)的發(fā)生時(shí)間)。
后續(xù) Flink SQL 任務(wù)在運(yùn)行的過程中也會(huì)實(shí)際按照 cTime 的當(dāng)前時(shí)間作為一小時(shí)窗口結(jié)束觸發(fā)條件并計(jì)算一個(gè)小時(shí)窗口內(nèi)的數(shù)據(jù)。
2.處理時(shí)間案例:還是以之前的 clicks 表拿來舉例。
還是上面那個(gè)案例,但是這次需求方不需要按照數(shù)據(jù)上的時(shí)間戳劃分?jǐn)?shù)據(jù)(劃分滾動(dòng)窗口),只需要數(shù)據(jù)來了之后, 在 Flink 機(jī)器上的時(shí)間作為一小時(shí)窗口結(jié)束的書法條件并計(jì)算。
那么這種觸發(fā)機(jī)制就是處理時(shí)間。
3. 攝入時(shí)間案例:在 Flink 從外部數(shù)據(jù)源讀取到數(shù)據(jù)時(shí),給這條數(shù)據(jù)帶上的當(dāng)前數(shù)據(jù)源算子的本地時(shí)間戳。下游可以用這個(gè)時(shí)間戳進(jìn)行窗口聚合,不過這種幾乎不使用。
三、SQL 指定時(shí)間屬性的兩種方式
如果要滿足 Flink SQL 時(shí)間窗口類的聚合操作,SQL 或 Table API 中的 數(shù)據(jù)源表 就需要提供時(shí)間屬性(相當(dāng)于我們把這個(gè)時(shí)間屬性在 數(shù)據(jù)源表 上面進(jìn)行聲明),以及支持時(shí)間相關(guān)的操作。
那么來看看 Flink SQL 為我們提供的兩種指定時(shí)間戳的方式:
- CREATE TABLE DDL 創(chuàng)建表的時(shí)候指定。
- 可以在 DataStream 中指定,在后續(xù)的 DataStream 轉(zhuǎn)的 Table 中使用。
一旦時(shí)間屬性定義好,它就可以像普通列一樣使用,也可以在時(shí)間相關(guān)的操作中使用。
四、SQL 事件時(shí)間案例
來看看 Flink 中如何指定事件時(shí)間。
1. CREATE TABLE DDL 指定時(shí)間戳的方式。
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- 使用下面這句來將 user_action_time 聲明為事件時(shí)間,并且聲明 watermark 的生成規(guī)則,即 user_action_time 減 5 秒
-- 事件時(shí)間列的字段類型必須是 TIMESTAMP 或者 TIMESTAMP_LTZ 類型
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
從上面這條語句可以看到,如果想使用事件時(shí)間,那么我們的時(shí)間戳類型必須是 TIMESTAMP 或者 TIMESTAMP_LTZ 類型。很多小伙伴會(huì)想到,我們的時(shí)間戳一般不都是秒或者是毫秒(BIGINT 類型)嘛,那這種情況怎么辦?
解決方案必須要有啊。如下。
CREATE TABLE user_actions (
user_name STRING,
data STRING,
-- 1. 這個(gè) ts 就是常見的毫秒級(jí)別時(shí)間戳
ts BIGINT,
-- 2. 將毫秒時(shí)間戳轉(zhuǎn)換成 TIMESTAMP_LTZ 類型
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
-- 3. 使用下面這句來將 user_action_time 聲明為事件時(shí)間,并且聲明 watermark 的生成規(guī)則,即 user_action_time 減 5 秒
-- 事件時(shí)間列的字段類型必須是 TIMESTAMP 或者 TIMESTAMP_LTZ 類型
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
...
);
SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
2.DataStream 中指定事件時(shí)間。
之前介紹了 Table 和 DataStream 可以互轉(zhuǎn),那么 Flink 也提供了一個(gè)能力,就是在 Table 轉(zhuǎn)為 DataStream 時(shí),指定時(shí)間戳字段。如下案例:
public class DataStreamSourceEventTimeTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
// 1. 分配 watermark
DataStream r = env.addSource(new UserDefinedSource())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.minutes(0L)) {
@Override
public long extractTimestamp(Row element) {
return (long) element.getField("f2");
}
});
// 2. 使用 f2.rowtime 的方式將 f2 字段指為事件時(shí)間時(shí)間戳
Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2.rowtime");
tEnv.createTemporaryView("source_table", sourceTable);
// 3. 在 tumble window 中使用 f2
String tumbleWindowSql =
"SELECT TUMBLE_START(f2, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"
+ "FROM source_table\n"
+ "GROUP BY TUMBLE(f2, INTERVAL '5' SECOND)"
;
Table resultTable = tEnv.sqlQuery(tumbleWindowSql);
tEnv.toDataStream(resultTable, Row.class).print();
env.execute();
}
private static class UserDefinedSource implements SourceFunction, ResultTypeQueryable {
private volatile boolean isCancel;
@Override
public void run(SourceContext sourceContext) throws Exception {
int i = 0;
while (!this.isCancel) {
sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));
Thread.sleep(10L);
i++;
}
}
@Override
public void cancel() {
this.isCancel = true;
}
@Override
public TypeInformation getProducedType() {
return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),
TypeInformation.of(Long.class));
}
}
}
五、SQL 處理時(shí)間案例
來看看 Flink SQL 中如何指定處理時(shí)間。
1.CREATE TABLE DDL 指定時(shí)間戳的方式。
CREATE TABLE user_actions (
user_name STRING,
data STRING,
-- 使用下面這句來將 user_action_time 聲明為處理時(shí)間
user_action_time AS PROCTIME()
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
? DataStream 中指定處理時(shí)間。
public class DataStreamSourceProcessingTimeTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
// 1. 分配 watermark
DataStream r = env.addSource(new UserDefinedSource());
// 2. 使用 proctime.proctime 的方式將 f2 字段指為處理時(shí)間時(shí)間戳
Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2, proctime.proctime");
tEnv.createTemporaryView("source_table", sourceTable);
// 3. 在 tumble window 中使用 f2
String tumbleWindowSql =
"SELECT TUMBLE_START(proctime, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"
+ "FROM source_table\n"
+ "GROUP BY TUMBLE(proctime, INTERVAL '5' SECOND)"
;
Table resultTable = tEnv.sqlQuery(tumbleWindowSql);
tEnv.toDataStream(resultTable, Row.class).print();
env.execute();
}
private static class UserDefinedSource implements SourceFunction, ResultTypeQueryable {
private volatile boolean isCancel;
@Override
public void run(SourceContext sourceContext) throws Exception {
int i = 0;
while (!this.isCancel) {
sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));
Thread.sleep(10L);
i++;
}
}
@Override
public void cancel() {
this.isCancel = true;
}
@Override
public TypeInformation getProducedType() {
return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),
TypeInformation.of(Long.class));
}
}
}
新聞標(biāo)題:Flink SQL 知其所以然:SQL 的時(shí)間語義!
URL地址:http://m.5511xx.com/article/cdscjei.html


咨詢
建站咨詢
