文档

开始使用 InfluxDB 任务

InfluxDB 任务 是一个计划的 Flux 脚本,它接收一系列输入数据,对其进行修改或分析,然后将修改后的数据写回 InfluxDB 或执行其他操作。

本文将介绍编写一个基本的 InfluxDB 任务,该任务对数据进行下采样并将数据存储在新的桶中。

任务的组件

每个 InfluxDB 任务都需要以下组件。它们的形式和顺序可能不同,但它们都是任务的基本组成部分。

跳转到完整示例任务脚本

定义任务选项

任务选项定义了任务的计划、名称和其他信息。以下示例显示了如何在 Flux 脚本中设置任务选项。

option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}

有关每个选项的详细信息,请参阅任务配置选项

请注意,InfluxDB 不能保证任务会在预定时间运行。有关任务服务级别协议(SLA)的详细信息,请参阅查看任务运行日志

InfluxDB UI 提供了一个表单来定义任务选项。

检索和过滤数据

一个最小化的 Flux 脚本使用以下函数从数据源检索指定数量的数据,然后根据时间或列值过滤数据

  1. from():从 InfluxDB 查询数据。
  2. range():定义返回数据的时间范围。
  3. filter():根据列值过滤数据。

以下示例 Flux 从 InfluxDB 桶中检索数据,然后根据 _measurementhost 列进行过滤。

from(bucket: "example-bucket")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")

有关从其他源检索数据的详细信息,请参阅Flux 输入函数

在您的 Flux 脚本中使用任务选项

InfluxDB 将选项存储在可引用的 task 选项记录中。以下示例 Flux 使用时间范围 -task.every

from(bucket: "example-bucket")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")

task.every 是点表示法,它引用 task 选项记录的 every 属性。every 被定义为 1h,因此 -task.every 等同于 -1h

在您的 Flux 脚本中使用任务选项来定义值可以使重用您的任务更容易。

处理或转换您的数据

任务定期自动运行脚本。脚本以某种方式处理或转换数据——例如:下采样、检测异常或发送通知。

考虑一个每小时运行一次的任务,它通过计算特定间隔的平均值对数据进行下采样。它使用 aggregateWindow() 将点分组到 5 分钟(5m)窗口中,并使用 mean() 计算每个窗口的平均值。

以下示例代码显示了具有任务选项的 Flux 脚本。

option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}

from(bucket: "example-bucket")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")
    |> aggregateWindow(every: 5m, fn: mean)

使用偏移量来处理潜在数据

使用 offset 任务选项来处理潜在数据(例如来自边缘设备的数据)。一个运行间隔为一小时(every: 1h)且偏移量为五分钟(offset: 5m)的任务在每小时后 5 分钟执行,但查询原始一小时间隔的数据。

请参阅常见任务,了解与 InfluxDB 常用的任务示例。

定义目的地

在大多数情况下,您希望在任务对其进行了转换后发送和存储数据。目的地可以是一个单独的InfluxDB度量或桶。

以下示例使用to()将转换后的数据写回到另一个InfluxDB桶

// ...
    |> to(bucket: "example-downsampled", org: "my-org")

要将数据写入InfluxDB,to()需要以下列

  • _time
  • _measurement
  • _field
  • _value

要写入其他目的地,请参阅Flux输出函数

完整的Flux任务脚本示例

以下示例Flux结合了本指南中描述的所有组件

// Task options
option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}

// Data source
from(bucket: "example-bucket")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")
    // Data processing
    |> aggregateWindow(every: 5m, fn: mean)
    // Data destination
    |> to(bucket: "example-downsampled")

要了解更多关于InfluxDB任务及其工作原理的信息,请观看以下视频


这个页面有帮助吗?

感谢您的反馈!


Flux的未来

Flux将进入维护模式。您可以继续像现在一样使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB v3增强功能和InfluxDB Clustered现已上市

新功能包括更快的查询性能和管理工具,推进了InfluxDB v3产品线。InfluxDB Clustered现已上市。

InfluxDB v3性能和功能

InfluxDB v3产品线在查询性能方面取得了重大提升,并提供了新的管理工具。这些增强包括用于监控InfluxDB集群健康的操作仪表板、InfluxDB Cloud Dedicated中的单点登录(SSO)支持和新的令牌和数据库管理API。

了解新的v3增强功能


InfluxDB Clustered上市

InfluxDB Clustered现已上市,为您提供了在自管理堆栈中使用InfluxDB v3的功能。

与我们谈谈InfluxDB Clustered