文档文档

使用 Flux 转换数据

从 InfluxDB 查询数据时,你通常需要以某种方式转换这些数据。常见的例子包括聚合数据、数据降采样等。

本指南演示了如何使用 Flux 函数来转换你的数据。它将引导你创建一个 Flux 脚本,该脚本将数据分区到时间窗口中,计算每个窗口中 _value 的平均值,并将平均值作为新表输出。

如果你不熟悉 Flux 如何构造和操作数据,请参阅 Flux 数据模型

查询数据

使用之前 从 InfluxDB 查询数据 指南中构建的查询,但更新范围以从最近一小时拉取数据

from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")

Flux 函数

Flux 提供了许多执行特定操作、转换和任务的函数。你也可以在你的 Flux 查询中创建自定义函数函数在 Flux 标准库 文档中详细介绍。

从 InfluxDB 查询数据时,常用的一种函数是聚合函数。聚合函数接受表中的一组 _value,对其进行聚合,并将其转换为新值。

此示例使用 mean() 函数来计算每个时间窗口内值的平均值。

以下示例将逐步介绍窗口化和聚合数据所需的步骤,但有一个 aggregateWindow() 辅助函数可以为你执行此操作。了解过程中的步骤是很有好处的。

窗口化你的数据

Flux 的 window() 函数基于时间值对记录进行分区。使用 every 参数定义每个窗口的持续时间。

日历月和年

every 支持所有 有效的持续时间单位,包括日历月 (1mo)年 (1y)

对于此示例,以五分钟间隔 (5m) 对数据进行窗口化。

from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")
    |> window(every: 5m)

当数据被收集到时间窗口中时,每个窗口都作为自己的表输出。可视化时,每个表都被分配一个独特的颜色。

Windowed data tables

聚合窗口化数据

Flux 聚合函数接受每个表中的 _value 并以某种方式聚合它们。使用 mean() 函数来计算每个表的 _value 的平均值。

from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")
    |> window(every: 5m)
    |> mean()

当每个窗口中的行被聚合时,它们的输出表仅包含一行,其中包含聚合值。窗口化表仍然是独立的,并且在可视化时,将显示为单个、不连接的点。

Windowed aggregate data

向你的聚合添加时间

当值被聚合时,结果表没有 _time 列,因为用于聚合的记录都具有不同的时间戳。聚合函数不会推断应该为聚合值使用什么时间。因此,_time 列被删除。

下一个操作中需要 _time 列。要添加一个,请使用 duplicate() 函数将每个窗口化表的 _stop 列复制为 _time 列。

from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")
    |> window(every: 5m)
    |> mean()
    |> duplicate(column: "_stop", as: "_time")

取消窗口化聚合表

使用 window() 函数和 every: inf 参数将所有点收集到一个无限窗口中。

from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")
    |> window(every: 5m)
    |> mean()
    |> duplicate(column: "_stop", as: "_time")
    |> window(every: inf)

一旦取消分组并组合成单个表,聚合数据点将在你的可视化中显示为连接的点。

Unwindowed aggregate data

辅助函数

仅仅为了构建一个聚合数据的查询,这似乎需要大量的编码,但是经历这个过程有助于理解数据在通过每个函数时如何改变“形状”。

Flux 提供了(并允许你创建)“辅助”函数,这些函数抽象了许多这些步骤。使用 aggregateWindow() 可以完成本指南中执行的相同操作。

from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")
    |> aggregateWindow(every: 5m, fn: mean)

恭喜你!

你现在已经构建了一个 Flux 查询,该查询使用 Flux 函数来转换你的数据。还有许多其他方法可以使用 Flux 的原始函数和你自己的自定义函数来操作你的数据,但这对于基本语法和查询结构来说是一个很好的介绍。


要更深入地了解窗口化和聚合数据,以及每次转换的示例数据输出,请查看 窗口化和聚合数据 指南。



此页内容对您有帮助吗?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在这样使用它,而无需对您的代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速、最近数据引擎,可实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 构建在 Core 的基础上,增加了高可用性、读取副本、增强的安全性以及数据压缩,以实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看