Apache Doris Colocate Join原理是什么

发布时间:2022-10-27 16:24:49 作者:iii
来源:亿速云 阅读:129

这篇文章主要介绍“Apache Doris Colocate Join原理是什么”,在日常操作中,相信很多人在Apache Doris Colocate Join原理是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Apache Doris Colocate Join原理是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

What Colocate Join

我们都知道 Join 的常见连接类型分为以下几种:

Join 的常见算法实现包含以下几种:

分布式系统实现 Join 数据分布的常见策略有:

Colocate/Local Join 就是指多个节点 Join 时没有数据移动和网络传输,每个节点只在本地进行 Join,能够本地进行 Join 的前提是相同 Join Key 的数据分布在相同的节点。

Why Colocate Join

相比 Shuffle Join 和 Broadcast Join,Colocate Join 在查询时没有数据的网络传输,性能会更高。 在 Doris 的具体实现中,Colocate Join 相比 Shuffle Join 可以拥有更高的并发粒度,也可以显著提升 Join 的性能,这一点在后面会解释。

How Colocate Join

核心思路

对于 colocate tables,在任何情况下都要保证数据的本地性。 具体包括:

实现中最复杂是第 3 点: 处理 colocate tables 的 balance。

术语定义

Colocate Group

我们将一组具体相同 Colocate 属性的 Table 称为 Group,下图中 t1 和 t2 拥有相同的 Colocate Group。

Colocate Parent Table

我们将决定一个 Group 数据分布的 Table 称为 Parent Table,下图中 t1 是 Colocate Parent Table.

Colocate Child Table

我们将一个 Group 中除 Parent Table 之外的 Table 称为 Child Table,下图中 t2 是 Colocate Child Table.

Apache Doris Colocate Join原理是什么

Bucket Seq

如下图,如果一个表有 N 个 Partition, 则每个 Partition 的第 M 个 bucket 的 Bucket Seq 是 M。

Apache Doris Colocate Join原理是什么

1 数据导入时保证本地性

Doris 的分区方式如下所示,先根据分区字段 Range 分区,再根据指定的 Distributed Key Hash 分桶:

Apache Doris Colocate Join原理是什么

所以我们在数据导入时保证本地性的核心思想就是两次映射,对于 colocate tables,我们保证相同 Distributed Key 的数据映射到相同的 Bucket Seq,再保证相同 Bucket Seq 的 buckets 映射到相同的 BE。

Apache Doris Colocate Join原理是什么

具体来说,第一步:我们计算 Distributed Key 的 hash 值,并对 bucket num 取模,保证相同 Distributed Key 的数据映射到相同的 Bucket Seq。

Apache Doris Colocate Join原理是什么

第二步:将同一个 Colocate Group 下所有相同 Bucket Seq 的 Bucket 映射到相同的 BE,方法如下:

Apache Doris Colocate Join原理是什么

2 Colocate Join Query Plan

对 HashJoinFragment,由于 Join 的多张表有了数据本地性保证,所以可以去掉 Exchange Node,避免网络传输,将 ScanNode 直接设置为 Hash Join Node 的 Child。

Apache Doris Colocate Join原理是什么

3 Colocate Join Query Schedule

查询调度的目标: 一个 Colocate join 中所有 ScanNode 中所有 Bucket Seq 相同的 Buckets 被调度到同一个 BE。

查询调度的策略:第一个 ScanNode 的 Buckets 随机选择 BE,其余的 ScanNode 和第一个 ScanNode 保持一致。

4 Colocate Join At Bucket Seq Level

目前,Doris 的 Hash Join 是 Server 粒度的:

Apache Doris Colocate Join原理是什么

对于 colocate join,由于同一个 Colocate Group 下相同 Bucket Seq 的 Bucket 分布在相同的 BE,所以我们将 Join 的粒度从 Server 粒度降至 Bucket Seq 粒度:

Apache Doris Colocate Join原理是什么

5 Colocate Join Metadata Maintenance

对于 colocate join,我们需要维护以下几个核心元数据:

Apache Doris Colocate Join原理是什么

6 How to decide a query can colocate join

7 Colocate Join Support Balance

核心思路

新增一个 daemon 线程专门处理 colocate table 的 balance,并让正常的 balance 线程不处理 colocate table 的 balance。

何时 balance

有 BE 节点新增,删除,down 掉时。

balance 的粒度

正常 balance 的粒度是 bucket,但是对于 colocate table,我们必须保证同一个 colocate group 下所有 bucket 的数据本地性,所以我们 balance 的单位是 colocate group。

balance 对查询的影响

当一个 colocate group 正在 balance 时,colocate join 会退化为原始的 shuffle join 或 broadcast join。

balance 流程:

当有 BE 节点删除或长时间挂掉时,选择目标 BE 的策略:

和正常 balance 时的选择策略相同,考虑集群的整体负载,尽量选择负载较低的 BE。

当有 BE 节点新增时,选择目标 BE 的策略:

Colocate Join Performance

测试数据:

Table A,B,C 都有 10 天数据,1 天一个 partitions,每个 partition 有 570 万数据。

测试集群:

4 台低配物理机,每个 BE 24CPU,96MEM

测试 SQL:

SQL1:

select count(*)  
FROM A t1
INNER JOIN [shuffle] B t5
   ON ((t1.dt = t5.dt) AND (t1.id = t5.id))
INNER JOIN [shuffle] C t6
   ON ((t1.dt = t6.dt) AND (t1.id = t6.id))
where t1.dt in (xxx days);

SQL2:

select t1.dt, t1.id, t1.name, t1.second_id,t1.second_name,
t5.id, t5.weight_time,t5.list,
t6.ord_id, t6._id
FROM A t1
INNER JOIN B t5
   ON ((t1.dt = t5.dt) AND (t1.id = t5.id))
INNER JOIN C t6
   ON ((t1.dt = t6.dt) AND (t1.id = t6.id))
where t1.dt in (xxx days)
limit 10000;

Test Result for SQL1:

Apache Doris Colocate Join原理是什么

Test Result for SQL2:

Apache Doris Colocate Join原理是什么

可以看到,Colocate Join 相比 Shuffle Join 有明显的性能提升,而且随着集群规模越大,Join 的数据量越多,Colocate Join 的优势会更明显。

How To Use Colocate Join

社区最新代码已经支持 Colocate Join,只不过默认是关闭的,只需要在 FE 配置中设置 disable_colocate_join 为 false,即可开启 Colocate Join 功能。

具体使用时只需要在建表时增加 colocate_with 这个属性即可,colocate_with 的值可以设置成同一组 colocate 表中的任意一个,不过需要保证 colocate_with 属性中的表要先建立。

假如需要对 table t1 和 t2 进行 Colocate Join,可以按以下语句建表:

CREATE TABLE `t1` (
  `id` int(11) COMMENT "",
  `value` varchar(8) COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
"colocate_with" = "t1"
);


CREATE TABLE `t2` (
  `id` int(11) COMMENT "",
  `value` varchar(8) COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
"colocate_with" = "t1"
);

Colocate Join 目前限制

Colocate Join 适用场景

Colocate Join 十分适合几张表按照相同字段分桶,并高频根据相同字段 Join 的场景,比如电商的不少应用都按照商家 Id 分桶,并高频按照商家 Id 进行 Join。

Colocate Join FAQ

一句话总结,凡是不能进行 Colocate Join 的场景都会自动退化为原始的 Shuffle Join 或者 Broadcast Join

Q1: 支持多张表进行 Colocate Join 吗?

A: 支持

Q2: 支持 Colocate 表和正常表 Join 吗?

A: 支持

Q3: Colocate 表支持用非分桶的 Key 进行 Join 吗?

A: 支持:不符合 Colocate Join 条件的 Join 会使用 Shuffle Join 或 Broadcast Join

Q4: 如何确定 Join 是按照 Colocate Join 执行的?

A: explain 的结果中 Hash Join 的孩子节点如果直接是 OlapScanNode, 没有 Exchange Node,就说明是 Colocate Join

Q5: 如何修改 colocate_with 属性?

A: ALTER TABLE example_db.my_table set ("colocate_with"="target_table");

Q6: 如何禁用 colocate join?

A: set disable_colocate_join = true; 就可以禁用 Colocate Join,查询时就会使用 Shuffle Join 或 Broadcast Join

到此,关于“Apache Doris Colocate Join原理是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

推荐阅读:
  1. Spark SQL Join原理分析
  2. Mysql连接join查询的原理是什么

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

apache join

上一篇:PHP消息队列实现及运用的方法是什么

下一篇:Vue $nextTick能获取到最新Dom的原因是什么

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》