Kapacitor 入门
使用 Kapacitor 导入(流式或批量)时间序列数据,然后对数据进行转换、分析和操作。要开始使用 Kapacitor,请使用 Telegraf 收集本地计算机上的系统指标,并将它们存储在 InfluxDB 中。然后,使用 Kapacitor 处理您的系统数据。
- 概述
- 启动 InfluxDB 并收集 Telegraf 数据
- 启动 Kapacitor
- Kapacitor 任务
概述
Kapacitor 任务定义了使用 TICKscript 语法对一组数据执行的工作。Kapacitor 任务包括
stream
任务。流任务在 Kapacitor 中复制写入 InfluxDB 的数据。减少查询开销,并要求 Kapacitor 将数据存储在磁盘上。batch
任务。批量任务查询和处理指定间隔的数据。
要开始使用,请执行以下操作
- 如果您尚未执行,请下载并安装 InfluxData 1.x TICK 堆栈 (OSS)。
- 启动 InfluxDB 并启动 Telegraf。默认情况下,Telegraf 开始将系统指标发送到 InfluxDB,并创建一个“telegraf”数据库。
- 启动 Kapacitor。
注意: 以下过程中的示例命令是为 Linux 编写的。
启动 InfluxDB 并收集 Telegraf 数据
通过运行以下命令启动 InfluxDB
$ sudo systemctl start influxdb
在 Telegraf 配置文件 (
/etc/telegraf/telegraf.conf
) 中,配置[[outputs.influxd]]
以指定如何连接到 InfluxDB 以及目标数据库。[[outputs.influxdb]] ## InfluxDB url is required and must be in the following form: http/udp "://" host [ ":" port] ## Multiple urls can be specified as part of the same cluster; only ONE url is written to each interval. ## InfluxDB url urls = ["http://localhost:8086"] ## The target database for metrics is required (Telegraf creates if one doesn't exist). database = "telegraf"
运行以下命令以启动 Telegraf
$ sudo systemctl start telegraf
InfluxDB 和 Telegraf 现在正在 localhost 上运行。
一分钟后,运行以下命令以使用 InfluxDB API 查询 Telegraf 数据
$ curl -G 'http://localhost:8086/query?db=telegraf' --data-urlencode 'q=SELECT mean(usage_idle) FROM cpu'
将显示类似于以下内容的结果
{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",91.82304336748372]]}]}]}
启动 Kapacitor
运行以下命令以生成 Kapacitor 配置文件
kapacitord config > kapacitor.conf
默认情况下,Kapacitor 配置文件保存在
/etc/kapacitor/kapacitor.conf
中。如果您将文件保存到另一个位置,请在启动 Kapacitor 时指定该位置。Kapacitor 配置是一个 TOML 文件。为 InfluxDB 配置的输入也适用于 Kapacitor。
启动 Kapacitor 服务
$ sudo systemctl start kapacitor
由于 InfluxDB 在
http://localhost:8086
上运行,Kapacitor 在启动时会找到它,并在 InfluxDB 上创建多个订阅。订阅告诉 InfluxDB 将数据发送到 Kapacitor。(可选)要查看日志数据,请运行以下命令
$ sudo tail -f -n 128 /var/log/kapacitor/kapacitor.log
Kapacitor 监听 HTTP 端口并将数据发布到 InfluxDB。现在,InfluxDB 将来自 Telegraf 的数据流式传输到 Kapacitor。
执行任务
在 TICKscript 的开头,指定包含数据的数据库和保留策略
dbrp "telegraf"."autogen"
// ...
当 Kapacitor 从数据库和保留策略接收到与指定的数据库和保留策略匹配的数据时,Kapacitor 会执行 TICKscript。
Kapacitor 支持基于数据库和保留策略(没有其他条件)执行任务。
从流数据触发警报
触发警报是 Kapacitor 的常见用例。必须定义要发出警报的数据库和保留策略。
CPU 使用率警报示例
将以下 TICKscript 复制到名为
cpu_alert.tick
的文件中dbrp "telegraf"."autogen" stream // Select the CPU measurement from the `telegraf` database. |from() .measurement('cpu') // Triggers a critical alert when the CPU idle usage drops below 70% |alert() .crit(lambda: int("usage_idle") < 70) // Write each alert to a file. .log('/tmp/alerts.log')
在命令行中,使用
kapacitor
CLI 使用cpu_alert.tick
TICKscript 定义任务kapacitor define cpu_alert -tick cpu_alert.tick
如果数据库和保留策略未包含在 TICKscript 中(例如,
dbrp "telegraf"."autogen"
),请在使用kapacitor define
命令时使用-dbrp
标志,后跟"<DBNAME>"."<RETENTION_POLICY>"
以在添加任务时指定它们。(可选)使用
list
命令验证是否已创建警报$ kapacitor list tasks ID Type Status Executing Databases and Retention Policies cpu_alert stream disabled false ["telegraf"."autogen"]
(可选)使用
show
命令查看有关任务的详细信息$ kapacitor show cpu_alert ID: cpu_alert Error: Template: Type: stream Status: disabled Executing: false ...
为了确保日志文件和通信通道不会被警报淹没,测试任务。
启用任务以开始处理实时数据流
kapacitor enable cpu_alert
警报将实时写入日志。
运行
show
命令以验证任务是否正在接收数据并按预期运行$ kapacitor show cpu_alert |from() // Information about the state of the task and any error it may have encountered. ID: cpu_alert Error: Type: stream Status: Enabled Executing: true Created: 04 May 16 21:01 MDT Modified: 04 May 16 21:04 MDT LastEnabled: 04 May 16 21:03 MDT Databases Retention Policies: [""."autogen"] // Displays the version of the TICKscript that Kapacitor has stored in its local database. TICKscript: stream // Select just the cpu me .measurement('cpu') |alert() .crit(lambda: "usage_idle" < 70) // Whenever we get an alert write it to a file. .log('/tmp/alerts.log') DOT: digraph asdf { graph [throughput="0.00 points/s"]; stream0 [avg_exec_time_ns="0" ]; stream0 -> from1 [processed="12"]; from1 [avg_exec_time_ns="0" ]; from1 -> alert2 [processed="12"]; alert2 [alerts_triggered="0" avg_exec_time_ns="0" ]; }
返回一个 graphviz dot 格式的树,该树显示了由 TICKscript 定义的数据处理管道以及包含有关每个节点的统计信息的键值关联数组条目,以及沿着边缘到下一个节点的链接,其中也包括关联数组统计信息。链接/边缘成员中的 processed 键指示已沿图形的指定边缘传递的数据点数。
在上面的示例中,stream0
节点(也称为 TICKscript 中的 stream
变量)已将 12 个点发送到 from1
节点。from1
节点也已将 12 个点发送到 alert2
节点。由于 Telegraf 配置为发送 cpu
数据,因此所有 12 个点都符合 from1
节点的数据库/指标条件并被传递。
如有必要,请使用 OS 提供商提供的软件包在 Debian 或 RedHat 上安装 graphviz。graphviz 站点上提供的软件包不是最新的。
既然任务正在使用实时数据运行,这里有一个快速技巧,可以使用一个核心的 100% 来生成一些人为的 CPU 活动
while true; do i=0; done
测试任务
完成以下步骤以确保日志文件和通信通道不会被警报淹没。
记录数据流
kapacitor record stream -task cpu_alert -duration 60s
如果出现连接错误,例如:
getsockopt: connection refused
(Linux) 或connectex: No connection could be made...
(Windows),请验证 Kapacitor 服务是否正在运行(请参阅安装和启动 Kapacitor)。如果 Kapacitor 正在运行,请检查主机机器的防火墙设置,并确保端口9092
可访问。此外,请检查/var/log/kapacitor/kapacitor.log
中的消息。如果/etc/kapacitor/kapacitor.conf
中的http
或其他配置存在问题,则该问题将显示在日志中。如果 Kapacitor 服务在另一台主机机器上运行,请在本地 shell 中设置KAPACITOR_URL
环境变量,指向远程计算机上的 Kapacitor 端点。检索返回的 ID 并将 ID 分配给 bash 变量以供以后使用(实际返回的 UUID 不同)
rid=cd158f21-02e6-405c-8527-261ae6f26153
通过运行以下命令确认记录捕获了一些数据
kapacitor list recordings $rid
输出应如下所示
ID Type Status Size Date cd158f21-02e6-405c-8527-261ae6f26153 stream finished 2.2 kB 04 May 16 11:44 MDT
如果大小超过几个字节,则已捕获数据。如果 Kapacitor 未接收到数据,请检查每个层:Telegraf → InfluxDB → Kapacitor。如果 Telegraf 无法与 InfluxDB 通信,则会记录错误。如果 InfluxDB 无法将数据发送到 Kapacitor,则会记录有关
connection refused
的错误。针对 InfluxDB 运行查询SHOW SUBSCRIPTIONS
以查找 InfluxDB 用于将数据发送到 Kapacitor 的端点。在以下示例中,InfluxDB 必须在
localhost:8086
上运行$ curl -G 'http://localhost:8086/query?db=telegraf' --data-urlencode 'q=SHOW SUBSCRIPTIONS' {"results":[{"statement_id":0,"series":[{"name":"_internal","columns":["retention_policy","name","mode","destinations"],"values":[["monitor","kapacitor-ef3b3f9d-0997-4c0b-b1b6-5d0fb37fe509","ANY",["http://localhost:9092"]]]},{"name":"telegraf","columns":["retention_policy","name","mode","destinations"],"values":[["autogen","kapacitor-ef3b3f9d-0997-4c0b-b1b6-5d0fb37fe509","ANY",["http://localhost:9092"]]]}]}]}
使用
replay
测试特定任务的记录数据kapacitor replay -recording $rid -task cpu_alert
使用标志
-real-clock
通过时间戳之间的增量设置重放时间。时间由每个节点接收的数据点进行测量。查看日志以查找警报
sudo cat /tmp/alerts.log
每个 JSON 行代表一个警报,并包括警报级别和触发警报的数据。
如果主机机器繁忙,则可能需要一段时间才能记录警报。
(可选)修改任务以使其对警报真正敏感,以确保警报工作正常。在 TICKscript 中,将 lambda 函数
.crit(lambda: "usage_idle" < 70)
更改为.crit(lambda: "usage_idle" < 100)
,并仅使用TASK_NAME
和-tick
参数运行define
命令kapacitor define cpu_alert -tick cpu_alert.tick
在记录期间接收的每个数据点都会触发警报。
重放修改后的任务以验证结果。
kapacitor replay -recording $rid -task cpu_alert
一旦
alerts.log
结果验证任务正在工作,请将usage_idle
阈值改回更合理的水平,并再次使用步骤 6 中所示的define
命令重新定义任务。
注意 - 单引号与双引号
TICKscript 中的单引号和双引号的作用非常不同
请注意以下示例
var data = stream
|from()
.database('telegraf')
.retentionPolicy('autogen')
.measurement('cpu')
// NOTE: Double quotes on server1
.where(lambda: "host" == "server1")
此搜索的结果将始终为空,因为在“server1”周围使用了双引号。这意味着 Kapacitor 将搜索字段“host”等于字段“server1”中保存的值的序列。这可能不是预期的结果。更可能的意图是搜索标签“host”具有值“server1”的序列,因此应使用单引号。双引号表示数据字段,单引号表示字符串值。要匹配值,上面的 tick 脚本应如下所示
var data = stream
|from()
.database('telegraf')
.retentionPolicy('autogen')
.measurement('cpu')
// NOTE: Single quotes on server1
.where(lambda: "host" == 'server1')
扩展 TICKscript
下面的 TICKscript 将计算运行平均值,并将当前值与其进行比较。如果值与平均值相差超过 3 个标准差,则会触发警报。将 cpu_alert.tick
脚本替换为下面的 TICKscript
stream
|from()
.measurement('cpu')
|alert()
// Compare values to running mean and standard deviation
.crit(lambda: sigma("usage_idle") > 3)
.log('/tmp/alerts.log')
就像这样,就可以创建一个动态阈值,并且,如果 CPU 使用率在白天下降或在夜间飙升,则会发出警报。试一试。使用 define
更新任务 TICKscript。
kapacitor define cpu_alert -tick cpu_alert.tick
注意: 如果任务已启用,则使用 define
命令重新定义任务会自动重新加载 (reload
) 任务。要在不重新加载的情况下定义任务,请使用 -no-reload
现在 tail 警报日志
sudo tail -f /tmp/alerts.log
此时不应有任何警报触发。接下来,启动一个 while 循环以增加一些负载
while true; do i=0; done
一旦创建了足够的人工负载,应该很快就会将警报触发器写入日志。让循环运行几分钟。取消循环后,应再次发出警报,指示 CPU 使用率再次发生变化。使用此技术,可以为 CPU 使用率的上升沿和下降沿以及任何异常值生成警报。
真实世界的示例
既然已经介绍了基础知识,这里有一个更真实的示例。一旦来自多个主机的指标流式传输到 Kapacitor,就可以执行以下操作:聚合和分组每个数据中心中运行的每个服务的 CPU 使用率,然后根据第 95 个百分位数触发警报。除了将警报写入日志之外,Kapacitor 还可以与第三方实用程序集成:目前支持 Slack、PagerDuty、HipChat、VictorOps 等。警报也可以通过电子邮件发送,发布到自定义端点,或者可以触发自定义脚本的执行。还可以定义自定义消息格式,以便警报具有正确的上下文和含义。此 TICKscript 的外观如下例所示。
示例 - 用于多个服务 CPU 流式传输和第 95 个百分位数警报的 TICKscript
stream
|from()
.measurement('cpu')
// create a new field called 'used' which inverts the idle cpu.
|eval(lambda: 100.0 - "usage_idle")
.as('used')
|groupBy('service', 'datacenter')
|window()
.period(1m)
.every(1m)
// calculate the 95th percentile of the used cpu.
|percentile('used', 95.0)
|eval(lambda: sigma("percentile"))
.as('sigma')
.keep('percentile', 'sigma')
|alert()
.id('{{ .Name }}/{{ index .Tags "service" }}/{{ index .Tags "datacenter"}}')
.message('{{ .ID }} is {{ .Level }} cpu-95th:{{ index .Fields "percentile" }}')
// Compare values to running mean and standard deviation
.warn(lambda: "sigma" > 2.5)
.crit(lambda: "sigma" > 3.0)
.log('/tmp/alerts.log')
// Post data to custom endpoint
.post('https://alerthandler.example.com')
// Execute custom alert handler script
.exec('/bin/custom_alert_handler.sh')
// Send alerts to slack
.slack()
.channel('#alerts')
// Sends alerts to PagerDuty
.pagerDuty()
// Send alerts to VictorOps
.victorOps()
.routingKey('team_rocket')
定义警报这么简单的事情可以快速扩展到更大的范围。使用上面的脚本,如果任何数据中心中的任何服务偏离正常行为(由 CPU 使用率的历史第 95 个百分位数定义)超过 3 个标准差,则会触发警报,并且将在 1 分钟内完成!
有关警报如何工作的更多信息,请参阅AlertNode 文档。
从批量数据触发警报
除了处理流中的数据外,Kapacitor 还可以定期查询 InfluxDB 并批量处理数据。
虽然基于 CPU 使用率触发警报更适合流式传输的情况,但此处通过遵循相同的用例演示了 batch
任务的基本工作原理。
批量数据警报示例
此 TICKscript 大致执行与早期流任务相同的操作,但作为批量任务
dbrp "telegraf"."autogen"
batch
|query('''
SELECT mean(usage_idle)
FROM "telegraf"."autogen"."cpu"
''')
.period(5m)
.every(5m)
.groupBy(time(1m), 'cpu')
|alert()
.crit(lambda: "mean" < 70)
.log('/tmp/batch_alerts.log')
将上面的脚本复制到文件
batch_cpu_alert.tick
中。定义任务
kapacitor define batch_cpu_alert -tick batch_cpu_alert.tick
验证其创建
$ kapacitor list tasks ID Type Status Executing Databases and Retention Policies batch_cpu_alert batch disabled false ["telegraf"."autogen"] cpu_alert stream enabled true ["telegraf"."autogen"]
记录任务中查询的结果(注意,实际 UUID 不同)
kapacitor record batch -task batch_cpu_alert -past 20m # Save the id again rid=b82d4034-7d5c-4d59-a252-16604f902832
这记录了使用
batch_cpu_alert
任务中的查询的最后 20 分钟的批次。在本例中,由于period
为 5 分钟,因此记录并保存了最后 4 个批次。以相同的方式重放批量记录
kapacitor replay -recording $rid -task batch_cpu_alert
检查警报日志以确保按预期生成警报。上面的基于
sigma
的警报也可以适用于处理批量数据。尝试一下,并熟悉在 Kapacitor 中更新、测试和运行任务。
使用 Kapacitor 加载任务
要使用 Kapacitor 加载任务,请将 TICKscript 保存在 kapacitor.conf
中指定的load 目录中。TICKscript 必须包含数据库和保留策略声明 dbrp
。
加载目录中的 TICKscript 在 Kapacitor 启动时自动加载,不需要使用 kapacitor define 命令添加。
有关更多信息,请参阅加载目录。
此页是否对您有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一份子!我们欢迎并鼓励您提供关于 Kapacitor 和此文档的反馈和错误报告。要查找支持,请使用以下资源