文档文档

Kinesis 消费者输入插件

此服务输入插件以支持的 数据格式 之一,从 AWS Kinesis 数据流消费消息。

引入于: Telegraf v1.10.0 标签: messaging, iot 操作系统支持: all

服务输入

此插件是服务输入。普通插件收集由 interval 设置确定的指标。服务插件启动一个服务来监听并等待指标或事件发生。服务插件与普通插件的两个主要区别是:

  1. 全局或插件特定的 interval 设置可能不适用
  2. --test--test-wait--once 的 CLI 选项可能不会为此插件生成输出

全局配置选项

插件支持其他全局和插件配置设置,用于修改指标、标签和字段,创建别名以及配置插件顺序等任务。更多详情请参阅 CONFIGURATION.md

配置

# Configuration for the AWS Kinesis input.
[[inputs.kinesis_consumer]]
  ## Amazon REGION of kinesis endpoint.
  region = "ap-southeast-2"

  ## Amazon Credentials
  ## Credentials are loaded in the following order
  ## 1) Web identity provider credentials via STS if role_arn and web_identity_token_file are specified
  ## 2) Assumed credentials via STS if role_arn is specified
  ## 3) explicit credentials from 'access_key' and 'secret_key'
  ## 4) shared profile from 'profile'
  ## 5) environment variables
  ## 6) shared credentials file
  ## 7) EC2 Instance Profile
  # access_key = ""
  # secret_key = ""
  # token = ""
  # role_arn = ""
  # web_identity_token_file = ""
  # role_session_name = ""
  # profile = ""
  # shared_credential_file = ""

  ## Endpoint to make request against, the correct endpoint is automatically
  ## determined and this option should only be set if you wish to override the
  ## default.
  ##   ex: endpoint_url = "https://:8000"
  # endpoint_url = ""

  ## Kinesis StreamName must exist prior to starting telegraf.
  streamname = "StreamName"

  ## Shard iterator type
  ## Available options: 'TRIM_HORIZON' (first in non-expired) and 'LATEST'
  # shard_iterator_type = "TRIM_HORIZON"

  ## Interval for checking for new records
  ## Please consider limits for getting records documented here:
  ## https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
  # poll_interval = "250ms"

  ## Interval for scanning for new shards created when resharding
  ## If set to zero, shards are only scanned once on startup.
  # shard_update_interval = "30s"

  ## Max undelivered messages
  ## This plugin uses tracking metrics, which ensure messages are read to
  ## outputs before acknowledging them to the original broker to ensure data
  ## is not lost. This option sets the maximum messages to read from the
  ## broker that have not been written by an output.
  ##
  ## This value needs to be picked with awareness of the agent's
  ## metric_batch_size value as well. Setting max undelivered messages too high
  ## can result in a constant stream of data batches to the output. While
  ## setting it too low may never flush the broker's messages.
  # max_undelivered_messages = 1000

  ## Content encoding of the record data
  ## If you are processing a cloudwatch logs kinesis stream then set this to
  ## "gzip" as AWS compresses cloudwatch log data before it is sent to kinesis.
  # content_encoding = "identity"

  ## Data format of the records to consume
  ## See https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  # data_format = "influx"

  ## Optional: Configuration for DynamoDB backend to store positions in the stream
  # [inputs.kinesis_consumer.checkpoint_dynamodb]
  #   ## Unique name for this consumer
  #   app_name = "default"
  #   ## Table to store the sequence numbers in
  #   table_name = "default"
  #   ## Interval for persisting data to limit write operations
  #   # interval = "10s"

所需的 AWS IAM 权限

Kinesis

  • DescribeStream
  • GetRecords
  • GetShardIterator

DynamoDB

  • GetItem
  • PutItem

DynamoDB 检查点

DynamoDB 检查点将最后一个已处理的记录存储在 DynamoDB 中。要利用此功能,请创建一个具有以下字符串类型键的表

Partition key: namespace
Sort key: shard_id

Metrics

该插件接受任意输入,并根据 data_format 设置进行解析。没有预定义的指标格式。

示例输出

没有预定义的指标格式,因此输出取决于插件输入。


此页面是否有帮助?

感谢您的反馈!


InfluxDB 3.8 新特性

InfluxDB 3.8 和 InfluxDB 3 Explorer 1.6 的主要增强功能。

查看博客文章

InfluxDB 3.8 现已适用于 Core 和 Enterprise 版本,同时发布了 InfluxDB 3 Explorer UI 的 1.6 版本。本次发布着重于操作成熟度,以及如何更轻松地部署、管理和可靠地运行 InfluxDB。

更多信息,请查看

InfluxDB Docker 的 latest 标签将指向 InfluxDB 3 Core

在 **2026 年 2 月 3 日**,InfluxDB Docker 镜像的 latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。

如果使用 Docker 来安装和运行 InfluxDB,latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。例如,如果使用 Docker 运行 InfluxDB v2,请将 latest 版本标签替换为 Docker pull 命令中的特定版本标签 — 例如

docker pull influxdb:2