Kafka 事件处理器
Apache Kafka 是一个分布式流式平台,专为构建实时数据管道和流式应用程序而设计。配置 Kapacitor 以将警报消息发送到 Kafka 集群。
配置
Kafka 事件处理器的配置以及默认选项值在您的 kapacitor.conf
中设置。以下是一个配置示例
[[kafka]]
enabled = true
id = "localhost"
brokers = []
timeout = "10s"
batch-size = 100
batch-timeout = "1s"
use-ssl = false
ssl-ca = ""
ssl-cert = ""
ssl-key = ""
insecure-skip-verify = false
# Optional SASL configuration
sasl-username = "xxxxx"
sasl-password = "xxxxxxxx"
sasl-extensions = {}
sasl-mechanism = ""
sasl-version = ""
# Use if sasl-mechanism is GSSAPI. GSSAPI is for organizations using Kerberos.
sasl-gssapi-service-name = ""
sasl-gssapi-auth-type = "KRB5_USER_AUTH"
sasl-gssapi-disable-pafxfast = false
sasl-gssapi-kerberos-config-path = "/"
sasl-gssapi-key-tab-path = ""
sasl-gssapi-realm = "realm"
# Options if sasl-mechanism is OAUTHBEARER
sasl-oauth-service = "auth0"
sasl-oauth-client-id = "xxxxxxx"
sasl-oauth-client-secret = "xxxxxxxx"
sasl-oauth-token-url = "dedicated-auth0-token-url"
sasl-oauth-token-expiry-margin = "10s"
sasl-oauth-scopes = ""
sasl-oauth-tenant-id = ""
[kafka.sasl-oauth-parameters]
audience = "development"
sasl-access-token = ""
可以使用 TOML 中的多个 [[kafka]]
部分配置多个 Kafka 客户端。id
充当每个配置的 Kafka 客户端的唯一标识符。
enabled
设置为 true
以启用 Kafka 事件处理器。
id
Kafka 集群的唯一标识符。
brokers
使用 host:port
格式的 Kafka broker 地址列表。
timeout
与 Kafka broker 的网络操作超时时间。如果设置为 0,则使用默认值 10 秒。
batch-size
在发送到 Kafka 之前批处理的消息数。如果设置为 0,则使用默认值 100。
batch-timeout
在刷新不完整的批处理之前等待的最长时间。如果设置为 0,则使用默认值 1 秒。
use-ssl
启用 SSL 通信。对于其他 SSL 选项生效,必须为 true
。
ssl-ca
证书颁发机构文件的路径。
ssl-cert
主机证书文件的路径。
ssl-key
证书私钥文件的路径。
insecure-skip-verify
使用 SSL 但跳过链和主机验证。(如果使用自签名证书,则为必需。)
(可选)SASL 配置
sasl-username
用于 SASL 身份验证的用户名。
sasl-password
用于 SASL 身份验证的密码。
sasl-extensions
作为 TOML 表传递的任意键值字符串对
sasl-mechanism
SASL 机制类型。选项包括
GSSAPI
OAUTHBEARER
PLAIN
SCRAM-SHA-256
SCRAM-SHA-512
sasl-version
SASL 协议版本。
sasl-gssapi-service-name
GSSAPI 的服务名称。
sasl-gssapi-auth-type
GSSAPI 的授权类型。
sasl-gssapi-disable-pafxfast
设置为 true
或 false
。
sasl-gssapi-kerberos-config-path
Kerberos 配置文件的路径。
sasl-gssapi-key-tab-path
Kerberos 密钥表文件的路径。
sasl-gssapi-realm
默认 Kerberos realm。
如果 sasl-mechanism 是 OAUTHBEARER 的选项
sasl-oauth-service
使用 SASL/OAUTH 进行身份验证时要使用的服务名称。选项之一
""
(空) 或custom
auth0
azuread
sasl-oauth-client-id
使用 SASL/OAUTH 进行身份验证时要使用的客户端 ID。
sasl-oauth-client-secret
使用 SASL/OAUTH 进行身份验证时要使用的客户端密钥。
sasl-oauth-token-url
当 sasl-oauth-service 为 custom
或 auth0
时要使用的令牌 URL。否则留空。
sasl-oauth-token-expiry-margin
令牌过期时间的边距。
sasl-oauth-scopes
使用 SASL/OAUTH 进行身份验证时要使用的可选范围。
sasl-oauth-tenant-id
AzureAD 服务的租户 ID。
[kafka.sasl-oauth-parameters]
SASL/OAUTH 的可选键/值参数。例如,AUTH0 的 audience
sasl-access-token
静态 OAUTH 令牌。使用此选项代替其他 OAUTH 参数。
选项
以下 Kafka 事件处理器选项可以在处理器文件中设置,或者在使用 TICKscript 中的 .kafka()
时设置。
名称 | 类型 | 描述 |
---|---|---|
cluster | string | Kafka 集群的名称。 |
topic | string | Kafka 主题。在 TICKscript 中,这使用 .kafkaTopic() 设置。 |
template | string | 消息模板。 |
disablePartitionById | boolean | 禁用按消息 ID 对 Kafka 消息进行分区。 |
partitionAlgorithm | string | 用于将消息 ID 分配给 Kafka 分区的算法 (crc32 (默认), murmur2 , 或 fnv-1a )。 |
Kafka 消息分区
在 Kapacitor 1.6+ 中,具有相同 ID 的消息被发送到相同的 Kafka 分区。以前,消息被发送到数据量最少的 Kafka 分区,而与消息 ID 无关。没有 ID 的消息在分区之间随机分布。这使消息 ID 的 Kapacitor 概念与消息键的 Kafka 概念对齐。
要恢复到以前的行为,请使用 disablePartitionById 选项。
当按 ID 分区时,使用 partitionHashAlgorithm 指定用于将消息 ID 分配给 Kafka 分区的方法。Kapacitor 支持以下分区算法
- crc32: (默认) 与
librdkafka
和confluent-kafka-go
对齐 - murmur2: 与规范的 Java 分区逻辑对齐
- fnv-1a: 与 Shopify 的
sarama
项目对齐
示例:处理器文件
id: kafka-event-handler
topic: kapacitor-topic-name
kind: kafka
options:
cluster: kafka-cluster
topic: kafka-topic-name
template: kafka-template-name
disablePartitionById: false
partitionAlgorithm: crc32
示例:TICKscript
|alert()
// ...
.kafka()
.cluster('kafka-cluster')
.kafkaTopic('kafka-topic-name')
.template('kafka-template-name')
.disablePartitionById(FALSE)
.partitionAlgorithm('crc32')
使用 Kafka 事件处理器
在您的 kapacitor.conf
中启用 Kafka 事件处理器后,在您的 TICKscript 中使用 .kafka()
属性将警报发送到 Kafka 集群,或定义一个 Kafka 处理器,该处理器订阅一个主题并将发布的警报发送到 Kafka。
以下示例使用在 kapacitor.conf
中定义的以下 Kafka 配置
kapacitor.conf 中的 Kafka 设置
[[kafka]]
enabled = true
id = "infra-monitoring"
brokers = ["123.45.67.89:9092", "123.45.67.90:9092"]
timeout = "10s"
batch-size = 100
batch-timeout = "1s"
use-ssl = true
ssl-ca = "/etc/ssl/certs/ca.crt"
ssl-cert = "/etc/ssl/certs/cert.crt"
ssl-key = "/etc/ssl/certs/cert-key.key"
insecure-skip-verify = true
从 TICKscript 向 Kafka 集群发送警报
以下 TICKscript 使用 .kafka()
事件处理器在空闲 CPU 使用率降至 10% 以下时发送消息“Hey, check your CPU”。它将消息发布到 kapacitor.conf
中定义的 infra-monitoring
Kafka 集群中的 cpu-alerts
主题。
kafka-cpu-alert.tick
stream
|from()
.measurement('cpu')
|alert()
.crit(lambda: "usage_idle" < 10)
.message('Hey, check your CPU')
.kafka()
.kafkaTopic('cpu-alerts')
从定义的处理器向 Kafka 集群发送警报
以下设置向 cpu
主题发送消息“Hey, check your CPU”的警报。添加了一个 Kafka 处理器,该处理器订阅 cpu
主题,并将所有警报消息发布到与 kapacitor.conf
中定义的 infra-monitoring
Kafka 集群关联的 cpu-alerts
主题。
创建一个 TICKscript,将警报消息发布到主题。以下 TICKscript 在 CPU 空闲使用率降至 10% 以下(或 CPU 使用率高于 90%)时,向 cpu
主题发送警报消息。
cpu_alert.tick
stream
|from()
.measurement('cpu')
|alert()
.crit(lambda: "usage_idle" < 10)
.message('Hey, check your CPU')
.topic('cpu')
添加并启用 TICKscript
kapacitor define cpu_alert -tick cpu_alert.tick
kapacitor enable cpu_alert
创建一个处理器文件,该文件订阅 cpu
主题,并使用 Kafka 事件处理器将警报发送到 Kafka 中的 cpu-alerts
主题。
kafka_cpu_handler.yaml
id: kafka-cpu-alert
topic: cpu
kind: kafka
options:
topic: 'cpu-alerts'
添加处理器
kapacitor define-topic-handler kafka_cpu_handler.yaml
将 SASL 与 Kapacitor 结合使用
要使用 SSL 以外的身份验证方法,请配置 Kapacitor 以使用 SASL。一个示例是使用 Kapacitor 直接针对 Kafka 使用用户名/密码进行身份验证。有多种配置选项可用,但最常见的用法是用户名和密码,如以下示例所示
[[kafka]]
enabled = true
id = "infra-monitoring"
brokers = ["123.45.67.89:9092", "123.45.67.90:9092"]
timeout = "10s"
batch-size = 100
batch-timeout = "1s"
use-ssl = true
ssl-ca = "/etc/ssl/certs/ca.crt"
ssl-cert = "/etc/ssl/certs/cert.crt"
ssl-key = "/etc/ssl/certs/cert-key.key"
insecure-skip-verify = true
sasl-username = "kafka"
sasl-password = "kafkapassword"
此页是否对您有帮助?
感谢您的反馈!