文档文档

优化 InfluxDB 3 Core 的写入

使用以下技巧来优化向 InfluxDB 3 Core 写入数据时的性能和系统开销。

以下工具默认情况下写入 InfluxDB 并采用大多数写入优化

批量写入

批量写入数据,以最大限度地减少向 InfluxDB 写入数据时的网络开销。

最佳批处理大小为 10,000 行 Line Protocol 或 10 MB,以先达到的阈值为准。

按键对标签进行排序

在将数据点写入 InfluxDB 之前,请按字典顺序按键对标签进行排序。验证排序结果是否与 Go bytes.Compare 函数的结果匹配。

# Line protocol example with unsorted tags
measurement,tagC=therefore,tagE=am,tagA=i,tagD=i,tagB=think fieldKey=fieldValue 1562020262

# Optimized line protocol example with tags sorted by key
measurement,tagA=i,tagB=think,tagC=therefore,tagD=i,tagE=am fieldKey=fieldValue 1562020262

尽可能使用最粗略的时间精度

InfluxDB 3 Core 支持高达纳秒级的时间戳精度。但是,如果您的数据不是以纳秒为单位收集的,则无需以该精度写入。为了获得更好的性能,请为您的用例使用尽可能粗略的时间戳精度。

默认情况下,InfluxDB 3 Core 尝试通过识别哪个精度相对接近“现在”来自动检测 Line Protocol 中时间戳的精度。您还可以在写入请求中指定时间戳精度。InfluxDB 3 Core 支持以下时间戳精度

  • ns(纳秒)
  • us(微秒)
  • ms(毫秒)
  • s(秒)

使用 gzip 压缩

使用 gzip 压缩来加快写入 InfluxDB 3 Core 的速度。基准测试表明,数据压缩后速度最多可提高 5 倍。

在 Telegraf 中启用 gzip 压缩

在您的 telegraf.conf 中的 influxdb_v2 输出插件配置中,将 content_encoding 选项设置为 gzip

[[outputs.influxdb_v2]]
  urls = ["https://localhost:8181"]
  # ...
  content_encoding = "gzip"

在 InfluxDB 客户端库中启用 gzip 压缩

每个 InfluxDB 客户端库 都提供了压缩写入请求的选项,或者默认强制压缩。启用压缩的方法对于每个库都不同。有关具体说明,请参阅 InfluxDB 客户端库文档

将 gzip 压缩与 InfluxDB API 一起使用

当使用 InfluxDB API /api/v2/write 端点写入数据时,请使用 gzip 压缩数据,并将 Content-Encoding 标头设置为 gzip,例如

echo "mem,host=host1 used_percent=23.43234543 1641024000
mem,host=host2 used_percent=26.81522361 1641027600
mem,host=host1 used_percent=22.52984738 1641031200
mem,host=host2 used_percent=27.18294630 1641034800" | gzip > system.gzip \

curl --request POST "https://localhost:8181/api/v2/write?org=ignored&bucket=
DATABASE_NAME
"
\
--header "Authorization: Token
AUTH_TOKEN
"
\
--header "Content-Type: text/plain; charset=utf-8" \ --header "Content-Encoding: gzip" \ --data-binary @system.gzip

替换以下内容

  • DATABASE_NAME:要写入数据的数据库的名称

  • AUTH_TOKEN:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。

    在 Beta 阶段,InfluxDB 3 Core 不需要授权令牌。您可以省略 Authorization 标头,也可以提供任意令牌字符串。

使用 NTP 同步主机

使用网络时间协议 (NTP) 同步主机之间的时间。如果 Line Protocol 中未包含时间戳,则 InfluxDB 使用其主机的本地时间(UTC 时间)为每个点分配时间戳。如果主机的时钟未与 NTP 同步,则时间戳可能不准确。

在一个请求中写入多个数据点

要在一个请求中写入多行,Line Protocol 的每一行必须以新行 (\n) 分隔。

在写入之前预处理数据

在写入工作负载中预处理数据可以帮助您避免由于 Schema 冲突或资源使用而导致的写入失败。例如,如果您有许多设备写入同一张表,并且某些设备对同一字段使用不同的数据类型,那么您可能希望在将数据发送到 InfluxDB 之前生成警报或转换字段数据以适应您的 Schema。

使用 Telegraf,您可以处理来自其他服务和文件的数据,然后将其写入 InfluxDB。除了使用 Telegraf 包含的插件处理数据外,您还可以使用 Execd 处理器插件 来集成您自己的代码和外部应用程序。

以下示例演示了如何配置 Telegraf 代理插件以优化写入。这些示例使用 File 输入插件 从文件读取数据,并使用 InfluxDB v2 输出插件 将数据写入数据库,但您可以使用任何输入和输出插件。

先决条件

如果您尚未安装,请安装 Telegraf

从批处理中筛选数据

使用 Telegraf 和指标筛选在将数据写入 InfluxDB 之前对其进行筛选。

配置指标过滤器以保留或删除数据元素(在处理器和聚合器插件运行之前)。

  1. 输入以下命令以创建 Telegraf 配置,该配置解析系统使用情况数据,删除指定的字段和标签,然后将数据写入 InfluxDB

    cat <<EOF >> ./telegraf.conf
      [[inputs.cpu]]
        # Remove the specified fields from points.
        fieldpass = ["usage_system", "usage_idle"]
        # Remove the specified tags from points.
        tagexclude = ["host"]
      [[outputs.influxdb_v2]]
        urls = ["http://localhost:8181"]
        token = "
    AUTH_TOKEN
    "
    organization = "" bucket = "
    DATABASE_NAME
    "
    EOF

    替换以下内容

    • DATABASE_NAME:要写入数据的数据库的名称

    • AUTH_TOKEN:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。

      在 Beta 阶段,InfluxDB 3 Core 不需要授权令牌。对于 token 选项,请提供空令牌字符串或任意令牌字符串。

  2. 要测试输入和处理器,请输入以下命令

    telegraf --test --config telegraf.conf
    

    输出类似于以下内容。对于输入数据的每一行,过滤器都会传递指标名称、标签、指定的字段和时间戳。

    > cpu,cpu=cpu0 usage_idle=100,usage_system=0 1702067201000000000
    ...
    > cpu,cpu=cpu-total usage_idle=99.80198019802448,usage_system=0.1980198019802045 1702067201000000000
    

强制数据类型以避免拒绝点错误

使用 Telegraf 和 Converter 处理器插件 来转换字段数据类型以适应您的 Schema。

例如,如果您将 家庭传感器示例数据 中的示例数据写入数据库,然后尝试将以下批次写入同一张表

home,room=Kitchen temp=23.1,hum=36.6,co=22.1 1641063600
home,room=Living\ Room temp=22i,hum=36.4,co=17i 1641067200
home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641067200

InfluxDB 期望 co 包含整数值,并拒绝 co 浮点小数 (22.1) 值的点。为避免错误,请配置 Telegraf 以将字段转换为 Schema 列中的数据类型。

以下示例转换 temphumco 字段以适应 示例数据 Schema

  1. 在您的终端中,输入以下命令以创建示例数据文件

    cat <<EOF > ./home.lp
    home,room=Kitchen temp=23.1,hum=36.6,co=22.1 1641063600
    home,room=Living\ Room temp=22i,hum=36.4,co=17i 1641067200
    home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641067200
    EOF
    
  2. 输入以下命令以创建 Telegraf 配置,该配置解析示例数据,将字段值转换为指定的数据类型,然后将数据写入 InfluxDB

    cat <<EOF > ./telegraf.conf
    [[inputs.file]]
      ## For each interval, parse data from files in the list.
      files = ["home.lp"]
      influx_timestamp_precision = "1s"
      precision = "1s"
      tagexclude = ["host"]
    [[processors.converter]]
      [processors.converter.fields]
        ## A data type and a list of fields to convert to the data type.
        float = ["temp", "hum"]
        integer = ["co"]
    [[outputs.influxdb_v2]]
      ## InfluxDB v2 API credentials and the database to write to.
      urls = ["https://localhost:8181"]
      token = "
    AUTH_TOKEN
    "
    organization = "" bucket = "
    DATABASE_NAME
    "
    EOF

    替换以下内容

    • DATABASE_NAME:要写入数据的数据库的名称

    • AUTH_TOKEN:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。

      在 Beta 阶段,InfluxDB 3 Core 不需要授权令牌。对于 token 选项,请提供空令牌字符串或任意令牌字符串。

  3. 要测试输入和处理器,请输入以下命令

    telegraf --test --config telegraf.conf
    

    Telegraf 将以下内容输出到 stdout,然后退出

    > home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000
    > home,room=Living\ Room co=17i,hum=36.4,temp=22 1641067200000000000
    > home,room=Kitchen co=26i,hum=36.5,temp=22.7 1641067200000000000
    

合并行以优化内存和带宽

使用 Telegraf 和 Merge 聚合器插件 来合并共享相同 measurement、标签集和时间戳的点。

以下示例为两个序列(表、标签集和时间戳的组合)创建示例数据,然后合并每个序列中的点

  1. 在您的终端中,输入以下命令以创建示例数据文件并计算最早时间戳与现在之间相隔的秒数。该命令将计算出的值分配给您将在下一步中使用的 grace_duration 变量。

    cat <<EOF > ./home.lp
    home,room=Kitchen temp=23.1 1641063600
    home,room=Kitchen hum=36.6 1641063600
    home,room=Kitchen co=22i 1641063600
    home,room=Living\ Room temp=22.7 1641063600
    home,room=Living\ Room hum=36.4 1641063600
    home,room=Living\ Room co=17i 1641063600
    EOF
    grace_duration="$(($(date +%s)-1641063000))s"
    
  2. 输入以下命令以配置 Telegraf 来解析文件、合并点并将数据写入 InfluxDB,具体来说,该配置设置以下属性

    • influx_timestamp_precision:对于解析器,指定输入数据中的时间戳精度
    • 可选:aggregators.merge.grace 延长合并点的持续时间。为确保包含示例数据,配置使用上一步中计算出的变量。
    cat <<EOF > ./telegraf.conf
    # Parse metrics from a file
    [[inputs.file]]
      ## A list of files to parse during each interval.
      files = ["home.lp"]
      ## The precision of timestamps in your data.
      influx_timestamp_precision = "1s"
      tagexclude = ["host"]
    # Merge separate metrics that share a series key
    [[aggregators.merge]]
      grace = "$grace_duration"
      ## If true, drops the original metric.
      drop_original = true
    # Writes metrics as line protocol to the InfluxDB v2 API
    [[outputs.influxdb_v2]]
      ## InfluxDB v2 API credentials and the database to write data to.
      urls = ["https://localhost:8181"]
      token = "
    AUTH_TOKEN
    "
    organization = "" bucket = "
    DATABASE_NAME
    "
    EOF

    替换以下内容

    • DATABASE_NAME:要写入数据的数据库的名称

    • AUTH_TOKEN:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。

      在 Beta 阶段,InfluxDB 3 Core 不需要授权令牌。对于 token 选项,请提供空令牌字符串或任意令牌字符串。

  3. 要测试输入和聚合器,请输入以下命令

    telegraf --test --config telegraf.conf
    

    Telegraf 将以下内容输出到 stdout,然后退出

    > home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000
    > home,room=Living\ Room co=17i,hum=36.4,temp=22.7 1641063600000000000
    

避免发送重复数据

使用 Telegraf 和 Dedup 处理器插件 来筛选字段值与先前值完全重复的数据。对数据进行去重可以减少您的写入负载大小和资源使用量。

以下示例演示了如何使用 Telegraf 删除重复字段值的点,然后将数据写入 InfluxDB

  1. 在您的终端中,输入以下命令以创建示例数据文件并计算最早时间戳与现在之间相隔的秒数。该命令将计算出的值分配给您将在下一步中使用的 dedup_duration 变量。

    cat <<EOF > ./home.lp
    home,room=Kitchen temp=23.1,hum=36.6,co=22i 1641063600
    home,room=Living\ Room temp=22.5,hum=36.4,co=17i 1641063600
    home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641063605
    home,room=Living\ Room temp=22.5,hum=36.4,co=17i 1641063605
    home,room=Kitchen temp=23.1,hum=36.6,co=22i 1641063610
    home,room=Living\ Room temp=23.0,hum=36.4,co=17i 1641063610
    EOF
    dedup_duration="$(($(date +%s)-1641063000))s"
    
  2. 输入以下命令以配置 Telegraf 来解析文件、删除重复点并将数据写入 InfluxDB,具体来说,示例配置设置以下内容

    • influx_timestamp_precision:对于解析器,指定输入数据中的时间戳精度
    • processors.dedup:配置 Dedup 处理器插件
    • 可选:processors.dedup.dedup_interval。范围在 dedup_interval 到现在 之间的点将被考虑删除。为确保包含示例数据,配置使用上一步中计算出的变量。
    cat <<EOF > ./telegraf.conf
    # Parse metrics from a file
    [[inputs.file]]
      ## A list of files to parse during each interval.
      files = ["home.lp"]
      ## The precision of timestamps in your data.
      influx_timestamp_precision = "1s"
      tagexclude = ["host"]
    # Filter metrics that repeat previous field values
    [[processors.dedup]]
      ## Drops duplicates within the specified duration
      dedup_interval = "$dedup_duration"
    # Writes metrics as line protocol to the InfluxDB v2 API
    [[outputs.influxdb_v2]]
      ## InfluxDB v2 API credentials and the database to write data to.
      urls = ["https://localhost:8181"]
      token = "
    AUTH_TOKEN
    "
    organization = "" bucket = "
    DATABASE_NAME
    "
    EOF

    替换以下内容

    • DATABASE_NAME:要写入数据的数据库的名称

    • AUTH_TOKEN:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。

      在 Beta 阶段,InfluxDB 3 Core 不需要授权令牌。对于 token 选项,请提供空令牌字符串或任意令牌字符串。

  3. 要测试输入和处理器,请输入以下命令

    telegraf --test --config telegraf.conf
    

    Telegraf 将以下内容输出到 stdout,然后退出

    > home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000
    > home,room=Living\ Room co=17i,hum=36.4,temp=22.5 1641063600000000000
    > home,room=Kitchen co=26i,hum=36.5,temp=22.7 1641063605000000000
    > home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063610000000000
    > home,room=Living\ Room co=17i,hum=36.4,temp=23 1641063610000000000
    

运行自定义预处理代码

使用 Telegraf 和 Execd 处理器插件 来执行 Telegraf 外部的代码,然后写入处理后的数据。Execd 插件期望 stdin 中包含 Line Protocol 数据,将数据传递给配置的可执行文件,然后将 Line Protocol 输出到 stdout。

以下示例演示了如何使用 Telegraf 执行 Go 代码来处理指标,然后将输出写入 InfluxDB。Go multiplier.go 示例代码执行以下操作

  1. 从 Telegraf 导入 influx 解析器和序列化器插件。

  2. 将每行数据解析为 Telegraf 指标。

  3. 如果指标包含 count 字段,则将字段值乘以 2;否则,将消息打印到 stderr 并退出。

  4. 在您的编辑器中,输入以下示例代码并将文件另存为 multiplier.go

    package main
    
    import (
        "fmt"
        "os"
    
        "github.com/influxdata/telegraf/plugins/parsers/influx"
        influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx"
    )
    
    func main() {
        parser := influx.NewStreamParser(os.Stdin)
        serializer := influxSerializer.Serializer{}
        if err := serializer.Init(); err != nil {
            fmt.Fprintf(os.Stderr, "serializer init failed: %v\n", err)
            os.Exit(1)
        }
    
        for {
            metric, err := parser.Next()
            if err != nil {
                if err == influx.EOF {
                    return // stream ended
                }
                if parseErr, isParseError := err.(*influx.ParseError); isParseError {
                    fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr)
                    os.Exit(1)
                }
                fmt.Fprintf(os.Stderr, "ERR %v\n", err)
                os.Exit(1)
            }
    
            c, found := metric.GetField("count")
            if !found {
                fmt.Fprintf(os.Stderr, "metric has no count field\n")
                os.Exit(1)
            }
            switch t := c.(type) {
            case float64:
                t *= 2
                metric.AddField("count", t)
            case int64:
                t *= 2
                metric.AddField("count", t)
            default:
                fmt.Fprintf(os.Stderr, "count is not an unknown type, it's a %T\n", c)
                os.Exit(1)
            }
            b, err := serializer.Serialize(metric)
            if err != nil {
                fmt.Fprintf(os.Stderr, "ERR %v\n", err)
                os.Exit(1)
            }
            fmt.Fprint(os.Stdout, string(b))
        }
    }
    
  5. 初始化模块并安装依赖项

    go mod init processlp
    go mod tidy
    
  6. 在您的终端中,输入以下命令以创建示例数据文件

    cat <<EOF > ./home.lp
    home,room=Kitchen temp=23.1,count=1 1641063600
    home,room=Living\ Room temp=22.7,count=1 1641063600
    home,room=Kitchen temp=23.1 1641063601
    home,room=Living\ Room temp=22.7 1641063601
    EOF
    
  7. 输入以下命令以配置 Telegraf 来解析文件、执行 Go 二进制文件并将数据写入,具体来说,示例配置设置以下内容

    • influx_timestamp_precision:对于解析器,指定输入数据中的时间戳精度
    • processors.execd:配置 Execd 插件
    • processors.execd.command:设置 Execd 要运行的可执行文件和参数
    cat <<EOF > ./telegraf.conf
    # Parse metrics from a file
    [[inputs.file]]
      ## A list of files to parse during each interval.
      files = ["home.lp"]
      ## The precision of timestamps in your data.
      influx_timestamp_precision = "1s"
      tagexclude = ["host"]
    # Filter metrics that repeat previous field values
    [[processors.execd]]
      ## A list that contains the executable command and arguments to run as a daemon.
      command = ["go", "run", "multiplier.go"]
    # Writes metrics as line protocol to the InfluxDB v2 API
    [[outputs.influxdb_v2]]
      ## InfluxDB v2 API credentials and the database to write data to.
      urls = ["https://localhost:8181"]
      token = "
    AUTH_TOKEN
    "
    organization = "" bucket = "
    DATABASE_NAME
    "
    EOF

    替换以下内容

    • DATABASE_NAME:要写入数据的数据库的名称

    • AUTH_TOKEN:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。

      在 Beta 阶段,InfluxDB 3 Core 不需要授权令牌。对于 token 选项,请提供空令牌字符串或任意令牌字符串。

  8. 要测试输入和处理器,请输入以下命令

    telegraf --test --config telegraf.conf
    

    Telegraf 将以下内容输出到 stdout,然后退出

    > home,room=Kitchen count=2,temp=23.1 1641063600000000000
    > home,room=Living\ Room count=2,temp=22.7 1641063600000000000
    

此页面是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在一样使用它,而无需对代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速的最新数据引擎,可实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 以 Core 的基础为构建,增加了高可用性、只读副本、增强的安全性以及数据压缩,从而加快查询速度并优化存储。InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看