文档文档

InfluxDB 任务入门

InfluxDB 任务 是一个调度的 Flux 脚本,它接受输入数据流,以某种方式修改或分析它,然后将修改后的数据写回 InfluxDB 或执行其他操作。

本文档将引导您编写一个基本的 InfluxDB 任务,该任务对数据进行下采样并将其存储在新存储桶中。

任务的组成部分

每个 InfluxDB 任务都需要以下组成部分。它们的形式和顺序可能有所不同,但它们都是任务的基本组成部分。

跳到完整的示例任务脚本

定义任务选项

任务选项定义了任务的计划、名称和其他信息。以下示例显示了如何在 Flux 脚本中设置任务选项

option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}

有关每个选项的详细信息,请参阅任务配置选项

请注意,InfluxDB 不保证任务会按计划时间运行。有关任务服务级别协议 (SLA) 的详细信息,请参阅查看任务运行日志

InfluxDB UI 提供了一个用于定义任务选项的表单。

检索和过滤数据

最简化的 Flux 脚本使用以下函数从数据源检索指定量的数据,然后根据时间或列值过滤数据

  1. from():从 InfluxDB 查询数据。
  2. range():定义要从中返回数据的时间范围。
  3. filter():根据列值过滤数据。

以下示例 Flux 从 InfluxDB 存储桶检索数据,然后按 _measurementhost 列进行过滤

from(bucket: "example-bucket")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")

要从其他来源检索数据,请参阅Flux 输入函数

在您的 Flux 脚本中使用任务选项

InfluxDB 将选项存储在 task 选项记录中,您可以在 Flux 脚本中引用该记录。以下示例 Flux 使用时间范围 -task.every

from(bucket: "example-bucket")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")

task.every 是点表示法,它引用 task 选项记录的 every 属性。every 定义为 1h,因此 -task.every 等于 -1h

使用任务选项来定义 Flux 脚本中的值可以使重用任务更容易。

处理或转换您的数据

任务以固定的时间间隔自动运行脚本。脚本以某种方式处理或转换数据——例如:下采样、检测异常或发送通知。

考虑一个每小时运行一次并通过计算设定时间间隔的平均值来下采样数据的任务。它使用 aggregateWindow() 将点分组到 5 分钟 (5m) 的窗口中,并使用 mean() 计算每个窗口的平均值。

以下示例代码显示了带有任务选项的 Flux 脚本

option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}

from(bucket: "example-bucket")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")
    |> aggregateWindow(every: 5m, fn: mean)

使用偏移量来考虑潜在的延迟数据

使用 offset 任务选项来考虑潜在的延迟数据(例如来自边缘设备的数据)。以一小时间隔运行的任务 (every: 1h) 并偏移五分钟 (offset: 5m) 在小时后 5 分钟执行,但查询来自原始一小时间隔的数据。

有关 InfluxDB 常用的任务示例,请参阅常用任务

定义目标

在大多数情况下,您需要在任务转换数据后发送和存储数据。目标可以是单独的 InfluxDB 测量或存储桶。

下面的示例使用 to() 将转换后的数据写回另一个 InfluxDB 存储桶

// ...
    |> to(bucket: "example-downsampled", org: "my-org")

要将数据写入 InfluxDB,to() 需要以下列

  • _time
  • _measurement
  • _field
  • _value

要将数据写入其他目标,请参阅Flux 输出函数

Flux 任务脚本完整示例

以下示例 Flux 结合了本指南中描述的所有组件

// Task options
option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}

// Data source
from(bucket: "example-bucket")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")
    // Data processing
    |> aggregateWindow(every: 5m, fn: mean)
    // Data destination
    |> to(bucket: "example-downsampled")

要了解有关 InfluxDB 任务及其工作原理的更多信息,请观看以下视频


此页面对您有帮助吗?

感谢您的反馈!


Flux 的未来

Flux 将进入维护模式。您可以继续像现在一样使用它,而无需对代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速的最新数据引擎,可实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 构建于 Core 的基础上,增加了高可用性、读取副本、增强的安全性和数据压缩,以实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看