文档文档

Kafka 事件处理器

Apache Kafka 是一个分布式流式平台,专为构建实时数据管道和流式应用程序而设计。配置 Kapacitor 以将警报消息发送到 Kafka 集群。

配置

Kafka 事件处理器的配置以及默认选项值在您的 kapacitor.conf 中设置。以下是一个配置示例

[[kafka]]
  enabled = true
  id = "localhost"
  brokers = []
  timeout = "10s"
  batch-size = 100
  batch-timeout = "1s"
  use-ssl = false
  ssl-ca = ""
  ssl-cert = ""
  ssl-key = ""
  insecure-skip-verify = false
  # Optional SASL configuration
  sasl-username = "xxxxx"
  sasl-password = "xxxxxxxx"
  sasl-extensions = {}
  sasl-mechanism = ""
  sasl-version = ""
  # Use if sasl-mechanism is GSSAPI. GSSAPI is for organizations using Kerberos.
  sasl-gssapi-service-name = ""
  sasl-gssapi-auth-type = "KRB5_USER_AUTH"
  sasl-gssapi-disable-pafxfast = false
  sasl-gssapi-kerberos-config-path = "/"
  sasl-gssapi-key-tab-path = ""
  sasl-gssapi-realm = "realm"
  # Options if sasl-mechanism is OAUTHBEARER
  sasl-oauth-service = "auth0"
  sasl-oauth-client-id = "xxxxxxx"
  sasl-oauth-client-secret = "xxxxxxxx"
  sasl-oauth-token-url = "dedicated-auth0-token-url"
  sasl-oauth-token-expiry-margin = "10s"
  sasl-oauth-scopes = ""
  sasl-oauth-tenant-id = ""
  [kafka.sasl-oauth-parameters]
     audience = "development"
  sasl-access-token = ""

可以使用 TOML 中的多个 [[kafka]] 部分配置多个 Kafka 客户端。id 充当每个配置的 Kafka 客户端的唯一标识符。

enabled

设置为 true 以启用 Kafka 事件处理器。

id

Kafka 集群的唯一标识符。

brokers

使用 host:port 格式的 Kafka broker 地址列表。

timeout

与 Kafka broker 的网络操作超时时间。如果设置为 0,则使用默认值 10 秒。

batch-size

在发送到 Kafka 之前批处理的消息数。如果设置为 0,则使用默认值 100。

batch-timeout

在刷新不完整的批处理之前等待的最长时间。如果设置为 0,则使用默认值 1 秒。

use-ssl

启用 SSL 通信。对于其他 SSL 选项生效,必须为 true

ssl-ca

证书颁发机构文件的路径。

ssl-cert

主机证书文件的路径。

ssl-key

证书私钥文件的路径。

insecure-skip-verify

使用 SSL 但跳过链和主机验证。(如果使用自签名证书,则为必需。)

(可选)SASL 配置

sasl-username

用于 SASL 身份验证的用户名。

sasl-password

用于 SASL 身份验证的密码。

sasl-extensions

作为 TOML 表传递的任意键值字符串对

sasl-mechanism

SASL 机制类型。选项包括

  • GSSAPI
  • OAUTHBEARER
  • PLAIN
  • SCRAM-SHA-256
  • SCRAM-SHA-512

sasl-version

SASL 协议版本。

sasl-gssapi-service-name

GSSAPI 的服务名称。

sasl-gssapi-auth-type

GSSAPI 的授权类型。

sasl-gssapi-disable-pafxfast

设置为 truefalse

sasl-gssapi-kerberos-config-path

Kerberos 配置文件的路径。

sasl-gssapi-key-tab-path

Kerberos 密钥表文件的路径。

sasl-gssapi-realm

默认 Kerberos realm。

如果 sasl-mechanism 是 OAUTHBEARER 的选项

sasl-oauth-service

使用 SASL/OAUTH 进行身份验证时要使用的服务名称。选项之一

  • "" (空) 或 custom
  • auth0
  • azuread

sasl-oauth-client-id

使用 SASL/OAUTH 进行身份验证时要使用的客户端 ID。

sasl-oauth-client-secret

使用 SASL/OAUTH 进行身份验证时要使用的客户端密钥。

sasl-oauth-token-url

当 sasl-oauth-service 为 customauth0 时要使用的令牌 URL。否则留空。

sasl-oauth-token-expiry-margin

令牌过期时间的边距。

sasl-oauth-scopes

使用 SASL/OAUTH 进行身份验证时要使用的可选范围。

sasl-oauth-tenant-id

AzureAD 服务的租户 ID。

[kafka.sasl-oauth-parameters]

SASL/OAUTH 的可选键/值参数。例如,AUTH0 的 audience

sasl-access-token

静态 OAUTH 令牌。使用此选项代替其他 OAUTH 参数。

选项

以下 Kafka 事件处理器选项可以在处理器文件中设置,或者在使用 TICKscript 中的 .kafka() 时设置。

名称类型描述
clusterstringKafka 集群的名称。
topicstringKafka 主题。在 TICKscript 中,这使用 .kafkaTopic() 设置。
templatestring消息模板。
disablePartitionByIdboolean禁用按消息 ID 对 Kafka 消息进行分区。
partitionAlgorithmstring用于将消息 ID 分配给 Kafka 分区的算法 (crc32 (默认), murmur2, 或 fnv-1a)。

Kafka 消息分区

Kapacitor 1.6+ 中,具有相同 ID 的消息被发送到相同的 Kafka 分区。以前,消息被发送到数据量最少的 Kafka 分区,而与消息 ID 无关。没有 ID 的消息在分区之间随机分布。这使消息 ID 的 Kapacitor 概念与消息键的 Kafka 概念对齐。

要恢复到以前的行为,请使用 disablePartitionById 选项。

当按 ID 分区时,使用 partitionHashAlgorithm 指定用于将消息 ID 分配给 Kafka 分区的方法。Kapacitor 支持以下分区算法

  • crc32: (默认)librdkafkaconfluent-kafka-go 对齐
  • murmur2: 与规范的 Java 分区逻辑对齐
  • fnv-1a: 与 Shopify 的 sarama 项目对齐

示例:处理器文件

id: kafka-event-handler
topic: kapacitor-topic-name
kind: kafka
options:
  cluster: kafka-cluster
  topic: kafka-topic-name
  template: kafka-template-name
  disablePartitionById: false
  partitionAlgorithm: crc32

示例:TICKscript

|alert()
  // ...
  .kafka()
    .cluster('kafka-cluster')
    .kafkaTopic('kafka-topic-name')
    .template('kafka-template-name')
    .disablePartitionById(FALSE)
    .partitionAlgorithm('crc32')

使用 Kafka 事件处理器

在您的 kapacitor.conf 中启用 Kafka 事件处理器后,在您的 TICKscript 中使用 .kafka() 属性将警报发送到 Kafka 集群,或定义一个 Kafka 处理器,该处理器订阅一个主题并将发布的警报发送到 Kafka。

以下示例使用在 kapacitor.conf 中定义的以下 Kafka 配置

kapacitor.conf 中的 Kafka 设置

[[kafka]]
  enabled = true
  id = "infra-monitoring"
  brokers = ["123.45.67.89:9092", "123.45.67.90:9092"]
  timeout = "10s"
  batch-size = 100
  batch-timeout = "1s"
  use-ssl = true
  ssl-ca = "/etc/ssl/certs/ca.crt"
  ssl-cert = "/etc/ssl/certs/cert.crt"
  ssl-key = "/etc/ssl/certs/cert-key.key"
  insecure-skip-verify = true

从 TICKscript 向 Kafka 集群发送警报

以下 TICKscript 使用 .kafka() 事件处理器在空闲 CPU 使用率降至 10% 以下时发送消息“Hey, check your CPU”。它将消息发布到 kapacitor.conf 中定义的 infra-monitoring Kafka 集群中的 cpu-alerts 主题。

kafka-cpu-alert.tick

stream
  |from()
    .measurement('cpu')
  |alert()
    .crit(lambda: "usage_idle" < 10)
    .message('Hey, check your CPU')
    .kafka()
      .kafkaTopic('cpu-alerts')

从定义的处理器向 Kafka 集群发送警报

以下设置向 cpu 主题发送消息“Hey, check your CPU”的警报。添加了一个 Kafka 处理器,该处理器订阅 cpu 主题,并将所有警报消息发布到与 kapacitor.conf 中定义的 infra-monitoring Kafka 集群关联的 cpu-alerts 主题。

创建一个 TICKscript,将警报消息发布到主题。以下 TICKscript 在 CPU 空闲使用率降至 10% 以下(或 CPU 使用率高于 90%)时,向 cpu 主题发送警报消息。

cpu_alert.tick
stream
  |from()
    .measurement('cpu')
  |alert()
    .crit(lambda: "usage_idle" < 10)
    .message('Hey, check your CPU')
    .topic('cpu')

添加并启用 TICKscript

kapacitor define cpu_alert -tick cpu_alert.tick
kapacitor enable cpu_alert

创建一个处理器文件,该文件订阅 cpu 主题,并使用 Kafka 事件处理器将警报发送到 Kafka 中的 cpu-alerts 主题。

kafka_cpu_handler.yaml
id: kafka-cpu-alert
topic: cpu
kind: kafka
options:
  topic: 'cpu-alerts'

添加处理器

kapacitor define-topic-handler kafka_cpu_handler.yaml

将 SASL 与 Kapacitor 结合使用

要使用 SSL 以外的身份验证方法,请配置 Kapacitor 以使用 SASL。一个示例是使用 Kapacitor 直接针对 Kafka 使用用户名/密码进行身份验证。有多种配置选项可用,但最常见的用法是用户名和密码,如以下示例所示

[[kafka]]
  enabled = true
  id = "infra-monitoring"
  brokers = ["123.45.67.89:9092", "123.45.67.90:9092"]
  timeout = "10s"
  batch-size = 100
  batch-timeout = "1s"
  use-ssl = true
  ssl-ca = "/etc/ssl/certs/ca.crt"
  ssl-cert = "/etc/ssl/certs/cert.crt"
  ssl-key = "/etc/ssl/certs/cert-key.key"
  insecure-skip-verify = true
  sasl-username = "kafka"
  sasl-password = "kafkapassword"

此页是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在这样使用它,而无需对代码进行任何更改。

阅读更多

InfluxDB 3 开源现已公开发布 Alpha 版本

InfluxDB 3 开源版本现已可用于 alpha 测试,根据 MIT 或 Apache 2 许可获得许可。

我们正在发布两个产品作为 alpha 版本的一部分。

InfluxDB 3 Core 是我们新的开源产品。它是一个用于时间序列和事件数据的最新数据引擎。InfluxDB 3 Enterprise 是一个商业版本,它建立在 Core 的基础上,增加了历史查询功能、读取副本、高可用性、可扩展性和细粒度的安全性。

有关如何开始使用的更多信息,请查看