在做非索引字段聚合时候优化器根据成本基本会选择hashAgg算子进行聚合运算,但是hashAgg是多阶段聚合的,可以在tikv存储节点上先进行聚合,然后再到tidb计算节点上二次聚合,当聚合的key字段重复值多的时候下推到tikv做聚合可以减少大量的数据到tidb节点从而减少计算耗时以及减少内存的使用。但是当key字段重复值很少时候如果下推到tikv上那么反而是做无用功浪费存储节点的CPU且聚合效果不好。也就是:
1、如果聚合的key字段重复度很低,那么不希望在tikv节点上做聚合,直接在tidb节点上做聚合。
2、如果聚合的key字段重复度很高,那么很希望在tikv节点上做一次聚合,减少tidb节点聚合压力,性能会大幅提升。
但是往往统计信息不一定及时(或者优化器自身评估问题,如很多时候多字段存在业务相关性,selectivity选择度偏离严重),导致错误的执行计划,尤其是第二种情况,本来可以在tikv做大量过滤这里变成了都要去tidb侧做,对于select col1,sum(col2_decimal),sum(col3_decimal),sum(col4_decimal)… from T group by col1;这种数据全部到tidb侧,加上decimal等数据类型的内存放大问题往往会导致tidb容易发生OOM问题,或者聚合计算非常慢。
因此希望可以提供session/global级别的参数可以选择hashAgg是否下压到存储节点,如果可以更希望在hint级别实现,这样控制力度更细,可以针对需要的hashAgg算子(一个语句中可能存在多个hashAgg,虽然一般情况下很少)指定hint来选择是否需要下推。
注意:
tidb_opt_agg_push_down
这个参数并不是将hashAgg下推到tikv节点,而是用来设置优化器是否执行聚合函数下推到 Join,Projection 和 UnionAll 之前的优化操作
这里举一个例子(不考虑分区表、多表等绕过形式的优化手段),对于一张历史大表(几十亿),需要按照日期O_ORDERDATE加载部分数据(几百万),而且新加载数据的状态O_ORDERSTATUS是未开始状态,加载完毕后立刻需要对这个新加载的数据做批次查询任务,涉及到聚合。因为新加载的数据量相对整体很少不一定触发自动统计信息搜集,所以可能会导致对新增量数据的评估存在问题。如果对全表做统计信息搜集那么可能执行时间很久,代价较大,甚至比后续批次时间都长。
拿TPCH100中的orders表进行演示,步骤如下:
1、生成新表orders_bak like orders,加载orders某日期(O_ORDERDATE)之前的大量数据。
2、为orders_bak添加联合索引(O_ORDERDATE、O_ORDERSTATUS)用于快速过滤新增数据。
3、对orders_bak表做统计信息搜集,当做历史全量数据。
4、对orders_bak表加载最新日期的数据(不做统计信息搜集)。
5、对orders_bak表中字段O_ORDERPRIORITY做key进行聚合sum(O_TOTALPRICE)操作。
6、观察执行计划情况,希望聚合进行下推到tikv,但是实际并不下推:
explain select O_ORDERPRIORITY,sum(O_TOTALPRICE) from orders_bak where O_ORDERDATE=‘1998-08-02’ and O_ORDERSTATUS=‘N’ group by O_ORDERPRIORITY;
mysql> show create table orders \G
*************************** 1. row ***************************
Table: orders
Create Table: CREATE TABLE `orders` (
`O_ORDERKEY` bigint(20) NOT NULL,
`O_CUSTKEY` bigint(20) NOT NULL,
`O_ORDERSTATUS` char(1) NOT NULL,
`O_TOTALPRICE` decimal(15,2) NOT NULL,
`O_ORDERDATE` date NOT NULL,
`O_ORDERPRIORITY` char(15) NOT NULL,
`O_CLERK` char(15) NOT NULL,
`O_SHIPPRIORITY` bigint(20) NOT NULL,
`O_COMMENT` varchar(79) NOT NULL,
PRIMARY KEY (`O_ORDERKEY`) /*T![clustered_index] CLUSTERED */
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
1 row in set (0.00 sec)
mysql> select count(*) from orders;
+-----------+
| count(*) |
+-----------+
| 150000000 |
+-----------+
1 row in set (1.83 sec)
mysql> select max(O_ORDERDATE),min(O_ORDERDATE) from orders;
+------------------+------------------+
| max(O_ORDERDATE) | min(O_ORDERDATE) |
+------------------+------------------+
| 1998-08-02 | 1992-01-01 |
+------------------+------------------+
1 row in set (1.55 sec)
mysql> select O_ORDERSTATUS,count(*) from orders group by O_ORDERSTATUS;
+---------------+----------+
| O_ORDERSTATUS | count(*) |
+---------------+----------+
| O | 73086053 |
| F | 73072502 |
| P | 3841445 |
+---------------+----------+
3 rows in set (2.18 sec)
mysql> select O_ORDERPRIORITY,count(*) from orders group by O_ORDERPRIORITY;
+-----------------+----------+
| O_ORDERPRIORITY | count(*) |
+-----------------+----------+
| 4-NOT SPECIFIED | 30004093 |
| 1-URGENT | 29995209 |
| 5-LOW | 30002971 |
| 2-HIGH | 29997467 |
| 3-MEDIUM | 30000260 |
+-----------------+----------+
5 rows in set (2.69 sec)
创建orders_bak表并加载数据:
mysql> create table orders_bak like orders;
Query OK, 0 rows affected (0.12 sec)
--加载一百多万数据(因为历史数据太多执行时间较长,这里仅加载100多万数据作为历史数据)
--开启tiflash加速加载
mysql> set tidb_enable_tiflash_read_for_write_stmt=ON;
Query OK, 0 rows affected (0.00 sec)
mysql> explain insert into orders_bak select * from orders where O_ORDERDATE != '1998-08-02' limit 1000000;
+----------------------------------+------------+--------------+---------------+------------------------------------------------------------+
| id | estRows | task | access object | operator info |
+----------------------------------+------------+--------------+---------------+------------------------------------------------------------+
| Insert_1 | N/A | root | | N/A |
| └─Limit_12 | 1000000.00 | root | | offset:0, count:1000000 |
| └─TableReader_23 | 1000000.00 | root | | MppVersion: 1, data:ExchangeSender_22 |
| └─ExchangeSender_22 | 1000000.00 | mpp[tiflash] | | ExchangeType: PassThrough |
| └─Limit_21 | 1000000.00 | mpp[tiflash] | | offset:0, count:1000000 |
| └─Selection_20 | 1000000.00 | mpp[tiflash] | | ne(tpch100.orders.o_orderdate, 1998-08-02 00:00:00.000000) |
| └─TableFullScan_19 | 1002486.09 | mpp[tiflash] | table:orders | keep order:false |
+----------------------------------+------------+--------------+---------------+------------------------------------------------------------+
7 rows in set (0.01 sec)
mysql> batch on orders.O_ORDERKEY limit 5000 insert into orders_bak select * from orders where O_ORDERDATE like '1992-01-%';
+----------------+---------------+
| number of jobs | job status |
+----------------+---------------+
| 387 | all succeeded |
+----------------+---------------+
1 row in set (1 min 37.83 sec)
Records: 2789 Duplicates: 0 Warnings: 0
mysql> alter table orders_bak add index (O_ORDERDATE,O_ORDERSTATUS);
Query OK, 0 rows affected (7.34 sec)
mysql> analyze table orders_bak;
Query OK, 0 rows affected, 1 warning (2.29 sec)
对orders_bak表添加增量数据,需要注意的是历史数据中并没有增量数据中相同的O_ORDERDATE日期数据和O_ORDERSTATUS状态数据,这里新增一个“未初始化状态值”N。
加载数据如下:
mysql> insert into orders_bak select O_ORDERKEY,O_CUSTKEY,'N',O_TOTALPRICE,O_ORDERDATE,O_ORDERPRIORITY,O_CLERK,O_SHIPPRIORITY,O_COMMENT from orders where O_ORDERDATE='1998-08-02';
Query OK, 62388 rows affected (7.58 sec)
Records: 62388 Duplicates: 0 Warnings: 0
SQL语句执行计划如下:
mysql> explain select O_SHIPPRIORITY,sum(O_TOTALPRICE),count(O_CUSTKEY) from orders_bak where O_ORDERDATE='1998-08-02' and O_ORDERSTATUS='N' group by O_SHIPPRIORITY;
+------------------------------------+---------+-----------+-----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| id | estRows | task | access object | operator info |
+------------------------------------+---------+-----------+-----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Projection_5 | 1.00 | root | | tpch100.orders_bak.o_shippriority, Column#10, Column#11 |
| └─HashAgg_9 | 1.00 | root | | group by:tpch100.orders_bak.o_shippriority, funcs:sum(tpch100.orders_bak.o_totalprice)->Column#10, funcs:count(tpch100.orders_bak.o_custkey)->Column#11, funcs:firstrow(tpch100.orders_bak.o_shippriority)->tpch100.orders_bak.o_shippriority |
| └─IndexLookUp_31 | 0.00 | root | | |
| ├─IndexRangeScan_29(Build) | 0.00 | cop[tikv] | table:orders_bak, index:O_ORDERDATE(O_ORDERDATE, O_ORDERSTATUS) | range:[1998-08-02 "N",1998-08-02 "N"], keep order:false |
| └─TableRowIDScan_30(Probe) | 0.00 | cop[tikv] | table:orders_bak | keep order:false |
+------------------------------------+---------+-----------+-----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
5 rows in set (0.01 sec)
mysql> explain analyze select O_SHIPPRIORITY,sum(O_TOTALPRICE),count(O_CUSTKEY) from orders_bak where O_ORDERDATE='1998-08-02' and O_ORDERSTATUS='N' group by O_SHIPPRIORITY;

| id | estRows | actRows | task | access object | execution info | operator info | memory | disk |

| Projection_5 | 1.00 | 1 | root | | time:361.1ms, loops:2, RRU:436.527075, WRU:0.000000, Concurrency:OFF | tpch100.orders_bak.o_shippriority, Column#10, Column#11 | 2.14 KB | N/A |
| └─HashAgg_9 | 1.00 | 1 | root | | time:361.1ms, loops:2 | group by:tpch100.orders_bak.o_shippriority, funcs:sum(tpch100.orders_bak.o_totalprice)->Column#10, funcs:count(tpch100.orders_bak.o_custkey)->Column#11, funcs:firstrow(tpch100.orders_bak.o_shippriority)->tpch100.orders_bak.o_shippriority | 83.2 KB | 0 Bytes |
| └─IndexLookUp_31 | 0.00 | 62388 | root | | time:354.1ms, loops:62, index_task: {total_time: 94.3ms, fetch_handle: 94.2ms, build: 18.9µs, wait: 68.8µs}, table_task: {total_time: 897.2ms, num: 7, concurrency: 5}, next: {wait_index: 31.5ms, wait_table_lookup_build: 459.3µs, wait_table_lookup_resp: 318.5ms} | | 5.28 MB | N/A |
| ├─IndexRangeScan_29(Build) | 0.00 | 62388 | cop[tikv] | table:orders_bak, index:O_ORDERDATE(O_ORDERDATE, O_ORDERSTATUS) | time:91.1ms, loops:64, cop_task: {num: 9, max: 26.2ms, min: 5.22ms, avg: 10.2ms, p95: 26.2ms, max_proc_keys: 23988, p95_proc_keys: 23988, tot_proc: 74.1ms, tot_wait: 688.7µs, rpc_num: 9, rpc_time: 91.9ms, copr_cache_hit_ratio: 0.00, build_task_duration: 30.2µs, max_distsql_concurrency: 1}, tikv_task:{proc max:15ms, min:4ms, avg: 8.22ms, p80:15ms, p95:15ms, iters:96, tasks:9}, scan_detail: {total_process_keys: 62388, total_process_keys_size: 3431340, total_keys: 62397, get_snapshot_time: 205.9µs, rocksdb: {delete_skipped_count: 337313, key_skipped_count: 399701, block: {cache_hit_count: 1891, read_count: 257, read_byte: 3.98 MB, read_time: 5.44ms}}} | range:[1998-08-02 "N",1998-08-02 "N"], keep order:false | N/A | N/A |
| └─TableRowIDScan_30(Probe) | 0.00 | 62388 | cop[tikv] | table:orders_bak | time:855ms, loops:71, cop_task: {num: 9, max: 278.6ms, min: 11ms, avg: 99.2ms, p95: 278.6ms, max_proc_keys: 18170, p95_proc_keys: 18170, tot_proc: 631.3ms, tot_wait: 3.71ms, rpc_num: 9, rpc_time: 892.1ms, copr_cache_hit_ratio: 0.00, build_task_duration: 1.68ms, max_distsql_concurrency: 2}, tikv_task:{proc max:260ms, min:8ms, avg: 88.2ms, p80:188ms, p95:260ms, iters:102, tasks:9}, scan_detail: {total_process_keys: 62388, total_process_keys_size: 9472269, total_keys: 62401, get_snapshot_time: 3.01ms, rocksdb: {delete_skipped_count: 24, key_skipped_count: 50, block: {cache_hit_count: 275613, read_count: 12, read_byte: 190.6 KB, read_time: 338.2µs}}} | keep order:false | N/A | N/A |

5 rows in set (0.36 sec)
可以看到并没有对hashAgg进行下推,如果整个表数据量非常大,新增数据量几百上千万,那么在tidb层做hashAgg会有很大压力。一般为了避免hashAgg引发oom-kill,会设置并行度为1,让其落盘。但是非并行hashAgg因为算子效率执行相对较低,导致IndexLookUp_31可能会堆积较多数据,在实际使用中如果有大量的聚合字段(sum_col1,sum_col2_sum_col3…)可能会占用很大内存导致在IndexLookup算子引发oom-kill。
对整个表做统计信息搜集后,执行计划如下:
mysql> explain select O_SHIPPRIORITY,sum(O_TOTALPRICE),count(O_CUSTKEY) from orders_bak where O_ORDERDATE='1998-08-02' and O_ORDERSTATUS='N' group by O_SHIPPRIORITY;
+-------------------------------+------------+-----------+------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| id | estRows | task | access object | operator info |
+-------------------------------+------------+-----------+------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Projection_5 | 1.00 | root | | tpch100.orders_bak.o_shippriority, Column#10, Column#11 |
| └─HashAgg_11 | 1.00 | root | | group by:tpch100.orders_bak.o_shippriority, funcs:sum(Column#12)->Column#10, funcs:count(Column#13)->Column#11, funcs:firstrow(tpch100.orders_bak.o_shippriority)->tpch100.orders_bak.o_shippriority |
| └─TableReader_12 | 1.00 | root | | data:HashAgg_6 |
| └─HashAgg_6 | 1.00 | cop[tikv] | | group by:tpch100.orders_bak.o_shippriority, funcs:sum(tpch100.orders_bak.o_totalprice)->Column#12, funcs:count(tpch100.orders_bak.o_custkey)->Column#13 |
| └─Selection_10 | 62544.33 | cop[tikv] | | eq(tpch100.orders_bak.o_orderdate, 1998-08-02 00:00:00.000000), eq(tpch100.orders_bak.o_orderstatus, "N") |
| └─TableFullScan_9 | 1995177.00 | cop[tikv] | table:orders_bak | keep order:false |
+-------------------------------+------------+-----------+------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
6 rows in set (0.01 sec)
mysql> explain analyze select O_SHIPPRIORITY,sum(O_TOTALPRICE),count(O_CUSTKEY) from orders_bak where O_ORDERDATE='1998-08-02' and O_ORDERSTATUS='N' group by O_SHIPPRIORITY;

| id | estRows | actRows | task | access object | execution info | operator info | memory | disk |
+-------------------------------+------------+---------+-----------+------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+
| Projection_5 | 1.00 | 1 | root | | time:860.9µs, loops:2, RRU:0.751712, WRU:0.000000, Concurrency:OFF | tpch100.orders_bak.o_shippriority, Column#10, Column#11 | 3.92 KB | N/A |
| └─HashAgg_11 | 1.00 | 1 | root | | time:836.2µs, loops:2 | group by:tpch100.orders_bak.o_shippriority, funcs:sum(Column#12)->Column#10, funcs:count(Column#13)->Column#11, funcs:firstrow(tpch100.orders_bak.o_shippriority)->tpch100.orders_bak.o_shippriority | 14.6 KB | 0 Bytes |
| └─TableReader_12 | 1.00 | 3 | root | | time:818.2µs, loops:2, cop_task: {num: 3, max: 741.4µs, min: 506µs, avg: 596.7µs, p95: 741.4µs, tot_proc: 5.14µs, tot_wait: 405.5µs, rpc_num: 3, rpc_time: 1.73ms, copr_cache_hit_ratio: 1.00, build_task_duration: 13.6µs, max_distsql_concurrency: 3} | data:HashAgg_6 | 476 Bytes | N/A |
| └─HashAgg_6 | 1.00 | 3 | cop[tikv] | | tikv_task:{proc max:661ms, min:452ms, avg: 530.3ms, p80:661ms, p95:661ms, iters:1950, tasks:3}, scan_detail: {get_snapshot_time: 74µs, rocksdb: {block: {}}} | group by:tpch100.orders_bak.o_shippriority, funcs:sum(tpch100.orders_bak.o_totalprice)->Column#12, funcs:count(tpch100.orders_bak.o_custkey)->Column#13 | N/A | N/A |
| └─Selection_10 | 62544.33 | 62388 | cop[tikv] | | tikv_task:{proc max:646ms, min:429ms, avg: 515ms, p80:646ms, p95:646ms, iters:1950, tasks:3} | eq(tpch100.orders_bak.o_orderdate, 1998-08-02 00:00:00.000000), eq(tpch100.orders_bak.o_orderstatus, "N") | N/A | N/A |
| └─TableFullScan_9 | 1995177.00 | 1995177 | cop[tikv] | table:orders_bak | tikv_task:{proc max:607ms, min:402ms, avg: 483.3ms, p80:607ms, p95:607ms, iters:1950, tasks:3} | keep order:false | N/A | N/A |

6 rows in set (0.00 sec)
可以看在聚合key(O_SHIPPRIORITY)重复度较大情况下,hashAgg算子下推到tikv节点会减少数据传输、减少tidb侧内存压力、增加性能。
希望能提供手工hint方式(或者参数设置),根据业务特性对特定语句由用户判断是否需要对聚合进行下压。