• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    Apache FlinkCEP 实现超时状态监控的步骤详解

     

    CEP - Complex Event Processing复杂事件处理。

    订单下单后超过一定时间还未进行支付确认。

    打车订单生成后超过一定时间没有确认上车。

    外卖超过预定送达时间一定时限还没有确认送达。

    Apache FlinkCEP API

    CEPTimeoutEventJob

    FlinkCEP源码简析

    DataStream和PatternStream

    DataStream 一般由相同类型事件或元素组成,一个DataStream可以通过一系列的转换操作如Filter、Map等转换为另一个DataStream。

    PatternStream 是对CEP模式匹配的流的抽象,把DataStream和Pattern组合在一块,然后对外提供select和flatSelect等方法。PatternStream并不是DataStream,它提供方法把匹配的模式序列和与其相关联的事件组成的映射(就是Map<模式名称,List<事件>>)发出去,发到SingleOutputStreamOperator里面,SingleOutputStreamOperator是DataStream。

    CEPOperatorUtils工具类里的方法和变量使用了「PatternStream」来命名,比如:

    public
     
    static
     <IN, OUT> 
    SingleOutputStreamOperator
    <OUT> createPatternStream(...){...}
    public
    
    static
     <IN, OUT1, OUT2> 
    SingleOutputStreamOperator
    <OUT1> createTimeoutPatternStream(...){...}
    
    final
     
    SingleOutputStreamOperator
    <OUT> patternStream;

    SingleOutputStreamOperator

    @Public
    
    public
     
    class
     
    SingleOutputStreamOperator
    <T> 
    extends
     
    DataStream
    <T> {...}

    PatternStream的构造方法:

    PatternStream
    (
    final
     
    DataStream
    <T> inputStream, 
    final
     
    Pattern
    <T, ?> pattern) {
    
      
    this
    .inputStream = inputStream;
    
      
    this
    .pattern = pattern;
    
      
    this
    .comparator = 
    null
    ;
    
    }
    
    
    
    PatternStream
    (
    final
     
    DataStream
    <T> inputStream, 
    final
     
    Pattern
    <T, ?> pattern, 
    final
     
    EventComparator
    <T> comparator) {
    
      
    this
    .inputStream = inputStream;
    
      
    this
    .pattern = pattern;
    
      
    this
    .comparator = comparator;
    
    }

    Pattern、Quantifier和EventComparator

    Pattern是模式定义的Base Class,Builder模式,定义好的模式会被NFACompiler用来生成NFA。

    如果想要自己实现类似next和followedBy这种方法,比如timeEnd,对Pattern进行扩展重写应该是可行的。

    public
    class
    Pattern
    <T, F 
    extends
     T> {
    /** 模式名称 */
    private
    final
    String
     name;
    /** 前面一个模式 */
    private
    final
    Pattern
    <T, ? 
    extends
     T> previous;
    /** 一个事件如果要被当前模式匹配到,必须满足的约束条件 */
    private
    IterativeCondition
    <F> condition;
    /** 时间窗口长度,在时间长度内进行模式匹配 */
    private
    Time
     windowTime;
    /** 模式量词,意思是一个模式匹配几个事件等 默认是匹配到一个 */
    private
    Quantifier
     quantifier = 
    Quantifier
    .one(
    ConsumingStrategy
    .STRICT);
    /** 停止将事件收集到循环状态时,事件必须满足的条件 */
    private
    IterativeCondition
    <F> untilCondition;
    /**
       * 适用于{@code times}模式,用来维护模式里事件可以连续发生的次数
       */
    private
    Times
     times;
    // 匹配到事件之后的跳过策略
    private
    final
    AfterMatchSkipStrategy
     afterMatchSkipStrategy;
      ...
    }

    Quantifier是用来描述具体模式行为的,主要有三大类:

    Single-单一匹配、Looping-循环匹配、Times-一定次数或者次数范围内都能匹配到。

    每一个模式Pattern可以是optional可选的(单一匹配或循环匹配),并可以设置ConsumingStrategy。

    循环和次数也有一个额外的内部ConsumingStrategy,用在模式中接收的事件之间。

    public
    class
    Quantifier
     {
      ...
    /**
       * 5个属性,可以组合,但并非所有的组合都是有效的
       */
    public
    enum
    QuantifierProperty
     {
        SINGLE,
        LOOPING,
        TIMES,
        OPTIONAL,
        GREEDY
      }
    /**
       * 描述在此模式中匹配哪些事件的策略
       */
    public
    enum
    ConsumingStrategy
     {
        STRICT,
        SKIP_TILL_NEXT,
        SKIP_TILL_ANY,
        NOT_FOLLOW,
        NOT_NEXT
      }
    /**
       * 描述当前模式里事件可以连续发生的次数;举个例子,模式条件无非就是boolean,满足true条件的事件连续出现times次,或者一个次数范围,比如2~4次,2次,3次,4次都会被当前模式匹配出来,因此同一个事件会被重复匹配到
       */
    public
    static
    class
    Times
     {
    private
    final
    int
     from;
    private
    final
    int
     to;
    private
    Times
    (
    int
     from, 
    int
     to) {
    Preconditions
    .checkArgument(from > 
    0
    , 
    "The from should be a positive number greater than 0."
    );
    Preconditions
    .checkArgument(to >= from, 
    "The to should be a number greater than or equal to from: "
     + from + 
    "."
    );
    this
    .from = from;
    this
    .to = to;
        }
    public
    int
     getFrom() {
    return
     from;
        }
    public
    int
     getTo() {
    return
     to;
        }
    // 次数范围
    public
    static
    Times
     of(
    int
     from, 
    int
     to) {
    return
    new
    Times
    (from, to);
        }
    // 指定具体次数
    public
    static
    Times
     of(
    int
     times) {
    return
    new
    Times
    (times, times);
        }
    @Override
    public
    boolean
     equals(
    Object
     o) {
    if
     (
    this
     == o) {
    return
    true
    ;
          }
    if
     (o == 
    null
     || getClass() != o.getClass()) {
    return
    false
    ;
          }
    Times
     times = (
    Times
    ) o;
    return
     from == times.from &&
            to == times.to;
        }
    @Override
    public
    int
     hashCode() {
    return
    Objects
    .hash(from, to);
        }
      }
      ...
    }

    EventComparator,自定义事件比较器,实现EventComparator接口。

    public
     
    interface
     
    EventComparator
    <T> 
    extends
     
    Comparator
    <T>, 
    Serializable
     {
    long
     serialVersionUID = 
    1L
    ;
    }

    NFACompiler和NFA

    NFACompiler提供将Pattern编译成NFA或者NFAFactory的方法,使用NFAFactory可以创建多个NFA。

    public
    class
    NFACompiler
     {
      ...
    /**
       * NFAFactory 创建NFA的接口
       *
       * @param <T> Type of the input events which are processed by the NFA
       */
    public
    interface
    NFAFactory
    <T> 
    extends
    Serializable
     {
        NFA<T> createNFA();
      }
      
    /**
       * NFAFactory的具体实现NFAFactoryImpl
       *
       * <p>The implementation takes the input type serializer, the window time and the set of
       * states and their transitions to be able to create an NFA from them.
       *
       * @param <T> Type of the input events which are processed by the NFA
       */
    private
    static
    class
    NFAFactoryImpl
    <T> 
    implements
    NFAFactory
    <T> {
        
    private
    static
    final
    long
     serialVersionUID = 
    8939783698296714379L
    ;
        
    private
    final
    long
     windowTime;
    private
    final
    Collection
    <
    State
    <T>> states;
    private
    final
    boolean
     timeoutHandling;
        
    private
    NFAFactoryImpl
    (
    long
     windowTime,
    Collection
    <
    State
    <T>> states,
    boolean
     timeoutHandling) {
          
    this
    .windowTime = windowTime;
    this
    .states = states;
    this
    .timeoutHandling = timeoutHandling;
        }
        
    @Override
    public
     NFA<T> createNFA() {
    // 一个NFA由状态集合、时间窗口的长度和是否处理超时组成
    return
    new
     NFA<>(states, windowTime, timeoutHandling);
        }
      }
    }

    NFA:Non-deterministic finite automaton - 非确定的有限(状态)自动机。

    更多内容参见

    https://zh.wikipedia.org/wiki/非确定有限状态自动机

    public
    class
     NFA<T> {
    /**
       * NFACompiler返回的所有有效的NFA状态集合
       * These are directly derived from the user-specified pattern.
       */
    private
    final
    Map
    <
    String
    , 
    State
    <T>> states;
      
    /**
       * Pattern.within(Time)指定的时间窗口长度
       */
    private
    final
    long
     windowTime;
      
    /**
       * 一个超时匹配的标记
       */
    private
    final
    boolean
     handleTimeout;
      ...
    }

     

    PatternSelectFunction和PatternFlatSelectFunction

    当一个包含被匹配到的事件的映射能够通过模式名称访问到的时候,PatternSelectFunction的select()方法会被调用。模式名称是由Pattern定义的时候指定的。select()方法恰好返回一个结果,如果需要返回多个结果,则可以实现PatternFlatSelectFunction。

    public
     
    interface
     
    PatternSelectFunction
    <IN, OUT> 
    extends
     
    Function
    , 
    Serializable
     {
    
    
    
      
    /**
    
       * 从给到的事件映射中生成一个结果。这些事件使用他们关联的模式名称作为唯一标识
    
       */
    
      OUT select(
    Map
    <
    String
    , 
    List
    <IN>> pattern) 
    throws
     
    Exception
    ;
    
    }

     

    PatternFlatSelectFunction,不是返回一个OUT,而是使用Collector 把匹配到的事件收集起来。

    public
    interface
    PatternFlatSelectFunction
    <IN, OUT> 
    extends
    Function
    , 
    Serializable
     {
      
    /**
       * 生成一个或多个结果
       */
    void
     flatSelect(
    Map
    <
    String
    , 
    List
    <IN>> pattern, 
    Collector
    <OUT> out) 
    throws
    Exception
    ;
    }

    SelectTimeoutCepOperator、PatternTimeoutFunction

    SelectTimeoutCepOperator是在CEPOperatorUtils中调用createTimeoutPatternStream()方法时创建出来。

    SelectTimeoutCepOperator中会被算子迭代调用的方法是processMatchedSequences()和processTimedOutSequences()。

    模板方法...对应到抽象类AbstractKeyedCEPPatternOperator中processEvent()方法和advanceTime()方法。

    还有FlatSelectTimeoutCepOperator和对应的PatternFlatTimeoutFunction。

    public
    class
    SelectTimeoutCepOperator
    <IN, OUT1, OUT2, KEY>
    extends
    AbstractKeyedCEPPatternOperator
    <IN, KEY, OUT1, 
    SelectTimeoutCepOperator
    .
    SelectWrapper
    <IN, OUT1, OUT2>> {
    private
    OutputTag
    <OUT2> timedOutOutputTag;
    public
    SelectTimeoutCepOperator
    (
    TypeSerializer
    <IN> inputSerializer,
    boolean
     isProcessingTime,
    NFACompiler
    .
    NFAFactory
    <IN> nfaFactory,
    final
    EventComparator
    <IN> comparator,
    AfterMatchSkipStrategy
     skipStrategy,
    // 参数命名混淆了flat...包括SelectWrapper类中的成员命名...
    PatternSelectFunction
    <IN, OUT1> flatSelectFunction,
    PatternTimeoutFunction
    <IN, OUT2> flatTimeoutFunction,
    OutputTag
    <OUT2> outputTag,
    OutputTag
    <IN> lateDataOutputTag) {
    super
    (
          inputSerializer,
          isProcessingTime,
          nfaFactory,
          comparator,
          skipStrategy,
    new
    SelectWrapper
    <>(flatSelectFunction, flatTimeoutFunction),
          lateDataOutputTag);
    this
    .timedOutOutputTag = outputTag;
      }
      ...
    }
    public
    interface
    PatternTimeoutFunction
    <IN, OUT> 
    extends
    Function
    , 
    Serializable
     {
      OUT timeout(
    Map
    <
    String
    , 
    List
    <IN>> pattern, 
    long
     timeoutTimestamp) 
    throws
    Exception
    ;
    }
    public
    interface
    PatternFlatTimeoutFunction
    <IN, OUT> 
    extends
    Function
    , 
    Serializable
     {
    void
     timeout(
    Map
    <
    String
    , 
    List
    <IN>> pattern, 
    long
     timeoutTimestamp, 
    Collector
    <OUT> out) 
    throws
    Exception
    ;
    }

     

    CEP和CEPOperatorUtils

    CEP是创建PatternStream的工具类,PatternStream只是DataStream和Pattern的组合。

    public
    class
     CEP {
      
    public
    static
     <T> 
    PatternStream
    <T> pattern(
    DataStream
    <T> input, 
    Pattern
    <T, ?> pattern) {
    return
    new
    PatternStream
    <>(input, pattern);
      }
      
    public
    static
     <T> 
    PatternStream
    <T> pattern(
    DataStream
    <T> input, 
    Pattern
    <T, ?> pattern, 
    EventComparator
    <T> comparator) {
    return
    new
    PatternStream
    <>(input, pattern, comparator);
      }
    }

     

    CEPOperatorUtils是在PatternStream的select()方法和flatSelect()方法被调用的时候,去创建SingleOutputStreamOperator(DataStream)。

    public
    class
    CEPOperatorUtils
     {
      ...
    private
    static
     <IN, OUT, K> 
    SingleOutputStreamOperator
    <OUT> createPatternStream(
    final
    DataStream
    <IN> inputStream,
    final
    Pattern
    <IN, ?> pattern,
    final
    TypeInformation
    <OUT> outTypeInfo,
    final
    boolean
     timeoutHandling,
    final
    EventComparator
    <IN> comparator,
    final
    OperatorBuilder
    <IN, OUT> operatorBuilder) {
    final
    TypeSerializer
    <IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
        
    // check whether we use processing time
    final
    boolean
     isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == 
    TimeCharacteristic
    .
    ProcessingTime
    ;
        
    // compile our pattern into a NFAFactory to instantiate NFAs later on
    final
    NFACompiler
    .
    NFAFactory
    <IN> nfaFactory = 
    NFACompiler
    .compileFactory(pattern, timeoutHandling);
        
    final
    SingleOutputStreamOperator
    <OUT> patternStream;
        
    if
     (inputStream 
    instanceof
    KeyedStream
    ) {
    KeyedStream
    <IN, K> keyedStream = (
    KeyedStream
    <IN, K>) inputStream;
          patternStream = keyedStream.transform(
            operatorBuilder.getKeyedOperatorName(),
            outTypeInfo,
            operatorBuilder.build(
              inputSerializer,
              isProcessingTime,
              nfaFactory,
              comparator,
              pattern.getAfterMatchSkipStrategy()));
        } 
    else
     {
    KeySelector
    <IN, 
    Byte
    > keySelector = 
    new
    NullByteKeySelector
    <>();
          patternStream = inputStream.keyBy(keySelector).transform(
            operatorBuilder.getOperatorName(),
            outTypeInfo,
            operatorBuilder.build(
              inputSerializer,
              isProcessingTime,
              nfaFactory,
              comparator,
              pattern.getAfterMatchSkipStrategy()
            )).forceNonParallel();
        }
        
    return
     patternStream;
      }
      ...
    }

    FlinkCEP实现步骤

    1. IN: DataSource -> DataStream -> Transformations -> DataStream
    2. Pattern: Pattern.begin.where.next.where...times...
    3. PatternStream: CEP.pattern(DataStream, Pattern)
    4. DataStream: PatternStream.select(PatternSelectFunction) PatternStream.flatSelect(PatternSelectFunction)
    5. OUT: DataStream -> Transformations -> DataStream -> DataSink

    FlinkCEP匹配超时实现步骤

    TimeoutCEP的流需要keyBy,即KeyedStream,如果inputStream不是KeyedStream,会new一个0字节的Key(上面CEPOperatorUtils源码里有提到)。

    KeySelector
    <IN, 
    Byte
    > keySelector = 
    new
     
    NullByteKeySelector
    <>();

    Pattern最后调用within设置窗口时间。 如果是对主键进行分组,一个时间窗口内最多只会匹配出一个超时事件,使用PatternStream.select(...)就可以了。

    1. IN: DataSource -> DataStream -> Transformations -> DataStream -> keyBy -> KeyedStream
    2. Pattern: Pattern.begin.where.next.where...within(Time windowTime)
    3. PatternStream: CEP.pattern(KeyedStream, Pattern)
    4. OutputTag: new OutputTag(...)
    5. SingleOutputStreamOperator: PatternStream.flatSelect(OutputTag, PatternFlatTimeoutFunction, PatternFlatSelectFunction)
    6. DataStream: SingleOutputStreamOperator.getSideOutput(OutputTag)
    7. OUT: DataStream -> Transformations -> DataStream -> DataSink

    FlinkCEP超时不足

    和Flink窗口聚合类似,如果使用事件时间和依赖事件生成的水印向前推进,需要后续的事件到达,才会触发窗口进行计算和输出结果。

    FlinkCEP超时完整demo

    public
    class
    CEPTimeoutEventJob
     {
    private
    static
    final
    String
     LOCAL_KAFKA_BROKER = 
    "localhost:9092"
    ;
    private
    static
    final
    String
     GROUP_ID = 
    CEPTimeoutEventJob
    .
    class
    .getSimpleName();
    private
    static
    final
    String
     GROUP_TOPIC = GROUP_ID;
      
    public
    static
    void
     main(
    String
    [] args) 
    throws
    Exception
     {
    // 参数
    ParameterTool
     params = 
    ParameterTool
    .fromArgs(args);
        
    StreamExecutionEnvironment
     env = 
    StreamExecutionEnvironment
    .getExecutionEnvironment();
    // 使用事件时间
        env.setStreamTimeCharacteristic(
    TimeCharacteristic
    .
    EventTime
    );
        env.enableCheckpointing(
    5000
    );
        env.getCheckpointConfig().enableExternalizedCheckpoints(
    CheckpointConfig
    .
    ExternalizedCheckpointCleanup
    .RETAIN_ON_CANCELLATION);
        env.getConfig().disableSysoutLogging();
        env.getConfig().setRestartStrategy(
    RestartStrategies
    .fixedDelayRestart(
    5
    , 
    10000
    ));
        
    // 不使用POJO的时间
    final
    AssignerWithPeriodicWatermarks
     extractor = 
    new
    IngestionTimeExtractor
    <POJO>();
        
    // 与Kafka Topic的Partition保持一致
        env.setParallelism(
    3
    );
        
    Properties
     kafkaProps = 
    new
    Properties
    ();
        kafkaProps.setProperty(
    "bootstrap.servers"
    , LOCAL_KAFKA_BROKER);
        kafkaProps.setProperty(
    "group.id"
    , GROUP_ID);
        
    // 接入Kafka的消息
    FlinkKafkaConsumer011
    <POJO> consumer = 
    new
    FlinkKafkaConsumer011
    <>(GROUP_TOPIC, 
    new
    POJOSchema
    (), kafkaProps);
    DataStream
    <POJO> pojoDataStream = env.addSource(consumer)
            .assignTimestampsAndWatermarks(extractor);
        pojoDataStream.print();
        
    // 根据主键aid分组 即对每一个POJO事件进行匹配检测【不同类型的POJO,可以采用不同的within时间】
    // 1.
    DataStream
    <POJO> keyedPojos = pojoDataStream
            .keyBy(
    "aid"
    );
        
    // 从初始化到终态-一个完整的POJO事件序列
    // 2.
    Pattern
    <POJO, POJO> completedPojo =
    Pattern
    .<POJO>begin(
    "init"
    )
                .where(
    new
    SimpleCondition
    <POJO>() {
    private
    static
    final
    long
     serialVersionUID = -
    6847788055093903603L
    ;
                  
    @Override
    public
    boolean
     filter(POJO pojo) 
    throws
    Exception
     {
    return
    "02"
    .equals(pojo.getAstatus());
                  }
                })
                .followedBy(
    "end"
    )
    //            .next("end")
                .where(
    new
    SimpleCondition
    <POJO>() {
    private
    static
    final
    long
     serialVersionUID = -
    2655089736460847552L
    ;
                  
    @Override
    public
    boolean
     filter(POJO pojo) 
    throws
    Exception
     {
    return
    "00"
    .equals(pojo.getAstatus()) || 
    "01"
    .equals(pojo.getAstatus());
                  }
                });
        
    // 找出1分钟内【便于测试】都没有到终态的事件aid
    // 如果针对不同类型有不同within时间,比如有的是超时1分钟,有的可能是超时1个小时 则生成多个PatternStream
    // 3.
    PatternStream
    <POJO> patternStream = CEP.pattern(keyedPojos, completedPojo.within(
    Time
    .minutes(
    1
    )));
        
    // 定义侧面输出timedout
    // 4.
    OutputTag
    <POJO> timedout = 
    new
    OutputTag
    <POJO>(
    "timedout"
    ) {
    private
    static
    final
    long
     serialVersionUID = 
    773503794597666247L
    ;
        };
        
    // OutputTag<L> timeoutOutputTag, PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction, PatternFlatSelectFunction<T, R> patternFlatSelectFunction
    // 5.
    SingleOutputStreamOperator
    <POJO> timeoutPojos = patternStream.flatSelect(
            timedout,
    new
    POJOTimedOut
    (),
    new
    FlatSelectNothing
    ()
        );
        
    // 打印输出超时的POJO
    // 6.7.
        timeoutPojos.getSideOutput(timedout).print();
        timeoutPojos.print();
        env.execute(
    CEPTimeoutEventJob
    .
    class
    .getSimpleName());
      }
      
    /**
       * 把超时的事件收集起来
       */
    public
    static
    class
    POJOTimedOut
    implements
    PatternFlatTimeoutFunction
    <POJO, POJO> {
    private
    static
    final
    long
     serialVersionUID = -
    4214641891396057732L
    ;
        
    @Override
    public
    void
     timeout(
    Map
    <
    String
    , 
    List
    <POJO>> map, 
    long
     l, 
    Collector
    <POJO> collector) 
    throws
    Exception
     {
    if
     (
    null
     != map.get(
    "init"
    )) {
    for
     (POJO pojoInit : map.get(
    "init"
    )) {
    System
    .out.println(
    "timeout init:"
     + pojoInit.getAid());
              collector.collect(pojoInit);
            }
          }
    // 因为end超时了,还没收到end,所以这里是拿不到end的
    System
    .out.println(
    "timeout end: "
     + map.get(
    "end"
    ));
        }
      }
      
    /**
       * 通常什么都不做,但也可以把所有匹配到的事件发往下游;如果是宽松临近,被忽略或穿透的事件就没办法选中发往下游了
       * 一分钟时间内走完init和end的数据
       *
       * @param <T>
       */
    public
    static
    class
    FlatSelectNothing
    <T> 
    implements
    PatternFlatSelectFunction
    <T, T> {
    private
    static
    final
    long
     serialVersionUID = -
    3029589950677623844L
    ;
        
    @Override
    public
    void
     flatSelect(
    Map
    <
    String
    , 
    List
    <T>> pattern, 
    Collector
    <T> collector) {
    System
    .out.println(
    "flatSelect: "
     + pattern);
        }
      }
    }

    测试结果(followedBy):

    3
    > POJO{aid=
    'ID000-0'
    , astyle=
    'STYLE000-0'
    , aname=
    'NAME-0'
    , logTime=
    1563419728242
    , energy=
    529.00
    , age=
    0
    , tt=
    2019
    -
    07
    -
    18
    , astatus=
    '02'
    , createTime=
    null
    , updateTime=
    null
    }
    3
    > POJO{aid=
    'ID000-1'
    , astyle=
    'STYLE000-2'
    , aname=
    'NAME-1'
    , logTime=
    1563419728783
    , energy=
    348.00
    , age=
    26
    , tt=
    2019
    -
    07
    -
    18
    , astatus=
    '02'
    , createTime=
    null
    , updateTime=
    null
    }
    3
    > POJO{aid=
    'ID000-0'
    , astyle=
    'STYLE000-0'
    , aname=
    'NAME-0'
    , logTime=
    1563419749259
    , energy=
    492.00
    , age=
    0
    , tt=
    2019
    -
    07
    -
    18
    , astatus=
    '00'
    , createTime=
    null
    , updateTime=
    null
    }
    flatSelect: {init=[POJO{aid=
    'ID000-0'
    , astyle=
    'STYLE000-0'
    , aname=
    'NAME-0'
    , logTime=
    1563419728242
    , energy=
    529.00
    , age=
    0
    , tt=
    2019
    -
    07
    -
    18
    , astatus=
    '02'
    , createTime=
    null
    , updateTime=
    null
    }], 
    end
    =[POJO{aid=
    'ID000-0'
    , astyle=
    'STYLE000-0'
    , aname=
    'NAME-0'
    , logTime=
    1563419749259
    , energy=
    492.00
    , age=
    0
    , tt=
    2019
    -
    07
    -
    18
    , astatus=
    '00'
    , createTime=
    null
    , updateTime=
    null
    }]}
    timeout init:ID000-
    1
    3
    > POJO{aid=
    'ID000-1'
    , astyle=
    'STYLE000-2'
    , aname=
    'NAME-1'
    , logTime=
    1563419728783
    , energy=
    348.00
    , age=
    26
    , tt=
    2019
    -
    07
    -
    18
    , astatus=
    '02'
    , createTime=
    null
    , updateTime=
    null
    }
    timeout 
    end
    : 
    null
    3
    > POJO{aid=
    'ID000-2'
    , astyle=
    'STYLE000-0'
    , aname=
    'NAME-0'
    , logTime=
    1563419829639
    , energy=
    467.00
    , age=
    0
    , tt=
    2019
    -
    07
    -
    18
    , astatus=
    '03'
    , createTime=
    null
    , updateTime=
    null
    }
    3
    > POJO{aid=
    'ID000-2'
    , astyle=
    'STYLE000-0'
    , aname=
    'NAME-0'
    , logTime=
    1563419841394
    , energy=
    107.00
    , age=
    0
    , tt=
    2019
    -
    07
    -
    18
    , astatus=
    '00'
    , createTime=
    null
    , updateTime=
    null
    }
    3
    > POJO{aid=
    'ID000-3'
    , astyle=
    'STYLE000-0'
    , aname=
    'NAME-0'
    , logTime=
    1563419967721
    , energy=
    431.00
    , age=
    0
    , tt=
    2019
    -
    07
    -
    18
    , astatus=
    '02'
    , createTime=
    null
    , updateTime=
    null
    }
    3
    > POJO{aid=
    'ID000-3'
    , astyle=
    'STYLE000-2'
    , aname=
    'NAME-0'
    , logTime=
    1563419979567
    , energy=
    32.00
    , age=
    26
    , tt=
    2019
    -
    07
    -
    18
    , astatus=
    '03'
    , createTime=
    null
    , updateTime=
    null
    }
    3
    > POJO{aid=
    'ID000-3'
    , astyle=
    'STYLE000-2'
    , aname=
    'NAME-0'
    , logTime=
    1563419993612
    , energy=
    542.00
    , age=
    26
    , tt=
    2019
    -
    07
    -
    18
    , astatus=
    '01'
    , createTime=
    null
    , updateTime=
    null
    }
    flatSelect: {init=[POJO{aid=
    'ID000-3'
    , astyle=
    'STYLE000-0'
    , aname=
    'NAME-0'
    , logTime=
    1563419967721
    , energy=
    431.00
    , age=
    0
    , tt=
    2019
    -
    07
    -
    18
    , astatus=
    '02'
    , createTime=
    null
    , updateTime=
    null
    }], 
    end
    =[POJO{aid=
    'ID000-3'
    , astyle=
    'STYLE000-2'
    , aname=
    'NAME-0'
    , logTime=
    1563419993612
    , energy=
    542.00
    , age=
    26
    , tt=
    2019
    -
    07
    -
    18
    , astatus=
    '01'
    , createTime=
    null
    , updateTime=
    null
    }]}
    3
    > POJO{aid=
    'ID000-4'
    , astyle=
    'STYLE000-0'
    , aname=
    'NAME-0'
    , logTime=
    1563420063760
    , energy=
    122.00
    , age=
    0
    , tt=
    2019
    -
    07
    -
    18
    , astatus=
    '02'
    , createTime=
    null
    , updateTime=
    null
    }
    3
    > POJO{aid=
    'ID000-4'
    , astyle=
    'STYLE000-0'
    , aname=
    'NAME-0'
    , logTime=
    1563420078008
    , energy=
    275.00
    , age=
    0
    , tt=
    2019
    -
    07
    -
    18
    , astatus=
    '03'
    , createTime=
    null
    , updateTime=
    null
    }
    timeout init:ID000-
    4
    3
    > POJO{aid=
    'ID000-4'
    , astyle=
    'STYLE000-0'
    , aname=
    'NAME-0'
    , logTime=
    1563420063760
    , energy=
    122.00
    , age=
    0
    , tt=
    2019
    -
    07
    -
    18
    , astatus=
    '02'
    , createTime=
    null
    , updateTime=
    null
    }
    timeout 
    end
    : 
    null

    总结

    以上所述是小编给大家介绍的Apache FlinkCEP 实现超时状态监控的步骤,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!

    上一篇:nginx设置资源缓存实战详解
    下一篇:Linux下安装VMWare15.5的教程
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    Apache FlinkCEP 实现超时状态监控的步骤详解 Apache,FlinkCEP,实现,超时,