查询计划
查询计划是 InfluxDB 3 Querier 设计和执行的一系列步骤,用于计算查询结果。Querier 使用 DataFusion 和 Arrow 构建和执行查询计划,这些计划调用 DataFusion 和 InfluxDB 特定的运算符,这些运算符从 Object store 和 Ingester 读取数据,并应用查询转换,例如去重、过滤、聚合、合并、投影和排序,以计算最终结果。
与许多其他数据库一样,Querier 包含一个查询优化器。在解析传入的查询后,Querier 构建一个逻辑计划——一系列高级步骤,例如扫描、过滤和排序,这是查询所需的。在逻辑计划之后,Querier 然后构建最佳物理计划,以在最短的时间内计算出正确的结果。该计划利用 Ingester 的数据分区来并行化计划操作并在执行计划之前修剪不必要的数据。Querier 还应用谓词和投影下推的常用技术,以尽可能早地进一步修剪数据。
显示语法
逻辑和物理查询计划以树语法表示(例如,在 EXPLAIN
报告中)。
- 每个计划都表示为由节点组成的倒置树。
- 父节点等待其子节点的输出。
- 数据从树的底部最内层节点向上流到顶部的最外层根节点。
逻辑和物理计划示例
以下查询生成包含逻辑计划和物理计划的 EXPLAIN
报告
EXPLAIN SELECT city, min_temp, time FROM h2o ORDER BY city ASC, time DESC;
输出如下
图 1. EXPLAIN 报告
| plan_type | plan |
+---------------+--------------------------------------------------------------------------+
| logical_plan | Sort: h2o.city ASC NULLS LAST, h2o.time DESC NULLS FIRST |
| | TableScan: h2o projection=[city, min_temp, time] |
| physical_plan | SortPreservingMergeExec: [city@0 ASC NULLS LAST,time@2 DESC] |
| | UnionExec |
| | SortExec: expr=[city@0 ASC NULLS LAST,time@2 DESC] |
| | ParquetExec: file_groups={...}, projection=[city, min_temp, time] |
| | SortExec: expr=[city@0 ASC NULLS LAST,time@2 DESC] |
| | ParquetExec: file_groups={...}, projection=[city, min_temp, time] |
| | |
来自 EXPLAIN SELECT city, min_temp, time FROM h2o ORDER BY city ASC, time DESC;
的输出
图 1 物理计划中的叶节点是并行 ParquetExec
节点
ParquetExec: file_groups={...}, projection=[city, min_temp, time]
...
ParquetExec: file_groups={...}, projection=[city, min_temp, time]
数据流
物理计划节点表示 ExecutionPlan
的特定实现,它接收输入流,应用表达式进行过滤和排序,然后将其输出流产生到其父节点。
下图显示了 图 1 物理计划中的数据流和 ExecutionPlan
节点的顺序
SortPreservingMergeExec
UnionExec
SortExec
ParquetExec
SortExec
ParquetExec
InfluxDB Clustered 包括以下计划表达式
逻辑计划
查询的逻辑计划
- 是一个高级计划,表达了查询的“意图”以及计算结果所需的步骤。
- 需要有关数据模式的信息
- 独立于物理执行、集群配置、数据源(Ingester 或 Object store)或数据组织或分区方式
- 显示为 DataFusion
LogicalPlan
节点的树
LogicalPlan
节点
InfluxDB Clustered 逻辑计划树中的每个节点都表示 LogicalPlan
实现,它接收从查询中提取的条件,并应用关系运算符和优化来将输入数据转换为输出表。
以下是 InfluxDB 逻辑计划中使用的一些 LogicalPlan
节点。
TableScan
Tablescan
从表提供程序按引用或从上下文中检索行。
Projection
Projection
对输入评估任意表达式列表;等效于带有表达式列表的 SQL SELECT
语句。
Filter
Filter
从输入中过滤掉不满足指定表达式的行;等效于带有谓词表达式的 SQL WHERE
子句。
Sort
Sort
根据排序表达式列表对输入进行排序;用于实现 SQL ORDER BY
。
有关详细信息和 LogicalPlan
实现的列表,请参阅 DataFusion 文档中的 Enum datafusion::logical_expr::LogicalPlan
Variants。
物理计划
查询的物理计划或执行计划
- 是从逻辑计划派生的优化计划,其中包含查询执行的底层步骤。
- 考虑集群配置(例如,CPU 和内存分配)和数据组织(例如:分区、文件数以及文件是否重叠)——例如
- 如果您在具有不同配置的不同集群上使用相同的数据运行相同的查询,则每个集群可能会为该查询生成不同的物理计划。
- 如果您在同一集群上在不同时间运行相同的查询,则物理计划可能会每次都不同,具体取决于查询时的数据。
- 如果使用
ANALYZE
生成,则包括在查询执行期间采样的运行时指标 - 显示为
ExecutionPlan
节点的树
ExecutionPlan
节点
InfluxDB Clustered 物理计划中的每个节点都表示对 DataFusion ExecutionPlan
的特定实现的调用,该实现接收输入数据、查询条件表达式和输出模式。
以下是 InfluxDB 物理计划中使用的一些 ExecutionPlan
节点。
DeduplicateExec
InfluxDB DeduplicateExec
接收按 sort_key
排序的 RecordBatch
输入流,并应用 InfluxDB 特定的去重逻辑。输出取决于具有相同键的输入行的顺序。
EmptyExec
DataFusion EmptyExec
是空关系的执行计划,指示表不包含查询时间范围内的数据。
FilterExec
用于 Filter
LogicalPlan
的执行计划。
DataFusion FilterExec
针对所有输入批次评估布尔谓词,以确定要包含在输出批次中的行。
ParquetExec
DataFusion ParquetExec
扫描一个或多个 Parquet 分区。
ParquetExec
表达式
file_groups
文件组是要扫描的文件列表。文件通过路径引用
1/1/b862a7e9b.../243db601-....parquet
1/1/b862a7e9b.../f5fb7c7d-....parquet
在 InfluxDB 3 中,路径结构表示数据的组织方式。
路径具有以下结构
<namespace_id>/<table_id>/<partition_hash_id>/<uuid_of_the_file>.parquet
1 / 1 /b862a7e9b329ee6a4.../243db601-f3f1-4....parquet
namespace_id
:正在查询的命名空间(数据库)table_id
:正在查询的表(measurement)partition_hash_id
:此文件所属的分区。您可以计算分区 ID 以查找查询读取的分区数。uuid_of_the_file
:文件标识符。
ParquetExec
并行处理组,并按顺序读取每个组中的文件。
projection
projection
列出了查询计划需要读取以执行查询的表列。参数名称 projection
指的是投影下推,即过滤列的操作。
考虑以下包含多列的示例数据
h2o,state=CA,city=SF min_temp=68.4,max_temp=85.7,area=500u 600
表 | 州 | 城市 | 最低温度 | 最高温度 | 面积 | 时间 |
---|---|---|---|---|---|---|
h2o | CA | SF | 68.4 | 85.7 | 500u | 600 |
但是,以下 SQL 查询仅指定三列(city
、state
和 time
)
SELECT city, count(1)
FROM h2o
WHERE time >= to_timestamp(200) AND time < to_timestamp(700)
AND state = 'MA'
GROUP BY city
ORDER BY city ASC;
在处理查询时,Querier 在投影中指定了三个必需的列,并且投影“下推”到叶节点——未指定的列在查询执行期间尽早修剪。
projection=[city, state, time]
output_ordering
output_ordering
指定输出的排序顺序。如果输出应该排序,并且 Querier 知道顺序,则 Querier 指定 output_ordering
。
在将数据存储到 Parquet 文件时,InfluxDB 对数据进行排序以提高存储压缩率和查询效率,并且规划器尝试尽可能长时间地保留该顺序。通常,ParquetExec
接收的 output_ordering
值是存储数据的排序(或排序的子集)。
根据设计,RecordBatchesExec
数据未排序。
在以下示例中,查询规划器指定输出排序顺序 state ASC, city ASC, time ASC,
output_ordering=[state@2 ASC, city@1 ASC, time@3 ASC, __chunk_order@0 ASC]
predicate
predicate
是查询中指定的数据过滤器,用于在扫描 Parquet 文件时进行行过滤。
例如,给定以下 SQL 查询
SELECT city, count(1)
FROM h2o
WHERE time >= to_timestamp(200) AND time < to_timestamp(700)
AND state = 'MA'
GROUP BY city
ORDER BY city ASC;
predicate
值是 WHERE
语句中的布尔表达式
predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA
pruning predicate
pruning_predicate
是从 predicate
值创建的,用于从选定的分区中修剪数据和文件。
例如,给定从 SQL 解析的以下 predicate
predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA,
Querier 创建以下 pruning_predicate
pruning_predicate=time_max@0 >= 200 AND time_min@1 < 700 AND state_min@2 <= MA AND MA <= state_max@3
默认过滤器按 time
过滤文件。
在生成物理计划之前,额外的 partition pruning
步骤使用分区列上的谓词来修剪分区。
ProjectionExec
DataFusion ProjectionExec
对输入评估任意表达式列表;用于 Projection
LogicalPlan
的执行计划。
RecordBatchesExec
InfluxDB RecordBatchesExec
实现从 InfluxDB 3 Ingester 检索和扫描最近写入但尚未持久化的数据。
在生成计划时,Querier 将查询条件(例如,数据库、表和列)发送到 Ingester,以检索尚未持久化到 Parquet 文件的数据。如果 Ingester 具有满足条件的数据(块大小非零),则该计划包括 RecordBatchesExec
。
RecordBatchesExec
属性
chunks
chunks
是来自 Ingester 的数据块的数量。通常为一个 (1
),但可以有很多个。
projection
projection
指定要读取和输出的列的列表。
列列表中的 __chunk_order
是 InfluxDB 生成的列,用于保持块和文件的顺序以进行去重——例如
projection=[__chunk_order, city, state, time]
有关详细信息和其他 DataFusion ExecutionPlan
实现,请参阅 DataFusion 文档中的 Struct datafusion::datasource::physical_plan
implementors。
SortExec
用于 Sort
LogicalPlan
的执行计划。
DataFusion SortExec
支持对大于内存管理器分配的内存的数据集进行排序,方法是溢出到磁盘。
SortPreservingMergeExec
DataFusion SortPreservingMergeExec
接收输入执行计划和排序表达式列表,并且如果输入计划的每个分区都根据这些排序表达式进行排序,则生成根据这些表达式排序的单个分区。
UnionExec
DataFusion UnionExec
是 UNION ALL
执行计划,用于组合具有相同模式的多个输入。UnionExec
连接分区,并且不会在分区内或跨分区混合或复制数据。
重叠数据和去重
重叠数据是指时间范围(由时间戳表示)相交的文件或批次。如果两个数据块都包含同一时间段的数据,则它们重叠。
重叠数据示例
例如,以下数据块表示写入 InfluxDB 的 Line Protocol
// Chunk 4: stored parquet file
// - time range: 400-600
// - no duplicates in its own chunk
// - overlaps chunk 3
[
"h2o,state=CA,city=SF min_temp=68.4,max_temp=85.7,area=500u 600",
"h2o,state=CA,city=SJ min_temp=69.5,max_temp=89.2 600", // duplicates row 3 in chunk 5
"h2o,state=MA,city=Bedford max_temp=80.75,area=742u 400", // overlaps chunk 3
"h2o,state=MA,city=Boston min_temp=65.40,max_temp=82.67 400", // overlaps chunk 3
],
// Chunk 5: Ingester data
// - time range: 550-700
// - overlaps & duplicates data in chunk 4
[
"h2o,state=MA,city=Bedford max_temp=88.75,area=742u 600", // overlaps chunk 4
"h2o,state=CA,city=SF min_temp=68.4,max_temp=85.7,area=500u 650",
"h2o,state=CA,city=SJ min_temp=68.5,max_temp=90.0 600", // duplicates row 2 in chunk 4
"h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 700",
"h2o,state=MA,city=Boston min_temp=67.4 550", // overlaps chunk 4
]
Chunk 4
跨越400-600
的时间范围,表示持久化到 Object store 中 Parquet 文件的数据。Chunk 5
跨越550-700
的时间范围,表示来自 Ingester 的尚未持久化的数据。- 数据块重叠范围
550-600
。
如果数据在查询时重叠,则 Querier 必须在查询计划中包含去重过程,该过程使用与 Ingester 使用的相同的多列排序-合并运算符。与使用排序-合并运算符的摄取计划相比,查询计划更复杂,并确保数据在去重后流经计划。
由于去重中使用的排序-合并操作具有不可忽略的执行成本,因此 InfluxDB 3 尝试避免去重的需要。由于 InfluxDB 组织数据的方式,Parquet 文件永远不会包含其存储数据的重复项;只有重叠数据可能包含重复项。在压缩期间,Compactor 对存储的数据进行排序以减少重叠并优化查询性能。对于没有重叠的数据,Querier 不需要包含去重过程,并且查询计划可以进一步分配非重叠数据以进行并行处理。
DataFusion 查询计划
有关 DataFusion 查询计划和 InfluxDB 3 中使用的 DataFusion API 的更多信息,请参阅以下内容
- DataFusion 文档中的 Query Planning and Execution Overview。
- DataFusion 文档中的 Plan representations。
此页面是否对您有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一份子!我们欢迎并鼓励您对 InfluxDB Clustered 和本文档提供反馈和错误报告。要查找支持,请使用以下资源
拥有年度合同或支持合同的客户可以联系 InfluxData 支持。