数据库中执行器层到底在做什么?

2019-10-06

应届生招骋,很多同学都不知道数据库执行器在做什么事情。贵司的 SQL Engine Team 又期望招到有经验的候选人,这无疑缩小了招骋范围。所以这篇文章,我想讲一讲这个话题:数据库中执行器层到底在做什么。为扩大的候选人来一点指引。

我从面试题开始切入,解释为什么这样出题,以及这几道面试题和数据库执行器有什么关联。

两数组找公共元素

这道题大意是,给两个数组,需要找到两个数组中共同的元素。题目相当的简单真诚,没有套路。每个人应该都能至少想到答案。

最基本的解法,两层循环,第一个数组里面的每个元素,去跟第二个数组的每个元素比较。这是一个 O(N2) 的算法。

有没有更好一点的呢?对第一个数组建一个哈希表,然后用第二个数组去过一遍这个哈希表,如果元素在哈希表中存在,就是公共的。第一遍的建表和第二遍的扫描,都是 O(N) 可以完成的。这个算法的缺点是,由于要创建哈希表,内存消耗会大一些。

第三种解法,我们可以分别对两个数组做排序,排序是 O(NlogN) 的。然后对两个排序好的数据,对比着扫描一遍,元素相同的是我们要的,不相同的时候把数组中较小的那个元素的游标往前移,扫一遍是 O(N) 的开销,所以整体是 O(NlogN)。

好啦,这很简单,对吧!那它和数据库执行器有啥关联呢? 其实这道题就是 Join 算子的实现。最基础的 inner join 的三种实现方式:

  • 第一种暴力对比的,叫 nest loop join
  • 第二种用哈希表的,叫 hash join
  • 第三种要求输入有序的,叫 sort merge join

join 是数据库中基础的基础,也是重中之重。

这道题扩展一下。面试官可以继续提问,假设这两个数组不是内存数组,而是两个文件呢?要求找两个文件中共同的元素。其实解法也是一样的,nest loop join / hash join / sort merge join。两个文件等价于数据库里面两个表。

求两文件的公共元素,等价于

select id from t1, t2 where t1.id = t2.id

nest loop join 的特点在于,慢!毕竟是 O(N2) 的,而每次访问还有网络和读盘开销。hash join 的特点在于,耗内存。一般用小表建表。如果内存装不下,就用不了 hash join 了。最后 sort merge join 特点,就是有排序那一步的限制条件。

这道题再继续扩展一下。面试官会继续问:两个文件找公共元素,假设在多核的单机环境下,如何做优化?如何才能充分发挥多核性能?

其实核心是围绕 hash join 拆分,hash 比较好拆一些。在第二步的 hash 探查阶段,是明显可以并行的,数据读出来分发到多核,并行去查 hash 表。另外,如果我们还可以拆分任务,比如有多少 16 个核我们先把数据空间划分 16 个 bin,两个文件就会打散到各自的核上面处理。但是这样划分会受到数据分布的影响,如果数据分布不均匀,就会有的核要处理的数据量比较多。 文件的话,还有读盘受 IO 影响,计算受 CPU 影响,如何让阶段不要互相 block ,让整个系统资源利用起来,提升吞吐?绑核,CPU 亲和性,调度都可以提问,还有缓存,NUMA 等等。

再继续扩展,就是两超大文件,有一个集群,如何利用多机,实现一个分布式的找公共元素?这里面就更复杂了,会考虑机器 down 掉,中间结果的缓存,错误的恢复,任务划分的均匀,比如最慢的一个拖慢整个的任务完成时间。

一道很开放的面试题,一层一层的递进,可以问到很深。当然,如果这些全部能答得很好的,那肯定不是应届生水平,本文的废话也不用读了,直接甩简历过来就好。

top N URL

这道题是某同事特别喜欢出的一道题。题目大概是这样子,100G 的大文件,里面存储的全部是 URL 记录。要求找出里面出现频率最高的前 100 条记录。充分利用多核,然后内存限制要求只能使用 1G 以内。

100G 大文件是显然无法 load 到内存的,所以要拆小。拆有几种拆法,顺序拆分,变成一批一批的。或者按 hash 拆分。不少候选人都是按 hash 拆的,hash 拆完每个文件就小了,然后可以 load 到内存处理了。再一个一个的加载,去重,计数。变成 url => count 的记录,最后进行汇总。如果前面按 hash 拆了,同一条 url 不会落在多个文件里面。可以分别对每个文件求 top N 了再最后汇总再做一步 top N。

考虑程序健壮性是很重要的,有两种极端情况必须考虑:

  • 100G 文件里面全部都是同一条 URL
  • 100G 文件里面每一条 URL 各不相同

如果全部是同一条 URL,那么使用 hash 拆分的方式,100G 拆完还是 100G 文件,这个会导致多核并没有使用上,退化成单核处理。如果每条 URL 各不相同,那么汇总的时候,如果一步到位, url => 1 的记录数会非常多,会把内存撑爆。需要结果落盘了再次处理。

我们不妨看一看,这个题目如果写一条 SQL 来处理,该怎么做?比如表是这样子:

create table t (url varchar);

去重,求 count 可以用 group by 操作,这样可以求出 url 和对就的出现次数:

select url, count(*) as cnt from t group by url;

如果我们要求 top N,可以用 order by limit,于是整个 SQL 可以写成:

select * from (select url, count(*) as cnt from t group by url) as tmp order by cnt limit 100;

group by 之后是可以直接带 order by limit,所以简单写成:

select url, count(*) as cnt from t group by url order by cnt limit 100;

重点来了,看一看查询计划是什么样子:

mysql> explain select url, count(*) as cnt from t group by url order by cnt limit 100;
+--------------------------|----------|------|-------------------------------------------------------+
| id                       | count    | task | operator info                                              |
+--------------------------|----------|------|-------------------------------------------------------+
| Projection_7             | 100.00   | root | Column#1, Column#3                                         |
| └─TopN_10                | 100.00   | root | Column#3:asc, offset:0, count:100                          |
|   └─HashAgg_16           | 8000.00  | root | group by:Column#1, funcs:count(1), firstrow(Column#1)      |
|     └─TableReader_21     | 10000.00 | root | data:TableScan_20                                          |
|       └─TableScan_20     | 10000.00 | cop  | table:t, range:[-inf,+inf], keep order:false, stats:pseudo |
+--------------------------|----------|------|-------------------------------------------------------+
5 rows in set (0.00 sec)

执行器执行这个查询计划会做什么呢?先把数据全读上来(TableReader),然后做聚合(HashAgg),再对结果求 TopN。它涉及了 TableReader,HashAgg,TopN 几个主要的执行器。其中 HashAgg 是指 hash aggregate,做的事情就是将输入的 URL 分组并计算 count(1) 和 firstrow。

回到 100G 文件的题目。其实我们可以拆成两步:第一步是统计各 URL 出现次数;第二步是计算 TopN。实现第一步的 URL 出现次数,就是 Aggregate 操作,Aggregate 两种基本的的实现方式:

  • 第一种是 hash aggregate
  • 第二种是 streaming aggregate

hash aggregate 就是创建一个 hash 表,去重计数。streaming aggregate 是先排序了扫一遍有序结果,去重计数。

明白为什么要出这首题了吧,实现这道题,跟实现执行器层的某些算子,是一样的。这样出题是希望候选人理解执行器内部实现机制!

map reduce

上面那道题实现并没有讲完,因为我们其实只讲到了单机的实现,如果在 1G 内存限制就暴了。我们把第一步提取出来,可以出另外一道题:单词频率统计。

这就是 map reduce 的一个经典例子了,100G 的 URL 文件,求每一条 URL 出现的频次。

我们先把输入文件进行处理,在执行 map 之前,先把数据切分了丢到各机器上面去。在各个机器上,将输入处理 key-value 形式,(url, 1)。

然后 shuffle 过程,按一定的规则分组(比如取头几个单词,比如取一个哈希码,比如按 range 范围),特定的 key 会落到特定的 partition 上面。只要这里的划分是合理的,内存用量是可控的。然后对 partition 的结果进行处理,每个 partition 的结果处理成 (url, n) 的形式。

接下来就到了 reduce 阶段,

把节点到 partition 映射关系反过来,就知道每个节点要拉哪些 partition 的数据。然后 reduce 节点执行一个 merge 操作就可以吐出最后的 (url, n) 了。

其实 map reduce 的关键点是 shuffle,如何打散数据,让数据无依赖的并行计算。利用多核,或者分布式的时候利用多机,都是需要理解这些东西的。

一个分布式数据库的执行器层的基础知识就这些了。简单来说,就是计算和分布。计算需要实现执行器层的各种算子,Join,聚合函数,排序,TopN 等等。然后各种算子通过一个叫火山模型的方式变成可组合的。分布就是如何拆分数据和计算任务,让整个集群资源都以参与进来,跟 map-reduce 是差不多的。

希望读者能有一些启发,写得不好,不要扔鸡蛋...扔简历吧(mkl at pingcap dot com)~

executordatabase