怎么结合Spark讲一下Flink的runtime

发布时间:2021-12-16 21:18:53 作者:柒染
来源:亿速云 阅读:150

怎么结合Spark讲一下Flink的runtime,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

Flink运行时主要角色有两个:JobManager和TaskManager,无论是standalone集群,on yarn都是要启动这两个角色。有点类似于MRv1的架构了,JobManager主要是负责接受客户端的job,调度job,协调checkpoint等。TaskManager执行具体的Task。TaskManager为了对资源进行隔离和增加允许的task数,引入了slot的概念,这个slot对资源的隔离仅仅是对内存进行隔离,策略是均分,比如taskmanager的管理内存是3GB,假如有三个slot,那么每个slot就仅仅有1GB内存可用。

根据经验,taskslot数最佳默认值就是CPU核心数。使用超线程,每个task slot需要2个或更多硬件线程上下文。

Client这个角色主要是为job提交做些准备工作,比如构建jobgraph提交到jobmanager,提交完了可以立即退出,当然也可以用client来监控进度。 

Jobmanager和TaskManager之间通信类似于Spark 的早期版本,采用的是actor系统。

根据以上描述,绘制出运行架构图就是下图:

怎么结合Spark讲一下Flink的runtime

Task到底是什么玩意?

讲到这可以先回顾一下Spark了,主要三个概念:

1. Shuffle

Spark 任务job中shuffle个数决定着stage个数。

2. 分区

Spark 算子中RDD的分区数决定者stage任务的并行度。

3. 分区传递

复杂的入union,join等暂不提。简单的调用链如下:

rdd.map-->filter-->reducebykey-->map。

例子中假设rdd有6个分区,map到fliter的分区数传递是不变,filter到redcuebykey分区就变了,reducebykey的分区有个默认计算公式,星球里讲过了,假设我们在使用reducebykey的时候传入了一个分区数12。

分区数,map是6,filter也是6,reducebykey后面的map就是12。

怎么结合Spark讲一下Flink的runtime

override def getPartitions: Array[Partition] =firstParent[T].partitions

map这类转换完全继承了父RDD的分区器和分区数,默认无法人为设置并行度,只有在shuffle的时候,我们才可以传入并行度。

上述讲解主要是想带着大家搞明白,以下几个概念:

1.        Flink的并行度由什么决定的?

这个很简单,Flink每个算子都可以设置并行度,然后就是也可以设置全局并行度。

Api的设置

.map(new RollingAdditionMapper()).setParallelism(10)

全局配置在flink-conf.yaml文件中,parallelism.default,默认是1:

怎么结合Spark讲一下Flink的runtime

2.        Flink的task是什么?

按理说应该是每个算子的一个并行度实例就是一个subtask-在这里为了区分暂时叫做substask。那么,带来很多问题,由于flink的taskmanager运行task的时候是每个task采用一个单独的线程,这就会带来很多线程切换开销,进而影响吞吐量。

为了减轻这种情况,flink进行了优化,也即对subtask进行链式操作,链式操作结束之后得到的task,再作为一个调度执行单元,放到一个线程里执行。

如下图的,source/map 两个算子进行了链式;keyby/window/apply有进行了链式,sink单独的一个。

怎么结合Spark讲一下Flink的runtime 注释:图中假设是source/map的并行度都是2,keyby/window/apply的并行度也都是2,sink的是1,总共task有五个,最终需要五个线程。

按照到这一步的理解,画的执行图应该是这样的:

怎么结合Spark讲一下Flink的runtime

有些朋友该说了,据我观察实际上并不是这样的呀。。。

怎么结合Spark讲一下Flink的runtime这个是实际上是flink又一次优化。

默认情况下,flink允许如果任务是不同的task的时候,允许任务共享slot,当然,前提是必须在同一个job内部。

结果就是,每个slot可以执行job的一整个pipeline,如上图。这样做的好处主要有以下几点:

1.Flink 集群所需的taskslots数与job中最高的并行度一致。也就是说我们不需要再去计算一个程序总共会起多少个task了。

2.更容易获得更充分的资源利用。如果没有slot共享,那么非密集型操作source/flatmap就会占用同密集型操作 keyAggregation/sink 一样多的资源。如果有slot共享,将基线的2个并行度增加到6个,能充分利用slot资源,同时保证每个TaskManager能平均分配到重的subtasks,比如keyby/window/apply操作就会均分到申请的所有slot里,这样slot的负载就均衡了。

链式的原则,也即是什么情况下才会对task进行链式操作呢?简单梗概一下:

  1. 上下游的并行度一致

  2. 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)

  3. 上下游节点都在同一个 slot group 中(下面会解释 slot group)

  4. 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)

  5. 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)

  6. 两个节点间数据分区方式是 forward(参考理解数据流的分区)

  7. 用户没有禁用 chain

关于怎么结合Spark讲一下Flink的runtime问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注亿速云行业资讯频道了解更多相关知识。

推荐阅读:
  1. Redis Streams与Spark的完美结合
  2. 第97课:Spark Streaming 结合Spark SQL 案例

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

spark flink runtime

上一篇:spark 流式去重的示例分析

下一篇:python匿名函数怎么创建

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》