查询计划
查询计划是InfluxDB v3 查询器 设计和执行以计算查询结果的步骤序列。查询器使用DataFusion和Arrow构建和执行查询计划,调用DataFusion和InfluxDB特定的操作符,从对象存储和收集器读取数据,并应用查询转换,如去重、过滤、聚合、合并、投影和排序,以计算最终结果。
与其他许多数据库一样,查询器 包含一个查询优化器。在解析传入查询后,查询器 构建一个 逻辑计划 - 查询所需的一系列高级步骤,如扫描、过滤和排序。接着,查询器 构建最佳 物理计划 以在最少时间内计算正确的结果。计划利用收集器的数据分区来并行化计划操作,并在执行计划之前剪枝不必要的数据。查询器还应用谓词和投影下沉等常见技术,以尽可能早地进一步剪枝数据。
显示语法
逻辑和物理查询计划以树语法表示(例如,在EXPLAIN
报告中)。
- 每个计划都表示为一个由节点组成的倒置树。
- 父节点等待其子节点的输出。
- 数据从树的最底层内部节点流向顶部的最外层根节点。
示例逻辑和物理计划
以下查询生成一个包含逻辑计划和物理计划的EXPLAIN
报告
EXPLAIN SELECT city, min_temp, time FROM h2o ORDER BY city ASC, time DESC;
输出如下
图1. EXPLAIN报告
| plan_type | plan |
+---------------+--------------------------------------------------------------------------+
| logical_plan | Sort: h2o.city ASC NULLS LAST, h2o.time DESC NULLS FIRST |
| | TableScan: h2o projection=[city, min_temp, time] |
| physical_plan | SortPreservingMergeExec: [city@0 ASC NULLS LAST,time@2 DESC] |
| | UnionExec |
| | SortExec: expr=[city@0 ASC NULLS LAST,time@2 DESC] |
| | ParquetExec: file_groups={...}, projection=[city, min_temp, time] |
| | SortExec: expr=[city@0 ASC NULLS LAST,time@2 DESC] |
| | ParquetExec: file_groups={...}, projection=[city, min_temp, time] |
| | |
EXPLAIN SELECT city, min_temp, time FROM h2o ORDER BY city ASC, time DESC;
的输出
图1物理计划中的叶子节点是并行的ParquetExec
节点
ParquetExec: file_groups={...}, projection=[city, min_temp, time]
...
ParquetExec: file_groups={...}, projection=[city, min_temp, time]
数据流
物理计划节点表示对ExecutionPlan
的特定实现,它接收输入流,应用过滤和排序表达式,然后将输出流发送给其父节点。
以下图表显示了图1物理计划中的数据流和ExecutionPlan
节点的顺序
SortPreservingMergeExec
UnionExec
SortExec
ParquetExec
SortExec
ParquetExec
InfluxDB集群包括以下计划表达式
逻辑计划
查询的逻辑计划
- 是一个高级计划,表达了查询的“意图”和计算结果所需的步骤。
- 需要关于数据模式的信息
- 与物理执行、集群配置、数据源(Ingestor或对象存储)或数据的组织或分区无关
- 显示为DataFusion
LogicalPlan
节点的树
LogicalPlan
节点
InfluxDB集群的逻辑计划树中的每个节点代表一个LogicalPlan
实现,该实现接收从查询中提取的准则并应用关系运算符和优化,将输入数据转换为输出表。
以下是一些在InfluxDB逻辑计划中使用的LogicalPlan
节点。
TableScan
Tablescan
通过引用或从上下文中检索表提供者中的行。
Projection
Projection
评估输入上的任意表达式列表;相当于带有表达式列表的SQL SELECT
语句。
Filter
Filter
过滤掉不满足指定表达式的输入行;相当于带有谓词表达式的SQL WHERE
子句。
Sort
Sort
根据排序表达式列表对输入进行排序;用于实现SQL ORDER BY
。
有关详细信息及LogicalPlan
实现的列表,请参阅DataFusion文档中的Enum datafusion::logical_expr::LogicalPlan
变体。
物理计划
查询的物理计划或执行计划
- 是从逻辑计划派生的优化计划,并包含查询执行的底层步骤。
- 考虑集群配置(例如,CPU和内存分配)和数据组织(例如:分区、文件数量以及文件是否重叠)
- 如果您在不同的配置的不同集群上使用相同的数据运行相同的查询,每个集群可能会为该查询生成不同的物理计划。
- 如果您在不同的时间在同一集群上运行相同的查询,物理计划可能会因查询时间的数据而异。
- 如果使用
ANALYZE
生成,将包括查询执行期间采样的运行时指标 - 显示为
ExecutionPlan
节点的树
ExecutionPlan
节点
InfluxDB集群的物理计划树中的每个节点代表对特定DataFusion ExecutionPlan
实现的调用,该实现接收输入数据、查询准则表达式和输出模式。
以下是一些InfluxDB物理计划中使用的ExecutionPlan
节点。
DeduplicateExec
InfluxDB的DeduplicateExec
接收一个按sort_key
排序的RecordBatch
输入流,并应用InfluxDB特定的去重逻辑。输出依赖于具有相同键的输入行的顺序。
EmptyExec
DataFusion的EmptyExec
是空关系的执行计划,表示该表不包含查询时间范围内的数据。
FilterExec
Filter
LogicalPlan
的执行计划。
DataFusion的FilterExec
评估一个布尔谓词,针对所有输入批次以确定包含在输出批次中的行。
ParquetExec
DataFusion的ParquetExec
扫描一个或多个Parquet分区。
ParquetExec
表达式
file_groups
一个file group是要扫描的文件列表。文件通过路径引用
1/1/b862a7e9b.../243db601-....parquet
1/1/b862a7e9b.../f5fb7c7d-....parquet
InfluxDB v3中,路径结构表示数据是如何组织的。
一个路径具有以下结构
<namespace_id>/<table_id>/<partition_hash_id>/<uuid_of_the_file>.parquet
1 / 1 /b862a7e9b329ee6a4.../243db601-f3f1-4....parquet
namespace_id
:正在查询的命名空间(数据库)table_id
:正在查询的表(测量值)partition_hash_id
:此文件所属的分区。您可以计数分区ID以找到查询读取的分区数量。uuid_of_the_file
:文件标识符。
ParquetExec
并行处理组并按顺序读取每个组中的文件。
projection
projection
列出了查询计划需要读取以执行查询的表列。参数名projection
指的是projection pushdown,即过滤列的操作。
考虑以下包含许多列的样本数据
h2o,state=CA,city=SF min_temp=68.4,max_temp=85.7,area=500u 600
table | state | city | min_temp | max_temp | area | time |
---|---|---|---|---|---|---|
h2o | CA | SF | 68.4 | 85.7 | 500u | 600 |
然而,以下SQL查询仅指定了三个列(city
、state
和time
)
SELECT city, count(1)
FROM h2o
WHERE time >= to_timestamp(200) AND time < to_timestamp(700)
AND state = 'MA'
GROUP BY city
ORDER BY city ASC;
在处理查询时,Querier指定了三个所需的列在投影中,并将投影“推下”到叶子节点–在查询执行期间尽可能早地剪枝未指定的列。
projection=[city, state, time]
output_ordering
output_ordering
指定了输出的排序顺序。如果输出应排序并且Querier知道顺序,则Querier指定output_ordering
。
在存储数据到Parquet文件时,InfluxDB对数据进行排序以提高存储压缩和查询效率,计划器会尽可能长时间地保留该顺序。通常,ParquetExec
接收到的output_ordering
值是存储数据的排序顺序(或排序顺序的子集)。
按照设计,RecordBatchesExec
数据是不排序的。
在以下示例中,查询计划器指定了输出排序顺序state ASC, city ASC, time ASC,
output_ordering=[state@2 ASC, city@1 ASC, time@3 ASC, __chunk_order@0 ASC]
predicate
predicate
是查询中指定的数据过滤器,用于在扫描Parquet文件时进行行过滤。
例如,给定以下SQL查询
SELECT city, count(1)
FROM h2o
WHERE time >= to_timestamp(200) AND time < to_timestamp(700)
AND state = 'MA'
GROUP BY city
ORDER BY city ASC;
谓词值是 WHERE
语句中的布尔表达式
predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA
剪枝谓词
pruning_predicate
是从 predicate
值生成的,用于从所选分区中剪枝数据和文件。
例如,以下是从 SQL 解析出的 predicate
predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA,
查询器创建了以下 pruning_predicate
pruning_predicate=time_max@0 >= 200 AND time_min@1 < 700 AND state_min@2 <= MA AND MA <= state_max@3
默认过滤器按 时间
筛选文件。
在生成物理计划之前,一个额外的 分区剪枝
步骤使用分区列上的谓词来剪枝分区。
ProjectionExec
DataFusion ProjectionExec
评估输入上的任意表达式列表;投影
逻辑计划
的执行计划。
RecordBatchesExec
InfluxDB RecordBatchesExec
实现从 InfluxDB v3 Ingester 中检索和扫描最近写入但尚未持久化的数据。
在生成计划时,查询器 将查询标准(如数据库名称、表和列)发送到 Ingester 以检索尚未持久化到 Parquet 文件的表。如果 Ingester 有符合标准的数据(块大小非零),则计划包括 RecordBatchesExec
。
RecordBatchesExec
属性
块
块
是来自 Ingester 的数据块数量。通常是一个(1
),但可能有多个。
projection
projection
指定要读取和输出的列列表。
列列表中的 __chunk_order
是 InfluxDB 生成的列,用于在去重时保持块和文件有序——例如
projection=[__chunk_order, city, state, time]
有关详细信息和其他 DataFusion 执行计划
实现,请参阅 DataFusion 文档中的 datafusion::datasource::physical_plan
结构实现者。
SortExec
排序
逻辑计划
的执行计划。
DataFusion SortExec
通过将数据溢出到磁盘来支持对大于内存管理器分配的内存的数据集进行排序。
SortPreservingMergeExec
DataFusion SortPreservingMergeExec
接收输入执行计划和排序表达式列表,如果输入计划的每个分区都按这些排序表达式排序,则输出一个按这些表达式排序的单个分区。
UnionExec
DataFusion UnionExec
是将具有相同模式的多个输入合并的 UNION ALL
执行计划。UnionExec
连接分区,不混合或复制分区内的数据或跨分区复制数据。
重叠数据与去重
重叠数据 指的是时间范围(由时间戳表示)相交的文件或批次。如果两个 块 的数据包含相同时间段的数据,则这两个数据块重叠。
重叠数据的示例
例如,以下块表示写入 InfluxDB 的行协议
// Chunk 4: stored parquet file
// - time range: 400-600
// - no duplicates in its own chunk
// - overlaps chunk 3
[
"h2o,state=CA,city=SF min_temp=68.4,max_temp=85.7,area=500u 600",
"h2o,state=CA,city=SJ min_temp=69.5,max_temp=89.2 600", // duplicates row 3 in chunk 5
"h2o,state=MA,city=Bedford max_temp=80.75,area=742u 400", // overlaps chunk 3
"h2o,state=MA,city=Boston min_temp=65.40,max_temp=82.67 400", // overlaps chunk 3
],
// Chunk 5: Ingester data
// - time range: 550-700
// - overlaps & duplicates data in chunk 4
[
"h2o,state=MA,city=Bedford max_temp=88.75,area=742u 600", // overlaps chunk 4
"h2o,state=CA,city=SF min_temp=68.4,max_temp=85.7,area=500u 650",
"h2o,state=CA,city=SJ min_temp=68.5,max_temp=90.0 600", // duplicates row 2 in chunk 4
"h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 700",
"h2o,state=MA,city=Boston min_temp=67.4 550", // overlaps chunk 4
]
如果在查询时间数据重叠,则查询器必须在查询计划中包含去重过程,该过程使用与Ingestor相同的列排序合并操作符。与使用排序合并操作符的摄取计划相比,查询计划更复杂,并确保在去重后数据流经计划。
由于去重中使用的排序合并操作具有非平凡的执行成本,InfluxDB v3试图避免去重的需要。由于InfluxDB组织数据的方式,Parquet文件永远不会包含其存储数据的重复项;只有重叠的数据可以包含重复项。在压缩过程中,压缩器对存储的数据进行排序以减少重叠并优化查询性能。对于没有重叠的数据,查询器不需要包含去重过程,查询计划可以进一步分配非重叠数据以进行并行处理。
DataFusion查询计划
有关InfluxDB v3中DataFusion查询计划和DataFusion API的更多信息,请参阅以下内容
这个页面有帮助吗?
感谢您的反馈!