InfluxDB 任务入门
InfluxDB 任务 是计划的 Flux 脚本,它接受输入数据流,以某种方式修改或分析它,然后将修改后的数据写回 InfluxDB 或执行其他操作。
本文档介绍了如何编写一个基本的 InfluxDB 任务,该任务对数据进行降采样并将其存储在一个新的存储桶中。
任务的组成部分
每个 InfluxDB 任务都需要以下组件。它们的形式和顺序可能有所不同,但它们都是任务的重要组成部分。
定义任务选项
任务选项定义了任务的计划、名称和其他信息。以下示例展示了如何在 Flux 脚本中设置任务选项
option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}
有关每个选项的详细信息,请参阅 任务配置选项。
请注意,InfluxDB 不保证任务将在计划的时间运行。有关任务服务级别协议 (SLA) 的详细信息,请参阅 查看任务运行日志。
InfluxDB UI 提供了一个用于定义任务选项的表单。
检索和过滤数据
最小的 Flux 脚本使用以下函数从数据源检索指定数量的数据,然后根据时间和列值过滤数据
以下示例 Flux 从 InfluxDB 存储桶检索数据,然后按 _measurement
和 host
列进行过滤
from(bucket: "example-bucket")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")
要从其他来源检索数据,请参阅 Flux 输入函数。
在您的 Flux 脚本中使用任务选项
InfluxDB 将选项存储在您可以在 Flux 脚本中引用的 task
选项记录中。以下示例 Flux 使用时间范围 -task.every
from(bucket: "example-bucket")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")
task.every
是点表示法,它引用 task
选项记录的 every
属性。every
定义为 1h
,因此 -task.every
等于 -1h
。
使用任务选项在您的 Flux 脚本中定义值可以使重用您的任务更容易。
处理或转换您的数据
任务按固定的时间间隔自动运行脚本。脚本以某种方式处理或转换数据——例如:降采样、检测异常或发送通知。
考虑一个每小时运行一次的任务,并通过计算设定时间间隔的平均值来对数据进行降采样。它使用 aggregateWindow()
将数据点分组到 5 分钟 (5m
) 的窗口中,并使用 mean()
计算每个窗口的平均值。
以下示例代码显示了带有任务选项的 Flux 脚本
option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}
from(bucket: "example-bucket")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")
|> aggregateWindow(every: 5m, fn: mean)
使用偏移量来处理延迟数据
使用 offset
任务选项来处理潜在的延迟数据(例如来自边缘设备的数据)。以一小时间隔运行的任务 (every: 1h
) 和五分钟的偏移量 (offset: 5m
) 在每小时过后的 5 分钟执行,但查询来自原始一小时间隔的数据。
有关 InfluxDB 常用任务的示例,请参阅 常用任务。
定义目标
在大多数情况下,您需要发送和存储任务转换后的数据。目标可以是单独的 InfluxDB 测量或存储桶。
以下示例使用 to()
将转换后的数据写回另一个 InfluxDB 存储桶
// ...
|> to(bucket: "example-downsampled", org: "my-org")
要将数据写入 InfluxDB,to()
需要以下列
_time
_measurement
_field
_value
要将数据写入其他目标,请参阅 Flux 输出函数。
完整的示例 Flux 任务脚本
以下示例 Flux 结合了本指南中描述的所有组件
// Task options
option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}
// Data source
from(bucket: "example-bucket")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")
// Data processing
|> aggregateWindow(every: 5m, fn: mean)
// Data destination
|> to(bucket: "example-downsampled")
要了解有关 InfluxDB 任务及其工作原理的更多信息,请观看以下视频
此页面是否对您有帮助?
感谢您的反馈!