文档文档

优化 Flux 查询

优化您的 Flux 查询以减少其内存和计算 (CPU) 需求。

使用下推启动查询

下推 是将数据操作推送到底层数据源而不是在内存中操作数据的函数或函数组合。使用下推启动查询以提高查询性能。一旦非下推函数运行,Flux 会将数据拉入内存并在那里运行所有后续操作。

下推函数和函数组合

在查询 InfluxDB 2.7 或 InfluxDB Cloud 数据源时,大多数下推都受支持。如下表所示,在 InfluxDB 2.7 中不支持少数下推。

函数InfluxDB 2.7InfluxDB Cloud
count()
duplicate()
filter() *
fill()
first()
last()
max()
mean()
min()
range()
rename()
sum()
window()
函数组合
group() |> count()
group() |> first()
group() |> last()
group() |> max()
group() |> min()
group() |> sum()
sort() |> limit()
window() |> count()
window() |> first()
window() |> last()
window() |> max()
window() |> min()
window() |> sum()

* filter() 仅在所有参数值都是静态时才下推。请参阅 避免内联处理过滤器

在查询的开头使用下推函数和函数组合。一旦非下推函数运行,Flux 会将数据拉入内存并在那里运行所有后续操作。

正在使用的下推函数
from(bucket: "example-bucket")
    |> range(start: -1h)                       //
    |> filter(fn: (r) => r.sensor == "abc123") //
    |> group(columns: ["_field", "host"])      // Pushed to the data source
    |> aggregateWindow(every: 5m, fn: max)     //
    |> filter(fn: (r) => r._value >= 90.0)     //

    |> top(n: 10)                              // Run in memory

避免内联处理过滤器

避免使用数学运算或字符串操作内联定义数据过滤器。内联处理过滤器值会阻止 filter() 将其操作下推到底层数据源,因此先前函数返回的数据会加载到内存中。这通常会导致显着的性能损失。

例如,以下查询使用 仪表板变量 和字符串连接来定义要过滤的区域。由于 filter() 内联使用字符串连接,因此无法将其操作下推到底层数据源,并将从 range() 返回的所有数据加载到内存中。

from(bucket: "example-bucket")
    |> range(start: -1h)                      
    |> filter(fn: (r) => r.region == v.provider + v.region)

要动态设置过滤器并保持 filter() 函数的下推能力,请使用变量在 filter() 外部定义过滤器值

region = v.provider + v.region

from(bucket: "example-bucket")
    |> range(start: -1h)                      
    |> filter(fn: (r) => r.region == region)

避免过短的窗口持续时间

窗口化(基于时间间隔对数据进行分组)通常用于聚合和降采样数据。通过避免过短的窗口持续时间来提高性能。更多窗口需要更多计算能力来评估应将每行分配给哪个窗口。合理的窗口持续时间取决于查询的总时间范围。

谨慎使用“重型”函数

以下函数比其他函数使用更多的内存或 CPU。在使用它们之前,请考虑它们在数据处理中的必要性

我们正在不断优化 Flux,此列表可能无法代表其当前状态。

尽可能使用 set() 而不是 map()

set()experimental.set()map 都可以设置数据中的列值,但是 set 函数比 map() 具有性能优势。

使用以下准则来确定使用哪个

  • 如果将列值设置为预定义的静态值,请使用 set()experimental.set()
  • 如果使用现有行数据动态设置列值,请使用 map()

将列值设置为静态值

以下查询在功能上是相同的,但使用 set() 比使用 map() 性能更高。

data
    |> map(fn: (r) => ({ r with foo: "bar" }))

// Recommended
data
    |> set(key: "foo", value: "bar")

使用现有行数据动态设置列值

data
    |> map(fn: (r) => ({ r with foo: r.bar }))

平衡时间范围和数据精度

为确保查询性能,请平衡时间范围和数据精度。例如,如果您查询每秒存储的数据并请求六个月的数据,则结果将包括每个序列约 1550 万个点。根据 filter() (基数) 后返回的序列数,这可能会很快变成数十亿个点。Flux 必须将这些点存储在内存中才能生成响应。使用 下推 来优化内存中存储的点数。

要查询长时间段的数据,请创建一个任务来 降采样数据,然后查询降采样后的数据。

使用 Flux 性能分析器测量查询性能

使用 Flux Profiler 包 来测量查询性能并将性能指标附加到查询输出。以下 Flux 性能分析器可用

  • query:提供有关整个 Flux 脚本执行情况的统计信息。
  • operator:提供有关查询中每个操作的统计信息。

导入 profiler 包并使用 profile.enabledProfilers 选项启用性能分析器。

import "profiler"

option profiler.enabledProfilers = ["query", "operator"]

// Query to profile

有关 Flux 性能分析器的更多信息,请参阅 Flux Profiler 包


此页内容对您有帮助吗?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在一样使用它,而无需对代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速、近实时数据引擎,可实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 以 Core 的基础为构建,增加了高可用性、读取副本、增强的安全性以及数据压缩,从而实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看