Spark SQL中Not in Subquery为何低效以及如何规避

发布时间:2021-12-17 09:47:58 作者:柒染
来源:亿速云 阅读:247

这期内容当中小编将会给大家带来有关Spark SQL中Not in Subquery为何低效以及如何规避,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

首先看个Not in Subquery的SQL:

// test_partition1 和 test_partition2为Hive外部分区表select * from test_partition1 t1 where t1.id not in (select id from test_partition2);
对应的完整的逻辑计划和物理计划为:
== Parsed Logical Plan =='Project [*]+- 'Filter NOT 't1.id IN (list#3 [])   :  +- 'Project ['id]   :     +- 'UnresolvedRelation `test_partition2`   +- 'SubqueryAlias `t1`      +- 'UnresolvedRelation `test_partition1`
== Analyzed Logical Plan ==id: string, name: string, dt: stringProject [id#4, name#5, dt#6]+- Filter NOT id#4 IN (list#3 [])   :  +- Project [id#7]   :     +- SubqueryAlias `default`.`test_partition2`   :        +- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9]   +- SubqueryAlias `t1`      +- SubqueryAlias `default`.`test_partition1`         +- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]
== Optimized Logical Plan ==Join LeftAnti, ((id#4 = id#7) || isnull((id#4 = id#7))):- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]+- Project [id#7]   +- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9]
== Physical Plan ==BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#4 = id#7) || isnull((id#4 = id#7))):- Scan hive default.test_partition1 [id#4, name#5, dt#6], HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]+- BroadcastExchange IdentityBroadcastMode   +- Scan hive default.test_partition2 [id#7], HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9]
通过上述逻辑计划和物理计划可以看出,Spark SQL在对not in subquery处理,从逻辑计划转换为物理计划时,会最终选择BroadcastNestedLoopJoin(对应到Spark源码中BroadcastNestedLoopJoinExec.scala)策略。

提起BroadcastNestedLoopJoin,不得不提Nested Loop Join,它在很多RDBMS中得到应用,比如mysql。它的工作方式是循环从一张表(outer table)中读取数据,然后访问另一张表(inner table,通常有索引),将outer表中的每一条数据与inner表中的数据进行join,类似一个嵌套的循环并且在循环的过程中进行数据的比对校验是否满足一定条件。

对于被连接的数据集较小的情况下,Nested Loop Join是个较好的选择。但是当数据集非常大时,从它的执行原理可知,效率会很低甚至可能影响整个服务的稳定性。

而Spark SQL中的BroadcastNestedLoopJoin就类似于Nested Loop Join,只不过加上了广播表(build table)而已。

BroadcastNestedLoopJoin是一个低效的物理执行计划,内部实现将子查询(select id from test_partition2)进行广播,然后test_partition1每一条记录通过loop遍历广播的数据去匹配是否满足一定条件。

private def leftExistenceJoin(   // 广播的数据    relation: Broadcast[Array[InternalRow]],    exists: Boolean): RDD[InternalRow] = {  assert(buildSide == BuildRight)  /* streamed对应物理计划中:  Scan hive default.test_partition1 [id#4, name#5, dt#6], HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]  */  streamed.execute().mapPartitionsInternal { streamedIter =>    val buildRows = relation.value    val joinedRow = new JoinedRow
  // 条件是否定义。此处为Some(((id#4 = id#7) || isnull((id#4 = id#7))))    if (condition.isDefined) {      streamedIter.filter(l =>        // exists主要是为了根据joinType来进一步条件判断数据的返回与否,此处joinType为LeftAnti        buildRows.exists(r => boundCondition(joinedRow(l, r))) == exists      )            // else    } else if (buildRows.nonEmpty == exists) {      streamedIter    } else {      Iterator.empty    }  }}

由于BroadcastNestedLoopJoin的低效率执行,可能导致长时间占用executor资源,影响集群性能。同时,因为子查询的结果集要进行广播,如果数据量特别大,对driver端也是一个严峻的考验,极有可能带来OOM的风险。因此,在实际生产中,要尽可能利用其他效率相对高的SQL来避免使用Not in Subquery。

虽然通过改写Not in Subquery的SQL,进行低效率的SQL到高效率的SQL过渡,能够避免上面所说的问题。但是这往往建立在我们发现任务执行慢甚至失败,然后排查任务中的SQL,发现"问题"SQL的前提下。那么如何在任务执行前,就"检查"出这样的SQL,从而进行提前预警呢?

这里笔者给出一个思路,就是解析Spark SQL计划,根据Spark SQL的join策略匹配条件等,来判断任务中是否使用了低效的Not in Subquery进行预警,然后通知业务方进行修改。同时,我们在实际完成数据的ETL处理等分析时,也要事前避免类似的低性能SQL。

上述就是小编为大家分享的Spark SQL中Not in Subquery为何低效以及如何规避了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。

推荐阅读:
  1. 急中生智~利用Spark core完成"ETL"!
  2. Spark SQL笔记整理(一):Spark SQL整体背景介绍

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

spark subquery spark sql

上一篇:Spark广播变量分析以及如何动态更新广播变量

下一篇:python匿名函数怎么创建

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》