文档文档

将 Flux 任务与 Kapacitor 结合使用

使用 Kapacitor 1.6+ 针对 InfluxDB 和其他数据源运行 Flux 任务。利用 Flux 功能的完整库,在 Kapacitor 中构建强大的数据处理和监控任务。

开始之前

在开始使用 Flux 任务之前,请考虑

  • Kapacitor Flux 任务不能使用 Kapacitor 主题或事件处理程序。您只能使用 Flux 通知端点从 Flux 脚本中发送警报。
  • Flux 任务由 Kapacitor 1.6+ 中内置的 Flux 任务引擎计划和执行。此引擎与 Kapacitor TICKscript 任务引擎是分开的。
  • Flux 任务在 Flux 任务脚本中使用 Flux task 选项进行配置。这包括任务名称和执行计划。

选择您正在使用的 InfluxDB 版本

  1. 在 InfluxDB 中设置 Flux 任务数据库
  2. 配置 Kapacitor Flux 任务
  3. 创建 Flux 任务

在 InfluxDB 中设置 Flux 任务数据库

(可选但建议)

当 Kapacitor 执行 Flux 任务时,它可以将有关任务执行(运行)的信息存储在 InfluxDB 数据库中。要存储此数据,请执行以下操作

  1. 在 InfluxDB 中创建一个新数据库,以存储 Flux 任务运行和日志数据。

    CREATE DATABASE kapacitorfluxtasks
    
  2. 为防止磁盘上出现大量 Kapacitor Flux 任务日志数据,请使用有限的保留期更新默认的 autogen 保留策略,或创建具有有限保留期的新保留策略 (RP)。

-- Syntax
ALTER RETENTION POLICY <rp-name> ON <db-name> DURATION <new-retention-duration>

-- Example
ALTER RETENTION POLICY autogen ON kapacitorfluxtasks DURATION 3d
-- Syntax
CREATE RETENTION POLICY <rp-name> on <db-name> DURATION <retention-duration>

-- Example 
CREATE RETENTION POLICY threedays on kapacitorfluxtasks DURATION 3d

配置 Kapacitor Flux 任务

在您的 kapacitor.conf 中,更新或添加 [fluxtask] 下的以下设置

  • enabled: true
  • task-run-influxdb: kapacitor.conf 中的 InfluxDB 配置的名称,用于存储 Flux 任务数据。要禁用 Flux 任务日志记录,请设置为 "none"
  • task-run-bucket: 用于存储 Flux 任务数据和日志的 InfluxDB 数据库。我们建议将其留空。默认情况下,数据写入 kapacitor_fluxtask_logs 数据库。要指定另一个数据库来写入任务日志数据,请使用 "db-name" 命名约定(不支持包括保留策略 "db-name/rp")。如果指定的数据库在 InfluxDB 中尚不存在,则 Kapacitor 尝试创建该数据库。如果启用了身份验证,则需要 CREATE DATABASE 的权限。有关更多信息,请参阅 InfluxDB 中的身份验证和授权
  • 提供以下其中一项
    • task-run-org: 保留为空字符串 ("")
    • task-run-orgid: 保留为空字符串 ("")
  • task-run-measurement: 用于存储任务运行和日志数据的 InfluxDB 指标。默认为 "runs"
Kapacitor Flux 任务配置示例
# ...

[fluxtask]
  enabled = true
  task-run-influxdb = "default"
  task-run-bucket = "kapacitor_fluxtask_logs"
  task-run-org = ""
  task-run-orgid = ""
  task-run-measurement = "runs"

# ...

有关 Kapacitor [fluxtask] 配置选项的更多信息,请参阅配置 Kapacitor

创建 Flux 任务

  1. 创建 Flux 任务脚本。在您的脚本中包含 task 选项,以配置 Kapacitor Flux 任务。有关编写 Flux 任务的更多信息,请参阅:

    提供 InfluxDB 连接凭据

    from()to() 函数需要您的 InfluxDB hosttoken

    • host: InfluxDB URL。
    • token: 如果启用了 InfluxDB 身份验证,请使用 username:password 语法。否则,请为您的令牌使用空字符串 ("")。

    Bucket 名称语法

    当使用 Flux 查询或写入 InfluxDB 1.x 时,请使用 database-name/retention-policy-name 模式来指定您的 bucket。

    example-task.flux
    option task = {
      name: "example-task-name",
      every: 1h,
      offset: 10m
    }
    
    host = "http://localhost:8086"
    token = ""
    
    from(bucket: "example-db/example-rp", host: host, token: token)
      |> range(start: -task.every)
      |> filter(fn: (r) => r._measurement == "example-measurement")
      |> aggregateWindow(every: 10m, fn: mean)
      |> to(bucket: "example-db/example-rp-downsampled", host: host, token: token)
    
  2. 使用 kapacitor flux task create 命令将您的 Flux 脚本添加为 Kapacitor Flux 任务。

    kapacitor flux task create --file /path/to/example-task.flux
    

有关创建 Kapacitor Flux 任务的更多详细信息,请参阅创建 Kapacitor Flux 任务

考虑使用 InfluxDB 任务

如果您正在使用 InfluxDB CloudInfluxDB OSS 2.x,请考虑使用 原生 InfluxDB 任务进行数据处理。

  1. 为 InfluxDB Cloud 或 2.x 设置 Kapacitor
  2. 为 InfluxDB Cloud 或 2.x 配置 Kapacitor Flux 任务
  3. 创建 Flux 任务

为 InfluxDB Cloud 或 2.x 设置 Kapacitor

配置 Kapacitor 以连接到 InfluxDB Cloud 或 InfluxDB OSS 2.x。有关详细说明,请参阅以下内容

为 InfluxDB Cloud 或 2.x 配置 Kapacitor Flux 任务

在您的 kapacitor.conf 中,更新或添加 [fluxtask] 下的以下设置

  • enabled: true
  • task-run-influxdb: kapacitor.conf 中的 InfluxDB 配置的名称,用于存储 Flux 任务数据。要禁用 Flux 任务日志记录,请设置为 "none"
  • task-run-bucket: 用于存储 Flux 任务数据和日志的 InfluxDB bucket。我们建议将其留空。默认情况下,数据写入 kapacitor_fluxtask_logs bucket。要指定另一个 bucket 来写入任务日志数据,请使用 _tasks 系统 bucket创建新 bucket。如果指定的 bucket 在 InfluxDB 中尚不存在,则 Kapacitor 尝试使用 POST /api/v2/buckets 创建它,在这种情况下,您的 API 令牌必须具有在 InfluxDB 中创建 bucket 的权限。有关更多信息,请参阅管理 API 令牌
  • 提供以下其中一项
    • task-run-org: InfluxDB 组织名称。
    • task-run-orgid: InfluxDB 组织 ID。
  • task-run-measurement: 用于存储任务运行和日志数据的 InfluxDB 指标。默认为 "runs"
# ...

[fluxtask]
  enabled = true
  task-run-influxdb = "InfluxDB"
  task-run-bucket = "kapacitor_fluxtask_logs"
  task-run-org = "example-org"
  task-run-measurement = "runs"

# ...

创建 Flux 任务

  1. 创建 Flux 任务脚本。在您的脚本中包含 task 选项,以配置 Kapacitor Flux 任务。有关编写 Flux 任务的更多信息,请参阅:

    提供 InfluxDB 连接凭据

    from()](/flux/v0/stdlib/influxdata/influxdb/from/) 和 to() 函数需要您的 InfluxDB hosttoken

    • host: InfluxDB URL。
    • token: 如果启用了 InfluxDB 身份验证,请使用 username:password 语法。否则,请为您的令牌使用空字符串 ("")。
    example-task.flux
    option task = {
      name: "example-task-name",
      every: 1h,
      offset: 10m
    }
    
    host = "http://localhost:8086"
    token = ""
    
    from(bucket: "example-bucket", host: host, token: token)
      |> range(start: -task.every)
      |> filter(fn: (r) => r._measurement == "example-measurement")
      |> aggregateWindow(every: 10m, fn: mean)
      |> to(bucket: "example-bucket-downsampled", host: host, token: token)
    
  2. 使用 kapacitor flux task create 命令将您的 Flux 脚本添加为 Kapacitor Flux 任务。

    kapacitor flux task create --file /path/to/example-task.flux
    

有关创建 Kapacitor Flux 任务的更多详细信息,请参阅创建 Kapacitor Flux 任务


此页是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在一样使用它,而无需对您的代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速、最近数据引擎,可实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 构建在 Core 的基础上,增加了高可用性、读取副本、增强的安全性以及数据压缩,以实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看