Hive Statistics介绍

Hive Metastore里除了保存元数据库表之外,还会保存statistics(统计信息)[1]。statistics记录了表或分区的大小、行数等信息,可以用于查询优化。Hive会在表、分区、字段级别记录各自的统计信息,主要包括:

例如我们新创建一张hive分区表,然后用hive插入一些数据,此时通过describe formatted语句可以看到表级别的statistics:

另外也可以指定partition查看某个分区的statistics:

如果开启了hive.stats.column.autogather,或者我们再运行一下ANALYZE TABLE xxx COMPUTE STATISTICS FOR COLUMNS,那么还会创建出字段级别的statistics,可以通过describe formatted [table_name] [column_name]来查看每个字段的statistics,包括min/max等信息。

可以看到,Hive会尽可能地在每次变更数据时更新statistics以保证其准确性。当然用户也可以为了提高性能来关掉开关。Hive statistics会用于CBO(hive.cbo.enable)查询计划优化[2],来选择出代价最低的查询计划。另外,如果开启了hive.compute.query.using.stats(默认值关闭),诸如count(1)、min、max的查询会直接走Hive statistics快速返回结果,而不进行文件扫描。

Spark如何使用Hive Statistics

通常来说,Spark用户也会使用Hive Metastore来管理库表元数据,Spark本身也兼容Hive表的读写。不过Spark对于Hive Statistics的使用与Hive有所不同,首先来说Spark定义了自己的一组Param前缀(spark.sql.statistics.XXXX),并不依赖于Hive本身的statistics字段,Spark使用的statistics包括:

Spark自己定义了一套statistics key,这也意味着Spark引擎写入的statistics不会被其他引擎使用和共享,另外可以看出Spark对于statistics的写入相对于Hive要谨慎很多,在默认的配置下,Spark不会在数据写入时有效地更新statistics,意味着Hive Metastore里的统计信息不再能准确反映表的实际情况。

Spark Statistics的写入

上述表格已经列出了各个statistics key是在何时更新的,这里我们再详述一下。

表级别statistics

针对常规的SQL,Spark主要通过org.apache.spark.sql.execution.command.CommandUtils#updateTableStats这个方法来更新写入后的statistics,代码如下。代码的内容比较简单,仅当autoSizeUpdateEnabled开关打开,才会触发tableSize的计算,计算tableSize的原理为:如果是非分区表,递归list表存储路径下的所有文件,将每个文件size求和;如果是分区表,则默认会listPartitions先拿到所有分区的路径,然后每个分区大小并行计算。最终仅会更新表级别的spark.sql.statistics.totalSize属性。

那这个方法什么时候会被触发调用呢,包括 _AlterTableAddPartition、CreateDataSourceTableAsSelect、AlterTableDropPartition、AlterTableSetLocation、LoadDataInto、TruncateTable_、以及两种Table的Insert(InsertIntoHadoopFsRelationCommand、InsertIntoHiveTable),都会调用CommandUtils.updateTableStats。也就是覆盖了所有数据更新或者表更新的场景,在开启开关后都会更新spark.sql.statistics.totalSize。
另外,要主动更新statistics还是要运行Analyze命令[3],对于 AnalyzeTable 命令,除了更新spark.sql.statistics.totalSize之外,也会根据参数noscan是否关闭来判断更新spark.sql.statistics.numRows,源码如下。

分区级别statistics

相比于表级别的statistics,分区级别statistics写入的场景更少了。即使你打开了spark.sql.statistics.size.autoUpdate.enabled这个开关,也不会在写入时自动更新任何分区级别statistics,因为这个开关只会写入表级别的statistics。包括当我们用spark新创建分区AlterTableAddPartition时,新分区的statistics也是空的。
不过除了Analyze命令以外,有一种情况,对于 ALTER TABLE … RECOVER PARTITIONS 语句,如果新创建了分区,则新分区会带上numFiles、totalSize这两个statistics。注意这里写入的statistics没有以spark.sql.statistics开头,而是写入了Hive格式的statistics,从代码注释看这样做的目的”This two fast stat could prevent Hive metastore to list the files again.”,也就是仅为了性能优化做的一次写入,毕竟这个命令在执行的时候已经扫描过一次分区目录了。
真正要更新spark的分区statistics,还是得运行 ANALYZE TABLE xxx PARTITION (xxxx = xxx) COMPUTE STATISTICS; 这个命令会更新分区statistics的两个key:spark.sql.statistics.totalSize、spark.sql.statistics.numRows。

字段级别statistics

Spark字段级别的statistics,只能通过 ANALYZE TABLE xxx COMPUTE STATISTICS FOR COLUMNS xxx; 语句来计算和写入。它记录了某个字段在整张表的min/max等统计信息,没有在分区维度进行统计。
Spark字段级别统计信息包括了avgLen、distinctCount、min、max、maxLen、nullCount以及histogram。前面几个统计值都好理解,histogram比较特殊,需要开启spark.sql.statistics.histogram.enabled之后再运行Analyze才可以生成。histogram将字段值的分布切分成n个百分比位(n=spark.sql.statistics.histogram.numBins,默认值254),每一段bin内部会统计min/max和distinct_count。
一份完整的Spark字段级统计信息如下图:

Spark可能也会更新Hive Stats

翻遍Spark源码,Spark更新的statistics都会以spark.sql.statistics开头,看似不会更新Hive格式的statistics。但实际情况下,Spark变更元数据时createTable/alterTable/addPartition会连接到Hive Metastore,此时在Hive Metastore内部依然会更新表和分区的statistics。这里更新的是Hive格式的stats:numFiles和totalSize。
那么问题就是Spark写入表的时候,是否会触发createTable/alterTable/addPartition这些ddl命令。类似于CTAS的语句,在创建元数据的同时进行插入肯定没问题会触发。如果只是普通的INSERT [OVERWRITE]命令,是否更新则要取决于Spark内部实现,Spark内部的插入有两种实现InsertIntoHadoopFsRelationCommand、InsertIntoHiveTable,分别对应Datasource表和Hive表。一个表的类型取决于Create table命令时候的语法[4]。另外如果是orc/parquet格式的Hive表,并且开启了spark.sql.hive.convertMetastoreParquet或spark.sql.hive.convertMetastoreOrc(默认是开启),则会做RelationConversion,实际上还是用InsertIntoHadoopFsRelationCommand来实现orc/parquet表的写入。如果是InsertIntoHiveTable命令,则会在写入非分区表时触发alterTable然后在Hive Metastore里计算statistics,同时也会更新transient_lastDdlTime。如果是InsertIntoHadoopFsRelationCommand命令,则不会触发alterTable,也不会计算statistics。

表类型 新增表/分区(CTAS、插入新分区)
是否更新Hive Stats
普通INSERT插入表
是否更新表Hive Stats
普通INSERT插入分区
是否更新分区Hive Stats
Datasource表
Hive表
(除了orc/parquet)
Hive表
(orc/parquet)

这里可以看出细节很多,也比较复杂,如果同一个表用不同引擎写入,或者用不同参数写入,可能行为都不一样。总体上看stats的写入在spark端来看是比较混乱的。

数据湖格式对statistics的使用

数据湖格式指的是delta、hudi、iceberg。数据湖格式通常不太依赖Hive Metastore的元数据,而是自己维护一份文件来记录表的元数据和统计信息。不同的格式对hive statistics的使用我做了一下测试,由于版本等原因,这里的结论仅做一个参考。

格式 写入数据是否更新statistics Analyze是否更新statistics
delta 会更新totalSize、numFiles Spark3不支持Analyze
Spark2的Analyze会更新spark.sql.statistics.numRows/spark.sql.statistics.totalSize
hudi 会更新totalSize、numFiles 会更新spark.sql.statistics.numRows 和 spark.sql.statistics.totalSize
iceberg 会更新totalSize、numFiles、numRows Spark3不支持Analyze
Spark2不支持spark-sql

总体来说,湖格式对Hive statistics的使用有几个特点

Spark Statistics的使用

在Spark内部实现里,会先从Hive Metastore里拿到的原始的statistics,构造出CatalogStatistics,随后在执行阶段会转换成Statistics
Spark在构造CatalogStatistics时,无论是表级别还是分区级别的statistics,都是先读取Hive格式的statistics,再读取spark.sql.statistics前缀的statistics,如果存在则Spark的statistics优先覆盖Hive的statistics。读取字段级别stats则没有读hive的stats,而是只取了spark.sql.statistics.colStats开头的表级别stats。
最终在Spark的每个LogicalPlan的节点都会有一个名为stats的Statistics属性,表示了计划执行到此处预估的数据量org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats#stats
那么这个plan里面的Statistics是怎么计算得来的,分几种情况:

Join策略选择

得到查询计划里面的Statistics后,最重要的用处就是判断一张表是否可以做Broadcast Join。因为Broadcast Join可以把小表广播出去,性能要远优于Sort Merge Join。判断逻辑就是stats里的表大小,是否小于spark.sql.autoBroadcastJoinThreshold(默认10M,一般建议调大点)。

如上所述,执行计划中的Statistics会从Hive Metastore获取,尽管在有些场景下会通过list文件来兜底,但是并不能覆盖所有场景。一份准确的表级别stats可以避免小表被Sort Merge Join。例如:

1
2
CREATE TABLE student (id INT, name STRING, age INT) STORED AS TEXTFILE;
EXPLAIN SELECT * FROM student t1 left join student t2 on t1.id=t2.id;


这是一个没有数据的空表,但仍然做了SortMergeJoin,如果有stats更新,才会走到BroadcastHashJoin。
不过Spark3开启AQE后,这个问题会好很多,AQE会自动Join策略调整DemoteBroadcastHashJoin,上述例子虽然explain的计划是SMJ但是执行过程中会变为BHJ。

cbo优化规则

cbo的开关(spark.sql.cbo.enabled)默认是关闭的,如果打开,一方面上述执行计划的Statistics会估算的更准确,另一方面也会增加一些执行计划的优化规则,主要就是CostBasedJoinReorder:
spark.sql.cbo.joinReorder.enabled开启后,会触发CostBasedJoinReorder这个Optimizer优化规则。这个规则会用来调整多个表连续Join时的Join顺序,通过这些表的stats和动态规划算法,优化成开销最小的Join顺序。
关于这个cbo规则的详细设计和算法,可以参考官方的设计文档 SPARK-16026[5]。

参考资料

[1] https://cwiki.apache.org/confluence/display/hive/statsdev
[2]https://cwiki.apache.org/confluence/display/Hive/Cost-based+optimization+in+Hive
[3]https://spark.apache.org/docs/3.3.0/sql-ref-syntax-aux-analyze-table.html#content
[4]https://spark.apache.org/docs/latest/sql-ref-syntax-ddl-create-table.html
[5] https://issues.apache.org/jira/browse/SPARK-16026