使用 Flux 转换数据
本页介绍的是早期版本的 InfluxDB OSS。InfluxDB OSS v2 是最新的稳定版本。请参阅等效的 InfluxDB v2 文档: 使用 Flux 转换数据。
当从 InfluxDB 查询数据时,您通常需要以某种方式转换数据。常见的例子包括将数据聚合为平均值、降采样数据等。
本指南演示了如何使用 Flux 函数来转换您的数据。它将引导您创建一个 Flux 脚本,该脚本将数据划分为时间窗口,计算每个窗口中 _value
的平均值,并将平均值输出为新表。
重要的是要理解您的数据“形状”如何在每个操作中发生变化。
查询数据
使用上一个从 InfluxDB 查询数据指南中构建的查询,但更新范围以从过去一小时拉取数据
from(bucket:"telegraf/autogen")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage_system" and
r.cpu == "cpu-total"
)
Flux 函数
Flux 提供了许多函数,用于执行特定的操作、转换和任务。您还可以在 Flux 查询中创建自定义函数。函数在 Flux 函数文档中详细介绍。
从 InfluxDB 查询数据进行转换时,常用的一种函数是聚合函数。聚合函数接受表中一组 _value
,对其进行聚合,并将其转换为新值。
此示例使用 mean()
函数 来平均时间窗口内的值。
以下示例逐步介绍了窗口化和聚合数据所需的步骤,但有一个
aggregateWindow()
辅助函数 可以为您完成此操作。了解过程中的步骤是很有好处的。
窗口化您的数据
Flux 的 window()
函数 根据时间值对记录进行分区。使用 every
参数定义每个窗口的时间持续时间。
日历月和年
every
支持所有有效的持续时间单位,包括日历月 (1mo
) 和年 (1y
)。
对于此示例,以五分钟间隔 (5m
) 窗口化数据。
from(bucket:"telegraf/autogen")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage_system" and
r.cpu == "cpu-total"
)
|> window(every: 5m)
当数据被收集到时间窗口中时,每个窗口都作为自己的表输出。可视化时,每个表都会被分配一个唯一的颜色。
聚合窗口化数据
Flux 聚合函数接受每个表中的 _value
并以某种方式聚合它们。使用 mean()
函数 来平均每个表的 _value
。
from(bucket:"telegraf/autogen")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage_system" and
r.cpu == "cpu-total"
)
|> window(every: 5m)
|> mean()
当每个窗口中的行被聚合时,它们的输出表仅包含一个带有聚合值的单行。窗口化的表仍然是分开的,并且在可视化时,将显示为单个、不连接的点。
为您的聚合添加时间
当值被聚合时,结果表没有 _time
列,因为用于聚合的记录都具有不同的时间戳。聚合函数不会推断聚合值应使用哪个时间。因此,_time
列被删除。
下一个操作中需要 _time
列。要添加一个,请使用 duplicate()
函数 将 _stop
列复制为每个窗口化表的 _time
列。
from(bucket:"telegraf/autogen")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage_system" and
r.cpu == "cpu-total"
)
|> window(every: 5m)
|> mean()
|> duplicate(column: "_stop", as: "_time")
取消窗口化聚合表
将 window()
函数与 every: inf
参数一起使用,以将所有点收集到单个无限窗口中。
from(bucket:"telegraf/autogen")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage_system" and
r.cpu == "cpu-total"
)
|> window(every: 5m)
|> mean()
|> duplicate(column: "_stop", as: "_time")
|> window(every: inf)
一旦取消分组并组合到单个表中,聚合数据点将在您的可视化中显示为连接状态。
辅助函数
构建一个聚合数据的查询似乎需要大量编码,但是,经历这个过程有助于理解数据在通过每个函数时如何改变“形状”。
Flux 提供(并允许您创建)“辅助”函数,这些函数抽象了许多步骤。本指南中执行的相同操作可以使用 aggregateWindow()
函数 完成。
from(bucket:"telegraf/autogen")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage_system" and
r.cpu == "cpu-total"
)
|> aggregateWindow(every: 5m, fn: mean)
恭喜!
您现在已经构建了一个 Flux 查询,该查询使用 Flux 函数来转换您的数据。使用 Flux 的原始函数和您自己的自定义函数,还有更多方法可以操作您的数据,但这很好地介绍了基本语法和查询结构。
要更深入地了解窗口化和聚合数据,并查看每次转换的示例数据输出,请查看窗口化和聚合数据指南。
此页面对您有帮助吗?
感谢您的反馈!