将 Flux 任务与 Kapacitor 结合使用
使用 Kapacitor 1.6+ 来针对 InfluxDB 和其他数据源运行 Flux 任务。利用完整的 Flux 功能库在 Kapacitor 中构建强大的数据处理和监控任务。
开始之前
在开始使用 Flux 任务之前,请考虑以下事项:
- Kapacitor Flux 任务不能使用 Kapacitor 主题或事件处理程序。您只能使用 Flux 通知端点在 Flux 脚本中发送警报。
- Flux 任务由 Kapacitor 1.6+ 内置的 Flux 任务引擎进行调度和执行。此引擎与 Kapacitor TICKscript 任务引擎是分开的。
- Flux 任务通过 Flux 任务脚本中的 Flux
task选项进行配置。这包括任务名称和执行计划。
选择您正在使用的 InfluxDB 版本
在 InfluxDB 中设置 Flux 任务数据库
(可选但推荐)
当 Kapacitor 执行 Flux 任务时,它可以将有关任务执行(运行)的信息存储在 InfluxDB 数据库中。要存储这些数据,请执行以下操作:
在 InfluxDB 中创建一个新数据库来存储 Flux 任务运行和日志数据。
CREATE DATABASE kapacitorfluxtasks为防止磁盘上积累大量 Kapacitor Flux 任务日志数据,请更新默认的
autogen保留策略,设置一个有限的保留期,或者创建一个具有有限保留期的新保留策略 (RP)。
-- Syntax
ALTER RETENTION POLICY <rp-name> ON <db-name> DURATION <new-retention-duration>
-- Example
ALTER RETENTION POLICY autogen ON kapacitorfluxtasks DURATION 3d-- Syntax
CREATE RETENTION POLICY <rp-name> on <db-name> DURATION <retention-duration>
-- Example
CREATE RETENTION POLICY threedays on kapacitorfluxtasks DURATION 3d配置 Kapacitor Flux 任务
在您的 kapacitor.conf 文件的 [fluxtask] 部分下更新或添加以下设置:
- enabled:
true - task-run-influxdb: 要用于存储 Flux 任务数据的、您
kapacitor.conf中的 InfluxDB 配置名称。要禁用 Flux 任务日志记录,请设置为"none"。 - task-run-bucket: 用于存储 Flux 任务数据和日志的 InfluxDB 数据库。我们建议将其留空。默认情况下,数据写入
kapacitor_fluxtask_logs数据库。要指定另一个数据库来写入任务日志数据,请使用"db-name"命名约定(包括保留策略"db-name/rp"不受支持)。如果指定的数据库在 InfluxDB 中尚不存在,Kapacitor 会尝试创建该数据库。如果启用了身份验证,则需要CREATE DATABASE权限。有关更多信息,请参阅 InfluxDB 中的身份验证和授权。 - 提供以下之一:
- task-run-org: 留空字符串 (
"") - task-run-orgid: 留空字符串 (
"")
- task-run-org: 留空字符串 (
- task-run-measurement: 用于存储任务运行和日志数据的 InfluxDB measurement。默认为
"runs"。
Kapacitor Flux 任务配置示例
# ...
[fluxtask]
enabled = true
task-run-influxdb = "default"
task-run-bucket = "kapacitor_fluxtask_logs"
task-run-org = ""
task-run-orgid = ""
task-run-measurement = "runs"
# ...有关 Kapacitor [fluxtask] 配置选项的更多信息,请参阅 配置 Kapacitor。
创建 Flux 任务
创建 Flux 任务脚本。在脚本中包含 task 选项来配置 Kapacitor Flux 任务。有关编写 Flux 任务的更多信息,请参阅:
提供 InfluxDB 连接凭据
from()和to()函数需要您的 InfluxDB 主机和令牌。- host: InfluxDB URL。
- token: 如果InfluxDB 身份验证已启用,请使用
username:password语法。否则,请使用空字符串 ("") 作为您的令牌。
Bucket 名称语法
当使用 Flux 查询或写入 InfluxDB 1.x 时,使用
database-name/retention-policy-name模式来指定您的 bucket。example-task.flux
option task = { name: "example-task-name", every: 1h, offset: 10m } host = "https://:8086" token = "" from(bucket: "example-db/example-rp", host: host, token: token) |> range(start: -task.every) |> filter(fn: (r) => r._measurement == "example-measurement") |> aggregateWindow(every: 10m, fn: mean) |> to(bucket: "example-db/example-rp-downsampled", host: host, token: token)使用
kapacitor flux task create命令将您的 Flux 脚本添加为 Kapacitor Flux 任务。kapacitor flux task create --file /path/to/example-task.flux
有关创建 Kapacitor Flux 任务的更多详细信息,请参阅 创建 Kapacitor Flux 任务。
考虑使用 InfluxDB 任务
如果您正在使用 InfluxDB Cloud 或 InfluxDB OSS 2.x,请考虑使用 原生 InfluxDB 任务进行数据处理。
为 InfluxDB Cloud 或 2.x 设置 Kapacitor
配置 Kapacitor 以连接到 InfluxDB Cloud 或 InfluxDB OSS 2.x。有关详细说明,请参阅以下内容:
为 InfluxDB Cloud 或 2.x 配置 Kapacitor Flux 任务
在您的 kapacitor.conf 文件的 [fluxtask] 部分下更新或添加以下设置:
- enabled:
true - task-run-influxdb: 要用于存储 Flux 任务数据的、您
kapacitor.conf中的 InfluxDB 配置名称。要禁用 Flux 任务日志记录,请设置为"none"。 - task-run-bucket: 用于存储 Flux 任务数据和日志的 InfluxDB bucket。我们建议将其留空。默认情况下,数据写入
kapacitor_fluxtask_logsbucket。要指定另一个 bucket 来写入任务日志数据,请使用 _tasks 系统 bucket 或 创建新 bucket。如果指定的 bucket 在 InfluxDB 中尚不存在,Kapacitor 会尝试使用POST /api/v2/buckets来创建它,在这种情况下,您的 API 令牌必须具有在 InfluxDB 中创建 bucket 的权限。有关更多信息,请参阅 管理 API 令牌。 - 提供以下之一:
- task-run-org: InfluxDB 组织名称。
- task-run-orgid: InfluxDB 组织 ID。
- task-run-measurement: 用于存储任务运行和日志数据的 InfluxDB measurement。默认为
"runs"。
# ...
[fluxtask]
enabled = true
task-run-influxdb = "InfluxDB"
task-run-bucket = "kapacitor_fluxtask_logs"
task-run-org = "example-org"
task-run-measurement = "runs"
# ...创建 Flux 任务
创建 Flux 任务脚本。在脚本中包含 task 选项来配置 Kapacitor Flux 任务。有关编写 Flux 任务的更多信息,请参阅:
提供 InfluxDB 连接凭据
from()](/flux/v0/stdlib/influxdata/influxdb/from/) 和to()函数需要您的 InfluxDB 主机和令牌。- host: InfluxDB URL。
- token: 如果InfluxDB 身份验证已启用,请使用
username:password语法。否则,请使用空字符串 ("") 作为您的令牌。
example-task.flux
option task = { name: "example-task-name", every: 1h, offset: 10m } host = "https://:8086" token = "" from(bucket: "example-bucket", host: host, token: token) |> range(start: -task.every) |> filter(fn: (r) => r._measurement == "example-measurement") |> aggregateWindow(every: 10m, fn: mean) |> to(bucket: "example-bucket-downsampled", host: host, token: token)使用
kapacitor flux task create命令将您的 Flux 脚本添加为 Kapacitor Flux 任务。kapacitor flux task create --file /path/to/example-task.flux
有关创建 Kapacitor Flux 任务的更多详细信息,请参阅 创建 Kapacitor Flux 任务。
此页面是否有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一员!我们欢迎并鼓励您对 Kapacitor 和本文档提供反馈和错误报告。要获取支持,请使用以下资源: