文档文档

使用 Flux 转换数据

本页介绍的是早期版本的 InfluxDB OSS。InfluxDB OSS v2 是最新的稳定版本。请参阅等效的 InfluxDB v2 文档: 使用 Flux 转换数据

从 InfluxDB 查询数据时,您通常需要以某种方式转换数据。常见的例子包括将数据聚合为平均值、降采样数据等。

本指南演示了如何使用 Flux 函数来转换您的数据。它将引导您创建一个 Flux 脚本,该脚本将数据划分为时间窗口,计算每个窗口中 _value 的平均值,并将平均值输出为新表。

重要的是要理解您的数据“形状”如何在每个操作中发生变化。

查询数据

使用上一个从 InfluxDB 查询数据指南中构建的查询,但更新范围以从过去一小时拉取数据

from(bucket:"telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r._field == "usage_system" and
    r.cpu == "cpu-total"
  )

Flux 函数

Flux 提供了许多函数,用于执行特定的操作、转换和任务。您还可以在 Flux 查询中创建自定义函数函数在 Flux 函数文档中详细介绍。

从 InfluxDB 查询数据进行转换时,常用的一种函数是聚合函数。聚合函数接受表中一组 _value,对其进行聚合,并将其转换为新值。

此示例使用 mean() 函数 来平均时间窗口内的值。

以下示例逐步介绍了窗口化和聚合数据所需的步骤,但有一个 aggregateWindow() 辅助函数 可以为您完成此操作。了解过程中的步骤是很有好处的。

窗口化您的数据

Flux 的 window() 函数 根据时间值对记录进行分区。使用 every 参数定义每个窗口的时间持续时间。

日历月和年

every 支持所有有效的持续时间单位,包括日历月 (1mo)年 (1y)

对于此示例,以五分钟间隔 (5m) 窗口化数据。

from(bucket:"telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r._field == "usage_system" and
    r.cpu == "cpu-total"
  )
  |> window(every: 5m)

当数据被收集到时间窗口中时,每个窗口都作为自己的表输出。可视化时,每个表都会被分配一个唯一的颜色。

Windowed data tables

聚合窗口化数据

Flux 聚合函数接受每个表中的 _value 并以某种方式聚合它们。使用 mean() 函数 来平均每个表的 _value

from(bucket:"telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r._field == "usage_system" and
    r.cpu == "cpu-total"
  )
  |> window(every: 5m)
  |> mean()

当每个窗口中的行被聚合时,它们的输出表仅包含一个带有聚合值的单行。窗口化的表仍然是分开的,并且在可视化时,将显示为单个、不连接的点。

Windowed aggregate data

为您的聚合添加时间

当值被聚合时,结果表没有 _time 列,因为用于聚合的记录都具有不同的时间戳。聚合函数不会推断聚合值应使用哪个时间。因此,_time 列被删除。

下一个操作中需要 _time 列。要添加一个,请使用 duplicate() 函数_stop 列复制为每个窗口化表的 _time 列。

from(bucket:"telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r._field == "usage_system" and
    r.cpu == "cpu-total"
  )
  |> window(every: 5m)
  |> mean()
  |> duplicate(column: "_stop", as: "_time")

取消窗口化聚合表

window() 函数与 every: inf 参数一起使用,以将所有点收集到单个无限窗口中。

from(bucket:"telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r._field == "usage_system" and
    r.cpu == "cpu-total"
  )
  |> window(every: 5m)
  |> mean()
  |> duplicate(column: "_stop", as: "_time")
  |> window(every: inf)

一旦取消分组并组合到单个表中,聚合数据点将在您的可视化中显示为连接状态。

Unwindowed aggregate data

辅助函数

构建一个聚合数据的查询似乎需要大量编码,但是,经历这个过程有助于理解数据在通过每个函数时如何改变“形状”。

Flux 提供(并允许您创建)“辅助”函数,这些函数抽象了许多步骤。本指南中执行的相同操作可以使用 aggregateWindow() 函数 完成。

from(bucket:"telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r._field == "usage_system" and
    r.cpu == "cpu-total"
  )
  |> aggregateWindow(every: 5m, fn: mean)

恭喜!

您现在已经构建了一个 Flux 查询,该查询使用 Flux 函数来转换您的数据。使用 Flux 的原始函数和您自己的自定义函数,还有更多方法可以操作您的数据,但这很好地介绍了基本语法和查询结构。


要更深入地了解窗口化和聚合数据,并查看每次转换的示例数据输出,请查看窗口化和聚合数据指南。



此页面对您有帮助吗?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在这样使用它,而无需对代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开发布 Alpha 版

InfluxDB 3 开源版本现已可用于 Alpha 测试,根据 MIT 或 Apache 2 许可获得许可。

我们正在发布两个产品作为 Alpha 版本的一部分。

InfluxDB 3 Core 是我们新的开源产品。它是用于时间序列和事件数据的最新数据引擎。InfluxDB 3 Enterprise 是一个商业版本,它建立在 Core 的基础上,增加了历史查询功能、读取副本、高可用性、可扩展性和细粒度的安全性。

有关如何开始使用的更多信息,请查看