文档文档

使用 Flux 转换数据

此页面记录了早期版本的 InfluxDB OSS。InfluxDB OSS v2 是最新的稳定版本。请参阅等效的 InfluxDB v2 文档: 使用 Flux 转换数据

从 InfluxDB 查询数据时,您通常需要以某种方式转换数据。常见的示例包括将数据聚合为平均值、下采样数据等。

本指南演示了如何使用 Flux 函数来转换您的数据。它将引导您创建一个 Flux 脚本,该脚本将数据划分为时间窗口,计算每个窗口中 _value 的平均值,并将平均值输出为新表。

重要的是要了解数据的“形状”如何通过每个操作发生变化。

查询数据

使用在上一个从 InfluxDB 查询数据指南中构建的查询,但更新范围以从最近一小时拉取数据

from(bucket:"telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r._field == "usage_system" and
    r.cpu == "cpu-total"
  )

Flux 函数

Flux 提供了许多执行特定操作、转换和任务的函数。您还可以在 Flux 查询中创建自定义函数函数在 Flux 函数文档中详细介绍。

从 InfluxDB 查询数据时,常用的一种函数是聚合函数。聚合函数获取表中的一组 _value,对其进行聚合,并将其转换为新值。

此示例使用 mean() 函数来计算时间窗口内值的平均值。

以下示例逐步介绍了窗口化和聚合数据所需的步骤,但有一个 aggregateWindow() 辅助函数可以为您完成此操作。了解此过程中的步骤是有好处的。

窗口化您的数据

Flux 的 window() 函数根据时间值对记录进行分区。使用 every 参数为每个窗口定义时间长度。

日历月和年

every 支持所有有效的持续时间单位,包括日历月 (1mo)年 (1y)

对于此示例,以五分钟间隔 (5m) 窗口化数据。

from(bucket:"telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r._field == "usage_system" and
    r.cpu == "cpu-total"
  )
  |> window(every: 5m)

当数据被收集到时间窗口中时,每个窗口都作为其自身的表输出。可视化时,每个表都会被分配一个唯一的颜色。

Windowed data tables

聚合窗口化数据

Flux 聚合函数获取每个表中的 _value,并以某种方式聚合它们。使用 mean() 函数来计算每个表的 _value 的平均值。

from(bucket:"telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r._field == "usage_system" and
    r.cpu == "cpu-total"
  )
  |> window(every: 5m)
  |> mean()

当每个窗口中的行被聚合时,它们的输出表仅包含一行,其中包含聚合值。窗口化表仍然是独立的,并且在可视化时,将显示为单个、未连接的点。

Windowed aggregate data

为您的聚合添加时间

当值被聚合时,结果表没有 _time 列,因为用于聚合的记录都具有不同的时间戳。聚合函数不会推断聚合值应使用哪个时间。因此,_time 列被删除。

下一步操作中需要 _time 列。要添加一个,请使用 duplicate() 函数_stop 列复制为每个窗口化表的 _time 列。

from(bucket:"telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r._field == "usage_system" and
    r.cpu == "cpu-total"
  )
  |> window(every: 5m)
  |> mean()
  |> duplicate(column: "_stop", as: "_time")

取消窗口化聚合表

使用 every: inf 参数的 window() 函数将所有点收集到单个无限窗口中。

from(bucket:"telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r._field == "usage_system" and
    r.cpu == "cpu-total"
  )
  |> window(every: 5m)
  |> mean()
  |> duplicate(column: "_stop", as: "_time")
  |> window(every: inf)

一旦取消分组并合并到单个表中,聚合数据点将在您的可视化中显示为已连接。

Unwindowed aggregate data

辅助函数

仅仅为了构建一个聚合数据的查询,这似乎需要大量的编码,但是,经历这个过程有助于理解数据在通过每个函数时如何改变“形状”。

Flux 提供(并允许您创建)“辅助”函数,这些函数抽象了许多这些步骤。使用 aggregateWindow() 函数可以完成本指南中执行的相同操作。

from(bucket:"telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r._field == "usage_system" and
    r.cpu == "cpu-total"
  )
  |> aggregateWindow(every: 5m, fn: mean)

恭喜!

您现在已经构建了一个 Flux 查询,该查询使用 Flux 函数来转换您的数据。还有许多其他方法可以使用 Flux 的原始函数和您自己的自定义函数来操作您的数据,但这对于基本语法和查询结构来说是一个很好的介绍。


要更深入地了解窗口化和聚合数据,并查看每次转换的示例数据输出,请查看窗口化和聚合数据指南。



此页对您有帮助吗?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在一样使用它,而无需对您的代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

启动快速。扩展更快。

获取更新

InfluxDB 3 Core 是一个开源、高速、最近数据引擎,可实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 构建在 Core 的基础上,增加了高可用性、读取副本、增强的安全性以及数据压缩,以实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层级可供非商业家庭或业余爱好者使用。

有关更多信息,请查看