文档

查询计划

查询计划是InfluxDB v3 查询器 设计和执行以计算查询结果的步骤序列。查询器使用DataFusion和Arrow构建和执行查询计划,调用DataFusion和InfluxDB特定的操作符,从对象存储收集器读取数据,并应用查询转换,如去重、过滤、聚合、合并、投影和排序,以计算最终结果。

与其他许多数据库一样,查询器 包含一个查询优化器。在解析传入查询后,查询器 构建一个 逻辑计划 - 查询所需的一系列高级步骤,如扫描、过滤和排序。接着,查询器 构建最佳 物理计划 以在最少时间内计算正确的结果。计划利用收集器的数据分区来并行化计划操作,并在执行计划之前剪枝不必要的数据。查询器还应用谓词和投影下沉等常见技术,以尽可能早地进一步剪枝数据。

显示语法

逻辑物理查询计划以树语法表示(例如,在EXPLAIN报告中)。

  • 每个计划都表示为一个由节点组成的倒置树。
  • 父节点等待其子节点的输出。
  • 数据从树的最底层内部节点流向顶部的最外层根节点

示例逻辑和物理计划

以下查询生成一个包含逻辑计划和物理计划的EXPLAIN报告

EXPLAIN SELECT city, min_temp, time FROM h2o ORDER BY city ASC, time DESC;

输出如下

图1. EXPLAIN报告

| plan_type     | plan                                                                     |
+---------------+--------------------------------------------------------------------------+
| logical_plan  | Sort: h2o.city ASC NULLS LAST, h2o.time DESC NULLS FIRST                 |
|               |   TableScan: h2o projection=[city, min_temp, time]                       |
| physical_plan | SortPreservingMergeExec: [city@0 ASC NULLS LAST,time@2 DESC]             |
|               |   UnionExec                                                              |
|               |     SortExec: expr=[city@0 ASC NULLS LAST,time@2 DESC]                   |
|               |       ParquetExec: file_groups={...}, projection=[city, min_temp, time]  |
|               |     SortExec: expr=[city@0 ASC NULLS LAST,time@2 DESC]                   |
|               |       ParquetExec: file_groups={...}, projection=[city, min_temp, time]  |
|               |                                                                          |

EXPLAIN SELECT city, min_temp, time FROM h2o ORDER BY city ASC, time DESC;的输出

图1物理计划中的叶子节点是并行的ParquetExec节点

      ParquetExec: file_groups={...}, projection=[city, min_temp, time]
...
      ParquetExec: file_groups={...}, projection=[city, min_temp, time]

数据流

物理计划节点表示对ExecutionPlan的特定实现,它接收输入流,应用过滤和排序表达式,然后将输出流发送给其父节点。

以下图表显示了图1物理计划中的数据流和ExecutionPlan节点的顺序

SortPreservingMergeExec
UnionExec
SortExec
ParquetExec
SortExec
ParquetExec

InfluxDB集群包括以下计划表达式

逻辑计划

查询的逻辑计划

  • 是一个高级计划,表达了查询的“意图”和计算结果所需的步骤。
  • 需要关于数据模式的信息
  • 与物理执行、集群配置、数据源(Ingestor或对象存储)或数据的组织或分区无关
  • 显示为DataFusion LogicalPlan节点的树

LogicalPlan节点

InfluxDB集群的逻辑计划树中的每个节点代表一个LogicalPlan实现,该实现接收从查询中提取的准则并应用关系运算符和优化,将输入数据转换为输出表。

以下是一些在InfluxDB逻辑计划中使用的LogicalPlan节点。

TableScan

Tablescan通过引用或从上下文中检索表提供者中的行。

Projection

Projection评估输入上的任意表达式列表;相当于带有表达式列表的SQL SELECT语句。

Filter

Filter过滤掉不满足指定表达式的输入行;相当于带有谓词表达式的SQL WHERE子句。

Sort

Sort根据排序表达式列表对输入进行排序;用于实现SQL ORDER BY

有关详细信息及LogicalPlan实现的列表,请参阅DataFusion文档中的Enum datafusion::logical_expr::LogicalPlan变体

物理计划

查询的物理计划或执行计划

  • 是从逻辑计划派生的优化计划,并包含查询执行的底层步骤。
  • 考虑集群配置(例如,CPU和内存分配)和数据组织(例如:分区、文件数量以及文件是否重叠)
    • 如果您在不同的配置的不同集群上使用相同的数据运行相同的查询,每个集群可能会为该查询生成不同的物理计划。
    • 如果您在不同的时间在同一集群上运行相同的查询,物理计划可能会因查询时间的数据而异。
  • 如果使用ANALYZE生成,将包括查询执行期间采样的运行时指标
  • 显示为ExecutionPlan节点的树

ExecutionPlan节点

InfluxDB集群的物理计划树中的每个节点代表对特定DataFusion ExecutionPlan实现的调用,该实现接收输入数据、查询准则表达式和输出模式。

以下是一些InfluxDB物理计划中使用的ExecutionPlan节点。

DeduplicateExec

InfluxDB的DeduplicateExec接收一个按sort_key排序的RecordBatch输入流,并应用InfluxDB特定的去重逻辑。输出依赖于具有相同键的输入行的顺序。

EmptyExec

DataFusion的EmptyExec是空关系的执行计划,表示该表不包含查询时间范围内的数据。

FilterExec

Filter LogicalPlan的执行计划。

DataFusion的FilterExec评估一个布尔谓词,针对所有输入批次以确定包含在输出批次中的行。

ParquetExec

DataFusion的ParquetExec扫描一个或多个Parquet分区。

ParquetExec表达式

file_groups

一个file group是要扫描的文件列表。文件通过路径引用

  • 1/1/b862a7e9b.../243db601-....parquet
  • 1/1/b862a7e9b.../f5fb7c7d-....parquet

InfluxDB v3中,路径结构表示数据是如何组织的。

一个路径具有以下结构

<namespace_id>/<table_id>/<partition_hash_id>/<uuid_of_the_file>.parquet
    1         /    1    /b862a7e9b329ee6a4.../243db601-f3f1-4....parquet
  • namespace_id:正在查询的命名空间(数据库)
  • table_id:正在查询的表(测量值)
  • partition_hash_id:此文件所属的分区。您可以计数分区ID以找到查询读取的分区数量。
  • uuid_of_the_file:文件标识符。

ParquetExec并行处理组并按顺序读取每个组中的文件。

projection

projection列出了查询计划需要读取以执行查询的表列。参数名projection指的是projection pushdown,即过滤列的操作。

考虑以下包含许多列的样本数据

h2o,state=CA,city=SF min_temp=68.4,max_temp=85.7,area=500u 600
tablestatecitymin_tempmax_tempareatime
h2oCASF68.485.7500u600

然而,以下SQL查询仅指定了三个列(citystatetime

SELECT city, count(1)
FROM h2o
WHERE time >= to_timestamp(200) AND time < to_timestamp(700)
  AND state = 'MA'
GROUP BY city
ORDER BY city ASC;

在处理查询时,Querier指定了三个所需的列在投影中,并将投影“推下”到叶子节点–在查询执行期间尽可能早地剪枝未指定的列。

projection=[city, state, time]
output_ordering

output_ordering指定了输出的排序顺序。如果输出应排序并且Querier知道顺序,则Querier指定output_ordering

在存储数据到Parquet文件时,InfluxDB对数据进行排序以提高存储压缩和查询效率,计划器会尽可能长时间地保留该顺序。通常,ParquetExec接收到的output_ordering值是存储数据的排序顺序(或排序顺序的子集)。

按照设计,RecordBatchesExec数据是不排序的。

在以下示例中,查询计划器指定了输出排序顺序state ASC, city ASC, time ASC,

output_ordering=[state@2 ASC, city@1 ASC, time@3 ASC, __chunk_order@0 ASC]
predicate

predicate是查询中指定的数据过滤器,用于在扫描Parquet文件时进行行过滤。

例如,给定以下SQL查询

SELECT city, count(1)
FROM h2o
WHERE time >= to_timestamp(200) AND time < to_timestamp(700)
  AND state = 'MA'
GROUP BY city
ORDER BY city ASC;

谓词值是 WHERE 语句中的布尔表达式

predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA
剪枝谓词

pruning_predicate 是从 predicate 值生成的,用于从所选分区中剪枝数据和文件。

例如,以下是从 SQL 解析出的 predicate

predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA,

查询器创建了以下 pruning_predicate

pruning_predicate=time_max@0 >= 200 AND time_min@1 < 700 AND state_min@2 <= MA AND MA <= state_max@3

默认过滤器按 时间 筛选文件。

在生成物理计划之前,一个额外的 分区剪枝 步骤使用分区列上的谓词来剪枝分区。

ProjectionExec

DataFusion ProjectionExec 评估输入上的任意表达式列表;投影 逻辑计划 的执行计划。

RecordBatchesExec

InfluxDB RecordBatchesExec 实现从 InfluxDB v3 Ingester 中检索和扫描最近写入但尚未持久化的数据。

在生成计划时,查询器 将查询标准(如数据库名称、表和列)发送到 Ingester 以检索尚未持久化到 Parquet 文件的表。如果 Ingester 有符合标准的数据(块大小非零),则计划包括 RecordBatchesExec

RecordBatchesExec 属性

是来自 Ingester 的数据块数量。通常是一个(1),但可能有多个。

projection

projection 指定要读取和输出的列列表。

列列表中的 __chunk_order 是 InfluxDB 生成的列,用于在去重时保持块和文件有序——例如

projection=[__chunk_order, city, state, time]

有关详细信息和其他 DataFusion 执行计划 实现,请参阅 DataFusion 文档中的 datafusion::datasource::physical_plan 结构实现者

SortExec

排序 逻辑计划 的执行计划。

DataFusion SortExec 通过将数据溢出到磁盘来支持对大于内存管理器分配的内存的数据集进行排序。

SortPreservingMergeExec

DataFusion SortPreservingMergeExec 接收输入执行计划和排序表达式列表,如果输入计划的每个分区都按这些排序表达式排序,则输出一个按这些表达式排序的单个分区。

UnionExec

DataFusion UnionExec 是将具有相同模式的多个输入合并的 UNION ALL 执行计划。UnionExec 连接分区,不混合或复制分区内的数据或跨分区复制数据。

重叠数据与去重

重叠数据 指的是时间范围(由时间戳表示)相交的文件或批次。如果两个 的数据包含相同时间段的数据,则这两个数据块重叠。

重叠数据的示例

例如,以下块表示写入 InfluxDB 的行协议

// Chunk 4: stored parquet file
// - time range: 400-600
// - no duplicates in its own chunk
// - overlaps chunk 3
[
 "h2o,state=CA,city=SF min_temp=68.4,max_temp=85.7,area=500u 600",
 "h2o,state=CA,city=SJ min_temp=69.5,max_temp=89.2 600",  // duplicates row 3 in chunk 5
 "h2o,state=MA,city=Bedford max_temp=80.75,area=742u 400", // overlaps chunk 3
 "h2o,state=MA,city=Boston min_temp=65.40,max_temp=82.67 400", // overlaps chunk 3
],

// Chunk 5: Ingester data
// - time range: 550-700
// - overlaps & duplicates data in chunk 4
[
"h2o,state=MA,city=Bedford max_temp=88.75,area=742u 600", // overlaps chunk 4
"h2o,state=CA,city=SF min_temp=68.4,max_temp=85.7,area=500u 650",
"h2o,state=CA,city=SJ min_temp=68.5,max_temp=90.0 600", // duplicates row 2 in chunk 4
"h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 700",
"h2o,state=MA,city=Boston min_temp=67.4 550", // overlaps chunk 4
]
  • 块4跨越时间范围400-600,表示保存在对象存储中的Parquet文件中的数据。
  • 块5跨越时间范围550-700,表示来自Ingestor尚未持久化的数据。
  • 这些块重叠在范围550-600

如果在查询时间数据重叠,则查询器必须在查询计划中包含去重过程,该过程使用与Ingestor相同的列排序合并操作符。与使用排序合并操作符的摄取计划相比,查询计划更复杂,并确保在去重后数据流经计划。

由于去重中使用的排序合并操作具有非平凡的执行成本,InfluxDB v3试图避免去重的需要。由于InfluxDB组织数据的方式,Parquet文件永远不会包含其存储数据的重复项;只有重叠的数据可以包含重复项。在压缩过程中,压缩器对存储的数据进行排序以减少重叠并优化查询性能。对于没有重叠的数据,查询器不需要包含去重过程,查询计划可以进一步分配非重叠数据以进行并行处理。

DataFusion查询计划

有关InfluxDB v3中DataFusion查询计划和DataFusion API的更多信息,请参阅以下内容


这个页面有帮助吗?

感谢您的反馈!


Flux的未来

Flux将进入维护模式。您可以继续像现在一样使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB v3增强功能和InfluxDB Clustered现在已公开发布

包括更快查询性能和管理工具在内的新功能推进了InfluxDB v3产品线。InfluxDB Clustered现在已公开发布。

InfluxDB v3 性能与特性

InfluxDB v3 产品线在查询性能方面取得了显著提升,并提供了新的管理工具。这些增强包括用于监控 InfluxDB 集群健康状态的运维仪表板、InfluxDB 云专享版中的单点登录(SSO)支持,以及用于令牌和数据库的新管理 API。

了解 v3 的新增强功能


InfluxDB 集群版正式发布

InfluxDB 集群版现已正式发布,您可以在自管理的堆栈中使用 InfluxDB v3 的强大功能。

与我们讨论 InfluxDB 集群版