Spark生产作业容错能力的负面影响有哪些

发布时间:2021-12-16 14:05:31 作者:iii
来源:亿速云 阅读:109

这篇文章主要讲解了“Spark生产作业容错能力的负面影响有哪些”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Spark生产作业容错能力的负面影响有哪些”吧!

1. Spark TaskLocality

在 Spark 中数据本地性通过 TaskLocality 来表示,有如下几个级别,

从上到下数据本地性依次递减。

Spark 在执行前通过数据的分区信息进行计算 Task 的 Locality,Task 总是会被优先分配到它要计算的数据所在节点以尽可能地减少网络 IO。这个计算的过程通过 spark.locality.wait 默认为3s,控制这个计算的过程。

2. Spark 内部容错

原理这里不细讲,简而言之就是重试。Spark 规定了同一个 Job 中同一个 Stage 连续失败重试的上限(spark.stage.maxConsecutiveAttempts),默认为4,也规定了一个 Stage 中 同一个 Task 可以失败重试的次数(spark.task.maxFailures),默认为4。当其中任何一个阈值达到上限,Spark 都会使整个 Job 失败,停止可能的“无意义”的重试。

3. 数据本地性和容错的冲突

我们首先来看一个例子,如图所示,图为 Spark Stage 页面下 Task Page 的详细视图。

3.1 问题一:单个 Task 重试为什么失败?

结合硬件层面的排查,发现是 NodeManager 物理节点上挂在的 /mnt/dfs/4,出现硬件故障导致盘只读,ShuffleMapTask 在即将完成时,将index文件和data文件commit时,获取index的临时文件时候发生FileNotFoundException   

java.io.FileNotFoundException: /mnt/dfs/4/yarn/local/usercache/da_haitao/appcache/application_1568691584183_1953115/blockmgr-1b6553f2-a564-4b31-a4a6-031f21c9c30f/0a/shuffle_96_2685_0.index.82594412-1f46-465e-a067-2c5e386a978e (No such file or directory)    at java.io.FileOutputStream.open0(Native Method)    at java.io.FileOutputStream.open(FileOutputStream.java:270)    at java.io.FileOutputStream.<init>(FileOutputStream.java:213)    at java.io.FileOutputStream.<init>(FileOutputStream.java:162)    at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:144)    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:245)    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:190)    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)    at org.apache.spark.scheduler.Task.run(Task.scala:109)

3.2 问题二:为什么该 Task 的4次重试都在同一个物理节点?

这是由于 Driver 在调度该 Task 的时候进行了数据本地性的运算,而且在  spark.locality.wait 默认为3s的时间约束内成功获得了NODE_LOCAL级别的数据本地性,故而都调度到了同一个  NodeManger 物理节点。

3.3 问题三:为什么总是“本地重试”,不是“异地重试”?
这个过程从逻辑上讲,其实已经不是“本地重试”,而恰恰是“异地重试”了。这我们可以从4次的重试的 Executor ID 上进行判断,第0、1和3次是在 ID 6上进行的,而第2次是在 ID 5上发生的。但由于ID 5和6都在同一个 NodeManger 节点,所以我们看起来像是“本地重试”。另一个原因就是上面所说的数据本地性的成功解析,所以这些 Task 的每次重试都高概率的来到这个节点。  
所有 Spark Task 级别的重试从逻辑上都应该属于“异地重试”,他们都需要通过 Driver 重新调度到新的 Executor 进行重试。我们所观测到的“本地”和“异地”是属于“现象”而非“本质”,影响这种现象的条件有比如下面几个(不一定全面):1. 数据本地性 2. Executor 由于 NodeLabel 限制,只在若干有限的物理机上分配 3. ResourceManager 调度时刚好把所有的 Executor 都分配到某个节点上。
3.4 问题5:为什么4次失败都操作同一个坏的盘?
该 NodeManger 实际上有/mnt/dfs/{0-11}, 一共12块盘,从物理检查上看,整个过程中也只有/mnt/dfs/4有异常告警,那为啥 Spark 这么傻?这么多好盘不用,专挑一块坏的盘死磕?
我们可以先看下出错的文件,我们包这个文件分成5个部分来看,  
  
  
  1. /mnt/dfs/4/yarn/local/2. usercache/da_haitao/appcache/application_1568691584183_1953115/ blockmgr-1b6553f2-a564-4b31-a4a6-031f21c9c30f/3. 0a/4. shuffle_96_2685_0.index5. .82594412-1f46-465e-a067-2c5e386a978e
基于这样的逻辑,对于某次Shuffle 过程的某个分区(Partition)的最终输出文件名其实是可以预测的也是固定的,比如我们这个 case 中,第96次shuffle的第2685分区的 index 文件的文件名即为shuffle_96_2685_0.index。
Spark 在写和读这个文件的时候,基于相同的定位逻辑(算法)来保证依赖关系,  
第一步确定根目录,Spark 通过文件名的hash绝对值与盘符数的模,作为索引却确定根目录  
  scala> math.abs("shuffle_96_2685_0.index".hashCode) % 12res0: Int = 6
而根目录的数组对于一个 Executor 的这个生命周期内而言是确定的,它是一个由简单随机算法将所有路径打散的一个固定数组。所以一旦文件名称确定,Executor 不换的话,根目录一定是确定的。所以都固定的去访问/mnt/dfs/4这个坏盘。  
但这只解释了一个 Executor 所被分配 Task 失败的原因,我们的 Task 还在不同的 executor 上进行过尝试。
3.5 问题5:为什么两个 Executor 上的重试都失败了?
其实这个问题只是概率的问题, Spark 用类似下面算法打乱所有LOCAL_DIRS的配置,如下面的的简单测试,这种碰撞的概率还是极高的,我们ID 5,6,的 Executor 下 DiskBlockManager 包含的 localDirs(6)应该都对应于 /mnt/dfs/4 这个坏盘。  
scala> def randomizeInPlace[T](arr: Array[Int], rand: java.util.Random = new java.util.Random): Array[Int] = {     |     for (i <- (arr.length - 1) to 1 by -1) {     |       val j = rand.nextInt(i + 1)     |       val tmp = arr(j)     |       arr(j) = arr(i)     |       arr(i) = tmp     |     }     |     arr     |   }randomizeInPlace: [T](arr: Array[Int], rand: java.util.Random)Array[Int]scala> randomizeInPlace(res11)res23: Array[Int] = Array(3, 2, 4, 1)
scala> randomizeInPlace(res11)res24: Array[Int] = Array(2, 3, 4, 1)
scala> randomizeInPlace(res11)res25: Array[Int] = Array(2, 1, 3, 4)
scala> randomizeInPlace(res11)res26: Array[Int] = Array(4, 2, 1, 3)
scala> randomizeInPlace(res11)res27: Array[Int] = Array(2, 3, 4, 1)

感谢各位的阅读,以上就是“Spark生产作业容错能力的负面影响有哪些”的内容了,经过本文的学习后,相信大家对Spark生产作业容错能力的负面影响有哪些这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

推荐阅读:
  1. igpu异构能力有哪些
  2. Spark作业的原理是什么

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

spark

上一篇:Unity3D 5.0+动态加载模型和对应Light Map方法的示例分析

下一篇:Linux sftp命令的用法是怎样的

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》