文档文档

Kafka 事件处理器

Apache Kafka 是一个分布式流处理平台,用于构建实时数据管道和流处理应用程序。配置 Kapacitor 将警报消息发送到 Kafka 集群。

配置

Kafka 事件处理器的配置以及默认的 选项 值在 kapacitor.conf 中设置。下面是一个配置示例

[[kafka]]
  enabled = true
  id = "localhost"
  brokers = []
  timeout = "10s"
  batch-size = 100
  batch-timeout = "1s"
  use-ssl = false
  ssl-ca = ""
  ssl-cert = ""
  ssl-key = ""
  insecure-skip-verify = false
  # Optional SASL configuration
  sasl-username = "xxxxx"
  sasl-password = "xxxxxxxx"
  sasl-extensions = {}
  sasl-mechanism = ""
  sasl-version = ""
  # Use if sasl-mechanism is GSSAPI. GSSAPI is for organizations using Kerberos.
  sasl-gssapi-service-name = ""
  sasl-gssapi-auth-type = "KRB5_USER_AUTH"
  sasl-gssapi-disable-pafxfast = false
  sasl-gssapi-kerberos-config-path = "/"
  sasl-gssapi-key-tab-path = ""
  sasl-gssapi-realm = "realm"
  # Options if sasl-mechanism is OAUTHBEARER
  sasl-oauth-service = "auth0"
  sasl-oauth-client-id = "xxxxxxx"
  sasl-oauth-client-secret = "xxxxxxxx"
  sasl-oauth-token-url = "dedicated-auth0-token-url"
  sasl-oauth-token-expiry-margin = "10s"
  sasl-oauth-scopes = ""
  sasl-oauth-tenant-id = ""
  [kafka.sasl-oauth-parameters]
     audience = "development"
  sasl-access-token = ""

可以通过 TOML 中的多个 [[kafka]] 部分来配置多个 Kafka 客户端。id 作为每个配置的 Kafka 客户端的唯一标识符。

enabled

设置为 true 以启用 Kafka 事件处理器。

id

Kafka 集群的唯一标识符。

brokers

使用 host:port 格式的 Kafka 代理地址列表。

timeout

与 Kafka 代理的网络操作超时时间。如果设置为 0,则使用默认值 10 秒。

batch-size

在发送到 Kafka 之前批量处理的消息数量。如果设置为 0,则使用默认值 100。

batch-timeout

在刷新不完整批次之前等待的最大时间。如果设置为 0,则使用默认值 1 秒。

use-ssl

启用 SSL 通信。必须设置为 true 才能使其他 SSL 选项生效。

ssl-ca

证书颁发机构文件的路径。

ssl-cert

主机证书文件的路径。

ssl-key

证书私钥文件的路径。

insecure-skip-verify

使用 SSL 但跳过链和主机验证。(如果使用自签名证书,则必需。)

(可选) SASL 配置

sasl-username

用于 SASL 身份验证的用户名。

sasl-password

用于 SASL 身份验证的密码。

sasl-extensions

任意键值字符串对,作为 TOML 表传递

sasl-mechanism

SASL 机制类型。选项有

  • GSSAPI
  • OAUTHBEARER
  • PLAIN
  • SCRAM-SHA-256
  • SCRAM-SHA-512

sasl-version

SASL 协议版本。

sasl-gssapi-service-name

GSSAPI 的服务名称。

sasl-gssapi-auth-type

GSSAPI 的授权类型。

sasl-gssapi-disable-pafxfast

设置为 truefalse

sasl-gssapi-kerberos-config-path

Kerberos 配置文件路径。

sasl-gssapi-key-tab-path

Kerberos 密钥表路径。

sasl-gssapi-realm

默认 Kerberos 领域。

如果 sasl-mechanism 是 OAUTHBEARER,则为选项

sasl-oauth-service

使用 SASL/OAUTH 进行身份验证时使用的服务名称。其中一个选项是

  • "" (空) 或 custom
  • auth0
  • azuread

sasl-oauth-client-id

使用 SASL/OAUTH 进行身份验证时使用的客户端 ID。

sasl-oauth-client-secret

使用 SASL/OAUTH 进行身份验证时使用的客户端密钥。

sasl-oauth-token-url

当 sasl-oauth-service 为 customauth0 时使用的令牌 URL。否则留空。

sasl-oauth-token-expiry-margin

令牌过期时间的边际。

sasl-oauth-scopes

使用 SASL/OAUTH 进行身份验证时使用的可选范围。

sasl-oauth-tenant-id

AzureAD 服务的租户 ID。

[kafka.sasl-oauth-parameters]

SASL/OAUTH 的可选键/值参数。例如 AUTH0 的 audience

sasl-access-token

静态 OAUTH 令牌。使用此项代替其他 OAUTH 参数。

选项

Kafka 事件处理器的以下选项可以在 处理程序文件 中设置,或在使用 TICKscript 中的 .kafka() 时设置。

名称类型描述
集群stringKafka 集群的名称。
topicstringKafka 主题。在 TICKscript 中,这是使用 .kafkaTopic() 设置的。
templatestring消息模板。
disablePartitionByIdboolean禁用按消息 ID 分区 Kafka 消息。
partitionAlgorithmstring用于将消息 ID 分配给 Kafka 分区的算法 (crc32 (默认), murmur2, 或 fnv-1a)。

Kafka 消息分区

在 **Kapacitor 1.6+** 中,具有相同 ID 的消息将被发送到同一个 Kafka 分区。在此之前,消息会被发送到数据量最少的 Kafka 分区,而不管消息 ID。没有 ID 的消息会随机分布在各个分区之间。这使得 Kapacitor 的消息 ID 概念与 Kafka 的消息键概念保持一致。

要恢复到之前的行为,请使用 disablePartitionById 选项。

当按 ID 分区时,使用 partitionHashAlgorithm 指定用于将消息 ID 分配给 Kafka 分区的方法。Kapacitor 支持以下分区算法

  • crc32: (默认)librdkafkaconfluent-kafka-go 一致
  • murmur2: 与标准的 Java 分区逻辑一致
  • fnv-1a: 与 Shopify 的 sarama 项目一致

示例:处理程序文件

id: kafka-event-handler
topic: kapacitor-topic-name
kind: kafka
options:
  cluster: kafka-cluster
  topic: kafka-topic-name
  template: kafka-template-name
  disablePartitionById: false
  partitionAlgorithm: crc32

示例:TICKscript

|alert()
  // ...
  .kafka()
    .cluster('kafka-cluster')
    .kafkaTopic('kafka-topic-name')
    .template('kafka-template-name')
    .disablePartitionById(FALSE)
    .partitionAlgorithm('crc32')

使用 Kafka 事件处理器

kapacitor.conf 中启用 Kafka 事件处理器后,在 TICKscript 中使用 .kafka() 属性将警报发送到 Kafka 集群,或者定义一个订阅主题并将已发布的警报发送到 Kafka 的 Kafka 处理程序。

下面的示例使用 kapacitor.conf 中定义的以下 Kafka 配置

kapacitor.conf 中的 Kafka 设置

[[kafka]]
  enabled = true
  id = "infra-monitoring"
  brokers = ["123.45.67.89:9092", "123.45.67.90:9092"]
  timeout = "10s"
  batch-size = 100
  batch-timeout = "1s"
  use-ssl = true
  ssl-ca = "/etc/ssl/certs/ca.crt"
  ssl-cert = "/etc/ssl/certs/cert.crt"
  ssl-key = "/etc/ssl/certs/cert-key.key"
  insecure-skip-verify = true

从 TICKscript 发送警报到 Kafka 集群

下面的 TICKscript 使用 .kafka() 事件处理器,在空闲 CPU 使用率低于 10% 时发送消息:“嘿,检查你的 CPU”。它将消息发布到 kapacitor.conf 中定义的 infra-monitoring Kafka 集群的 cpu-alerts 主题。

kafka-cpu-alert.tick

stream
  |from()
    .measurement('cpu')
  |alert()
    .crit(lambda: "usage_idle" < 10)
    .message('Hey, check your CPU')
    .kafka()
      .kafkaTopic('cpu-alerts')

从已定义的处理程序发送警报到 Kafka 集群

以下设置将警报发送到 cpu 主题,消息为:“嘿,检查你的 CPU”。添加了一个 Kafka 处理程序,该处理程序订阅 cpu 主题,并将所有警报消息发布到与 kapacitor.conf 中定义的 infra-monitoring Kafka 集群关联的 cpu-alerts 主题。

创建一个将警报消息发布到主题的 TICKscript。下面的 TICKscript 在 CPU 空闲使用率低于 10% 时(或 CPU 使用率高于 90%)cpu 主题发送警报消息。

cpu_alert.tick
stream
  |from()
    .measurement('cpu')
  |alert()
    .crit(lambda: "usage_idle" < 10)
    .message('Hey, check your CPU')
    .topic('cpu')

添加并启用 TICKscript

kapacitor define cpu_alert -tick cpu_alert.tick
kapacitor enable cpu_alert

创建一个订阅 cpu 主题的处理程序文件,并使用 Kafka 事件处理器将警报发送到 Kafka 中的 cpu-alerts 主题。

kafka_cpu_handler.yaml
id: kafka-cpu-alert
topic: cpu
kind: kafka
options:
  topic: 'cpu-alerts'

添加处理器

kapacitor define-topic-handler kafka_cpu_handler.yaml

将 SASL 与 Kapacitor 结合使用

要使用除 SSL 之外的身份验证方法,请配置 Kapacitor 使用 SASL。一个示例是使用 Kapacitor 直接使用用户名/密码向 Kafka 进行身份验证。有多种配置选项可用,但最常见的用法是使用用户名和密码,如下例所示

[[kafka]]
  enabled = true
  id = "infra-monitoring"
  brokers = ["123.45.67.89:9092", "123.45.67.90:9092"]
  timeout = "10s"
  batch-size = 100
  batch-timeout = "1s"
  use-ssl = true
  ssl-ca = "/etc/ssl/certs/ca.crt"
  ssl-cert = "/etc/ssl/certs/cert.crt"
  ssl-key = "/etc/ssl/certs/cert-key.key"
  insecure-skip-verify = true
  sasl-username = "kafka"
  sasl-password = "kafkapassword"

此页面是否有帮助?

感谢您的反馈!


InfluxDB 3.8 新特性

InfluxDB 3.8 和 InfluxDB 3 Explorer 1.6 的主要增强功能。

查看博客文章

InfluxDB 3.8 现已适用于 Core 和 Enterprise 版本,同时发布了 InfluxDB 3 Explorer UI 的 1.6 版本。本次发布着重于操作成熟度,以及如何更轻松地部署、管理和可靠地运行 InfluxDB。

更多信息,请查看

InfluxDB Docker 的 latest 标签将指向 InfluxDB 3 Core

在 **2026 年 2 月 3 日**,InfluxDB Docker 镜像的 latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。

如果使用 Docker 来安装和运行 InfluxDB,latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。例如,如果使用 Docker 运行 InfluxDB v2,请将 latest 版本标签替换为 Docker pull 命令中的特定版本标签 — 例如

docker pull influxdb:2