将 Flux 任务与 Kapacitor 结合使用
使用 Kapacitor 1.6+ 针对 InfluxDB 和其他数据源运行 Flux 任务。利用 Flux 功能的完整库,在 Kapacitor 中构建强大的数据处理和监控任务。
开始之前
在开始使用 Flux 任务之前,请考虑
- Kapacitor Flux 任务不能使用 Kapacitor 主题或事件处理程序。您只能使用 Flux 通知端点从 Flux 脚本中发送警报。
- Flux 任务由 Kapacitor 1.6+ 中内置的 Flux 任务引擎计划和执行。此引擎与 Kapacitor TICKscript 任务引擎是分开的。
- Flux 任务在 Flux 任务脚本中使用 Flux
task
选项进行配置。这包括任务名称和执行计划。
选择您正在使用的 InfluxDB 版本
在 InfluxDB 中设置 Flux 任务数据库
(可选但建议)
当 Kapacitor 执行 Flux 任务时,它可以将有关任务执行(运行)的信息存储在 InfluxDB 数据库中。要存储此数据,请执行以下操作
在 InfluxDB 中创建一个新数据库,以存储 Flux 任务运行和日志数据。
CREATE DATABASE kapacitorfluxtasks
为防止磁盘上出现大量 Kapacitor Flux 任务日志数据,请使用有限的保留期更新默认的
autogen
保留策略,或创建具有有限保留期的新保留策略 (RP)。
-- Syntax
ALTER RETENTION POLICY <rp-name> ON <db-name> DURATION <new-retention-duration>
-- Example
ALTER RETENTION POLICY autogen ON kapacitorfluxtasks DURATION 3d
-- Syntax
CREATE RETENTION POLICY <rp-name> on <db-name> DURATION <retention-duration>
-- Example
CREATE RETENTION POLICY threedays on kapacitorfluxtasks DURATION 3d
配置 Kapacitor Flux 任务
在您的 kapacitor.conf
中,更新或添加 [fluxtask]
下的以下设置
- enabled:
true
- task-run-influxdb:
kapacitor.conf
中的 InfluxDB 配置的名称,用于存储 Flux 任务数据。要禁用 Flux 任务日志记录,请设置为"none"
。 - task-run-bucket: 用于存储 Flux 任务数据和日志的 InfluxDB 数据库。我们建议将其留空。默认情况下,数据写入
kapacitor_fluxtask_logs
数据库。要指定另一个数据库来写入任务日志数据,请使用"db-name"
命名约定(不支持包括保留策略"db-name/rp"
)。如果指定的数据库在 InfluxDB 中尚不存在,则 Kapacitor 尝试创建该数据库。如果启用了身份验证,则需要CREATE DATABASE
的权限。有关更多信息,请参阅 InfluxDB 中的身份验证和授权。 - 提供以下其中一项
- task-run-org: 保留为空字符串 (
""
) - task-run-orgid: 保留为空字符串 (
""
)
- task-run-org: 保留为空字符串 (
- task-run-measurement: 用于存储任务运行和日志数据的 InfluxDB 指标。默认为
"runs"
。
Kapacitor Flux 任务配置示例
# ...
[fluxtask]
enabled = true
task-run-influxdb = "default"
task-run-bucket = "kapacitor_fluxtask_logs"
task-run-org = ""
task-run-orgid = ""
task-run-measurement = "runs"
# ...
有关 Kapacitor [fluxtask]
配置选项的更多信息,请参阅配置 Kapacitor。
创建 Flux 任务
创建 Flux 任务脚本。在您的脚本中包含 task 选项,以配置 Kapacitor Flux 任务。有关编写 Flux 任务的更多信息,请参阅:
提供 InfluxDB 连接凭据
from()
和to()
函数需要您的 InfluxDB host 和 token。- host: InfluxDB URL。
- token: 如果启用了 InfluxDB 身份验证,请使用
username:password
语法。否则,请为您的令牌使用空字符串 (""
)。
Bucket 名称语法
当使用 Flux 查询或写入 InfluxDB 1.x 时,请使用
database-name/retention-policy-name
模式来指定您的 bucket。example-task.flux
option task = { name: "example-task-name", every: 1h, offset: 10m } host = "http://localhost:8086" token = "" from(bucket: "example-db/example-rp", host: host, token: token) |> range(start: -task.every) |> filter(fn: (r) => r._measurement == "example-measurement") |> aggregateWindow(every: 10m, fn: mean) |> to(bucket: "example-db/example-rp-downsampled", host: host, token: token)
使用
kapacitor flux task create
命令将您的 Flux 脚本添加为 Kapacitor Flux 任务。kapacitor flux task create --file /path/to/example-task.flux
有关创建 Kapacitor Flux 任务的更多详细信息,请参阅创建 Kapacitor Flux 任务。
考虑使用 InfluxDB 任务
如果您正在使用 InfluxDB Cloud 或 InfluxDB OSS 2.x,请考虑使用 原生 InfluxDB 任务进行数据处理。
为 InfluxDB Cloud 或 2.x 设置 Kapacitor
配置 Kapacitor 以连接到 InfluxDB Cloud 或 InfluxDB OSS 2.x。有关详细说明,请参阅以下内容
为 InfluxDB Cloud 或 2.x 配置 Kapacitor Flux 任务
在您的 kapacitor.conf
中,更新或添加 [fluxtask]
下的以下设置
- enabled:
true
- task-run-influxdb:
kapacitor.conf
中的 InfluxDB 配置的名称,用于存储 Flux 任务数据。要禁用 Flux 任务日志记录,请设置为"none"
。 - task-run-bucket: 用于存储 Flux 任务数据和日志的 InfluxDB bucket。我们建议将其留空。默认情况下,数据写入
kapacitor_fluxtask_logs
bucket。要指定另一个 bucket 来写入任务日志数据,请使用 _tasks 系统 bucket 或创建新 bucket。如果指定的 bucket 在 InfluxDB 中尚不存在,则 Kapacitor 尝试使用POST /api/v2/buckets
创建它,在这种情况下,您的 API 令牌必须具有在 InfluxDB 中创建 bucket 的权限。有关更多信息,请参阅管理 API 令牌。 - 提供以下其中一项
- task-run-org: InfluxDB 组织名称。
- task-run-orgid: InfluxDB 组织 ID。
- task-run-measurement: 用于存储任务运行和日志数据的 InfluxDB 指标。默认为
"runs"
。
# ...
[fluxtask]
enabled = true
task-run-influxdb = "InfluxDB"
task-run-bucket = "kapacitor_fluxtask_logs"
task-run-org = "example-org"
task-run-measurement = "runs"
# ...
创建 Flux 任务
创建 Flux 任务脚本。在您的脚本中包含 task 选项,以配置 Kapacitor Flux 任务。有关编写 Flux 任务的更多信息,请参阅:
提供 InfluxDB 连接凭据
from()
](/flux/v0/stdlib/influxdata/influxdb/from/) 和to()
函数需要您的 InfluxDB host 和 token。- host: InfluxDB URL。
- token: 如果启用了 InfluxDB 身份验证,请使用
username:password
语法。否则,请为您的令牌使用空字符串 (""
)。
example-task.flux
option task = { name: "example-task-name", every: 1h, offset: 10m } host = "http://localhost:8086" token = "" from(bucket: "example-bucket", host: host, token: token) |> range(start: -task.every) |> filter(fn: (r) => r._measurement == "example-measurement") |> aggregateWindow(every: 10m, fn: mean) |> to(bucket: "example-bucket-downsampled", host: host, token: token)
使用
kapacitor flux task create
命令将您的 Flux 脚本添加为 Kapacitor Flux 任务。kapacitor flux task create --file /path/to/example-task.flux
有关创建 Kapacitor Flux 任务的更多详细信息,请参阅创建 Kapacitor Flux 任务。
此页是否对您有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一份子!我们欢迎并鼓励您提供关于 Kapacitor 和本文档的反馈和错误报告。要获得支持,请使用以下资源