文档文档

优化写入 InfluxDB 3 Core

使用这些技巧来优化写入 InfluxDB 3 Core 数据时的性能和系统开销。

以下工具写入 InfluxDB 并默认采用大多数写入优化

批量写入

批量写入数据以最大限度地减少写入 InfluxDB 时的网络开销。

最佳批量大小为 10,000 行行协议或 10 MB,以先满足的阈值为准。

首次写入时,按查询优先级对标签进行排序

首次写入 InfluxDB 3 Core 中的表时,将确定存储中的物理列顺序,该顺序直接影响查询性能。在查询执行过程中,较早出现的列通常过滤和访问速度更快。

在首次将数据写入表时,请按查询优先级对标签进行排序。将最常查询的标签放在前面——您经常在 WHERE 子句或 JOIN 中使用的标签——然后是较少查询的标签。例如,如果大多数查询都按 region 过滤,然后按 host 过滤,请构建第一次写入,使 region 位于 host 之前。

列顺序在第一次写入时确定,之后无法更改。在第一次写入后添加的标签将按列排序顺序的最后添加。在规划模式时,请考虑您的查询工作负载,以确保最佳的长期性能。

使用尽可能粗粒度的时间精度

InfluxDB 3 Core 支持高达纳秒级的时间戳精度。但是,如果您的数据不是以纳秒级收集的,则无需以该精度写入。为了获得更好的性能,请为您的用例使用您可以达到的最粗粒度的时间戳精度。

默认情况下,在 CLI 和 HTTP API 写入请求中,InfluxDB 3 Core 使用时间戳幅度自动检测精度。

要指定写入请求中时间戳的精度,请传递 precision 选项。

有关更多信息,请参阅以下内容

使用 gzip 压缩

使用 gzip 压缩可以加快写入 InfluxDB 3 Core 的速度。基准测试表明,当数据被压缩时,速度最多可提高 5 倍。

在 Telegraf 中启用 gzip 压缩

telegraf.confinfluxdb_v2 输出插件配置中,将 content_encoding 选项设置为 gzip

[[outputs.influxdb_v2]]
  urls = ["https://:8181"]
  # ...
  content_encoding = "gzip"

在 InfluxDB 客户端库中启用 gzip 压缩

每个 InfluxDB 客户端库都提供了压缩写入请求的选项,或者默认强制执行压缩。启用压缩的方法因库而异。有关具体说明,请参阅 InfluxDB 客户端库文档

将 gzip 压缩与 InfluxDB API 结合使用

当使用 InfluxDB API /api/v2/write 端点写入数据时,请使用 gzip 压缩数据,并将 Content-Encoding 标头设置为 gzip——例如

echo "mem,host=host1 used_percent=23.43234543 1641024000
mem,host=host2 used_percent=26.81522361 1641027600
mem,host=host1 used_percent=22.52984738 1641031200
mem,host=host2 used_percent=27.18294630 1641034800" | gzip > system.gzip \

curl --request POST "https://:8181/api/v2/write?org=ignored&bucket=
DATABASE_NAME
"
\
--header "Authorization: Token
AUTH_TOKEN
"
\
--header "Content-Type: text/plain; charset=utf-8" \ --header "Content-Encoding: gzip" \ --data-binary @system.gzip

替换以下内容:

  • DATABASE_NAME:要写入数据到的数据库名称
  • AUTH_TOKEN:您的 InfluxDB 3 Core 令牌 将其存储在秘密存储或环境变量中,以避免暴露原始令牌字符串。

使用 NTP 同步主机

使用网络时间协议 (NTP) 同步主机之间的时间。如果行协议中未包含时间戳,InfluxDB 将使用其主机的本地时间 (UTC) 为每个点分配时间戳。如果主机的时钟未与 NTP 同步,则时间戳可能不准确。

一次请求写入多个数据点

要一次请求写入多行,每一行行协议必须用换行符 (\n) 分隔。

写入前预处理数据

在写入工作负载中预处理数据可以帮助您避免由于模式冲突或资源使用而导致的写入失败。例如,如果您有许多设备写入同一个表,并且某些设备对同一个字段使用不同的数据类型,那么您可能希望生成警报或转换字段数据以匹配您的模式,然后再将数据发送到 InfluxDB。

使用 Telegraf,您可以处理来自其他服务和文件的数据,然后将其写入 InfluxDB。除了使用 Telegraf 包含的插件处理数据外,您还可以使用 Execd 处理器插件来集成您自己的代码和外部应用程序。

以下示例展示了如何配置 Telegraf 代理插件来优化写入。这些示例使用 File 输入插件从文件中读取数据,并使用 InfluxDB v2 输出插件将数据写入数据库,但您可以使用任何输入和输出插件。

前提条件

如果尚未安装 Telegraf,请安装 Telegraf

过滤批量数据

使用 Telegraf 和指标过滤来在将数据写入 InfluxDB 之前对其进行过滤。

配置指标过滤器以保留或删除数据元素(在处理器和聚合器插件运行之前)。

  1. 输入以下命令创建一个 Telegraf 配置,该配置解析系统使用情况数据,删除指定的字段和标签,然后将数据写入 InfluxDB

    cat <<EOF >> ./telegraf.conf
      [[inputs.cpu]]
        # Remove the specified fields from points.
        fieldpass = ["usage_system", "usage_idle"]
        # Remove the specified tags from points.
        tagexclude = ["host"]
      [[outputs.influxdb_v2]]
        urls = ["https://:8181"]
        token = "
    AUTH_TOKEN
    "
    organization = "" bucket = "
    DATABASE_NAME
    "
    EOF

    替换以下内容:

    • DATABASE_NAME:要写入数据到的数据库名称
    • AUTH_TOKEN:您的 InfluxDB 3 Core 令牌 将其存储在秘密存储或环境变量中,以避免暴露原始令牌字符串。
  2. 要测试输入和处理器,请输入以下命令

    telegraf --test --config telegraf.conf

    输出与以下类似。对于每一行输入数据,过滤器会传递指标名称、标签、指定的字段和时间戳。

    > cpu,cpu=cpu0 usage_idle=100,usage_system=0 1702067201000000000
    ...
    > cpu,cpu=cpu-total usage_idle=99.80198019802448,usage_system=0.1980198019802045 1702067201000000000

强制转换数据类型以避免点被拒绝的错误

使用 Telegraf 和 Converter 处理器插件将字段数据类型转换为适合您的模式。

例如,如果您将Home sensor sample data中的示例数据写入数据库,然后尝试将以下批量数据写入同一个表

home,room=Kitchen temp=23.1,hum=36.6,co=22.1 1641063600
home,room=Living\ Room temp=22i,hum=36.4,co=17i 1641067200
home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641067200

InfluxDB 期望 co 包含整数值,并拒绝 co 浮点十进制 (22.1) 值的点。为避免错误,请配置 Telegraf 将字段转换为模式列中的数据类型。

以下示例将 temphumco 字段转换为适合示例数据模式

  1. 在您的终端中,输入以下命令创建示例数据文件

    cat <<EOF > ./home.lp
    home,room=Kitchen temp=23.1,hum=36.6,co=22.1 1641063600
    home,room=Living\ Room temp=22i,hum=36.4,co=17i 1641067200
    home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641067200
    EOF
  2. 输入以下命令创建一个 Telegraf 配置,该配置解析示例数据,将字段值转换为指定的数据类型,然后将数据写入 InfluxDB

    cat <<EOF > ./telegraf.conf
    [[inputs.file]]
      ## For each interval, parse data from files in the list.
      files = ["home.lp"]
      influx_timestamp_precision = "1s"
      precision = "1s"
      tagexclude = ["host"]
    [[processors.converter]]
      [processors.converter.fields]
        ## A data type and a list of fields to convert to the data type.
        float = ["temp", "hum"]
        integer = ["co"]
    [[outputs.influxdb_v2]]
      ## InfluxDB v2 API credentials and the database to write to.
      urls = ["https://:8181"]
      token = "
    AUTH_TOKEN
    "
    organization = "" bucket = "
    DATABASE_NAME
    "
    EOF

    替换以下内容:

    • DATABASE_NAME:要写入数据到的数据库名称
    • AUTH_TOKEN:您的 InfluxDB 3 Core 令牌 将其存储在秘密存储或环境变量中,以避免暴露原始令牌字符串。
  3. 要测试输入和处理器,请输入以下命令

    telegraf --test --config telegraf.conf

    Telegraf 将以下内容输出到 stdout,然后退出

    > home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000
    > home,room=Living\ Room co=17i,hum=36.4,temp=22 1641067200000000000
    > home,room=Kitchen co=26i,hum=36.5,temp=22.7 1641067200000000000

合并行以优化内存和带宽

使用 Telegraf 和 Merge 聚合器插件来合并具有相同测量、标签集和时间戳的点。

以下示例为两个系列(表、标签集和时间戳的组合)创建示例数据,然后合并每个系列中的点

  1. 在您的终端中,输入以下命令创建示例数据文件并计算最早时间戳与现在之间的秒数。该命令将计算值赋给一个 grace_duration 变量,您将在下一步中使用它。

    cat <<EOF > ./home.lp
    home,room=Kitchen temp=23.1 1641063600
    home,room=Kitchen hum=36.6 1641063600
    home,room=Kitchen co=22i 1641063600
    home,room=Living\ Room temp=22.7 1641063600
    home,room=Living\ Room hum=36.4 1641063600
    home,room=Living\ Room co=17i 1641063600
    EOF
    grace_duration="$(($(date +%s)-1641063000))s"
  2. 输入以下命令配置 Telegraf 来解析文件、合并点并将数据写入 InfluxDB——具体来说,配置设置了以下属性

    • influx_timestamp_precision:对于解析器,指定输入数据中的时间戳精度
    • 可选:aggregators.merge.grace 扩展了合并点的时间范围。为确保包含示例数据,配置使用了上一步计算的变量。
    cat <<EOF > ./telegraf.conf
    # Parse metrics from a file
    [[inputs.file]]
      ## A list of files to parse during each interval.
      files = ["home.lp"]
      ## The precision of timestamps in your data.
      influx_timestamp_precision = "1s"
      tagexclude = ["host"]
    # Merge separate metrics that share a series key
    [[aggregators.merge]]
      grace = "$grace_duration"
      ## If true, drops the original metric.
      drop_original = true
    # Writes metrics as line protocol to the InfluxDB v2 API
    [[outputs.influxdb_v2]]
      ## InfluxDB v2 API credentials and the database to write data to.
      urls = ["https://:8181"]
      token = "
    AUTH_TOKEN
    "
    organization = "" bucket = "
    DATABASE_NAME
    "
    EOF

    替换以下内容:

    • DATABASE_NAME:要写入数据到的数据库名称
    • AUTH_TOKEN:您的 InfluxDB 3 Core 令牌 将其存储在秘密存储或环境变量中,以避免暴露原始令牌字符串。
  3. 要测试输入和聚合器,请输入以下命令

    telegraf --test --config telegraf.conf

    Telegraf 将以下内容输出到 stdout,然后退出

    > home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000
    > home,room=Living\ Room co=17i,hum=36.4,temp=22.7 1641063600000000000

避免发送重复数据

使用 Telegraf 和 Dedup 处理器插件来过滤字段值与先前值完全重复的数据。对数据进行去重可以减少写入的载荷大小和资源使用。

以下示例展示了如何使用 Telegraf 删除重复字段值的点,然后将数据写入 InfluxDB

  1. 在您的终端中,输入以下命令创建示例数据文件并计算最早时间戳与现在之间的秒数。该命令将计算值赋给一个 dedup_duration 变量,您将在下一步中使用它。

    cat <<EOF > ./home.lp
    home,room=Kitchen temp=23.1,hum=36.6,co=22i 1641063600
    home,room=Living\ Room temp=22.5,hum=36.4,co=17i 1641063600
    home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641063605
    home,room=Living\ Room temp=22.5,hum=36.4,co=17i 1641063605
    home,room=Kitchen temp=23.1,hum=36.6,co=22i 1641063610
    home,room=Living\ Room temp=23.0,hum=36.4,co=17i 1641063610
    EOF
    dedup_duration="$(($(date +%s)-1641063000))s"
  2. 输入以下命令配置 Telegraf 来解析文件、删除重复的点并将数据写入 InfluxDB——具体来说,示例配置设置了以下内容

    • influx_timestamp_precision:对于解析器,指定输入数据中的时间戳精度
    • processors.dedup:配置 Dedup 处理器插件
    • 可选:processors.dedup.dedup_interval。在 dedup_interval现在范围内的点将被考虑删除。为确保包含示例数据,配置使用了上一步计算的变量。
    cat <<EOF > ./telegraf.conf
    # Parse metrics from a file
    [[inputs.file]]
      ## A list of files to parse during each interval.
      files = ["home.lp"]
      ## The precision of timestamps in your data.
      influx_timestamp_precision = "1s"
      tagexclude = ["host"]
    # Filter metrics that repeat previous field values
    [[processors.dedup]]
      ## Drops duplicates within the specified duration
      dedup_interval = "$dedup_duration"
    # Writes metrics as line protocol to the InfluxDB v2 API
    [[outputs.influxdb_v2]]
      ## InfluxDB v2 API credentials and the database to write data to.
      urls = ["https://:8181"]
      token = "
    AUTH_TOKEN
    "
    organization = "" bucket = "
    DATABASE_NAME
    "
    EOF

    替换以下内容:

    • DATABASE_NAME:要写入数据到的数据库名称
    • AUTH_TOKEN:您的 InfluxDB 3 Core 令牌 将其存储在秘密存储或环境变量中,以避免暴露原始令牌字符串。
  3. 要测试输入和处理器,请输入以下命令

    telegraf --test --config telegraf.conf

    Telegraf 将以下内容输出到 stdout,然后退出

    > home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000
    > home,room=Living\ Room co=17i,hum=36.4,temp=22.5 1641063600000000000
    > home,room=Kitchen co=26i,hum=36.5,temp=22.7 1641063605000000000
    > home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063610000000000
    > home,room=Living\ Room co=17i,hum=36.4,temp=23 1641063610000000000

运行自定义预处理代码

使用 Telegraf 和 Execd 处理器插件来执行 Telegraf 外部的代码,然后写入处理后的数据。Execd 插件期望在 stdin 中接收行协议数据,将数据传递给配置的可执行文件,然后将行协议输出到 stdout。

以下示例展示了如何使用 Telegraf 执行 Go 代码来处理指标,然后将输出写入 InfluxDB。Go multiplier.go 示例代码执行以下操作

  1. 从 Telegraf 导入 influx 解析器和序列化器插件。

  2. 将每一行数据解析为 Telegraf 指标。

  3. 如果指标包含 count 字段,则将字段值乘以 2;否则,将消息打印到 stderr 并退出。

  4. 在您的编辑器中,输入以下示例代码并将文件保存为 multiplier.go

    package main
    
    import (
        "fmt"
        "os"
    
        "github.com/influxdata/telegraf/plugins/parsers/influx"
        influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx"
    )
    
    func main() {
        parser := influx.NewStreamParser(os.Stdin)
        serializer := influxSerializer.Serializer{}
        if err := serializer.Init(); err != nil {
            fmt.Fprintf(os.Stderr, "serializer init failed: %v\n", err)
            os.Exit(1)
        }
    
        for {
            metric, err := parser.Next()
            if err != nil {
                if err == influx.EOF {
                    return // stream ended
                }
                if parseErr, isParseError := err.(*influx.ParseError); isParseError {
                    fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr)
                    os.Exit(1)
                }
                fmt.Fprintf(os.Stderr, "ERR %v\n", err)
                os.Exit(1)
            }
    
            c, found := metric.GetField("count")
            if !found {
                fmt.Fprintf(os.Stderr, "metric has no count field\n")
                os.Exit(1)
            }
            switch t := c.(type) {
            case float64:
                t *= 2
                metric.AddField("count", t)
            case int64:
                t *= 2
                metric.AddField("count", t)
            default:
                fmt.Fprintf(os.Stderr, "count is not an unknown type, it's a %T\n", c)
                os.Exit(1)
            }
            b, err := serializer.Serialize(metric)
            if err != nil {
                fmt.Fprintf(os.Stderr, "ERR %v\n", err)
                os.Exit(1)
            }
            fmt.Fprint(os.Stdout, string(b))
        }
    }
  5. 初始化模块并安装依赖项

    go mod init processlp
    go mod tidy
  6. 在您的终端中,输入以下命令创建示例数据文件

    cat <<EOF > ./home.lp
    home,room=Kitchen temp=23.1,count=1 1641063600
    home,room=Living\ Room temp=22.7,count=1 1641063600
    home,room=Kitchen temp=23.1 1641063601
    home,room=Living\ Room temp=22.7 1641063601
    EOF
  7. 输入以下命令配置 Telegraf 来解析文件、执行 Go 二进制文件并将数据写入——具体来说,示例配置设置了以下内容

    • influx_timestamp_precision:对于解析器,指定输入数据中的时间戳精度
    • processors.execd:配置 Execd 插件
    • processors.execd.command:设置 Execd 要运行的可执行文件和参数
    cat <<EOF > ./telegraf.conf
    # Parse metrics from a file
    [[inputs.file]]
      ## A list of files to parse during each interval.
      files = ["home.lp"]
      ## The precision of timestamps in your data.
      influx_timestamp_precision = "1s"
      tagexclude = ["host"]
    # Filter metrics that repeat previous field values
    [[processors.execd]]
      ## A list that contains the executable command and arguments to run as a daemon.
      command = ["go", "run", "multiplier.go"]
    # Writes metrics as line protocol to the InfluxDB v2 API
    [[outputs.influxdb_v2]]
      ## InfluxDB v2 API credentials and the database to write data to.
      urls = ["https://:8181"]
      token = "
    AUTH_TOKEN
    "
    organization = "" bucket = "
    DATABASE_NAME
    "
    EOF

    替换以下内容:

    • DATABASE_NAME:要写入数据到的数据库名称
    • AUTH_TOKEN:您的 InfluxDB 3 Core 令牌 将其存储在秘密存储或环境变量中,以避免暴露原始令牌字符串。
  8. 要测试输入和处理器,请输入以下命令

    telegraf --test --config telegraf.conf

    Telegraf 将以下内容输出到 stdout,然后退出

    > home,room=Kitchen count=2,temp=23.1 1641063600000000000
    > home,room=Living\ Room count=2,temp=22.7 1641063600000000000

此页面是否有帮助?

感谢您的反馈!


InfluxDB 3.8 新特性

InfluxDB 3.8 和 InfluxDB 3 Explorer 1.6 的主要增强功能。

查看博客文章

InfluxDB 3.8 现已适用于 Core 和 Enterprise 版本,同时发布了 InfluxDB 3 Explorer UI 的 1.6 版本。本次发布着重于操作成熟度,以及如何更轻松地部署、管理和可靠地运行 InfluxDB。

更多信息,请查看

InfluxDB Docker 的 latest 标签将指向 InfluxDB 3 Core

在 **2026 年 2 月 3 日**,InfluxDB Docker 镜像的 latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。

如果使用 Docker 来安装和运行 InfluxDB,latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。例如,如果使用 Docker 运行 InfluxDB v2,请将 latest 版本标签替换为 Docker pull 命令中的特定版本标签 — 例如

docker pull influxdb:2