Apache Flink CEP的示例分析

发布时间:2021-12-16 14:46:17 作者:小新
来源:亿速云 阅读:248

这篇文章主要为大家展示了“Apache Flink CEP的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Apache Flink CEP的示例分析”这篇文章吧。

1、Flink CEP 概念以及使用场景

1.什么是 CEP

CEP 的意思是复杂事件处理,例如:起床-->洗漱-->吃饭-->上班等一系列串联起来的事件流形成的模式称为 CEP。如果发现某一次起床后没有刷牙洗脸亦或是吃饭就直接上班,就可以把这种非正常的事件流匹配出来进行分析,看看今天是不是起晚了。

下图中列出了几个例子:

Apache Flink CEP的示例分析

2.Flink CEP 应用场景

3.Flink CEP 原理

Apache Flink CEP的示例分析

Flink CEP 内部是用 NFA(非确定有限自动机)来实现的,由点和边组成的一个状态图,以一个初始状态作为起点,经过一系列的中间状态,达到终态。点分为起始状态、中间状态、最终状态三种,边分为 take、ignore、proceed 三种。 

下面以一个打车的例子来展示状态是如何流转的,规则见下图所示。

Apache Flink CEP的示例分析

以乘客制定行程作为开始,匹配乘客的下单事件,如果这个订单超时还没有被司机接单的话,就把行程事件和下单事件作为结果集往下游输出。

假如消息到来顺序为:行程-->其他-->下单-->其他。

状态流转如下:

(1)开始时状态处于行程状态,即等待用户制定行程。 

Apache Flink CEP的示例分析

(2)当收到行程事件时,匹配行程状态的条件,把行程事件放到结果集中,通过 take 边将状态往下转移到下单状态。 

Apache Flink CEP的示例分析

(3)由于下单状态上有一条 ignore 边,所以可以忽略收到的其他事件,直到收到下单事件时将其匹配,放入结果集中,并且将当前状态往下转移到超时未接单状态。这时候结果集当中有两个事件:制定行程事件和下单事件。  

Apache Flink CEP的示例分析Apache Flink CEP的示例分析

(4)超时未接单状态时,如果来了一些其他事件,同样可以被 ignore 边忽略,直到超时事件的触发,将状态往下转移到最终状态,这时候整个模式匹配成功,最终将结果集中的制定行程事件和下单事件输出到下游。

Apache Flink CEP的示例分析

上面是一个匹配成功的例子,如果是不成功的例子会怎么样?

假如当状态处于超时未接单状态时,收到了一个接单事件,那么就不符合超时未被接单的触发条件,此时整个模式匹配失败,之前放入结果集中的行程事件和下单事件会被清理。

Apache Flink CEP的示例分析

当收到行程事件时,匹配行程状态的条件,把行程事件放到结果集中,通过 take 边将状态往下转移到下单状态。 

2、Flink CEP 程序开发

本节将详细介绍 Flink CEP 的程序结构以及 API。

1.Flink CEP 程序结构

主要分为两部分:定义事件模式和匹配结果处理。

官方示例如下:

DataStream<Event> input = ...
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getId() == 42;
            }
        }
    ).next("middle").subtype(SubEvent.class).where(
        new SimpleCondition<SubEvent>() {
            @Override
            public boolean filter(SubEvent subEvent) {
                return subEvent.getVolume() >= 10.0;
            }
        }
    ).followedBy("end").where(
         new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getName().equals("end");
            }
         }
    );

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

DataStream<Alert> result = patternStream.select(
    new PatternProcessFunction<Event, Alert>() {
        @Override
        public void select(
                Map<String, List<Event>> pattern,
                Context ctx,
                Collector<Alert> out) throws Exception {
            out.collect(createAlertFrom(pattern));
        }
    });

程序结构分为三部分:首先需要定义一个模式(Pattern),即第 2 行代码所示,接着把定义好的模式绑定在 DataStream 上(第 25 行),最后就可以在具有 CEP 功能的 DataStream 上将匹配的结果进行处理(第 27 行)。下面对关键部分做详细讲解:

2.Flink CEP 构成

Apache Flink CEP的示例分析

上图中,蓝色方框代表的是一个个单独的模式;浅黄色的椭圆代表的是这个模式上可以添加的属性,包括模式可以发生的循环次数,或者这个模式是贪婪的还是可选的;橘色的椭圆代表的是模式间的关系,定义了多个模式之间是怎么样串联起来的。通过定义模式,添加相应的属性,将多个模式串联起来三步,就可以构成了一个完整的 Flink CEP 程序。

2.1 定义模式

下面是示例代码:

pattern.next("start").where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getId() == 42;
            }
        }
)

定义模式主要有如下 5 个部分组成:

2.2 模式的属性

接下来介绍一下怎样设置模式的属性。模式的属性主要分为循环属性和可选属性。

2.3 模式的有效期

由于模式的匹配事件存放在状态中进行管理,所以需要设置一个全局的有效期(within)。若不指定有效期,匹配事件会一直保存在状态中不会被清除。至于有效期能开多大,要依据具体使用场景和数据量来衡量,关键要看匹配的事件有多少,随着匹配的事件增多,新到达的消息遍历之前的匹配事件会增加 CPU、内存的消耗,并且随着状态变大,数据倾斜也会越来越严重。

2.4 模式间的联系

主要分为三种:严格连续性(next/notNext),宽松连续性(followedBy/notFollowedBy),和非确定宽松连续性(followedByAny)。

三种模式匹配的差别见下表所示:

模式&数据流严格连续性宽松连续性非确定宽松连续性

Pattern(A B) Streaming('a','c','b1','b2')

不匹配

匹配 输出:a,b1

匹配 输出:a,b1 a,b2

总结如下:

2.5 多模式组合

除了前面提到的模式定义和模式间的联系,还可以把相连的多个模式组合在一起看成一个模式组,类似于视图,可以在这个模式视图上进行相关操作。

Apache Flink CEP的示例分析

上图这个例子里面,首先匹配了一个登录事件,然后接下来匹配浏览,下单,购买这三个事件反复发生三次的用户。 

如果没有模式组的话,代码里面浏览,下单,购买要写三次。有了模式组,只需把浏览,下单,购买这三个事件当做一个模式组,把相应的属性加上 times(3) 就可以了。

2.6 处理结果

处理匹配的结果主要有四个接口:PatternFlatSelectFunction,PatternSelectFunction,PatternFlatTimeoutFunction 和 PatternTimeoutFunction。

从名字上可以看出,输出可以分为两类:select 和 flatSelect 指定输出一条还是多条,timeoutFunction 和不带 timeout 的 Function 指定可不可以对超时事件进行旁路输出。 

下图是输出的综合示例代码:

Apache Flink CEP的示例分析

2.7 状态存储优化

当一个事件到来时,如果这个事件同时符合多个输出的结果集,那么这个事件是如何保存的?

Flink CEP 通过 Dewey 计数法在多个结果集中共享同一个事件副本,以实现对事件副本进行资源共享。 

Apache Flink CEP的示例分析

3、Flink CEP 的扩展

本章主要介绍一些 Flink CEP 的扩展,讲述如何做到超时机制的精确管理,以及规则的动态加载与更新。

1.超时触发机制扩展

原生 Flink CEP 中超时触发的功能可以通过 within+outputtag 结合来实现,但是在复杂的场景下处理存在问题,如下图所示,在下单事件后还有一个预付款事件,想要得到下单并且预付款后超时未被接单的订单,该如何表示呢? 

Apache Flink CEP的示例分析

参照下单后超时未被接单的做法,把下单并且预付款后超时未被接单规则表示为下单.followedBy(预付款).followedBy(接单).within(time),那么这样实现会存在问题吗?

这种做法的计算结果是会存在脏数据的,因为这个规则不仅匹配到了下单并且预付款后超时未被接单的订单(想要的结果),同样还匹配到了只有下单行为后超时未被接单的订单(脏数据,没有预付款)。原因是因为超时 within 是控制在整个规则上,而不是某一个状态节点上,所以不论当前的状态是处在哪个状态节点,超时后都会被旁路输出。

那么就需要考虑能否通过时间来直接对状态转移做到精确的控制,而不是通过规则超时这种曲线救国的方式。于是乎,在通过消息触发状态的转移之外,需要增加通过时间触发状态的转移支持。要实现此功能,需要在原来的状态以及状态转移中,增加时间属性的概念。如下图所示,通过 wait 算子来得到 waiting 状态,然后在 waiting 状态上设置一个十秒的时间属性以定义一个十秒的时间窗口。

Apache Flink CEP的示例分析

wait 算子对应 NFA 中的 ignore 状态,将在没有到达时间窗口结束时间时自旋,在 ComputationState 中记录 wait 的开始时间,在 NFA 的 doProcess 中,将到来的数据与waiting 状态处理,如果到了 waiting 的结束时间,则进行状态转移。

Apache Flink CEP的示例分析

上图中红色方框中为 waiting 状态设置了两条 ignore 边:

  1. waitingStatus.addIgnore(lastSink,waitingCondition),waitingCondition 中的逻辑是获取当前的时间(支持事件时间),判断有没有超过设置的 waiting 阈值,如果超过就把状态向后转移。

  2. waitingStatus.addIgnore(waitingCondition),waitingCondition 中如果未达到设置的 waiting 阈值,就会自旋在当前的 waiting 状态不变。

2.规则动态注入

线上运行的 CEP 中肯定经常遇到规则变更的情况,如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景,如果规则窗口过长(一两个星期),状态过大,就会导致重启时间延长,期间就会造成一些想要处理的异常行为不能及时发现。

那么要怎么样做到规则的动态更新和加载呢?

Apache Flink CEP的示例分析

梳理一下整体架构,Flink CEP 是运行在 Flink Job 里的,而规则库是放在外部存储中的。首先,需要在运行的 Job 中能及时发现外部存储中规则的变化,即需要在 Job 中提供访问外部库的能力。其次,需要将规则库中变更的规则动态加载到 CEP 中,即把外部规则的描述解析成 Flink CEP 所能识别的 pattern 结构体。最后,把生成的 pattern 转化成 NFA,替换历史 NFA,这样对新到来的消息,就会使用新的规则进行匹配。

下图就是一个支持将外部规则动态注入、更新的接口。 

Apache Flink CEP的示例分析

这个接口里面主要实现了四个方法:

3.历史匹配结果清理

新规则动态加载到 Flink CEP 的 Job 中,替换掉原来的 NFA 之后,还需要对历史匹配的结果集进行清理。在 AbstractKeyedCEPPatternOperator 中实现刷新 NFA,注意,历史状态是否需要清理和业务相关:

  1. 修改的逻辑对规则中事件的匹配没有影响,保留历史结果集中的状态。

  2. 修改的逻辑影响到了之前匹配的部分,需要将之前匹配的结果集中的状态数据清除,防止错误的输出。

Apache Flink CEP的示例分析

以上是“Apache Flink CEP的示例分析”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!

推荐阅读:
  1. 从 Spark Streaming 到 Apache Flink : 实时数据流在爱奇艺的演进
  2. Apache Flink®生态所面临的机遇与挑战

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

apache flink cep

上一篇:机器学习使用场景有哪些

下一篇:Linux sftp命令的用法是怎样的

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》