文档文档

Kafka 输出插件

此插件将指标作为 Kafka 生产者写入Kafka Broker

引入于: Telegraf v0.1.7 标签: messaging 操作系统支持: all

全局配置选项

插件支持其他全局和插件配置设置,用于修改指标、标签和字段,创建别名以及配置插件顺序等任务。更多详情请参阅 CONFIGURATION.md

启动错误行为选项

除了插件特定的和全局的配置设置外,该插件还支持使用 startup_error_behavior 设置来指定出现启动错误时的行为。可用值如下:

  • error:如果出现启动错误,Telegraf 将停止并退出。这是默认行为。
  • ignore:Telegraf 将忽略此插件的启动错误,并禁用它,但会继续处理所有其他插件。
  • retry: Telegraf 会在每次收集或写入周期内尝试启动插件,以防出现启动错误。在启动成功之前,插件将被禁用。
  • probe: Telegraf 将(如果可能)探测插件的功能,并在探测失败时禁用该插件。如果插件不支持探测,Telegraf 将表现得如同设置了 ignore 一样。

Secret-store 支持

此插件支持来自 secret-stores 的 sasl_usernamesasl_passwordsasl_access_token 选项的 secret。有关如何使用它们的更多详细信息,请参阅 secret-store 文档

配置

# Configuration for the Kafka server to send metrics to
[[outputs.kafka]]
  ## URLs of kafka brokers
  ## The brokers listed here are used to connect to collect metadata about a
  ## cluster. However, once the initial metadata collect is completed, telegraf
  ## will communicate solely with the kafka leader and not all defined brokers.
  brokers = ["localhost:9092"]

  ## Kafka topic for producer messages
  topic = "telegraf"

  ## Tag value to be used as the topic. If not set or the tag does not exist,
  ## the 'topic' option is used.
  # topic_tag = ""

  ## If true, the 'topic_tag' will be removed from to the metric.
  # exclude_topic_tag = false

  ## Optional Client id
  # client_id = "Telegraf"

  ## Set the minimal supported Kafka version.  Setting this enables the use of new
  ## Kafka features and APIs.  Of particular interested, lz4 compression
  ## requires at least version 0.10.0.0.
  ##   ex: version = "1.1.0"
  # version = ""

  ## The routing tag specifies a tagkey on the metric whose value is used as
  ## the message key.  The message key is used to determine which partition to
  ## send the message to.  This tag is preferred over the routing_key option.
  routing_tag = "host"

  ## The routing key is set as the message key and used to determine which
  ## partition to send the message to.  This value is only used when no
  ## routing_tag is set or as a fallback when the tag specified in routing tag
  ## is not found.
  ##
  ## If set to "random", a random value will be generated for each message.
  ##
  ## When unset, no message key is added and each message is routed to a random
  ## partition.
  ##
  ##   ex: routing_key = "random"
  ##       routing_key = "telegraf"
  # routing_key = ""

  ## Compression codec represents the various compression codecs recognized by
  ## Kafka in messages.
  ##  0 : None
  ##  1 : Gzip
  ##  2 : Snappy
  ##  3 : LZ4
  ##  4 : ZSTD
  # compression_codec = 0

  ## Idempotent Writes
  ## If enabled, exactly one copy of each message is written.
  # idempotent_writes = false

  ##  RequiredAcks is used in Produce Requests to tell the broker how many
  ##  replica acknowledgements it must see before responding
  ##   0 : the producer never waits for an acknowledgement from the broker.
  ##       This option provides the lowest latency but the weakest durability
  ##       guarantees (some data will be lost when a server fails).
  ##   1 : the producer gets an acknowledgement after the leader replica has
  ##       received the data. This option provides better durability as the
  ##       client waits until the server acknowledges the request as successful
  ##       (only messages that were written to the now-dead leader but not yet
  ##       replicated will be lost).
  ##   -1: the producer gets an acknowledgement after all in-sync replicas have
  ##       received the data. This option provides the best durability, we
  ##       guarantee that no messages will be lost as long as at least one in
  ##       sync replica remains.
  # required_acks = -1

  ## The maximum number of times to retry sending a metric before failing
  ## until the next flush.
  # max_retry = 3

  ## The maximum permitted size of a message. Should be set equal to or
  ## smaller than the broker's 'message.max.bytes'.
  # max_message_bytes = 1000000

  ## Producer timestamp
  ## This option sets the timestamp of the kafka producer message, choose from:
  ##   * metric: Uses the metric's timestamp
  ##   * now: Uses the time of write
  # producer_timestamp = metric

  ## Add metric name as specified kafka header if not empty
  # metric_name_header = ""

  ## Optional TLS Config
  # enable_tls = false
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## Period between keep alive probes.
  ## Defaults to the OS configuration if not specified or zero.
  # keep_alive_period = "15s"

  ## Optional SOCKS5 proxy to use when connecting to brokers
  # socks5_enabled = true
  # socks5_address = "127.0.0.1:1080"
  # socks5_username = "alice"
  # socks5_password = "pass123"

  ## Optional SASL Config
  # sasl_username = ""
  # sasl_password = ""

  ## Optional SASL, one of:
  ##   OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, AWS-MSK-IAM
  # sasl_mechanism = ""

  ## Used if sasl_mechanism is GSSAPI
  # sasl_gssapi_service_name = ""
  # ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH
  # sasl_gssapi_auth_type = "KRB5_USER_AUTH"
  # sasl_gssapi_kerberos_config_path = "/"
  # sasl_gssapi_realm = "realm"
  # sasl_gssapi_key_tab_path = ""
  # sasl_gssapi_disable_pafxfast = false

  ## Access token used if sasl_mechanism is OAUTHBEARER
  # sasl_access_token = ""

  ## Used if sasl_mechanism is AWS-MSK-IAM
  # sasl_aws_msk_iam_region = ""
  ## for profile based auth
  ## sasl_aws_msk_iam_profile = ""
  ## for role based auth
  ## sasl_aws_msk_iam_role = ""
  ## sasl_aws_msk_iam_session = ""

  ## Arbitrary key value string pairs to pass as a TOML table. For example:
  ## {logicalCluster = "cluster-042", poolId = "pool-027"}
  # sasl_extensions = {}

  ## SASL protocol version.  When connecting to Azure EventHub set to 0.
  # sasl_version = 1

  ## Disable Kafka metadata full fetch
  # metadata_full = false

  ## Maximum number of retries for metadata operations including
  ## connecting. Sets Sarama library's Metadata.Retry.Max config value. If 0 or
  ## unset, use the Sarama default of 3,
  # metadata_retry_max = 0

  ## Type of retry backoff. Valid options: "constant", "exponential"
  # metadata_retry_type = "constant"

  ## Amount of time to wait before retrying. When metadata_retry_type is
  ## "constant", each retry is delayed this amount. When "exponential", the
  ## first retry is delayed this amount, and subsequent delays are doubled. If 0
  ## or unset, use the Sarama default of 250 ms
  # metadata_retry_backoff = 0

  ## Maximum amount of time to wait before retrying when metadata_retry_type is
  ## "exponential". Ignored for other retry types. If 0, there is no backoff
  ## limit.
  # metadata_retry_max_duration = 0

  ## Data format to output.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
  # data_format = "influx"

  ## NOTE: Due to the way TOML is parsed, tables must be at the END of the
  ## plugin definition, otherwise additional config options are read as part of
  ## the table

  ## Optional topic suffix configuration.
  ## If the section is omitted, no suffix is used.
  ## Following topic suffix methods are supported:
  ##   measurement - suffix equals to separator + measurement's name
  ##   tags        - suffix equals to separator + specified tags' values
  ##                 interleaved with separator

  ## Suffix equals to "_" + measurement name
  # [outputs.kafka.topic_suffix]
  #   method = "measurement"
  #   separator = "_"

  ## Suffix equals to "__" + measurement's "foo" tag value.
  ## If there's no such a tag, suffix equals to an empty string
  # [outputs.kafka.topic_suffix]
  #   method = "tags"
  #   keys = ["foo"]
  #   separator = "__"

  ## Suffix equals to "_" + measurement's "foo" and "bar"
  ## tag values, separated by "_". If there is no such tags,
  ## their values treated as empty strings.
  # [outputs.kafka.topic_suffix]
  #   method = "tags"
  #   keys = ["foo", "bar"]
  #   separator = "_"

max_retry

此选项控制在未收到来自代理的确认时,每个消息显示失败通知之前重试的次数。当设置大于 0 时,可以减少消息延迟,在瞬时错误的情况下可能出现重复消息,并在停机期间增加代理负载。

此选项类似于 Java Kafka Producer 中的 retries Producer 选项。


此页面是否有帮助?

感谢您的反馈!


InfluxDB 3.8 新特性

InfluxDB 3.8 和 InfluxDB 3 Explorer 1.6 的主要增强功能。

查看博客文章

InfluxDB 3.8 现已适用于 Core 和 Enterprise 版本,同时发布了 InfluxDB 3 Explorer UI 的 1.6 版本。本次发布着重于操作成熟度,以及如何更轻松地部署、管理和可靠地运行 InfluxDB。

更多信息,请查看

InfluxDB Docker 的 latest 标签将指向 InfluxDB 3 Core

在 **2026 年 2 月 3 日**,InfluxDB Docker 镜像的 latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。

如果使用 Docker 来安装和运行 InfluxDB,latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。例如,如果使用 Docker 运行 InfluxDB v2,请将 latest 版本标签替换为 Docker pull 命令中的特定版本标签 — 例如

docker pull influxdb:2