flink的Transformation数据处理方法是什么

发布时间:2021-12-31 15:25:51 作者:iii
来源:亿速云 阅读:103

本篇内容主要讲解“flink的Transformation数据处理方法是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“flink的Transformation数据处理方法是什么”吧!

Transformation 数据处理

SingleDataStream

Map

val dataStream = evn.formElements(("a",3),("d",4),("c",4),("c",5),("a",5))
//方法一
val mapStream:DataStream[(String,Int)] = dataStream.map(t => (t._1,t._2+1))
//方法二
val mapStream:DataStream[(String,Int)] = dataStream.map( new MapFunction[(String,Int),(String, Int)]{
  override def map(t: (String,Int)): (String,Int) ={
    (t._1, t._2+1)
  }
})

FlatMap

val dataStream:DataStream[String] = environment.fromCollections()
val resultStream[String] =dataStream.flatMap{str => str.split(" ")}

Filter

//通配符
val filter:DataStream[Int] = dataStream.filter{ _ %2 == 0}
//运算表达式
val filter:DataStream[Int] = dataStream.filter { x => x % 2 ==0}

KeyBy

val dataStream = env.fromElements((1,5),(2,2),(2,4),(1,3))
//指定第一个字段为分区key
val keyedStream: KeyedStream[(String,Int),Tuple]=dataSteam.keyBy(0)

Reduce

val dataStream = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
//指定第一个字段为分区key
val keyedStream: KeyedStream[(String,Int),Tuple]=dataSteam.keyBy(0)
//实现一:滚动第二个字段进行reduce相加求和
val reduceStream = keyedStream.reduce{(t1,t2) => (t1._1, t1._2+t2._2)}
//实现二:实现ReduceFunction
val reduceStream1 = keyedStream.reduce(new ReduceFunction[(String, Int)] {
  override def reduce(t1: (String,Int), t2:(String,Int)):(String, int) = {
    (t1._1, t1._2+ t2._2)
  }
})

Aggregations

val dataStream = env.fromElements((1,5),(2,2),(2,4),(1,3))
//指定第一个字段为分区key
val keyedStream: KeyedStream[(String,Int),Tuple]=dataSteam.keyBy(0)
//对第二个字段进行sum统计
val sumStream: DataStream[(Int,Int)] = keyedStream.sum(1)
//输出统计结果
sumStream.print()
//统计计算指定key最小值
val minStream: DataStream[(Int,Int)] = keyedStream.min(1)
//统计计算指定key最大值
val maxStream: DataStream[(Int,Int)] = keyedStream.max(1)
//统计计算指定key最小值,返回最小值对应元素
val minByStream: DataStream[(Int,Int)] = keyedStream.minBy(1)
//统计计算指定key最大值,返回最大值对应元素
val maxByStream: DataStream[(Int,Int)] = keyedStream.maxBy(1)

MultiDataStream

Unio

//创建不同数据集
val dataStream1: DataStream [(String ,Int)]= env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
val dataStream2: DataStream [(String ,Int)]= env.fromElements(("d",1),("s",2),("a",4),("e",5),("a",6))
val dataStream3: DataStream [(String ,Int)]= env.fromElements(("a",2),("d",1),("s",2),("c",3),("b",1))
//合并两个数据集
val unionStream = dataStream1.union(dataStream2)
//合并多个数据集
val allUnionStream = dataStream1.union(dataStream2,dataStream3)

Connect,CoMap,CoflatMap

val dataStream1: DataStream [(String ,Int)]= env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
val dataStream2: DataStream [Int]= env.fromElements(1,2,4,5,6)
//连接两个数据集
val connectedStream :ConnectedStreams[(String, Int), Int] = dataStream1.connect(dataStream2)
val resultStream = connectedStream.map(new CoMapFunction[(String,Int),Int,(Int, String)]{
  //定义第一个数据集函数处理逻辑,输入值为第一个DataStream
  override def map1(in1: (String,Int)): (Int ,String) = {
    (int1._2 , in1._1)
  }
  //定义第二个数据集函数处理逻辑
  override def amp2(in2: Int):(Int,String) = {
    (int2,"default")
  }

})
val resultStream2 = connectedStream.flatMap(new CoFlatMapFunction[(String,Int), Int ,(String ,Int , Int)]{
  //定义共享变量
  var number=0
  //定义第一个数据集处理函数
  override def flatMap1(in1:(String ,Int ), collector : Collector[(String,Int ,Int)]): Unit = {
    collector.collect((in1._1,in1._2,number))
  }
  //定义第二个数据集处理函数
  override def flatMap2(in2: Int, collector : Collector[(String , Int ,Int)]):Unit = {
    number=in2
  }
})
//通过keyby函数根据指定的key连接两个数据集
val keyedConnect: ConnectedStreams[(String ,Int ), Int] = dataStream1.connect(dataStream2).keyBy(1,0)
//通过broadcast关联两个数据集
val broadcastConnect: BroadcastConnectedStream [(String, Int), Int] = dataStream1.connect(dataStream2.broadcast())

split

//创建数据集
val DataStream1: DataStream[(String, Int)] = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
//合并连个DataStream数据集
val splitedStream : SplitStream[(String,Int)] = dataStream1.split(t => if(t._2 % 2 ==0 ) Seq("even") else Seq("odd"))

Select

//筛选出偶数数据集
val evenStream: DataStream[(String,Int)] = splitedStream.select("even")
//筛选出奇数数据集
val oddStream: DataStream[(String,Int)] = splitedStream.select("odd")
//筛选出偶数和奇数数据集
val allStream: DataStream[(String,Int)] = splitedStream.select("even","odd")

Iterate

//创建数据集,map处理为对数据分区根据默认并行度进行平衡
val DataStream = env.fromElements(3,1,2,1,5).map{ t:Int => t}

val iterated = dataStream.iterate((input: ConnectedStreams[Int , String]) => {
  //定义两个map处理数据集,第一个map反馈操作,第二个map将数据输出到下游
  val head= input.map(i => (i+1).toString, s => s) (head.filter( _ == "2"), head.filter (_ != "2"))
},1000)  //超过1000ms没有数据接入终止迭代

物理分区

随机分区(Random Partitioning)

val shuffleStream=dataStream.shuffle

平衡分区(Roundrobin Partitioning)

val shuffleStream= dataStream.rebalance();

Rescaling partitioning

//通过调用DataStream API中rescale()方法实现Rescaling Partitioning操作
val shuffleStream = dataStream.rescale();

广播操作

//通过DataStream API的broadcast() 方法实现广播分区
val shuffleStream= dataStream.broadcast()

自定义分区

Object customPartitioner extends Partitioner[String]{
  //获取随机数生成器
  val r=scala.util.Random
  override def partition(key: String, numPartitions: Int): Int ={
    //定义分区策略,key中如果包含a则放入0分区中,其他情况则根据Partitions num随机分区
    if(key.contains("flink")) 0 else r.nextInt(numPartitions)
  }
}
//通过数据集字段名称指定分区字段
dataStream.partitionCustom(customPartitioner,"filed_name");
//通过数据集字段索引指定分区字段
dataStream.partitionCustom(customPartitioner,0)

到此,相信大家对“flink的Transformation数据处理方法是什么”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

推荐阅读:
  1. 【Flink】Flink对于迟到数据的处理
  2. flink 读取hive的数据

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

flink transformation

上一篇:Mac清理优化工具CleanMyMac X 4.6.0的示例分析

下一篇:flink运行模式有哪些

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》