您好,登录后才能下订单哦!
这篇文章将为大家详细讲解有关Apache Flink如何设置并行度,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
在使用Apache Flink对数据进行处理时候,通常需要设置并行度。并行度是Apache Flink中一个非常重要的概念。设置合理的并行度能够加快数据的处理效率,不合理的并行度会造成效率降低甚至是任务出错。
Apache Flink程序包含多个任务(source,transformations/operators,sink)。这些任务使用几个并行实例所进行执行,这些并行的实例称之为并行度。
Apache Flink支持在不同的级别设置并行度。配置文件、env级别、算子级别。
配置文件默认
在我们提交一个Job的时候如果没有考虑并行度的话,那么Flink会使用默认配置文件中的并行度。我们可以通过命令查看Flink配置文件的并行度。
$ cat flink-conf.yaml |grep "parallelism.default"
parallelism.default: 1
例如当前获取到的并行度为1。也就是说当你不设置并行度的时候它就会使用配置文件默认的并行度 1。
2. env级别
env的级别就是Environment级别。也就是通过Execution Environment来设置整体的Job并行度。
val env = Stream...
env.setParallelism(5)
客户端级别
如果在执行Job时候,发现代码中没有设置并行度而又不修改配置文件的话,可以通过Client来设置Job的并行度。
./bin/flink run -p 5 ../wordCount-java*.jar
-p即设置WordCount的Job并行度为5。4. 算子级别
我们在编写Flink项目时,可能对于不同的Operator设置不同的并行度,例如为了实现读取Kafka的最高效读取需要参考Kafka的partition的数量对并行度进行设置,在Sink时需要对于Sink的介质设置不同的并行度。这样就会存在一个Job需要有多个并行度。这样就需要用到算子级别的并行度设置
val env = Stream...
val text = ...
text.keyBy(XXX)
.flatMap(XXX).setParallelism(5) //计算时设置为5
.addSink(XXXXX).setParallelism(1) //写入数据库时候设置为1
并行度的高级别会覆盖低级别的配置。例如在算子中设置的策略会覆盖配置文件中的parallelism。
从优先级上来看: 算子级别 > env级别 > Client级别 > 系统默认级别
在实际的使用中,我们需要设置合理的并行度来保证数据的高效处理,在一般情况下例如source,Sink等可能会需要不同的并行度来保证数据的快速读取与写入负载等。
Apache Flink的并行度设置并不是说越大越好、数据处理的效率就越高。而是需要设置合理的并行度。那么何谓合理呢?
Apache Flink的 并行度取决于每个TaskManager上的slot数量而决定的。Flink的JobManager把任务分成子任务提交给slot进行执行。相同的slot共享相同的JVM资源,同时对Flink提供维护的心跳等信息。
slot是指TaskManagere的并发执行能力,通常来说TaskManager有多少核CPU也就会有多少个slot。这样来看,我们设置的并行度其实是与TaskManager所有Slot数量有关的。
关于“Apache Flink如何设置并行度”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。