文档文档

开始使用 Kapacitor

使用 Kapacitor 来导入(流式或批量)时间序列数据,然后转换、分析和操作这些数据。要开始使用 Kapacitor,请使用 Telegraf 在本地计算机上收集系统指标并将其存储在 InfluxDB 中。然后,使用 Kapacitor 处理您的系统数据。

概述

Kapacitor 任务使用 TICKscript 语法定义要在数据集上执行的工作。Kapacitor 任务包括:

  • stream 任务。流任务会在 Kapacitor 中复制写入 InfluxDB 的数据。它会分担查询开销,并需要 Kapacitor 将数据存储在磁盘上。
  • batch 任务。批量任务会查询并处理指定间隔内的数据。

要开始,请执行以下操作:

  1. 如果尚未安装,请下载并安装 InfluxData 1.x TICK stack (OSS)
  2. 启动 InfluxDB 并启动 Telegraf。默认情况下,Telegraf 会开始将系统指标发送到 InfluxDB,并创建一个“telegraf”数据库。
  3. 启动 Kapacitor。

注意: 以下步骤中的示例命令是为 Linux 编写的。

启动 InfluxDB 并收集 Telegraf 数据

  1. 通过运行以下命令启动 InfluxDB:

    $ sudo systemctl start influxdb
  2. 在 Telegraf 配置文件(/etc/telegraf/telegraf.conf)中,配置 [[outputs.influxd]] 以指定如何连接到 InfluxDB 以及目标数据库。

    [[outputs.influxdb]]
    ## InfluxDB url is required and must be in the following form: http/udp "://" host [ ":" port]
    ## Multiple urls can be specified as part of the same cluster; only ONE url is written to each interval.
    ## InfluxDB url
    urls = ["https://:8086"]
    
    ## The target database for metrics is required (Telegraf creates if one doesn't exist).
    database = "telegraf"
  3. 运行以下命令启动 Telegraf:

    $ sudo systemctl start telegraf

    InfluxDB 和 Telegraf 现在正在 localhost 上运行。

  4. 一分钟后,运行以下命令使用 InfluxDB API 查询 Telegraf 数据:

    $ curl -G 'https://:8086/query?db=telegraf' --data-urlencode 'q=SELECT mean(usage_idle) FROM cpu'

    将显示类似以下内容的结果:

    {"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",91.82304336748372]]}]}]}

启动 Kapacitor

  1. 运行以下命令生成 Kapacitor 配置文件:

    kapacitord config > kapacitor.conf

    默认情况下,Kapacitor 配置文件保存在 /etc/kapacitor/kapacitor.conf 中。如果将文件保存到其他位置,请在启动 Kapacitor 时指定该位置。

    Kapacitor 配置是 TOML 文件。为 InfluxDB 配置的输入也可用于 Kapacitor。

  2. 启动 Kapacitor 服务

    $ sudo systemctl start kapacitor

    由于 InfluxDB 正在 https://:8086 上运行,Kapacitor 在启动过程中会找到它,并在 InfluxDB 上创建几个订阅。订阅会告知 InfluxDB 将数据发送到 Kapacitor。

  3. (可选)要查看日志数据,请运行以下命令:

    $ sudo tail -f -n 128 /var/log/kapacitor/kapacitor.log

    Kapacitor 在 HTTP 端口上监听并将数据发布到 InfluxDB。现在,InfluxDB 将 Telegraf 的数据流式传输到 Kapacitor。

执行任务

在 TICKscript 的开头,指定包含数据的数据库和保留策略:

dbrp "telegraf"."autogen"

// ...

当 Kapacitor 收到与指定匹配的数据库和保留策略的数据时,Kapacitor 会执行 TICKscript。

Kapacitor 支持基于数据库和保留策略执行任务(不包含其他条件)。

从流数据触发警报

触发警报是 Kapacitor 的常见用例。必须定义要警报的数据库和保留策略。

CPU 使用率示例警报
  1. 将以下 TICKscript 复制到名为 cpu_alert.tick 的文件中:

    dbrp "telegraf"."autogen"
    
    stream
        // Select the CPU measurement from the `telegraf` database.
        |from()
            .measurement('cpu')
        // Triggers a critical alert when the CPU idle usage drops below 70%
        |alert()
            .crit(lambda: int("usage_idle") <  70)
            // Write each alert to a file.
            .log('/tmp/alerts.log')
  2. 在命令行中,使用 kapacitor CLI 使用 cpu_alert.tick TICKscript 定义任务:

    kapacitor define cpu_alert -tick cpu_alert.tick

    如果 TICKscript 中未包含数据库和保留策略(例如 dbrp "telegraf"."autogen"),则在添加任务时,使用 kapacitor define 命令并加上 -dbrp 标志,后跟 "<DBNAME>"."<RETENTION_POLICY>" 来指定它们。

  3. (可选)使用 list 命令验证警报是否已创建:

    $ kapacitor list tasks
    ID        Type      Status    Executing Databases and Retention Policies
    cpu_alert stream    disabled  false     ["telegraf"."autogen"]
  4. (可选)使用 show 命令查看有关任务的详细信息:

    $ kapacitor show cpu_alert
    ID: cpu_alert
    Error:
    Template:
    Type: stream
    Status: disabled
    Executing: false
    ...
  5. 为确保日志文件和通信通道不被警报淹没,请测试任务

  6. 启用任务以开始处理实时数据流:

    kapacitor enable cpu_alert

    警报会实时写入日志。

  7. 运行 show 命令以验证任务是否正在接收数据并按预期运行:

    $ kapacitor show cpu_alert
    |from()
    // Information about the state of the task and any error it may have encountered.
    ID: cpu_alert
    Error:
    Type: stream
    Status: Enabled
    Executing: true
    Created: 04 May 16 21:01 MDT
    Modified: 04 May 16 21:04 MDT
    LastEnabled: 04 May 16 21:03 MDT
    Databases Retention Policies: [""."autogen"]
    
    // Displays the version of the TICKscript that Kapacitor has stored in its local database.
    TICKscript:
    stream
        // Select just the cpu me
            .measurement('cpu')
        |alert()
            .crit(lambda: "usage_idle" <  70)
            // Whenever we get an alert write it to a file.
            .log('/tmp/alerts.log')
    
    DOT:
    digraph asdf {
    graph [throughput="0.00 points/s"];
    
    stream0 [avg_exec_time_ns="0" ];
    stream0 -> from1 [processed="12"];
    
    from1 [avg_exec_time_ns="0" ];
    from1 -> alert2 [processed="12"];
    
    alert2 [alerts_triggered="0" avg_exec_time_ns="0" ];
    }

返回一个 graphviz dot 格式的树,显示 TICKscript 定义的数据处理管道,以及包含每个节点统计信息的键值关联数组条目,以及连接到下一个节点的边,其中也包含关联数组统计信息。processed 键表示沿着图的指定边传递的数据点数量。

在上面的示例中,stream0 节点(又名 TICKscript 中的 stream 变量)已将 12 个点发送到 from1 节点。from1 节点也已将 12 个点发送到 alert2 节点。由于 Telegraf 配置为发送 cpu 数据,所有 12 个点都匹配 from1 节点的数据库/度量标准,并被传递下去。

如有必要,请使用操作系统提供商提供的软件包在 Debian 或 RedHat 上安装 graphviz。graphviz 网站上提供的软件包不是最新的。

现在任务正在使用实时数据运行,这里有一个快速技巧,可以使用一个核心的 100% 来生成一些人工 CPU 活动:

while true; do i=0; done
测试任务

完成以下步骤,以确保日志文件和通信通道不会被警报淹没:

  1. 记录数据流

    kapacitor record stream -task cpu_alert -duration 60s

    如果出现连接错误,例如:getsockopt: connection refused (Linux) 或 connectex: No connection could be made... (Windows),请验证 Kapacitor 服务是否正在运行(参见安装和启动 Kapacitor)。如果 Kapacitor 正在运行,请检查主机防火墙设置,并确保端口 9092 可访问。此外,请检查 /var/log/kapacitor/kapacitor.log 中的消息。如果 /etc/kapacitor/kapacitor.conf 中的 http 或其他配置存在问题,该问题会出现在日志中。如果 Kapacitor 服务在另一台主机上运行,请在本地 shell 中将 KAPACITOR_URL 环境变量设置为远程机器上的 Kapacitor 端点。

  2. 检索返回的 ID 并将其分配给 bash 变量以供以后使用(实际返回的 UUID 会有所不同):

    rid=cd158f21-02e6-405c-8527-261ae6f26153
  3. 运行以下命令确认录制捕获了数据:

    kapacitor list recordings $rid

    输出应显示如下:

    ID                                      Type    Status    Size      Date
    cd158f21-02e6-405c-8527-261ae6f26153    stream  finished  2.2 kB    04 May 16 11:44 MDT

    如果大小大于几字节,则已捕获数据。如果 Kapacitor 未接收数据,请检查每个层:Telegraf → InfluxDB → Kapacitor。如果 Telegraf 无法与 InfluxDB 通信,则 Telegraf 会记录错误。如果 InfluxDB 无法将数据发送到 Kapacitor,则 InfluxDB 会记录有关 connection refused 的错误。针对 InfluxDB 运行查询 SHOW SUBSCRIPTIONS 以查找 InfluxDB 用于向 Kapacitor 发送数据的端点。

    在以下示例中,InfluxDB 必须在 localhost:8086 上运行:

    $ curl -G 'https://:8086/query?db=telegraf' --data-urlencode 'q=SHOW SUBSCRIPTIONS'
    
    {"results":[{"statement_id":0,"series":[{"name":"_internal","columns":["retention_policy","name","mode","destinations"],"values":[["monitor","kapacitor-ef3b3f9d-0997-4c0b-b1b6-5d0fb37fe509","ANY",["https://:9092"]]]},{"name":"telegraf","columns":["retention_policy","name","mode","destinations"],"values":[["autogen","kapacitor-ef3b3f9d-0997-4c0b-b1b6-5d0fb37fe509","ANY",["https://:9092"]]]}]}]}
  4. 使用 replay 以相同的方式测试特定任务的录制数据:

    kapacitor replay -recording $rid -task cpu_alert

    使用 -real-clock 标志通过时间戳之间的增量来设置重放时间。每个节点上的时间由它接收的数据点度量。

  5. 查看日志以获取警报

    sudo cat /tmp/alerts.log

    每行 JSON 代表一个警报,包括警报级别和触发警报的数据。

    如果主机很忙,记录警报可能需要一段时间。

  6. (可选)修改任务以使其非常敏感,以确保警报正常工作。在 TICKscript 中,将 lambda 函数 .crit(lambda: "usage_idle" < 70) 更改为 .crit(lambda: "usage_idle" < 100),然后仅使用 TASK_NAME-tick 参数运行 define 命令:

    kapacitor define cpu_alert -tick cpu_alert.tick

    在录制期间收到的每个数据点都会触发一个警报。

  7. 重放修改后的任务以验证结果。

    kapacitor replay -recording $rid -task cpu_alert

    一旦 alerts.log 中的结果确认任务正在工作,请将 usage_idle 阈值改回更合理的值,然后使用 6 步所示的 define 命令再次重新定义任务。

注意 - 单引号与双引号

TICKscripts 中的单引号和双引号具有非常不同的作用

请注意以下示例:

var data = stream
    |from()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('cpu')
        // NOTE: Double quotes on server1
        .where(lambda: "host" == "server1")

此搜索的结果将始终为空,因为在“server1”周围使用了双引号。这意味着 Kapacitor 将搜索一个字段“host”等于字段“server1”中值的一个系列。这可能不是预期。更有可能的是,意图是搜索一个 tag “host”具有‘server1’的系列,因此应使用单引号。双引号表示数据字段,单引号表示字符串值。为了匹配,上面的 tick script 应如下所示:

var data = stream
    |from()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('cpu')
        // NOTE: Single quotes on server1
        .where(lambda: "host" == 'server1')

扩展 TICKscripts

下面的 TICKscript 将计算移动平均值并将其与当前值进行比较。然后,如果值偏离平均值 3 个标准差以上,它将触发警报。将 cpu_alert.tick 脚本替换为下面的 TICKscript:

stream
    |from()
        .measurement('cpu')
    |alert()
        // Compare values to running mean and standard deviation
        .crit(lambda: sigma("usage_idle") > 3)
        .log('/tmp/alerts.log')

就这样,可以创建一个动态阈值,如果 CPU 使用率在白天下降或在晚上飙升,就会发出警报。尝试一下。使用 define 更新任务 TICKscript。

kapacitor define cpu_alert -tick cpu_alert.tick

注意: 如果任务已启用,使用 define 命令重新定义任务会自动重新加载 (reload) 该任务。要定义一个不重新加载的任务,请使用 -no-reload

现在跟踪警报日志:

sudo tail -f /tmp/alerts.log

目前应该还没有警报触发。接下来,启动一个 while 循环来添加一些负载:

while true; do i=0; done

一旦创建了足够的人工负载,警报触发器应该很快写入日志。让循环运行几分钟。取消循环后,应该会发出另一个警报,表明 CPU 使用率再次发生了变化。使用此技术,可以为 CPU 使用率的上升和下降边缘以及任何异常值生成警报。

实际用例

现在已经涵盖了基础知识,这是一个更实际的例子。一旦来自多个主机的指标流式传输到 Kapacitor,就可以做到:聚合和分组每个数据中心中每个运行服务的 CPU 使用率,然后根据 95th 百分位触发警报。除了仅仅将警报写入日志之外,Kapacitor 还可以与第三方实用程序集成:目前支持 Slack、PagerDuty、HipChat、VictorOps 等。警报还可以通过电子邮件发送,发布到自定义端点,或者触发自定义脚本的执行。还可以定义自定义消息格式,以便警报具有正确的上下文和含义。TICKscript 应如下面的示例所示。

示例 - 针对多个服务 CPU 的流式处理 TICKscript,并对 95th 百分位进行警报

stream
    |from()
        .measurement('cpu')
    // create a new field called 'used' which inverts the idle cpu.
    |eval(lambda: 100.0 - "usage_idle")
        .as('used')
    |groupBy('service', 'datacenter')
    |window()
        .period(1m)
        .every(1m)
    // calculate the 95th percentile of the used cpu.
    |percentile('used', 95.0)
    |eval(lambda: sigma("percentile"))
        .as('sigma')
        .keep('percentile', 'sigma')
    |alert()
        .id('{{ .Name }}/{{ index .Tags "service" }}/{{ index .Tags "datacenter"}}')
        .message('{{ .ID }} is {{ .Level }} cpu-95th:{{ index .Fields "percentile" }}')
        // Compare values to running mean and standard deviation
        .warn(lambda: "sigma" > 2.5)
        .crit(lambda: "sigma" > 3.0)
        .log('/tmp/alerts.log')

        // Post data to custom endpoint
        .post('https://alerthandler.example.com')

        // Execute custom alert handler script
        .exec('/bin/custom_alert_handler.sh')

        // Send alerts to slack
        .slack()
        .channel('#alerts')

        // Sends alerts to PagerDuty
        .pagerDuty()

        // Send alerts to VictorOps
        .victorOps()
        .routingKey('team_rocket')

像定义警报这样简单的事情可以快速扩展到更大的范围。通过上面的脚本,如果任何数据中心中的任何服务偏离了由历史 95th 百分位 CPU 使用率定义的正常行为超过 3 个标准差,就会触发警报,并且会在 1 分钟内完成!

有关警报工作原理的更多信息,请参阅AlertNode 文档。

从批量数据触发警报

除了处理流式数据外,Kapacitor 还可以定期查询 InfluxDB 并以批量方式处理数据。

虽然基于 CPU 使用率触发警报更适合流式场景,但通过遵循相同的用例,这里演示了 batch 任务的工作原理。

批量数据上的示例警报

此 TICKscript 的作用与之前的流任务大致相同,但作为批量任务:

dbrp "telegraf"."autogen"

batch
    |query('''
        SELECT mean(usage_idle)
        FROM "telegraf"."autogen"."cpu"
    ''')
        .period(5m)
        .every(5m)
        .groupBy(time(1m), 'cpu')
    |alert()
        .crit(lambda: "mean" < 70)
        .log('/tmp/batch_alerts.log')
  1. 将上面的脚本复制到 batch_cpu_alert.tick 文件中。

  2. 定义任务

    kapacitor define batch_cpu_alert -tick batch_cpu_alert.tick
  3. 验证其创建

    $ kapacitor list tasks
    ID              Type      Status    Executing Databases and Retention Policies
    batch_cpu_alert batch     disabled  false     ["telegraf"."autogen"]
    cpu_alert       stream    enabled   true      ["telegraf"."autogen"]
  4. 记录任务查询结果(注意,实际 UUID 不同):

    kapacitor record batch -task batch_cpu_alert -past 20m
    # Save the id again
    rid=b82d4034-7d5c-4d59-a252-16604f902832

    这会记录 batch_cpu_alert 任务中查询的最后 20 分钟的批量数据。在这种情况下,由于 period 为 5 分钟,因此会记录并保存最后 4 个批次。

  5. 以相同方式重放批量记录:

    kapacitor replay -recording $rid -task batch_cpu_alert
  6. 检查警报日志以确保警报按预期生成。上面基于 sigma 的警报也可以适应批量数据。尝试更新、测试和运行 Kapacitor 中的任务,以便熟悉它们。

使用 Kapacitor 加载任务

要使用 Kapacitor 加载任务,请将 TICKscript 保存在 kapacitor.conf 中指定的load目录中。TICKscripts 必须包含数据库和保留策略声明 dbrp

load 目录中的 TICKscripts 在 Kapacitor 启动时会自动加载,无需使用 kapacitor define 命令添加。

有关更多信息,请参阅Load Directory


此页面是否有帮助?

感谢您的反馈!


InfluxDB 3.8 新特性

InfluxDB 3.8 和 InfluxDB 3 Explorer 1.6 的主要增强功能。

查看博客文章

InfluxDB 3.8 现已适用于 Core 和 Enterprise 版本,同时发布了 InfluxDB 3 Explorer UI 的 1.6 版本。本次发布着重于操作成熟度,以及如何更轻松地部署、管理和可靠地运行 InfluxDB。

更多信息,请查看

InfluxDB Docker 的 latest 标签将指向 InfluxDB 3 Core

在 **2026 年 2 月 3 日**,InfluxDB Docker 镜像的 latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。

如果使用 Docker 来安装和运行 InfluxDB,latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。例如,如果使用 Docker 运行 InfluxDB v2,请将 latest 版本标签替换为 Docker pull 命令中的特定版本标签 — 例如

docker pull influxdb:2