文档文档

开始处理数据

既然您已经了解了从 InfluxDB 查询数据的基本知识,让我们超越基本查询,开始处理查询到的数据。“处理”数据可能意味着转换、聚合、降采样或对数据发出警报。本教程涵盖以下数据处理用例

大多数数据处理操作都需要手动编辑 Flux 查询。如果您正在使用 InfluxDB 数据浏览器,请切换到 脚本编辑器 而不是使用 查询构建器

重映射或分配数据中的值

使用 map() 函数 迭代数据中的每一行,并更新该行中的值。map() 是 Flux 中最有用的函数之一,它将帮助您完成许多需要执行的数据处理操作。

了解更多关于 map() 如何工作的信息

from(bucket: "get-started")
    |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)
    |> filter(fn: (r) => r._measurement == "home")
    |> filter(fn: (r) => r._field == "hum")
    |> map(fn: (r) => ({r with _value: r._value / 100.0}))

Map 示例

执行数学运算

有条件地分配状态

数据警报

分组数据

使用 group() 函数 按特定列值重新分组数据,以便进一步处理。

from(bucket: "get-started")
    |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)
    |> filter(fn: (r) => r._measurement == "home")
    |> group(columns: ["room", "_field"])

理解数据分组及其重要性很重要,但这对于本“入门”教程来说可能太多了。有关数据如何分组以及为什么重要的更多信息,请参阅 Flux 数据模型 文档。

默认情况下,from() 返回从 InfluxDB 查询的数据,这些数据按序列(measurement、标签和字段键)分组。返回的表流中的每个表代表一个组。每个表都包含相同的值,这些值用于对数据进行分组的列。当您聚合数据时,此分组非常重要。

Group 示例

按特定列分组数据

取消分组数据

聚合或选择特定数据

使用 Flux 聚合选择器 函数从 每个 输入表中返回聚合值或选定值。

from(bucket: "get-started")
    |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)
    |> filter(fn: (r) => r._measurement == "home")
    |> filter(fn: (r) => r._field == "co" or r._field == "hum" or r._field == "temp")
    |> mean()

按时间聚合

如果您想查询随时间变化的聚合值,这是一种 降采样 形式。

聚合函数

聚合函数 删除 组键 中的列,并为每个输入表返回一个单行,其中包含该表的聚合值。

聚合示例

计算每个房间的平均温度

计算所有房间的总体平均温度

计算每个房间在所有字段中报告的点数

分配新的聚合时间戳

_time 通常不是组键的一部分,并且在使用聚合函数时将被删除。要为聚合点分配新的时间戳,请将代表查询边界的 _start_stop 列复制为新的 _time 列。

from(bucket: "get-started")
    |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
    |> filter(fn: (r) => r._measurement == "home")
    |> filter(fn: (r) => r._field == "temp")
    |> mean()
    |> duplicate(column: "_stop", as: "_time")

选择器函数

选择器函数 从每个输入表返回一个或多个列,并保留所有列及其值。

选择器示例

返回每个房间的第一个温度

返回每个房间的最后一个温度

返回每个房间的最高温度

将数据透视到关系模式中

如果来自关系型 SQL 或类似 SQL 的查询语言(例如 InfluxQL),则 Flux 使用的数据模型与您习惯的模型不同。Flux 返回多个表,其中每个表包含不同的字段。“关系”模式将每个字段构造为每行中的一列。

使用 pivot() 函数 将数据透视到基于时间戳的“关系”模式中。

from(bucket: "get-started")
    |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
    |> filter(fn: (r) => r._measurement == "home")
    |> filter(fn: (r) => r._field == "co" or r._field == "hum" or r._field == "temp")
    |> filter(fn: (r) => r.room == "Kitchen")
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")

查看输入和透视输出

数据降采样

数据降采样是一种策略,可以提高查询时的性能,并优化长期数据存储。简而言之,降采样减少了查询返回的点数,而不会丢失数据中的总体趋势。

有关降采样数据的更多信息,请参阅 降采样数据

降采样数据最常见的方法是按时间间隔或“窗口”。例如,您可能想要查询最后一小时的数据,并返回每五分钟窗口的平均值。

使用 aggregateWindow() 按指定的时间间隔降采样数据

  • 使用 every 参数指定每个窗口的持续时间。
  • 使用 fn 参数指定要应用于每个窗口的 聚合选择器 函数。
  • (可选) 使用 timeSrc 参数指定要使用哪个列值来为每个窗口创建新的聚合时间戳。默认值为 _stop
from(bucket: "get-started")
    |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
    |> filter(fn: (r) => r._measurement == "home")
    |> filter(fn: (r) => r._field == "temp")
    |> aggregateWindow(every: 2h, fn: mean)

查看输入和降采样输出

使用 InfluxDB 任务自动化处理

InfluxDB 任务是计划查询,可以执行上述任何数据处理操作。通常,任务然后使用 to() 函数 将处理后的结果写回 InfluxDB。

有关创建和配置任务的更多信息,请参阅 InfluxDB 任务入门

降采样任务示例

option task = {
    name: "Example task"
    every: 1d,
}

from(bucket: "get-started-downsampled")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "home")
    |> aggregateWindow(every: 2h, fn: mean)

此页是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在这样使用它,而无需对您的代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速、近实时数据引擎,可实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 构建于 Core 的基础之上,增加了高可用性、只读副本、增强的安全性以及数据压缩,从而实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层级可供非商业家庭或业余爱好者使用。

有关更多信息,请查看