FLINK SIDDHI ADDON 学习笔记

发布时间:2020-07-17 13:51:17 作者:zhanjia
来源:网络 阅读:16225

SIDDHI 是一款功能强大的CEP 引擎,具有自己的DSL,丰富的模式匹配功能和可扩展性, 感谢陈浩同学提供了SIDDHI和FLINK的整合功能 https://github.com/haoch/flink-siddhi 本文主要介绍了这个ADDON的一些实现思路

  1. 将FLINK STREAM 转化为 SIDDHI STREAM 定义

  用法: SiddhiCEP.registerStream(streamName, FlinkDataStream, fieldNames)

  通过 FlinkDataStream.getType 获得流对象的类型定义.registerStream方法会构造一个 SiddhiStreamSchema 对象,根据流对象的类型定义,自动获取每个field对应的数据类型存在内部的fieldTypes数组中.

  SiddhiStreamSchema 内部会创建一个Siddhi StreamDefinition对象, StreamDefinition的attribute的定义根据fieldNames + fieldTypes 来添加.SiddhiTypeFactory.getAttributeType 负责将Flink 的数据类型映射为Siddhi的数据类型, 并可自动生成一段Define Stream的定义(见 SiddhiStreamSchema.getStreamDefinitionExpression 方法) define stream [streamName] ([fieldName 1] [fieldType 1], ...[fieldName n] [fieldType n])

  SiddhiStreamSchema 包括一个StreamSerializer: 用于将DataStream 中的对象转化为 Siddhi Stream 的输入(Object Array):
    如果流对象是一个简单类型 Atomic Type 直接将流对象放到 ARRAY中
    如果流对象是Tuple 类型,直接将Tuple 中前N个值放到ARRAY中
    如果流对象是Pojo或者CaseClass类型,直接根据每个fieldName 获取Class对应的属性放到ARRAY中

  1. 串联FLINK STREAM 和 SIDDHI STREAM

  SiddhiStream: 抽象的Stream基类

  convertDataStream 将原始的FLINK流转化为Tuple类型的流,Tuple的第一个元素为StreamId, 第二个元素为原来流中的数据,支持普通Stream 和 KeyedStream

  ExecutionSiddhiStream: 构建 SiddhiOperatorContext 并调用SiddhiStreamFactory.createDataStream 创建了集成Siddhi的 DataStream. DataStream的类型为Tuple的子类.SiddhiTypeFactory.getTupleTypeInformation: 其核心思路是通过Siddhi输出Stream的StreamDefinition获得其Attribute的定义,再通过 TypeInfoParser.parse构造Flink Tuple 类型的定义

  ExecutableStream 根据Siddhi query 创建ExecutionSiddhiStream对象
  SingleSiddhiStream, UnionSiddhiStream: ExecutableStream 的子类,支持Fluent Style的链式调用. UnionSiddhiStream 调用了DataStream.union 方法

  SiddhiStreamFactory.createDataStream 通过 FLINK DataStream的transform方法使用了自定义的StreamOperator: SiddhiStreamOperator. 在 AbstractSiddhiOperator 的 setup 方法中创建SiddhiManager 和 SiddhiAppRuntime 并注册了 InputHandler 和 OutputCallback (StreamOutputHandler)

  SiddhiStreamOperator.processElement 需要处理两种场景:
    Flink TimeCharacteristic = ProcessingTime: 先调用StreamSerializer将数据转化为Object Array, 再直接调用InputHandler.send将数据发送给Siddhi处理
    Flink TimeCharacteristic = EventTime: 缓存接收到的StreamRecord 到内部的priorityQueue中,直到收到Watermark, 将priorityQueue中小于watermark的StreamRecord一次发送给Siddhi处理

  StreamOutputHandler:根据Output的TypeInfo将Siddhi Event 转化为 Flink StreamRecord. 再转发到SiddhiStreamOperator的Output

  1. CHECKPOINT

  SiddhiStreamOperator中保留了两种State信息,一种是priorityQueue中保存的由于watermark未发送给Siddhi的消息. 另一种是Siddhi本身的State, 通过SiddhiAppRuntime.snapshot() 获得

推荐阅读:
  1. 解决JS内存泄露之js对象和dom对象互相引用问题
  2. 简单谈谈axios中的get,post方法

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

siddhi flink %d

上一篇:MySQL触发器trigger的使用

下一篇:keras如何实现调用训练模型和去掉全连接层

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》