文档文档

Kafka 事件处理器

Apache Kafka 是一个分布式流处理平台,专为构建实时数据管道和流处理应用程序而设计。配置 Kapacitor 以将告警消息发送到 Kafka 集群。

配置

Kafka 事件处理器的配置以及默认选项值在您的 kapacitor.conf 中设置。以下是一个配置示例

[[kafka]]
  enabled = true
  id = "localhost"
  brokers = []
  timeout = "10s"
  batch-size = 100
  batch-timeout = "1s"
  use-ssl = false
  ssl-ca = ""
  ssl-cert = ""
  ssl-key = ""
  insecure-skip-verify = false
  # Optional SASL configuration
  sasl-username = "xxxxx"
  sasl-password = "xxxxxxxx"
  sasl-extensions = {}
  sasl-mechanism = ""
  sasl-version = ""
  # Use if sasl-mechanism is GSSAPI. GSSAPI is for organizations using Kerberos.
  sasl-gssapi-service-name = ""
  sasl-gssapi-auth-type = "KRB5_USER_AUTH"
  sasl-gssapi-disable-pafxfast = false
  sasl-gssapi-kerberos-config-path = "/"
  sasl-gssapi-key-tab-path = ""
  sasl-gssapi-realm = "realm"
  # Options if sasl-mechanism is OAUTHBEARER
  sasl-oauth-service = "auth0"
  sasl-oauth-client-id = "xxxxxxx"
  sasl-oauth-client-secret = "xxxxxxxx"
  sasl-oauth-token-url = "dedicated-auth0-token-url"
  sasl-oauth-token-expiry-margin = "10s"
  sasl-oauth-scopes = ""
  sasl-oauth-tenant-id = ""
  [kafka.sasl-oauth-parameters]
     audience = "development"
  sasl-access-token = ""

可以使用 TOML 中的多个 [[kafka]] 部分配置多个 Kafka 客户端。id 充当每个配置的 Kafka 客户端的唯一标识符。

enabled

设置为 true 以启用 Kafka 事件处理器。

id

Kafka 集群的唯一标识符。

brokers

使用 host:port 格式的 Kafka broker 地址列表。

timeout

与 Kafka broker 进行网络操作的超时时间。如果设置为 0,则使用 10 秒的默认值。

batch-size

发送到 Kafka 之前批处理的消息数量。如果设置为 0,则使用 100 的默认值。

batch-timeout

在刷新不完整批处理之前等待的最长时间。如果设置为 0,则使用 1 秒的默认值。

use-ssl

启用 SSL 通信。对于其他 SSL 选项生效,必须为 true

ssl-ca

证书颁发机构文件的路径。

ssl-cert

主机证书文件的路径。

ssl-key

证书私钥文件的路径。

insecure-skip-verify

使用 SSL 但跳过链和主机验证。(如果使用自签名证书,则为必需。)

(可选) SASL 配置

sasl-username

用于 SASL 身份验证的用户名。

sasl-password

用于 SASL 身份验证的密码。

sasl-extensions

作为 TOML 表传递的任意键值字符串对

sasl-mechanism

SASL 机制类型。选项有

  • GSSAPI
  • OAUTHBEARER
  • PLAIN
  • SCRAM-SHA-256
  • SCRAM-SHA-512

sasl-version

SASL 协议版本。

sasl-gssapi-service-name

GSSAPI 的服务名称。

sasl-gssapi-auth-type

GSSAPI 的授权类型。

sasl-gssapi-disable-pafxfast

设置为 truefalse

sasl-gssapi-kerberos-config-path

Kerberos 配置文件的路径。

sasl-gssapi-key-tab-path

Kerberos 密钥表路径。

sasl-gssapi-realm

默认 Kerberos realm。

如果 sasl-mechanism 是 OAUTHBEARER 的选项

sasl-oauth-service

使用 SASL/OAUTH 进行身份验证时使用的服务名称。选项之一

  • "" (空) 或 custom
  • auth0
  • azuread

sasl-oauth-client-id

使用 SASL/OAUTH 进行身份验证时使用的客户端 ID。

sasl-oauth-client-secret

使用 SASL/OAUTH 进行身份验证时使用的客户端密钥。

sasl-oauth-token-url

当 sasl-oauth-service 为 customauth0 时使用的令牌 URL。否则留空。

sasl-oauth-token-expiry-margin

令牌过期时间的边距。

sasl-oauth-scopes

使用 SASL/OAUTH 进行身份验证时使用的可选范围。

sasl-oauth-tenant-id

AzureAD 服务的租户 ID。

[kafka.sasl-oauth-parameters]

SASL/OAUTH 的可选键/值参数。例如 AUTH0 的 audience

sasl-access-token

静态 OAUTH 令牌。使用此项代替其他 OAUTH 参数。

选项

以下 Kafka 事件处理器选项可以在处理程序文件中设置,或者在使用 TICKscript 中的 .kafka() 时设置。

名称类型描述
clusterstringKafka 集群的名称。
topicstringKafka 主题。在 TICKscript 中,这使用 .kafkaTopic() 设置。
templatestring消息模板。
disablePartitionByIdboolean禁用按消息 ID 对 Kafka 消息进行分区。
partitionAlgorithmstring用于将消息 ID 分配给 Kafka 分区的算法 (crc32 (默认)murmur2fnv-1a)。

Kafka 消息分区

Kapacitor 1.6+ 中,具有相同 ID 的消息被发送到相同的 Kafka 分区。以前,消息被发送到数据量最少的 Kafka 分区,而与消息 ID 无关。没有 ID 的消息在分区之间随机分布。这使消息 ID 的 Kapacitor 概念与消息键的 Kafka 概念对齐。

要恢复到以前的行为,请使用 disablePartitionById 选项。

当按 ID 分区时,使用 partitionHashAlgorithm 指定用于将消息 ID 分配给 Kafka 分区的方法。Kapacitor 支持以下分区算法

  • crc32(默认)librdkafkaconfluent-kafka-go 对齐
  • murmur2:与规范的 Java 分区逻辑对齐
  • fnv-1a:与 Shopify 的 sarama 项目对齐

示例:处理程序文件

id: kafka-event-handler
topic: kapacitor-topic-name
kind: kafka
options:
  cluster: kafka-cluster
  topic: kafka-topic-name
  template: kafka-template-name
  disablePartitionById: false
  partitionAlgorithm: crc32

示例:TICKscript

|alert()
  // ...
  .kafka()
    .cluster('kafka-cluster')
    .kafkaTopic('kafka-topic-name')
    .template('kafka-template-name')
    .disablePartitionById(FALSE)
    .partitionAlgorithm('crc32')

使用 Kafka 事件处理器

在您的 kapacitor.conf 中启用 Kafka 事件处理器后,在您的 TICKscript 中使用 .kafka() 属性将告警发送到 Kafka 集群,或定义订阅主题并将发布的告警发送到 Kafka 的 Kafka 处理程序。

以下示例使用在 kapacitor.conf 中定义的以下 Kafka 配置

kapacitor.conf 中的 Kafka 设置

[[kafka]]
  enabled = true
  id = "infra-monitoring"
  brokers = ["123.45.67.89:9092", "123.45.67.90:9092"]
  timeout = "10s"
  batch-size = 100
  batch-timeout = "1s"
  use-ssl = true
  ssl-ca = "/etc/ssl/certs/ca.crt"
  ssl-cert = "/etc/ssl/certs/cert.crt"
  ssl-key = "/etc/ssl/certs/cert-key.key"
  insecure-skip-verify = true

从 TICKscript 将告警发送到 Kafka 集群

以下 TICKscript 使用 .kafka() 事件处理器在空闲 CPU 使用率降至 10% 以下时发送消息 “Hey, check your CPU”。它将消息发布到 kapacitor.conf 中定义的 infra-monitoring Kafka 集群中的 cpu-alerts 主题。

kafka-cpu-alert.tick

stream
  |from()
    .measurement('cpu')
  |alert()
    .crit(lambda: "usage_idle" < 10)
    .message('Hey, check your CPU')
    .kafka()
      .kafkaTopic('cpu-alerts')

从定义的处理程序将告警发送到 Kafka 集群

以下设置向 cpu 主题发送消息 “Hey, check your CPU” 的告警。添加了一个 Kafka 处理程序,该处理程序订阅 cpu 主题并将所有告警消息发布到与 kapacitor.conf 中定义的 infra-monitoring Kafka 集群关联的 cpu-alerts 主题。

创建一个将告警消息发布到主题的 TICKscript。以下 TICKscript 在 CPU 空闲使用率降至 10% 以下(或 CPU 使用率高于 90%)时向 cpu 主题发送告警消息。

cpu_alert.tick
stream
  |from()
    .measurement('cpu')
  |alert()
    .crit(lambda: "usage_idle" < 10)
    .message('Hey, check your CPU')
    .topic('cpu')

添加并启用 TICKscript

kapacitor define cpu_alert -tick cpu_alert.tick
kapacitor enable cpu_alert

创建一个处理程序文件,该文件订阅 cpu 主题并使用 Kafka 事件处理器将告警发送到 Kafka 中的 cpu-alerts 主题。

kafka_cpu_handler.yaml
id: kafka-cpu-alert
topic: cpu
kind: kafka
options:
  topic: 'cpu-alerts'

添加处理程序

kapacitor define-topic-handler kafka_cpu_handler.yaml

将 SASL 与 Kapacitor 一起使用

要使用 SSL 以外的身份验证方法,请配置 Kapacitor 以使用 SASL。一个示例是使用 Kapacitor 直接针对 Kafka 使用用户名/密码进行身份验证。有多个配置选项可用,但最常见的用法是用户名和密码,如下例所示

[[kafka]]
  enabled = true
  id = "infra-monitoring"
  brokers = ["123.45.67.89:9092", "123.45.67.90:9092"]
  timeout = "10s"
  batch-size = 100
  batch-timeout = "1s"
  use-ssl = true
  ssl-ca = "/etc/ssl/certs/ca.crt"
  ssl-cert = "/etc/ssl/certs/cert.crt"
  ssl-key = "/etc/ssl/certs/cert-key.key"
  insecure-skip-verify = true
  sasl-username = "kafka"
  sasl-password = "kafkapassword"

此页是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在一样使用它,而无需对代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速、最近数据引擎,可实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 以 Core 的基础为构建,增加了高可用性、读取副本、增强的安全性以及数据压缩,以实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看