Kafka 事件处理器
Apache Kafka 是一个分布式流处理平台,用于构建实时数据管道和流处理应用程序。配置 Kapacitor 将警报消息发送到 Kafka 集群。
配置
Kafka 事件处理器的配置以及默认的 选项 值在 kapacitor.conf 中设置。下面是一个配置示例
[[kafka]]
enabled = true
id = "localhost"
brokers = []
timeout = "10s"
batch-size = 100
batch-timeout = "1s"
use-ssl = false
ssl-ca = ""
ssl-cert = ""
ssl-key = ""
insecure-skip-verify = false
# Optional SASL configuration
sasl-username = "xxxxx"
sasl-password = "xxxxxxxx"
sasl-extensions = {}
sasl-mechanism = ""
sasl-version = ""
# Use if sasl-mechanism is GSSAPI. GSSAPI is for organizations using Kerberos.
sasl-gssapi-service-name = ""
sasl-gssapi-auth-type = "KRB5_USER_AUTH"
sasl-gssapi-disable-pafxfast = false
sasl-gssapi-kerberos-config-path = "/"
sasl-gssapi-key-tab-path = ""
sasl-gssapi-realm = "realm"
# Options if sasl-mechanism is OAUTHBEARER
sasl-oauth-service = "auth0"
sasl-oauth-client-id = "xxxxxxx"
sasl-oauth-client-secret = "xxxxxxxx"
sasl-oauth-token-url = "dedicated-auth0-token-url"
sasl-oauth-token-expiry-margin = "10s"
sasl-oauth-scopes = ""
sasl-oauth-tenant-id = ""
[kafka.sasl-oauth-parameters]
audience = "development"
sasl-access-token = ""可以通过 TOML 中的多个 [[kafka]] 部分来配置多个 Kafka 客户端。id 作为每个配置的 Kafka 客户端的唯一标识符。
enabled
设置为 true 以启用 Kafka 事件处理器。
id
Kafka 集群的唯一标识符。
brokers
使用 host:port 格式的 Kafka 代理地址列表。
timeout
与 Kafka 代理的网络操作超时时间。如果设置为 0,则使用默认值 10 秒。
batch-size
在发送到 Kafka 之前批量处理的消息数量。如果设置为 0,则使用默认值 100。
batch-timeout
在刷新不完整批次之前等待的最大时间。如果设置为 0,则使用默认值 1 秒。
use-ssl
启用 SSL 通信。必须设置为 true 才能使其他 SSL 选项生效。
ssl-ca
证书颁发机构文件的路径。
ssl-cert
主机证书文件的路径。
ssl-key
证书私钥文件的路径。
insecure-skip-verify
使用 SSL 但跳过链和主机验证。(如果使用自签名证书,则必需。)
(可选) SASL 配置
sasl-username
用于 SASL 身份验证的用户名。
sasl-password
用于 SASL 身份验证的密码。
sasl-extensions
任意键值字符串对,作为 TOML 表传递
sasl-mechanism
SASL 机制类型。选项有
GSSAPIOAUTHBEARERPLAINSCRAM-SHA-256SCRAM-SHA-512
sasl-version
SASL 协议版本。
sasl-gssapi-service-name
GSSAPI 的服务名称。
sasl-gssapi-auth-type
GSSAPI 的授权类型。
sasl-gssapi-disable-pafxfast
设置为 true 或 false。
sasl-gssapi-kerberos-config-path
Kerberos 配置文件路径。
sasl-gssapi-key-tab-path
Kerberos 密钥表路径。
sasl-gssapi-realm
默认 Kerberos 领域。
如果 sasl-mechanism 是 OAUTHBEARER,则为选项
sasl-oauth-service
使用 SASL/OAUTH 进行身份验证时使用的服务名称。其中一个选项是
""(空) 或customauth0azuread
sasl-oauth-client-id
使用 SASL/OAUTH 进行身份验证时使用的客户端 ID。
sasl-oauth-client-secret
使用 SASL/OAUTH 进行身份验证时使用的客户端密钥。
sasl-oauth-token-url
当 sasl-oauth-service 为 custom 或 auth0 时使用的令牌 URL。否则留空。
sasl-oauth-token-expiry-margin
令牌过期时间的边际。
sasl-oauth-scopes
使用 SASL/OAUTH 进行身份验证时使用的可选范围。
sasl-oauth-tenant-id
AzureAD 服务的租户 ID。
[kafka.sasl-oauth-parameters]
SASL/OAUTH 的可选键/值参数。例如 AUTH0 的 audience
sasl-access-token
静态 OAUTH 令牌。使用此项代替其他 OAUTH 参数。
选项
Kafka 事件处理器的以下选项可以在 处理程序文件 中设置,或在使用 TICKscript 中的 .kafka() 时设置。
| 名称 | 类型 | 描述 |
|---|---|---|
| 集群 | string | Kafka 集群的名称。 |
| topic | string | Kafka 主题。在 TICKscript 中,这是使用 .kafkaTopic() 设置的。 |
| template | string | 消息模板。 |
| disablePartitionById | boolean | 禁用按消息 ID 分区 Kafka 消息。 |
| partitionAlgorithm | string | 用于将消息 ID 分配给 Kafka 分区的算法 (crc32 (默认), murmur2, 或 fnv-1a)。 |
Kafka 消息分区
在 **Kapacitor 1.6+** 中,具有相同 ID 的消息将被发送到同一个 Kafka 分区。在此之前,消息会被发送到数据量最少的 Kafka 分区,而不管消息 ID。没有 ID 的消息会随机分布在各个分区之间。这使得 Kapacitor 的消息 ID 概念与 Kafka 的消息键概念保持一致。
要恢复到之前的行为,请使用 disablePartitionById 选项。
当按 ID 分区时,使用 partitionHashAlgorithm 指定用于将消息 ID 分配给 Kafka 分区的方法。Kapacitor 支持以下分区算法
- crc32: (默认) 与
librdkafka和confluent-kafka-go一致 - murmur2: 与标准的 Java 分区逻辑一致
- fnv-1a: 与 Shopify 的
sarama项目一致
示例:处理程序文件
id: kafka-event-handler
topic: kapacitor-topic-name
kind: kafka
options:
cluster: kafka-cluster
topic: kafka-topic-name
template: kafka-template-name
disablePartitionById: false
partitionAlgorithm: crc32示例:TICKscript
|alert()
// ...
.kafka()
.cluster('kafka-cluster')
.kafkaTopic('kafka-topic-name')
.template('kafka-template-name')
.disablePartitionById(FALSE)
.partitionAlgorithm('crc32')使用 Kafka 事件处理器
在 kapacitor.conf 中启用 Kafka 事件处理器后,在 TICKscript 中使用 .kafka() 属性将警报发送到 Kafka 集群,或者定义一个订阅主题并将已发布的警报发送到 Kafka 的 Kafka 处理程序。
下面的示例使用 kapacitor.conf 中定义的以下 Kafka 配置
kapacitor.conf 中的 Kafka 设置
[[kafka]]
enabled = true
id = "infra-monitoring"
brokers = ["123.45.67.89:9092", "123.45.67.90:9092"]
timeout = "10s"
batch-size = 100
batch-timeout = "1s"
use-ssl = true
ssl-ca = "/etc/ssl/certs/ca.crt"
ssl-cert = "/etc/ssl/certs/cert.crt"
ssl-key = "/etc/ssl/certs/cert-key.key"
insecure-skip-verify = true从 TICKscript 发送警报到 Kafka 集群
下面的 TICKscript 使用 .kafka() 事件处理器,在空闲 CPU 使用率低于 10% 时发送消息:“嘿,检查你的 CPU”。它将消息发布到 kapacitor.conf 中定义的 infra-monitoring Kafka 集群的 cpu-alerts 主题。
kafka-cpu-alert.tick
stream
|from()
.measurement('cpu')
|alert()
.crit(lambda: "usage_idle" < 10)
.message('Hey, check your CPU')
.kafka()
.kafkaTopic('cpu-alerts')从已定义的处理程序发送警报到 Kafka 集群
以下设置将警报发送到 cpu 主题,消息为:“嘿,检查你的 CPU”。添加了一个 Kafka 处理程序,该处理程序订阅 cpu 主题,并将所有警报消息发布到与 kapacitor.conf 中定义的 infra-monitoring Kafka 集群关联的 cpu-alerts 主题。
创建一个将警报消息发布到主题的 TICKscript。下面的 TICKscript 在 CPU 空闲使用率低于 10% 时(或 CPU 使用率高于 90%)向 cpu 主题发送警报消息。
cpu_alert.tick
stream
|from()
.measurement('cpu')
|alert()
.crit(lambda: "usage_idle" < 10)
.message('Hey, check your CPU')
.topic('cpu')添加并启用 TICKscript
kapacitor define cpu_alert -tick cpu_alert.tick
kapacitor enable cpu_alert创建一个订阅 cpu 主题的处理程序文件,并使用 Kafka 事件处理器将警报发送到 Kafka 中的 cpu-alerts 主题。
kafka_cpu_handler.yaml
id: kafka-cpu-alert
topic: cpu
kind: kafka
options:
topic: 'cpu-alerts'添加处理器
kapacitor define-topic-handler kafka_cpu_handler.yaml将 SASL 与 Kapacitor 结合使用
要使用除 SSL 之外的身份验证方法,请配置 Kapacitor 使用 SASL。一个示例是使用 Kapacitor 直接使用用户名/密码向 Kafka 进行身份验证。有多种配置选项可用,但最常见的用法是使用用户名和密码,如下例所示
[[kafka]]
enabled = true
id = "infra-monitoring"
brokers = ["123.45.67.89:9092", "123.45.67.90:9092"]
timeout = "10s"
batch-size = 100
batch-timeout = "1s"
use-ssl = true
ssl-ca = "/etc/ssl/certs/ca.crt"
ssl-cert = "/etc/ssl/certs/cert.crt"
ssl-key = "/etc/ssl/certs/cert-key.key"
insecure-skip-verify = true
sasl-username = "kafka"
sasl-password = "kafkapassword"此页面是否有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一员!我们欢迎并鼓励您对 Kapacitor 和本文档提供反馈和错误报告。要获取支持,请使用以下资源: