您好,登录后才能下订单哦!
这篇文章主要讲解了“flink中窗口的作用是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“flink中窗口的作用是什么”吧!
窗口计算是流式计算中常用的数据计算方式之一,通过按照固定时间或长度将数据流切分成不同的窗口,再对数据进行相应的聚合操作,得到一定时间范围内的统计结果,例如统计最近5分钟内某网站的点击数,此时,点击数据在不断产生,通过5分钟窗口将数据限定在固定时间范围内,就可以对该范围内的有界数据执行聚合,得到最近5分钟的网站点击数。
代码接口规则
stream.keyBy(...) //keyed类型数据集 .window(...) //指定窗口分配器类型 [.trigger(...)] //指定触发器类型(可选) [.evictor(...)] //指定evictor(可选) [.allowedLateness(...)] //指定是否延迟处理数据(可选) [.sideOutputLateData(...)] //指定Output Lag(可选) .reduce/aggregate/fold/apply() //指定窗口计算函数 [.getSideOutput(...)] //根据Tag输出数据(可选)
算子
Windows Assigner:指定窗口类型,定义如何将数据流分配到一个或多个窗口
Windows Trigger:指定窗口触发的时机,定义窗口满足什么样的条件触发计算;
Evictor:用于数据剔除
Lateness:标记是否处理迟到数据,当迟到数据到达窗口中是否触发计算。
Output Tag:标记输出标签,然后通过getSideOutput将窗口中的数据根据标签输出。
Windows Function:定义窗口上数据处理的逻辑,例如对数据进行sum操作。
在运用窗口计算时,Flink根据上游数据集是否为KeyedStream类型(将数据集按Key分区),对应的Window Assigner会不同,
上游数据集为KeyedStream类型,则调用DataStream API的Windwo()方法指定Windows Assigner,数据将根据Key在不同的Task实例中并行分别计算,最后得出针对每个Key统计的结果。
如果是Non-Keyed类型,则调用WindowsAll()方法来指定Windows Assigner,所有数据都被窗口算子路由到一个Task中计算,并得到结果。
建议数据进行KeyedStream处理,这样启动并行计算,加速效率。
flink支持两种类型的窗口,一种基于时间,窗口大小由开始和结束时间戳约束,一种基于数量,根据固定数量定义窗口大小。
根据Windows Assigner数据分配方式的不同将Windows分为4大类:滚动窗口(Tumbling Windows)、滑动窗口(Sliding Windows)、会话窗口(Session Windows)和全局窗口(Global Windows)
滚动窗口根据固定时间或大小切分,且窗口与窗口间元素互不重叠,适合于固定时间大小和周期统计某一指标的窗口计算。
DataStream API提供了基于Event Time和Process Time两种时间类型的Tumbling窗口,对应的Assigner分别为TumblingEventTimeWindows和TumblingProcessTimeWindows,窗口大小童工of()指定,时间单位分别为Time.milliseconds(x)、Time.seconds(x)或Time.minutes(x),也可以是不同时间单位的组合。
如下实例,窗口时间按10S进行切分,窗口的时间是[1:00:00.000-1:00:09.999] 到[1:00:10.000-1:00:19.999]的等固定时间范围。
val inputStream:DataStream[T]= ... //定义Event Time Tumbling Windows val tumblingEventTimeWindows=inputStream.keyBy(_.id) //通过使用TumblingEventTimeWindows定义Event Time滚动窗口 .window(TumblingEventTimeWindows.of(Time.seconds(10))) .process(...) //定义窗口函数 //定义Process Time Tumbling Windows val tumblingProcessingTimeWindows = inputStream.keyBy(_.id) //通过TumblingProcessTimeWindows定义Evnet Time滚动窗口 .window(TumblingProcessTimeWindows.of(Times.seconds(10))) .process(...) //定义窗口函数
滑动窗口是一种常见的窗口类型,特点是在滚动窗口基础上增加了窗口滑动时间(Slide Time),且允许窗口数据发生重叠。这种窗口不像滚动窗口按照Windows Size向前移动,而是根据设定的Slide Time向前滑动。窗口之间的数据重叠大小根据Windows Size和Slide time决定,当Slide Time小于Windows Size便会发生窗口重叠,Slide Size大于WindowsSize会出现窗口不连续,数据可能不会再任何一个窗口内计算。
DataStream API针对Sliding Windows根据不同时间类型Assigner,包括基于Event Time的SlidingEventTimeWindows和基于Process Time的SlidingProcessingTimeWindows。
实例如下,指定Windows Size为1h,Slide Time为10m。
val inputStream:DataStream[T]= ... //定义Event Time Sliding Windows val slidingEventTimeWindows=inputStream.keyBy(_.id) //通过使用SlidingEventTimeWindows定义Event Time滚动窗口 .window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(10))) .process(...) //定义窗口函数 //定义Process Time Sliding Windows val slidingProcessTimeWindows = inputStream.keyBy(_.id) //通过SlidingProcessTimeWindows定义Evnet Time滚动窗口 .window(SlidingProcessTimeWindows.of(Time.hours(1),Time.minutes(10))) .process(...) //定义窗口函数
将某个时间段内活跃较高的数据聚合为一个窗口进行计算,窗口的触发条件为Session Gap,指规定时间内没有数据活跃接入,则任务窗口结束,触发窗口计算。
注意:如果数据一直不间断,会导致窗口始终不触发。
与滑动、滚动窗口不同,Session Windows不需要定义Windows Size和Slide Time,只需要定义session gap,规定不活跃数据的时间上线即可。
Session Windows比较适合非连续型数据处理或周期性产生数据的场景。DataStream API中可以创建基于Event Time和Process Time的Session Windows,对应的有Assigner分别为EventTimeSessionWindow和ProcessTimerSessionWindows。
实例代码如下:
val inputStream:DataStream[T]= ... //定义Event Time Session Windows val eventTimeSessionWindows=inputStream.keyBy(_.id) //通过使用EventTimeSessionWindows定义Event Time滚动窗口 .window(EventTimeSessionWindows.withGap(Time.milliseconds(10))) .process(...) //定义窗口函数 //定义Process Time Session Windows val processTimeSessionWindows = inputStream.keyBy(_.id) //通过ProcessTimeSessionWindows定义Evnet Time滚动窗口 .window(ProcessTimeSessionWindows.withGap(Time.milliseconds(10))) .process(...) //定义窗口函数
flink支持动态调整的Session Gap,需要实现SessionWindowTimeGapExtractor接口,并复写extract方法,完成Session Gap的抽取,然后将创建好的Session Gap抽取器传入ProcessiongTimeSessionWindows.withDynamicGap()方法即可。
val inputStream:DataStream[T]= ... //定义Event Time Session Windows val eventTimeSessionWindows=inputStream.keyBy(_.id) //通过使用EventTimeSessionWindows定义Event Time滚动窗口 .window(EventTimeSessionWindows.withDynamicGap( //实例化SessionWindowTimeGapExtractor接口 new SessionWindowTimeGapExtractor[String]{ override def extract(element:String):Long={ //动态指定并返回Session Gap } } )) .process(...) //定义窗口函数 //定义Process Time Session Windows val processTimeSessionWindows = inputStream.keyBy(_.id) //通过ProcessTimeSessionWindows定义Evnet Time滚动窗口 .window(ProcessTimeSessionWindows.withDynamicGap( //实例化SessionWindowTimeGapExtractor接口 new SessionWindowTimeGapExtractor[String]{ override def extract(element:String):Long={ //动态指定并返回Session Gap } } )) .process(...) //定义窗口函数
全局会话窗口将所有相同的key数据分配到单个窗口中计算,窗口没有起始和结束时间,窗口需要借助Triger触发计算,如果不指定,则不会触发计算。
使用全局窗口要非常谨慎,必须明确自己在整个窗口中统计出的结果是什么,并指定对应的触发器,同时指定相应的数据清理机制,否则数据将一直留在内存中。
val inputStream:DataStream[T]= ... val globalWindows = inputStream.keyBy(_.id) .window(GlobalWindows.create()) //通过GlobalWindows定义Global Windows .process()
flink定义的四种窗口,容易和时间窗口和事件窗口混淆,他们是不同维度的的窗口定义,需要特别注意下。
越长大越孤单,珍惜好身边人。
感谢各位的阅读,以上就是“flink中窗口的作用是什么”的内容了,经过本文的学习后,相信大家对flink中窗口的作用是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。