文档文档

开始使用 InfluxDB 任务

InfluxDB 任务 是一个计划执行的 Flux 脚本,它接受一系列输入数据,对其进行某种修改或分析,然后将修改后的数据写回 InfluxDB 或执行其他操作。

本文将指导您编写一个基本的 InfluxDB 任务,该任务对数据进行降采样并将其存储在新存储桶中。

任务组件

每个 InfluxDB 任务都需要以下组件。它们的格式和顺序可能不同,但它们都是任务的基本部分。

跳转到完整的示例任务脚本

定义任务选项

任务选项定义了任务的计划、名称和其他信息。以下示例显示了如何在 Flux 脚本中设置任务选项

option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}

有关每个选项的详细信息,请参阅任务配置选项

请注意,InfluxDB 不保证任务将在计划的时间运行。有关任务服务级别协议(SLA)的详细信息,请参阅查看任务运行日志

InfluxDB UI 提供了一个表单来定义任务选项。

可调用脚本的任务选项

使用 InfluxDB Cloud API 创建引用并运行可调用脚本的任务。在创建或更新任务时,将任务选项作为请求体中的属性传递 - 例如

  {
   "name": "30-day-avg-temp",
   "description": "IoT Center 30d environment average.",
   "every": "1d",
   "offset": "0m"
   ...
  }

有关创建运行可调用脚本的任务的更多信息,请参阅创建引用脚本的任务

检索和过滤数据

一个最小的 Flux 脚本使用以下函数从数据源检索指定数量的数据,然后根据时间或列值过滤数据

  1. from(): 从 InfluxDB 查询数据 Cloud
  2. range(): 定义要返回数据的时间范围。
  3. filter(): 根据列值过滤数据。

以下示例Flux从InfluxDB存储桶检索数据,然后通过_measurementhost列进行筛选

from(bucket: "example-bucket")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")

要检索来自其他源的数据,请参阅Flux输入函数

在您的Flux脚本中使用任务选项

InfluxDB将选项存储在可引用于您的Flux脚本中的task选项记录中。以下示例Flux使用了时间范围-task.every

from(bucket: "example-bucket")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")

task.every是点表示法,引用了task选项记录中的every属性。every被定义为1h,因此-task.every等同于-1h

使用任务选项在您的Flux脚本中定义值可以使重用任务更容易。

处理或转换您的数据

任务定期自动运行脚本。脚本以某种方式处理或转换数据—例如:降采样、检测异常或发送通知。

考虑一个每小时运行一次并按设定间隔计算平均值的任务。它使用aggregateWindow()将点分组到5分钟(5m)窗口中,并使用mean()计算每个窗口的平均值。

以下示例代码显示了具有任务选项的Flux脚本

option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}

from(bucket: "example-bucket")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")
    |> aggregateWindow(every: 5m, fn: mean)

使用偏移量来考虑潜在的数据

使用offset任务选项来考虑可能存在延迟的数据(如边缘设备的数据)。一个每小时运行一次(every: 1h)并具有五分钟偏移量(offset: 5m)的任务在每小时后5分钟执行,但查询原始一小时间隔的数据。

有关常用任务的示例,请参阅常见任务

使用可调用脚本处理数据

InfluxDB Cloud中,您可以创建运行可调用脚本的任务。您可以使用可调用脚本来管理和重用组织中的脚本。您可以使用任务通过选项和参数来安排脚本运行。

以下示例POST /api/v2/scripts请求体定义了一个新的可调用脚本,其中包含之前示例中的Flux

{
   "name": "aggregate-intervals",
   "description": "Group points into 5 minute windows and calculate the average of each
   window.",
   "script": "from(bucket: "example-bucket")\
                |> range(start: -task.every)\
                |> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")\
                |> aggregateWindow(every: 5m, fn: mean)",
    "language": "flux"
}

请注意,脚本不包含任务选项。一旦创建了可调用脚本,您可以使用POST /api/v2/tasks来创建一个运行脚本的任务。以下示例请求体定义了一个具有脚本ID和选项的任务

{
   "every": "1h",
   "description": "Downsample host with 5 min precision.",
   "name": "downsample_5m_precision",
   "scriptID": "09b2136232083000"
}

要创建一个脚本和一个使用参数的任务,请参阅如何创建运行可调用脚本的任务

定义一个目的地

在大多数情况下,您希望在任务转换数据后发送和存储数据。目的地可以是另一个单独的InfluxDB度量或存储桶。

以下示例使用to()将转换后的数据写回另一个InfluxDB存储桶

// ...
    |> to(bucket: "example-downsampled", org: "my-org")

要写入InfluxDB,to需要以下列

  • _time
  • _measurement
  • _field
  • _value

要写入其他目的地,请参阅Flux输出函数

完整的示例Flux任务脚本

以下示例Flux结合了本指南中描述的所有组件

// Task options
option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}

// Data source
from(bucket: "example-bucket")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")
    // Data processing
    |> aggregateWindow(every: 5m, fn: mean)
    // Data destination
    |> to(bucket: "example-downsampled")

完整的示例任务与可调用脚本

以下示例代码显示了结合本指南中描述的组件的POST /api/v2/scripts请求体

{
   "name": "aggregate-intervals-and-export",
   "description": "Group points into 5 minute windows and calculate the average of each
   window.",
   "script": "from(bucket: "example-bucket")\
                |> range(start: -task.every)\
                |> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")\
                // Data processing\
                |> aggregateWindow(every: 5m, fn: mean)\
                // Data destination\
                |> to(bucket: "example-downsampled")",
    "language": "flux"
}

以下示例代码展示了如何通过POST /api/v2/tasks请求体来安排脚本

{
   "every": "1h",
   "description": "Downsample host with 5 min precision.",
   "name": "downsample_5m_precision",
   "scriptID": "SCRIPT_ID"
}

要了解有关InfluxDB任务及其工作方式,请观看以下视频


这个页面有帮助吗?

感谢您的反馈!


Flux的未来

Flux将进入维护模式。您可以在不更改代码的情况下继续像现在一样使用它。

阅读更多

InfluxDB v3增强功能和InfluxDB Clustered现已普遍可用

包括更快查询性能和管理工具在内的新功能推动了InfluxDB v3产品线的进步。InfluxDB Clustered现已普遍可用。

InfluxDB v3性能和功能

InfluxDB v3产品线在查询性能方面取得了显著提升,并提供了新的管理工具。这些增强包括用于监控InfluxDB集群健康状况的操作仪表板、InfluxDB Cloud Dedicated中的单一登录(SSO)支持以及用于令牌和数据库的新管理API。

了解新v3增强功能


InfluxDB Clustered普遍可用

InfluxDB Clustered现已普遍可用,为您在自管理堆栈中提供了InfluxDB v3的功能。

与我们讨论InfluxDB Clustered

InfluxDB Cloud由TSM提供支持