关于手工调整hashAgg算子下推到tikv节点的支持

在做非索引字段聚合时候优化器根据成本基本会选择hashAgg算子进行聚合运算,但是hashAgg是多阶段聚合的,可以在tikv存储节点上先进行聚合,然后再到tidb计算节点上二次聚合,当聚合的key字段重复值多的时候下推到tikv做聚合可以减少大量的数据到tidb节点从而减少计算耗时以及减少内存的使用。但是当key字段重复值很少时候如果下推到tikv上那么反而是做无用功浪费存储节点的CPU且聚合效果不好。也就是:
1、如果聚合的key字段重复度很低,那么不希望在tikv节点上做聚合,直接在tidb节点上做聚合。
2、如果聚合的key字段重复度很高,那么很希望在tikv节点上做一次聚合,减少tidb节点聚合压力,性能会大幅提升。
但是往往统计信息不一定及时(或者优化器自身评估问题,如很多时候多字段存在业务相关性,selectivity选择度偏离严重),导致错误的执行计划,尤其是第二种情况,本来可以在tikv做大量过滤这里变成了都要去tidb侧做,对于select col1,sum(col2_decimal),sum(col3_decimal),sum(col4_decimal)… from T group by col1;这种数据全部到tidb侧,加上decimal等数据类型的内存放大问题往往会导致tidb容易发生OOM问题,或者聚合计算非常慢。
因此希望可以提供session/global级别的参数可以选择hashAgg是否下压到存储节点,如果可以更希望在hint级别实现,这样控制力度更细,可以针对需要的hashAgg算子(一个语句中可能存在多个hashAgg,虽然一般情况下很少)指定hint来选择是否需要下推。

注意:tidb_opt_agg_push_down这个参数并不是将hashAgg下推到tikv节点,而是用来设置优化器是否执行聚合函数下推到 Join,Projection 和 UnionAll 之前的优化操作

这里举一个例子(不考虑分区表、多表等绕过形式的优化手段),对于一张历史大表(几十亿),需要按照日期O_ORDERDATE加载部分数据(几百万),而且新加载数据的状态O_ORDERSTATUS是未开始状态,加载完毕后立刻需要对这个新加载的数据做批次查询任务,涉及到聚合。因为新加载的数据量相对整体很少不一定触发自动统计信息搜集,所以可能会导致对新增量数据的评估存在问题。如果对全表做统计信息搜集那么可能执行时间很久,代价较大,甚至比后续批次时间都长。

拿TPCH100中的orders表进行演示,步骤如下:
1、生成新表orders_bak like orders,加载orders某日期(O_ORDERDATE)之前的大量数据。
2、为orders_bak添加联合索引(O_ORDERDATE、O_ORDERSTATUS)用于快速过滤新增数据。
3、对orders_bak表做统计信息搜集,当做历史全量数据。
4、对orders_bak表加载最新日期的数据(不做统计信息搜集)。
5、对orders_bak表中字段O_ORDERPRIORITY做key进行聚合sum(O_TOTALPRICE)操作。
6、观察执行计划情况,希望聚合进行下推到tikv,但是实际并不下推:
explain select O_ORDERPRIORITY,sum(O_TOTALPRICE) from orders_bak where O_ORDERDATE=‘1998-08-02’ and O_ORDERSTATUS=‘N’ group by O_ORDERPRIORITY;

mysql> show create table orders \G
*************************** 1. row ***************************
       Table: orders
Create Table: CREATE TABLE `orders` (
  `O_ORDERKEY` bigint(20) NOT NULL,
  `O_CUSTKEY` bigint(20) NOT NULL,
  `O_ORDERSTATUS` char(1) NOT NULL,
  `O_TOTALPRICE` decimal(15,2) NOT NULL,
  `O_ORDERDATE` date NOT NULL,
  `O_ORDERPRIORITY` char(15) NOT NULL,
  `O_CLERK` char(15) NOT NULL,
  `O_SHIPPRIORITY` bigint(20) NOT NULL,
  `O_COMMENT` varchar(79) NOT NULL,
  PRIMARY KEY (`O_ORDERKEY`) /*T![clustered_index] CLUSTERED */
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
1 row in set (0.00 sec)

mysql> select count(*) from orders;
+-----------+
| count(*)  |
+-----------+
| 150000000 |
+-----------+
1 row in set (1.83 sec)

mysql> select max(O_ORDERDATE),min(O_ORDERDATE) from orders;
+------------------+------------------+
| max(O_ORDERDATE) | min(O_ORDERDATE) |
+------------------+------------------+
| 1998-08-02       | 1992-01-01       |
+------------------+------------------+
1 row in set (1.55 sec)

mysql> select O_ORDERSTATUS,count(*) from orders group by O_ORDERSTATUS;
+---------------+----------+
| O_ORDERSTATUS | count(*) |
+---------------+----------+
| O             | 73086053 |
| F             | 73072502 |
| P             |  3841445 |
+---------------+----------+
3 rows in set (2.18 sec)

mysql> select O_ORDERPRIORITY,count(*) from orders group by O_ORDERPRIORITY;
+-----------------+----------+
| O_ORDERPRIORITY | count(*) |
+-----------------+----------+
| 4-NOT SPECIFIED | 30004093 |
| 1-URGENT        | 29995209 |
| 5-LOW           | 30002971 |
| 2-HIGH          | 29997467 |
| 3-MEDIUM        | 30000260 |
+-----------------+----------+
5 rows in set (2.69 sec)


创建orders_bak表并加载数据:

mysql> create table orders_bak like orders;
Query OK, 0 rows affected (0.12 sec)

--加载一百多万数据(因为历史数据太多执行时间较长,这里仅加载100多万数据作为历史数据)
--开启tiflash加速加载
mysql> set tidb_enable_tiflash_read_for_write_stmt=ON;
Query OK, 0 rows affected (0.00 sec)

mysql> explain insert into orders_bak select * from orders where O_ORDERDATE != '1998-08-02' limit 1000000;
+----------------------------------+------------+--------------+---------------+------------------------------------------------------------+
| id                               | estRows    | task         | access object | operator info                                              |
+----------------------------------+------------+--------------+---------------+------------------------------------------------------------+
| Insert_1                         | N/A        | root         |               | N/A                                                        |
| └─Limit_12                       | 1000000.00 | root         |               | offset:0, count:1000000                                    |
|   └─TableReader_23               | 1000000.00 | root         |               | MppVersion: 1, data:ExchangeSender_22                      |
|     └─ExchangeSender_22          | 1000000.00 | mpp[tiflash] |               | ExchangeType: PassThrough                                  |
|       └─Limit_21                 | 1000000.00 | mpp[tiflash] |               | offset:0, count:1000000                                    |
|         └─Selection_20           | 1000000.00 | mpp[tiflash] |               | ne(tpch100.orders.o_orderdate, 1998-08-02 00:00:00.000000) |
|           └─TableFullScan_19     | 1002486.09 | mpp[tiflash] | table:orders  | keep order:false                                           |
+----------------------------------+------------+--------------+---------------+------------------------------------------------------------+
7 rows in set (0.01 sec)

mysql> batch on orders.O_ORDERKEY limit 5000 insert into orders_bak select * from orders where O_ORDERDATE  like '1992-01-%';
+----------------+---------------+
| number of jobs | job status    |
+----------------+---------------+
|            387 | all succeeded |
+----------------+---------------+
1 row in set (1 min 37.83 sec)
Records: 2789  Duplicates: 0  Warnings: 0


mysql> alter table orders_bak add index (O_ORDERDATE,O_ORDERSTATUS);
Query OK, 0 rows affected (7.34 sec)

mysql> analyze table orders_bak;
Query OK, 0 rows affected, 1 warning (2.29 sec)


对orders_bak表添加增量数据,需要注意的是历史数据中并没有增量数据中相同的O_ORDERDATE日期数据和O_ORDERSTATUS状态数据,这里新增一个“未初始化状态值”N。
加载数据如下:

mysql> insert into orders_bak select O_ORDERKEY,O_CUSTKEY,'N',O_TOTALPRICE,O_ORDERDATE,O_ORDERPRIORITY,O_CLERK,O_SHIPPRIORITY,O_COMMENT from orders where O_ORDERDATE='1998-08-02';
Query OK, 62388 rows affected (7.58 sec)
Records: 62388  Duplicates: 0  Warnings: 0

SQL语句执行计划如下:

mysql> explain select O_SHIPPRIORITY,sum(O_TOTALPRICE),count(O_CUSTKEY) from orders_bak where O_ORDERDATE='1998-08-02' and O_ORDERSTATUS='N' group by O_SHIPPRIORITY;
+------------------------------------+---------+-----------+-----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| id                                 | estRows | task      | access object                                                   | operator info                                                                                                                                                                                                                                 |
+------------------------------------+---------+-----------+-----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Projection_5                       | 1.00    | root      |                                                                 | tpch100.orders_bak.o_shippriority, Column#10, Column#11                                                                                                                                                                                       |
| └─HashAgg_9                        | 1.00    | root      |                                                                 | group by:tpch100.orders_bak.o_shippriority, funcs:sum(tpch100.orders_bak.o_totalprice)->Column#10, funcs:count(tpch100.orders_bak.o_custkey)->Column#11, funcs:firstrow(tpch100.orders_bak.o_shippriority)->tpch100.orders_bak.o_shippriority |
|   └─IndexLookUp_31                 | 0.00    | root      |                                                                 |                                                                                                                                                                                                                                               |
|     ├─IndexRangeScan_29(Build)     | 0.00    | cop[tikv] | table:orders_bak, index:O_ORDERDATE(O_ORDERDATE, O_ORDERSTATUS) | range:[1998-08-02 "N",1998-08-02 "N"], keep order:false                                                                                                                                                                                       |
|     └─TableRowIDScan_30(Probe)     | 0.00    | cop[tikv] | table:orders_bak                                                | keep order:false                                                                                                                                                                                                                              |
+------------------------------------+---------+-----------+-----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
5 rows in set (0.01 sec)

mysql> explain analyze select O_SHIPPRIORITY,sum(O_TOTALPRICE),count(O_CUSTKEY) from orders_bak where O_ORDERDATE='1998-08-02' and O_ORDERSTATUS='N' group by O_SHIPPRIORITY;

| id                                 | estRows | actRows | task      | access object                                                   | execution info| operator info                                                                                                                                                                                                                                 | memory  | disk    |

| Projection_5                       | 1.00    | 1       | root      |                                                                 | time:361.1ms, loops:2, RRU:436.527075, WRU:0.000000, Concurrency:OFF                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                | tpch100.orders_bak.o_shippriority, Column#10, Column#11                                                                                                                                                                                       | 2.14 KB | N/A     |
| └─HashAgg_9                        | 1.00    | 1       | root      |                                                                 | time:361.1ms, loops| group by:tpch100.orders_bak.o_shippriority, funcs:sum(tpch100.orders_bak.o_totalprice)->Column#10, funcs:count(tpch100.orders_bak.o_custkey)->Column#11, funcs:firstrow(tpch100.orders_bak.o_shippriority)->tpch100.orders_bak.o_shippriority | 83.2 KB | 0 Bytes |
|   └─IndexLookUp_31                 | 0.00    | 62388   | root      |                                                                 | time:354.1ms, loops:62, index_task: {total_time: 94.3ms, fetch_handle: 94.2ms, build: 18.9µs, wait: 68.8µs}, table_task: {total_time: 897.2ms, num: 7, concurrency: 5}, next: {wait_index: 31.5ms, wait_table_lookup_build: 459.3µs, wait_table_lookup_resp: 318.5ms}                                                                                                                                                                                                                                                                                                                                                                                                               |                                                                                                                                                                                                                                               | 5.28 MB | N/A     |
|     ├─IndexRangeScan_29(Build)     | 0.00    | 62388   | cop[tikv] | table:orders_bak, index:O_ORDERDATE(O_ORDERDATE, O_ORDERSTATUS) | time:91.1ms, loops:64, cop_task: {num: 9, max: 26.2ms, min: 5.22ms, avg: 10.2ms, p95: 26.2ms, max_proc_keys: 23988, p95_proc_keys: 23988, tot_proc: 74.1ms, tot_wait: 688.7µs, rpc_num: 9, rpc_time: 91.9ms, copr_cache_hit_ratio: 0.00, build_task_duration: 30.2µs, max_distsql_concurrency: 1}, tikv_task:{proc max:15ms, min:4ms, avg: 8.22ms, p80:15ms, p95:15ms, iters:96, tasks:9}, scan_detail: {total_process_keys: 62388, total_process_keys_size: 3431340, total_keys: 62397, get_snapshot_time: 205.9µs, rocksdb: {delete_skipped_count: 337313, key_skipped_count: 399701, block: {cache_hit_count: 1891, read_count: 257, read_byte: 3.98 MB, read_time: 5.44ms}}}    | range:[1998-08-02 "N",1998-08-02 "N"], keep order:false                                                                                                                                                                                       | N/A     | N/A     |
|     └─TableRowIDScan_30(Probe)     | 0.00    | 62388   | cop[tikv] | table:orders_bak                                                | time:855ms, loops:71, cop_task: {num: 9, max: 278.6ms, min: 11ms, avg: 99.2ms, p95: 278.6ms, max_proc_keys: 18170, p95_proc_keys: 18170, tot_proc: 631.3ms, tot_wait: 3.71ms, rpc_num: 9, rpc_time: 892.1ms, copr_cache_hit_ratio: 0.00, build_task_duration: 1.68ms, max_distsql_concurrency: 2}, tikv_task:{proc max:260ms, min:8ms, avg: 88.2ms, p80:188ms, p95:260ms, iters:102, tasks:9}, scan_detail: {total_process_keys: 62388, total_process_keys_size: 9472269, total_keys: 62401, get_snapshot_time: 3.01ms, rocksdb: {delete_skipped_count: 24, key_skipped_count: 50, block: {cache_hit_count: 275613, read_count: 12, read_byte: 190.6 KB, read_time: 338.2µs}}}      | keep order:false                                                                                                                                                                                                                              | N/A     | N/A     |

5 rows in set (0.36 sec)

可以看到并没有对hashAgg进行下推,如果整个表数据量非常大,新增数据量几百上千万,那么在tidb层做hashAgg会有很大压力。一般为了避免hashAgg引发oom-kill,会设置并行度为1,让其落盘。但是非并行hashAgg因为算子效率执行相对较低,导致IndexLookUp_31可能会堆积较多数据,在实际使用中如果有大量的聚合字段(sum_col1,sum_col2_sum_col3…)可能会占用很大内存导致在IndexLookup算子引发oom-kill。

对整个表做统计信息搜集后,执行计划如下:

mysql> explain select O_SHIPPRIORITY,sum(O_TOTALPRICE),count(O_CUSTKEY) from orders_bak where O_ORDERDATE='1998-08-02' and O_ORDERSTATUS='N' group by O_SHIPPRIORITY;
+-------------------------------+------------+-----------+------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| id                            | estRows    | task      | access object    | operator info                                                                                                                                                                                        |
+-------------------------------+------------+-----------+------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Projection_5                  | 1.00       | root      |                  | tpch100.orders_bak.o_shippriority, Column#10, Column#11                                                                                                                                              |
| └─HashAgg_11                  | 1.00       | root      |                  | group by:tpch100.orders_bak.o_shippriority, funcs:sum(Column#12)->Column#10, funcs:count(Column#13)->Column#11, funcs:firstrow(tpch100.orders_bak.o_shippriority)->tpch100.orders_bak.o_shippriority |
|   └─TableReader_12            | 1.00       | root      |                  | data:HashAgg_6                                                                                                                                                                                       |
|     └─HashAgg_6               | 1.00       | cop[tikv] |                  | group by:tpch100.orders_bak.o_shippriority, funcs:sum(tpch100.orders_bak.o_totalprice)->Column#12, funcs:count(tpch100.orders_bak.o_custkey)->Column#13                                              |
|       └─Selection_10          | 62544.33   | cop[tikv] |                  | eq(tpch100.orders_bak.o_orderdate, 1998-08-02 00:00:00.000000), eq(tpch100.orders_bak.o_orderstatus, "N")                                                                                            |
|         └─TableFullScan_9     | 1995177.00 | cop[tikv] | table:orders_bak | keep order:false                                                                                                                                                                                     |
+-------------------------------+------------+-----------+------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
6 rows in set (0.01 sec)

mysql> explain analyze  select O_SHIPPRIORITY,sum(O_TOTALPRICE),count(O_CUSTKEY) from orders_bak where O_ORDERDATE='1998-08-02' and O_ORDERSTATUS='N' group by O_SHIPPRIORITY;

| id                            | estRows    | actRows | task      | access object    | execution info                                                                                                                                                                                                                                                  | operator info                                                                                                                                                                                        | memory    | disk    |

| Projection_5                  | 1.00       | 1       | root      |                  | time:860.9µs, loops:2, RRU:0.751712, WRU:0.000000, Concurrency:OFF                                                                                                                                                                                              | tpch100.orders_bak.o_shippriority, Column#10, Column#11                                                                                                                                              | 3.92 KB   | N/A     |
| └─HashAgg_11                  | 1.00       | 1       | root      |                  | time:836.2µs, loops:2                                                                                                                                                                                                                                           | group by:tpch100.orders_bak.o_shippriority, funcs:sum(Column#12)->Column#10, funcs:count(Column#13)->Column#11, funcs:firstrow(tpch100.orders_bak.o_shippriority)->tpch100.orders_bak.o_shippriority | 14.6 KB   | 0 Bytes |
|   └─TableReader_12            | 1.00       | 3       | root      |                  | time:818.2µs, loops:2, cop_task: {num: 3, max: 741.4µs, min: 506µs, avg: 596.7µs, p95: 741.4µs, tot_proc: 5.14µs, tot_wait: 405.5µs, rpc_num: 3, rpc_time: 1.73ms, copr_cache_hit_ratio: 1.00, build_task_duration: 13.6µs, max_distsql_concurrency: 3}         | data:HashAgg_6                                                                                                                                                                                       | 476 Bytes | N/A     |
|     └─HashAgg_6               | 1.00       | 3       | cop[tikv] |                  | tikv_task:{proc max:661ms, min:452ms, avg: 530.3ms, p80:661ms, p95:661ms, iters:1950, tasks:3}, scan_detail: {get_snapshot_time: 74µs, rocksdb: {block: {}}}                                                                                                    | group by:tpch100.orders_bak.o_shippriority, funcs:sum(tpch100.orders_bak.o_totalprice)->Column#12, funcs:count(tpch100.orders_bak.o_custkey)->Column#13                                              | N/A       | N/A     |
|       └─Selection_10          | 62544.33   | 62388   | cop[tikv] |                  | tikv_task:{proc max:646ms, min:429ms, avg: 515ms, p80:646ms, p95:646ms, iters:1950, tasks:3}                                                                                                                                                                    | eq(tpch100.orders_bak.o_orderdate, 1998-08-02 00:00:00.000000), eq(tpch100.orders_bak.o_orderstatus, "N")                                                                                            | N/A       | N/A     |
|         └─TableFullScan_9     | 1995177.00 | 1995177 | cop[tikv] | table:orders_bak | tikv_task:{proc max:607ms, min:402ms, avg: 483.3ms, p80:607ms, p95:607ms, iters:1950, tasks:3}                                                                                                                                                                  | keep order:false                                                                                                                                                                                     | N/A       | N/A     |

6 rows in set (0.00 sec)

可以看在聚合key(O_SHIPPRIORITY)重复度较大情况下,hashAgg算子下推到tikv节点会减少数据传输、减少tidb侧内存压力、增加性能。
希望能提供手工hint方式(或者参数设置),根据业务特性对特定语句由用户判断是否需要对聚合进行下压。

1 个赞

目前 TiDB 提供了 agg_to_cop() Hint 指示优化器下推聚合函数,可以试下看能不能解决问题。
文档:https://docs.pingcap.com/zh/tidb/dev/optimizer-hints#agg_to_cop

感谢大佬,管用!

此话题已在最后回复的 60 天后被自动关闭。不再允许新回复。