日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
ApacheFlinkCEP實(shí)現(xiàn)超時狀態(tài)監(jiān)控的步驟詳解

 

成都創(chuàng)新互聯(lián)公司是一家專業(yè)提供門頭溝企業(yè)網(wǎng)站建設(shè),專注與成都網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè)、H5開發(fā)、小程序制作等業(yè)務(wù)。10年已為門頭溝眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)絡(luò)公司優(yōu)惠進(jìn)行中。

CEP – Complex Event Processing復(fù)雜事件處理。

訂單下單后超過一定時間還未進(jìn)行支付確認(rèn)。

打車訂單生成后超過一定時間沒有確認(rèn)上車。

外賣超過預(yù)定送達(dá)時間一定時限還沒有確認(rèn)送達(dá)。

apache FlinkCEP API

CEPTimeoutEventJob

FlinkCEP源碼簡析

DataStream和PatternStream

DataStream 一般由相同類型事件或元素組成,一個DataStream可以通過一系列的轉(zhuǎn)換操作如Filter、Map等轉(zhuǎn)換為另一個DataStream。

PatternStream 是對CEP模式匹配的流的抽象,把DataStream和Pattern組合在一塊,然后對外提供select和flatSelect等方法。PatternStream并不是DataStream,它提供方法把匹配的模式序列和與其相關(guān)聯(lián)的事件組成的映射(就是Map<模式名稱,List<事件>>)發(fā)出去,發(fā)到SingleOutputStreamOperator里面,SingleOutputStreamOperator是DataStream。

CEPOperatorUtils工具類里的方法和變量使用了「PatternStream」來命名,比如:

public

static

SingleOutputStreamOperator
createPatternStream(…){…}
public

static

SingleOutputStreamOperator
createTimeoutPatternStream(…){…}

final

SingleOutputStreamOperator
patternStream;

SingleOutputStreamOperator

@Public

public

class

SingleOutputStreamOperator

extends

DataStream
{…}

PatternStream的構(gòu)造方法:

PatternStream
(
final
 
DataStream
 inputStream, 
final
 
Pattern
 pattern) {

  
this
.inputStream = inputStream;

  
this
.pattern = pattern;

  
this
.comparator = 
null
;

}



PatternStream
(
final
 
DataStream
 inputStream, 
final
 
Pattern
 pattern, 
final
 
EventComparator
 comparator) {

  
this
.inputStream = inputStream;

  
this
.pattern = pattern;

  
this
.comparator = comparator;

}

Pattern、Quantifier和EventComparator

Pattern是模式定義的Base Class,Builder模式,定義好的模式會被NFACompiler用來生成NFA。

如果想要自己實(shí)現(xiàn)類似next和followedBy這種方法,比如timeEnd,對Pattern進(jìn)行擴(kuò)展重寫應(yīng)該是可行的。

public
class
Pattern
extends
T> {
/** 模式名稱 */
private
final
String
name;
/** 前面一個模式 */
private
final
Pattern
extends
T> previous;
/** 一個事件如果要被當(dāng)前模式匹配到,必須滿足的約束條件 */
private
IterativeCondition
condition;
/** 時間窗口長度,在時間長度內(nèi)進(jìn)行模式匹配 */
private
Time
windowTime;
/** 模式量詞,意思是一個模式匹配幾個事件等 默認(rèn)是匹配到一個 */
private
Quantifier
quantifier =
Quantifier
.one(
ConsumingStrategy
.STRICT);
/** 停止將事件收集到循環(huán)狀態(tài)時,事件必須滿足的條件 */
private
IterativeCondition
untilCondition;
/**
* 適用于{@code times}模式,用來維護(hù)模式里事件可以連續(xù)發(fā)生的次數(shù)
*/
private
Times
times;
// 匹配到事件之后的跳過策略
private
final
AfterMatchSkipStrategy
afterMatchSkipStrategy;

}

Quantifier是用來描述具體模式行為的,主要有三大類:

Single-單一匹配、Looping-循環(huán)匹配、Times-一定次數(shù)或者次數(shù)范圍內(nèi)都能匹配到。

每一個模式Pattern可以是optional可選的(單一匹配或循環(huán)匹配),并可以設(shè)置ConsumingStrategy。

循環(huán)和次數(shù)也有一個額外的內(nèi)部ConsumingStrategy,用在模式中接收的事件之間。

public
class
Quantifier
 {
  ...
/**
   * 5個屬性,可以組合,但并非所有的組合都是有效的
   */
public
enum
QuantifierProperty
 {
    SINGLE,
    LOOPING,
    TIMES,
    OPTIONAL,
    GREEDY
  }
/**
   * 描述在此模式中匹配哪些事件的策略
   */
public
enum
ConsumingStrategy
 {
    STRICT,
    SKIP_TILL_NEXT,
    SKIP_TILL_ANY,
    NOT_FOLLOW,
    NOT_NEXT
  }
/**
   * 描述當(dāng)前模式里事件可以連續(xù)發(fā)生的次數(shù);舉個例子,模式條件無非就是boolean,滿足true條件的事件連續(xù)出現(xiàn)times次,或者一個次數(shù)范圍,比如2~4次,2次,3次,4次都會被當(dāng)前模式匹配出來,因此同一個事件會被重復(fù)匹配到
   */
public
static
class
Times
 {
private
final
int
 from;
private
final
int
 to;
private
Times
(
int
 from, 
int
 to) {
Preconditions
.checkArgument(from > 
0
, 
"The from should be a positive number greater than 0."
);
Preconditions
.checkArgument(to >= from, 
"The to should be a number greater than or equal to from: "
 + from + 
"."
);
this
.from = from;
this
.to = to;
    }
public
int
 getFrom() {
return
 from;
    }
public
int
 getTo() {
return
 to;
    }
// 次數(shù)范圍
public
static
Times
 of(
int
 from, 
int
 to) {
return
new
Times
(from, to);
    }
// 指定具體次數(shù)
public
static
Times
 of(
int
 times) {
return
new
Times
(times, times);
    }
@Override
public
boolean
 equals(
Object
 o) {
if
 (
this
 == o) {
return
true
;
      }
if
 (o == 
null
 || getClass() != o.getClass()) {
return
false
;
      }
Times
 times = (
Times
) o;
return
 from == times.from &&
        to == times.to;
    }
@Override
public
int
 hashCode() {
return
Objects
.hash(from, to);
    }
  }
  ...
}

EventComparator,自定義事件比較器,實(shí)現(xiàn)EventComparator接口。

public

interface

EventComparator

extends

Comparator
,
Serializable
{
long
serialVersionUID =
1L
;
}

NFACompiler和NFA

NFACompiler提供將Pattern編譯成NFA或者NFAFactory的方法,使用NFAFactory可以創(chuàng)建多個NFA。

public
class
NFACompiler
{

/**
* NFAFactory 創(chuàng)建NFA的接口
*
* @param Type of the input events which are processed by the NFA
*/
public
interface
NFAFactory

extends
Serializable
{
NFA createNFA();
}

/**
* NFAFactory的具體實(shí)現(xiàn)NFAFactoryImpl
*
*

The implementation takes the input type serializer, the window time and the set of
* states and their transitions to be able to create an NFA from them.
*
* @param Type of the input events which are processed by the NFA
*/
private
static
class
NFAFactoryImpl

implements
NFAFactory
{

private
static
final
long
serialVersionUID =
8939783698296714379L
;

private
final
long
windowTime;
private
final
Collection
<
State
> states;
private
final
boolean
timeoutHandling;

private
NFAFactoryImpl
(
long
windowTime,
Collection
<
State
> states,
boolean
timeoutHandling) {

this
.windowTime = windowTime;
this
.states = states;
this
.timeoutHandling = timeoutHandling;
}

@Override
public
NFA createNFA() {
// 一個NFA由狀態(tài)集合、時間窗口的長度和是否處理超時組成
return
new
NFA<>(states, windowTime, timeoutHandling);
}
}
}

NFA:Non-deterministic finite automaton – 非確定的有限(狀態(tài))自動機(jī)。

更多內(nèi)容參見

https://zh.wikipedia.org/wiki/非確定有限狀態(tài)自動機(jī)

public
class
NFA {
/**
* NFACompiler返回的所有有效的NFA狀態(tài)集合
* These are directly derived from the user-specified pattern.
*/
private
final
Map
<
String
,
State
> states;

/**
* Pattern.within(Time)指定的時間窗口長度
*/
private
final
long
windowTime;

/**
* 一個超時匹配的標(biāo)記
*/
private
final
boolean
handleTimeout;

}

 

PatternSelectFunction和PatternFlatSelectFunction

當(dāng)一個包含被匹配到的事件的映射能夠通過模式名稱訪問到的時候,PatternSelectFunction的select()方法會被調(diào)用。模式名稱是由Pattern定義的時候指定的。select()方法恰好返回一個結(jié)果,如果需要返回多個結(jié)果,則可以實(shí)現(xiàn)PatternFlatSelectFunction。

public

interface

PatternSelectFunction

extends

Function
,
Serializable
{

/**

* 從給到的事件映射中生成一個結(jié)果。這些事件使用他們關(guān)聯(lián)的模式名稱作為唯一標(biāo)識

*/

OUT select(
Map
<
String
,
List
> pattern)
throws

Exception
;

}

 

PatternFlatSelectFunction,不是返回一個OUT,而是使用Collector 把匹配到的事件收集起來。

public
interface
PatternFlatSelectFunction

extends
Function
,
Serializable
{

/**
* 生成一個或多個結(jié)果
*/
void
flatSelect(
Map
<
String
,
List
> pattern,
Collector
out)
throws
Exception
;
}

SelectTimeoutCepOperator、PatternTimeoutFunction

SelectTimeoutCepOperator是在CEPOperatorUtils中調(diào)用createTimeoutPatternStream()方法時創(chuàng)建出來。

SelectTimeoutCepOperator中會被算子迭代調(diào)用的方法是processMatchedSequences()和processTimedOutSequences()。

模板方法…對應(yīng)到抽象類AbstractKeyedCEPPatternOperator中processEvent()方法和advanceTime()方法。

還有FlatSelectTimeoutCepOperator和對應(yīng)的PatternFlatTimeoutFunction。

public
class
SelectTimeoutCepOperator

extends
AbstractKeyedCEPPatternOperator
SelectTimeoutCepOperator
.
SelectWrapper
> {
private
OutputTag
timedOutOutputTag;
public
SelectTimeoutCepOperator
(
TypeSerializer
inputSerializer,
boolean
isProcessingTime,
NFACompiler
.
NFAFactory
nfaFactory,
final
EventComparator
comparator,
AfterMatchSkipStrategy
skipStrategy,
// 參數(shù)命名混淆了flat…包括SelectWrapper類中的成員命名…
PatternSelectFunction
flatSelectFunction,
PatternTimeoutFunction
flatTimeoutFunction,
OutputTag
outputTag,
OutputTag
lateDataOutputTag) {
super
(
inputSerializer,
isProcessingTime,
nfaFactory,
comparator,
skipStrategy,
new
SelectWrapper
<>(flatSelectFunction, flatTimeoutFunction),
lateDataOutputTag);
this
.timedOutOutputTag = outputTag;
}

}
public
interface
PatternTimeoutFunction

extends
Function
,
Serializable
{
OUT timeout(
Map
<
String
,
List
> pattern,
long
timeoutTimestamp)
throws
Exception
;
}
public
interface
PatternFlatTimeoutFunction

extends
Function
,
Serializable
{
void
timeout(
Map
<
String
,
List
> pattern,
long
timeoutTimestamp,
Collector
out)
throws
Exception
;
}

 

CEP和CEPOperatorUtils

CEP是創(chuàng)建PatternStream的工具類,PatternStream只是DataStream和Pattern的組合。

public
class
CEP {

public
static

PatternStream
pattern(
DataStream
input,
Pattern
pattern) {
return
new
PatternStream
<>(input, pattern);
}

public
static

PatternStream
pattern(
DataStream
input,
Pattern
pattern,
EventComparator
comparator) {
return
new
PatternStream
<>(input, pattern, comparator);
}
}

 

CEPOperatorUtils是在PatternStream的select()方法和flatSelect()方法被調(diào)用的時候,去創(chuàng)建SingleOutputStreamOperator(DataStream)。

public
class
CEPOperatorUtils
{

private
static

SingleOutputStreamOperator
createPatternStream(
final
DataStream
inputStream,
final
Pattern
pattern,
final
TypeInformation
outTypeInfo,
final
boolean
timeoutHandling,
final
EventComparator
comparator,
final
OperatorBuilder
operatorBuilder) {
final
TypeSerializer
inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());

// check whether we use processing time
final
boolean
isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() ==
TimeCharacteristic
.
ProcessingTime
;

// compile our pattern into a NFAFactory to instantiate NFAs later on
final
NFACompiler
.
NFAFactory
nfaFactory =
NFACompiler
.compileFactory(pattern, timeoutHandling);

final
SingleOutputStreamOperator
patternStream;

if
(inputStream
instanceof
KeyedStream
) {
KeyedStream
keyedStream = (
KeyedStream
) inputStream;
patternStream = keyedStream.transform(
operatorBuilder.getKeyedOperatorName(),
outTypeInfo,
operatorBuilder.build(
inputSerializer,
isProcessingTime,
nfaFactory,
comparator,
pattern.getAfterMatchSkipStrategy()));
}
else
{
KeySelector
Byte
> keySelector =
new
NullByteKeySelector
<>();
patternStream = inputStream.keyBy(keySelector).transform(
operatorBuilder.getOperatorName(),
outTypeInfo,
operatorBuilder.build(
inputSerializer,
isProcessingTime,
nfaFactory,
comparator,
pattern.getAfterMatchSkipStrategy()
)).forceNonParallel();
}

return
patternStream;
}

}

FlinkCEP實(shí)現(xiàn)步驟

  1. IN: DataSource -> DataStream -> Transformations -> DataStream
  2. Pattern: Pattern.begin.where.next.where…times…
  3. PatternStream: CEP.pattern(DataStream, Pattern)
  4. DataStream: PatternStream.select(PatternSelectFunction) PatternStream.flatSelect(PatternSelectFunction)
  5. OUT: DataStream -> Transformations -> DataStream -> DataSink

FlinkCEP匹配超時實(shí)現(xiàn)步驟

TimeoutCEP的流需要keyBy,即KeyedStream,如果inputStream不是KeyedStream,會new一個0字節(jié)的Key(上面CEPOperatorUtils源碼里有提到)。

KeySelector
Byte
> keySelector =
new

NullByteKeySelector
<>();

Pattern最后調(diào)用within設(shè)置窗口時間。 如果是對主鍵進(jìn)行分組,一個時間窗口內(nèi)最多只會匹配出一個超時事件,使用PatternStream.select(…)就可以了。

  1. IN: DataSource -> DataStream -> Transformations -> DataStream -> keyBy -> KeyedStream
  2. Pattern: Pattern.begin.where.next.where…within(Time windowTime)
  3. PatternStream: CEP.pattern(KeyedStream, Pattern)
  4. OutputTag: new OutputTag(…)
  5. SingleOutputStreamOperator: PatternStream.flatSelect(OutputTag, PatternFlatTimeoutFunction, PatternFlatSelectFunction)
  6. DataStream: SingleOutputStreamOperator.getSideOutput(OutputTag)
  7. OUT: DataStream -> Transformations -> DataStream -> DataSink

FlinkCEP超時不足

和Flink窗口聚合類似,如果使用事件時間和依賴事件生成的水印向前推進(jìn),需要后續(xù)的事件到達(dá),才會觸發(fā)窗口進(jìn)行計(jì)算和輸出結(jié)果。

FlinkCEP超時完整demo

public
class
CEPTimeoutEventJob
{
private
static
final
String
LOCAL_KAFKA_BROKER =
“l(fā)ocalhost:9092”
;
private
static
final
String
GROUP_ID =
CEPTimeoutEventJob
.
class
.getSimpleName();
private
static
final
String
GROUP_TOPIC = GROUP_ID;

public
static
void
main(
String
[] args)
throws
Exception
{
// 參數(shù)
ParameterTool
params =
ParameterTool
.fromArgs(args);

StreamExecutionEnvironment
env =
StreamExecutionEnvironment
.getExecutionEnvironment();
// 使用事件時間
env.setStreamTimeCharacteristic(
TimeCharacteristic
.
EventTime
);
env.enableCheckpointing(
5000
);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig
.
ExternalizedCheckpointCleanup
.RETAIN_ON_CANCELLATION);
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(
RestartStrategies
.fixedDelayRestart(
5
,
10000
));

// 不使用POJO的時間
final
AssignerWithPeriodicWatermarks
extractor =
new
IngestionTimeExtractor
();

// 與Kafka Topic的Partition保持一致
env.setParallelism(
3
);

Properties
kafkaProps =
new
Properties
();
kafkaProps.setProperty(
“bootstrap.servers”
, LOCAL_KAFKA_BROKER);
kafkaProps.setProperty(
“group.id”
, GROUP_ID);

// 接入Kafka的消息
FlinkKafkaConsumer011
consumer =
new
FlinkKafkaConsumer011
<>(GROUP_TOPIC,
new
POJOSchema
(), kafkaProps);
DataStream
pojoDataStream = env.addSource(consumer)
.assignTimestampsAndWatermarks(extractor);
pojoDataStream.print();

// 根據(jù)主鍵aid分組 即對每一個POJO事件進(jìn)行匹配檢測【不同類型的POJO,可以采用不同的within時間】
// 1.
DataStream
keyedPojos = pojoDataStream
.keyBy(
“aid”
);

// 從初始化到終態(tài)-一個完整的POJO事件序列
// 2.
Pattern
completedPojo =
Pattern
.begin(
“init”
)
.where(
new
SimpleCondition
() {
private
static
final
long
serialVersionUID = –
6847788055093903603L
;

@Override
public
boolean
filter(POJO pojo)
throws
Exception
{
return
“02”
.equals(pojo.getAstatus());
}
})
.followedBy(
“end”
)
// .next(“end”)
.where(
new
SimpleCondition
() {
private
static
final
long
serialVersionUID = –
2655089736460847552L
;

@Override
public
boolean
filter(POJO pojo)
throws
Exception
{
return
“00”
.equals(pojo.getAstatus()) ||
“01”
.equals(pojo.getAstatus());
}
});

// 找出1分鐘內(nèi)【便于測試】都沒有到終態(tài)的事件aid
// 如果針對不同類型有不同within時間,比如有的是超時1分鐘,有的可能是超時1個小時 則生成多個PatternStream
// 3.
PatternStream
patternStream = CEP.pattern(keyedPojos, completedPojo.within(
Time
.minutes(
1
)));

// 定義側(cè)面輸出timedout
// 4.
OutputTag
timedout =
new
OutputTag
(
“timedout”
) {
private
static
final
long
serialVersionUID =
773503794597666247L
;
};

// OutputTag timeoutOutputTag, PatternFlatTimeoutFunction patternFlatTimeoutFunction, PatternFlatSelectFunction patternFlatSelectFunction
// 5.
SingleOutputStreamOperator
timeoutPojos = patternStream.flatSelect(
timedout,
new
POJOTimedOut
(),
new
FlatSelectNothing
()
);

// 打印輸出超時的POJO
// 6.7.
timeoutPojos.getSideOutput(timedout).print();
timeoutPojos.print();
env.execute(
CEPTimeoutEventJob
.
class
.getSimpleName());
}

/**
* 把超時的事件收集起來
*/
public
static
class
POJOTimedOut
implements
PatternFlatTimeoutFunction
{
private
static
final
long
serialVersionUID = –
4214641891396057732L
;

@Override
public
void
timeout(
Map
<
String
,
List
> map,
long
l,
Collector
collector)
throws
Exception
{
if
(
null
!= map.get(
“init”
)) {
for
(POJO pojoInit : map.get(
“init”
)) {
System
.out.println(
“timeout init:”
+ pojoInit.getAid());
collector.collect(pojoInit);
}
}
// 因?yàn)閑nd超時了,還沒收到end,所以這里是拿不到end的
System
.out.println(
“timeout end: ”
+ map.get(
“end”
));
}
}

/**
* 通常什么都不做,但也可以把所有匹配到的事件發(fā)往下游;如果是寬松臨近,被忽略或穿透的事件就沒辦法選中發(fā)往下游了
* 一分鐘時間內(nèi)走完init和end的數(shù)據(jù)
*
* @param
*/
public
static
class
FlatSelectNothing

implements
PatternFlatSelectFunction
{
private
static
final
long
serialVersionUID = –
3029589950677623844L
;

@Override
public
void
flatSelect(
Map
<
String
,
List
> pattern,
Collector
collector) {
System
.out.println(
“flatSelect: ”
+ pattern);
}
}
}

測試結(jié)果(followedBy):

3
> POJO{aid=
‘ID000-0’
, astyle=
‘STYLE000-0’
, aname=
‘NAME-0′
, logTime=
1563419728242
, energy=
529.00
, age=
0
, tt=
2019

07

18
, astatus=
’02’
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
‘ID000-1’
, astyle=
‘STYLE000-2’
, aname=
‘NAME-1′
, logTime=
1563419728783
, energy=
348.00
, age=
26
, tt=
2019

07

18
, astatus=
’02’
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
‘ID000-0’
, astyle=
‘STYLE000-0’
, aname=
‘NAME-0′
, logTime=
1563419749259
, energy=
492.00
, age=
0
, tt=
2019

07

18
, astatus=
’00’
, createTime=
null
, updateTime=
null
}
flatSelect: {init=[POJO{aid=
‘ID000-0’
, astyle=
‘STYLE000-0’
, aname=
‘NAME-0′
, logTime=
1563419728242
, energy=
529.00
, age=
0
, tt=
2019

07

18
, astatus=
’02’
, createTime=
null
, updateTime=
null
}],
end
=[POJO{aid=
‘ID000-0’
, astyle=
‘STYLE000-0’
, aname=
‘NAME-0′
, logTime=
1563419749259
, energy=
492.00
, age=
0
, tt=
2019

07

18
, astatus=
’00’
, createTime=
null
, updateTime=
null
}]}
timeout init:ID000-
1
3
> POJO{aid=
‘ID000-1’
, astyle=
‘STYLE000-2’
, aname=
‘NAME-1′
, logTime=
1563419728783
, energy=
348.00
, age=
26
, tt=
2019

07

18
, astatus=
’02’
, createTime=
null
, updateTime=
null
}
timeout
end
:
null
3
> POJO{aid=
‘ID000-2’
, astyle=
‘STYLE000-0’
, aname=
‘NAME-0′
, logTime=
1563419829639
, energy=
467.00
, age=
0
, tt=
2019

07

18
, astatus=
’03’
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
‘ID000-2’
, astyle=
‘STYLE000-0’
, aname=
‘NAME-0′
, logTime=
1563419841394
, energy=
107.00
, age=
0
, tt=
2019

07

18
, astatus=
’00’
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
‘ID000-3’
, astyle=
‘STYLE000-0’
, aname=
‘NAME-0′
, logTime=
1563419967721
, energy=
431.00
, age=
0
, tt=
2019

07

18
, astatus=
’02’
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
‘ID000-3’
, astyle=
‘STYLE000-2’
, aname=
‘NAME-0′
, logTime=
1563419979567
, energy=
32.00
, age=
26
, tt=
2019

07

18
, astatus=
’03’
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
‘ID000-3’
, astyle=
‘STYLE000-2’
, aname=
‘NAME-0′
, logTime=
1563419993612
, energy=
542.00
, age=
26
, tt=
2019

07

18
, astatus=
’01’
, createTime=
null
, updateTime=
null
}
flatSelect: {init=[POJO{aid=
‘ID000-3’
, astyle=
‘STYLE000-0’
, aname=
‘NAME-0′
, logTime=
1563419967721
, energy=
431.00
, age=
0
, tt=
2019

07

18
, astatus=
’02’
, createTime=
null
, updateTime=
null
}],
end
=[POJO{aid=
‘ID000-3’
, astyle=
‘STYLE000-2’
, aname=
‘NAME-0′
, logTime=
1563419993612
, energy=
542.00
, age=
26
, tt=
2019

07

18
, astatus=
’01’
, createTime=
null
, updateTime=
null
}]}
3
> POJO{aid=
‘ID000-4’
, astyle=
‘STYLE000-0’
, aname=
‘NAME-0′
, logTime=
1563420063760
, energy=
122.00
, age=
0
, tt=
2019

07

18
, astatus=
’02’
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
‘ID000-4’
, astyle=
‘STYLE000-0’
, aname=
‘NAME-0′
, logTime=
1563420078008
, energy=
275.00
, age=
0
, tt=
2019

07

18
, astatus=
’03’
, createTime=
null
, updateTime=
null
}
timeout init:ID000-
4
3
> POJO{aid=
‘ID000-4’
, astyle=
‘STYLE000-0’
, aname=
‘NAME-0′
, logTime=
1563420063760
, energy=
122.00
, age=
0
, tt=
2019

07

18
, astatus=
’02’
, createTime=
null
, updateTime=
null
}
timeout
end
:
null

總結(jié)

以上所述是小編給大家介紹的Apache FlinkCEP 實(shí)現(xiàn)超時狀態(tài)監(jiān)控的步驟,希望對大家有所幫助,如果大家有任何疑問歡迎給我留言,小編會及時回復(fù)大家的!

香港服務(wù)器選創(chuàng)新互聯(lián),2H2G首月10元開通。
創(chuàng)新互聯(lián)(www.cdcxhl.com)互聯(lián)網(wǎng)服務(wù)提供商,擁有超過10年的服務(wù)器租用、服務(wù)器托管、云服務(wù)器、虛擬主機(jī)、網(wǎng)站系統(tǒng)開發(fā)經(jīng)驗(yàn)。專業(yè)提供云主機(jī)、虛擬主機(jī)、域名注冊、VPS主機(jī)、云服務(wù)器、香港云服務(wù)器、免備案服務(wù)器等。


網(wǎng)站名稱:ApacheFlinkCEP實(shí)現(xiàn)超時狀態(tài)監(jiān)控的步驟詳解
分享路徑:http://m.5511xx.com/article/dhsoejp.html