将 Flux 任务与 Kapacitor 结合使用
使用 Kapacitor 1.6+ 针对 InfluxDB 和其他数据源运行 Flux 任务。利用 Flux 功能的完整库,在 Kapacitor 中构建强大的数据处理和监控任务。
开始之前
在开始使用 Flux 任务之前,请考虑
- Kapacitor Flux 任务不能使用 Kapacitor 主题或事件处理程序。您只能使用 Flux 通知端点从 Flux 脚本中发送警报。
- Flux 任务由 Kapacitor 1.6+ 中内置的 Flux 任务引擎计划和执行。此引擎与 Kapacitor TICKscript 任务引擎是分开的。
- Flux 任务在 Flux 任务脚本中使用 Flux task选项进行配置。这包括任务名称和执行计划。
选择您正在使用的 InfluxDB 版本
在 InfluxDB 中设置 Flux 任务数据库
(可选但建议)
当 Kapacitor 执行 Flux 任务时,它可以将有关任务执行(运行)的信息存储在 InfluxDB 数据库中。要存储此数据,请执行以下操作
- 在 InfluxDB 中创建一个新数据库,以存储 Flux 任务运行和日志数据。 - CREATE DATABASE kapacitorfluxtasks
- 为防止磁盘上出现大量 Kapacitor Flux 任务日志数据,请使用有限的保留期更新默认的 - autogen保留策略,或创建具有有限保留期的新保留策略 (RP)。
-- Syntax
ALTER RETENTION POLICY <rp-name> ON <db-name> DURATION <new-retention-duration>
-- Example
ALTER RETENTION POLICY autogen ON kapacitorfluxtasks DURATION 3d
-- Syntax
CREATE RETENTION POLICY <rp-name> on <db-name> DURATION <retention-duration>
-- Example 
CREATE RETENTION POLICY threedays on kapacitorfluxtasks DURATION 3d
配置 Kapacitor Flux 任务
在您的 kapacitor.conf 中,更新或添加 [fluxtask] 下的以下设置
- enabled: true
- task-run-influxdb: kapacitor.conf中的 InfluxDB 配置的名称,用于存储 Flux 任务数据。要禁用 Flux 任务日志记录,请设置为"none"。
- task-run-bucket: 用于存储 Flux 任务数据和日志的 InfluxDB 数据库。我们建议将其留空。默认情况下,数据写入 kapacitor_fluxtask_logs数据库。要指定另一个数据库来写入任务日志数据,请使用"db-name"命名约定(不支持包括保留策略"db-name/rp")。如果指定的数据库在 InfluxDB 中尚不存在,则 Kapacitor 尝试创建该数据库。如果启用了身份验证,则需要CREATE DATABASE的权限。有关更多信息,请参阅 InfluxDB 中的身份验证和授权。
- 提供以下其中一项- task-run-org: 保留为空字符串 ("")
- task-run-orgid: 保留为空字符串 ("")
 
- task-run-org: 保留为空字符串 (
- task-run-measurement: 用于存储任务运行和日志数据的 InfluxDB 指标。默认为 "runs"。
Kapacitor Flux 任务配置示例
# ...
[fluxtask]
  enabled = true
  task-run-influxdb = "default"
  task-run-bucket = "kapacitor_fluxtask_logs"
  task-run-org = ""
  task-run-orgid = ""
  task-run-measurement = "runs"
# ...
有关 Kapacitor [fluxtask] 配置选项的更多信息,请参阅配置 Kapacitor。
创建 Flux 任务
- 创建 Flux 任务脚本。在您的脚本中包含 task 选项,以配置 Kapacitor Flux 任务。有关编写 Flux 任务的更多信息,请参阅: - 提供 InfluxDB 连接凭据- from()和- to()函数需要您的 InfluxDB host 和 token。- host: InfluxDB URL。
- token: 如果启用了 InfluxDB 身份验证,请使用 username:password语法。否则,请为您的令牌使用空字符串 ("")。
 - Bucket 名称语法- 当使用 Flux 查询或写入 InfluxDB 1.x 时,请使用 - database-name/retention-policy-name模式来指定您的 bucket。- example-task.flux- option task = { name: "example-task-name", every: 1h, offset: 10m } host = "https://:8086" token = "" from(bucket: "example-db/example-rp", host: host, token: token) |> range(start: -task.every) |> filter(fn: (r) => r._measurement == "example-measurement") |> aggregateWindow(every: 10m, fn: mean) |> to(bucket: "example-db/example-rp-downsampled", host: host, token: token)
- 使用 - kapacitor flux task create命令将您的 Flux 脚本添加为 Kapacitor Flux 任务。- kapacitor flux task create --file /path/to/example-task.flux
有关创建 Kapacitor Flux 任务的更多详细信息,请参阅创建 Kapacitor Flux 任务。
考虑使用 InfluxDB 任务
如果您正在使用 InfluxDB Cloud 或 InfluxDB OSS 2.x,请考虑使用 原生 InfluxDB 任务进行数据处理。
为 InfluxDB Cloud 或 2.x 设置 Kapacitor
配置 Kapacitor 以连接到 InfluxDB Cloud 或 InfluxDB OSS 2.x。有关详细说明,请参阅以下内容
为 InfluxDB Cloud 或 2.x 配置 Kapacitor Flux 任务
在您的 kapacitor.conf 中,更新或添加 [fluxtask] 下的以下设置
- enabled: true
- task-run-influxdb: kapacitor.conf中的 InfluxDB 配置的名称,用于存储 Flux 任务数据。要禁用 Flux 任务日志记录,请设置为"none"。
- task-run-bucket: 用于存储 Flux 任务数据和日志的 InfluxDB bucket。我们建议将其留空。默认情况下,数据写入 kapacitor_fluxtask_logsbucket。要指定另一个 bucket 来写入任务日志数据,请使用 _tasks 系统 bucket 或创建新 bucket。如果指定的 bucket 在 InfluxDB 中尚不存在,则 Kapacitor 尝试使用POST /api/v2/buckets创建它,在这种情况下,您的 API 令牌必须具有在 InfluxDB 中创建 bucket 的权限。有关更多信息,请参阅管理 API 令牌。
- 提供以下其中一项- task-run-org: InfluxDB 组织名称。
- task-run-orgid: InfluxDB 组织 ID。
 
- task-run-measurement: 用于存储任务运行和日志数据的 InfluxDB 指标。默认为 "runs"。
# ...
[fluxtask]
  enabled = true
  task-run-influxdb = "InfluxDB"
  task-run-bucket = "kapacitor_fluxtask_logs"
  task-run-org = "example-org"
  task-run-measurement = "runs"
# ...
创建 Flux 任务
- 创建 Flux 任务脚本。在您的脚本中包含 task 选项,以配置 Kapacitor Flux 任务。有关编写 Flux 任务的更多信息,请参阅: - 提供 InfluxDB 连接凭据- from()](/flux/v0/stdlib/influxdata/influxdb/from/) 和- to()函数需要您的 InfluxDB host 和 token。- host: InfluxDB URL。
- token: 如果启用了 InfluxDB 身份验证,请使用 username:password语法。否则,请为您的令牌使用空字符串 ("")。
 - example-task.flux- option task = { name: "example-task-name", every: 1h, offset: 10m } host = "https://:8086" token = "" from(bucket: "example-bucket", host: host, token: token) |> range(start: -task.every) |> filter(fn: (r) => r._measurement == "example-measurement") |> aggregateWindow(every: 10m, fn: mean) |> to(bucket: "example-bucket-downsampled", host: host, token: token)
- 使用 - kapacitor flux task create命令将您的 Flux 脚本添加为 Kapacitor Flux 任务。- kapacitor flux task create --file /path/to/example-task.flux
有关创建 Kapacitor Flux 任务的更多详细信息,请参阅创建 Kapacitor Flux 任务。
此页是否对您有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一份子!我们欢迎并鼓励您提供关于 Kapacitor 和本文档的反馈和错误报告。要获得支持,请使用以下资源
