您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
本篇内容主要讲解“flink怎么使用Event_time处理实时数据”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“flink怎么使用Event_time处理实时数据”吧!
//flink中关于时间的三个概念 //event time:数据产生的时间 //processing time:处理数据的时间 即操作数据的之间 比如一个flink在scala中的map函数处理数据时 //ingest time:摄取数据时间,在一个streaming程序中 一个时间段收集数据的时间 //而evet time在处理实时数据时是比较有用的,例如在由于网络的繁忙的原因,某些数据未能按时到达,假设迟到了30min, //而我们定义的最大延迟不能超过十分钟,那么一些数据包含了超时的数据那么这些数据是不会在这次操作中处理的而是会 //丢弃掉
//kafka生产者代码 package kafka.partition.test; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class PartitionProducer { public static void main(String[] args) { Map<String,Object> props = new HashMap<>(); props.put("acks", "1"); props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("bootstrap.servers", "bigdata01:9092"); String topic = "event_time"; KafkaProducer<String, String> producer = new KafkaProducer<>(props); for(int i = 0 ; i <= 20;i++) { //flink的watermarkassginer里面定义的超时时间是5000毫秒 long mills = System.currentTimeMillis(); if(i%3==0) { //数据的event time放在字符串的开头 以空格分割 //kafka event_time topic的0分区超时4000毫秒 String line = (mills-4000)+" "+"partition-0--this is a big +" +i; ProducerRecord< String,String> record = new ProducerRecord<String, String>(topic, new Integer(0), null, i+"", line); producer.send(record); }else if(i%3==1) { //kafka event_time topic的1分区超时5000毫秒 String line = (mills-5000)+" "+"partition-1--this is a big +" +i; ProducerRecord< String,String> record = new ProducerRecord<String, String>(topic, new Integer(1), null, i+"", line); producer.send(record); }else if(i%3==2) { //kafka event_time topic的2分区超时8000毫秒 String line = (mills-8000)+" "+"partition-2--this is a big +" +i; ProducerRecord< String,String> record = new ProducerRecord<String, String>(topic, new Integer(2), null, i+"", line); producer.send(record); } } producer.close(); } }
//自定义的TimestampsAndWatermarks package flink.streaming import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.watermark.Watermark class CustomWaterMarks extends AssignerWithPeriodicWatermarks[String]{ //超时时间 val maxOutOrderness = 5000l //flink过一段时间便会调一次该函数获取水印 def getCurrentWatermark():Watermark ={ val sysMilssecons = System.currentTimeMillis() new Watermark(sysMilssecons-maxOutOrderness) } //每条记录多会调用 来获得even time 在生产的数据中 even_time放在字符串的第一个字段 用空格分割 def extractTimestamp(element: String,previousElementTimestamp: Long): Long = { ((element.split(" ")).head).toLong } }
package flink.streaming import java.util.Properties import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks object StreamWithEventTimeAndWaterMarks { def main(args: Array[String]): Unit = { val kafkaProps = new Properties() //kafka的一些属性 kafkaProps.setProperty("bootstrap.servers", "bigdata01:9092") //所在的消费组 kafkaProps.setProperty("group.id", "group2") //获取当前的执行环境 val evn = StreamExecutionEnvironment.getExecutionEnvironment //配制处理数据的时候使用event time evn.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //kafka的consumer,test1是要消费的topic val kafkaSource = new FlinkKafkaConsumer[String]("event_time",new SimpleStringSchema,kafkaProps) //添加自定义的 TimestampsAndWatermarks kafkaSource.assignTimestampsAndWatermarks(new CustomWaterMarks) //设置从最新的offset开始消费 //kafkaSource.setStartFromGroupOffsets() kafkaSource.setStartFromLatest() //自动提交offset kafkaSource.setCommitOffsetsOnCheckpoints(true) //flink的checkpoint的时间间隔 //evn.enableCheckpointing(2000) //添加consumer val stream = evn.addSource(kafkaSource) evn.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE) //stream.setParallelism(3) val text = stream.flatMap{ _.toLowerCase().split(" ").drop(1).filter { _.nonEmpty} } .map{(_,1)} .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1) .map(x=>{(x._1,(new Integer(x._2)))}) text.print() //启动执行 //text.addSink(new Ssinks()) evn.execute("kafkawd") } }
打印结果 partition-2中的数据因为超时没有出现 1> (big,14) 4> (is,14) 1> (+0,1) 2> (+1,1) 3> (partition-1--this,7) 4> (+15,1) 3> (+12,1) 1> (partition-0--this,7) 3> (+6,1) 1> (+16,1) 4> (+10,1) 2> (+18,1) 4> (+7,1) 3> (+3,1) 2> (+9,1) 3> (+19,1) 2> (+13,1) 3> (a,14) 2> (+4,1)
到此,相信大家对“flink怎么使用Event_time处理实时数据”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。