文档说明

优化 InfluxDB 写入

使用以下技巧在向 InfluxDB 写入数据时优化性能和系统开销。

以下工具写入 InfluxDB,并默认采用 大多数 写入优化

批量写入

将数据批量写入以最小化写入 InfluxDB 时的网络开销。

最佳批量大小为行协议的 10,000 行或 10 MB,以先达到的阈值为准。

按键排序标签

在将数据点写入 InfluxDB 之前,按字典顺序对标签进行按键排序。验证排序结果与 Go bytes.Compare 函数 的结果匹配。

# Line protocol example with unsorted tags
measurement,tagC=therefore,tagE=am,tagA=i,tagD=i,tagB=think fieldKey=fieldValue 1562020262

# Optimized line protocol example with tags sorted by key
measurement,tagA=i,tagB=think,tagC=therefore,tagD=i,tagE=am fieldKey=fieldValue 1562020262

使用可能的最粗时间精度

默认情况下,InfluxDB 以纳秒精度写入数据。但如果您的数据不是按纳秒收集的,就没有必要以那种精度写入。为了更好的性能,请使用可能的最粗精度的时间戳。

在向 InfluxDB 写入数据 时指定时间戳精度。

使用 gzip 压缩

使用 gzip 压缩以加快向 InfluxDB 的写入速度。基准测试显示,数据压缩后速度可提高 5 倍。

在 Telegraf 中启用 gzip 压缩

在您的 telegraf.conf 中的 influxdb_v2 输出插件配置中,将 content_encoding 选项设置为 gzip

[[outputs.influxdb_v2]]
  urls = ["https://cluster-host.com"]
  # ...
  content_encoding = "gzip"

在 InfluxDB 客户端库中启用 gzip 压缩

每个 InfluxDB 客户端库 都提供了压缩写入请求的选项或默认启用压缩。每个库启用压缩的方法不同。有关具体说明,请参阅InfluxDB 客户端库文档

使用 InfluxDB API 进行 gzip 压缩

当使用 InfluxDB API /api/v2/write 端点写入数据时,使用 gzip 压缩数据,并将 Content-Encoding 标头设置为 gzip –例如

echo "mem,host=host1 used_percent=23.43234543 1641024000
mem,host=host2 used_percent=26.81522361 1641027600
mem,host=host1 used_percent=22.52984738 1641031200
mem,host=host2 used_percent=27.18294630 1641034800" | gzip > system.gzip \

curl --request POST "https://cluster-host.com/api/v2/write?org=ignored&bucket=
DATABASE_NAME
&precision=s"
\
--header "Authorization: Token
DATABASE_TOKEN
"
\
--header "Content-Type: text/plain; charset=utf-8" \ --header "Content-Encoding: gzip" \ --data-binary @system.gzip

替换以下内容

  • DATABASE_NAME:写入数据的 数据库名称
  • DATABASE_TOKEN:具有对指定数据库 写入 访问权限的 数据库令牌将其存储在密钥库或环境变量中,以避免暴露原始令牌字符串。

使用 NTP 同步主机

使用网络时间协议(NTP)在主机之间同步时间。如果行协议中未包含时间戳,InfluxDB 将使用其主机的本地时间(UTC)为每个点分配时间戳。如果主机的时钟未与 NTP 同步,则时间戳可能不准确。

在一个请求中写入多个数据点

要在单个请求中写入多行,每行行协议必须由换行符(\n)分隔。

在写入前预处理数据

在您的写入工作中预处理数据可以帮助您避免由于模式冲突或资源使用而导致的写入失败。例如,如果您有多个设备写入相同的测量值,并且某些设备使用不同的数据类型来表示相同的字段,那么您可能希望在将数据发送到 InfluxDB 之前生成警报或转换字段数据以符合您的模式。

使用 Telegraf,您可以处理来自其他服务和文件的数据,然后将它们写入 InfluxDB。除了使用 Telegraf 内置的插件处理数据外,您还可以使用 Execd 处理器插件 来集成自己的代码和外部应用程序。

以下示例演示了如何配置 Telegraf 代理插件 以优化写入。示例使用 文件输入插件 从文件中读取数据,并使用 InfluxDB v2 输出插件 将数据写入桶,但您可以使用任何输入和输出插件。

先决条件

如果您还没有安装,请安装 Telegraf

从批次中过滤数据

使用 Telegraf 和度量过滤来在写入 InfluxDB 之前过滤数据。

配置 度量过滤器 以保留或删除数据元素(在处理器和聚合器插件运行之前)。

  1. 输入以下命令以创建一个Telegraf配置,该配置解析系统使用数据,移除指定的字段和标签,然后将数据写入InfluxDB

    cat <<EOF >> ./telegraf.conf
      [[inputs.cpu]]
        # Remove the specified fields from points.
        fieldpass = ["usage_system", "usage_idle"]
        # Remove the specified tags from points.
        tagexclude = ["host"]
      [[outputs.influxdb_v2]]
        urls = ["https://cluster-host.com"]
        token = "
    DATABASE_TOKEN
    "
    organization = "" bucket = "
    DATABASE_NAME
    "
    EOF

    替换以下内容

    • DATABASE_NAME:写入数据的 数据库名称
    • DATABASE_TOKEN:具有对指定数据库 写入 访问权限的 数据库令牌将其存储在密钥库或环境变量中,以避免暴露原始令牌字符串。
  2. 要测试输入和处理器,请输入以下命令

telegraf --test --config telegraf.conf

输出类似于以下内容。对于输入数据的每一行,过滤器会传递指标名称、标签、指定的字段和时间戳。

> cpu,cpu=cpu0 usage_idle=100,usage_system=0 1702067201000000000
...
> cpu,cpu=cpu-total usage_idle=99.80198019802448,usage_system=0.1980198019802045 1702067201000000000

强制数据类型以避免拒绝的点错误

使用Telegraf和转换器处理器插件将字段数据类型转换为符合您的模式。

例如,如果您将开始使用家庭传感器数据中的示例数据写入数据库,然后尝试将以下批次写入相同的度量

home,room=Kitchen temp=23.1,hum=36.6,co=22.1 1641063600
home,room=Living\ Room temp=22i,hum=36.4,co=17i 1641067200
home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641067200

InfluxDB期望co包含整数值,并拒绝包含co浮点小数(22.1)值的点。为了避免错误,请配置Telegraf将字段转换为您的模式列中的数据类型。

以下示例将temphumco字段转换为符合示例数据模式

  1. 在您的终端中,输入以下命令以创建示例数据文件

    cat <<EOF > ./home.lp
    home,room=Kitchen temp=23.1,hum=36.6,co=22.1 1641063600
    home,room=Living\ Room temp=22i,hum=36.4,co=17i 1641067200
    home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641067200
    EOF
    
  2. 输入以下命令以创建一个Telegraf配置,该配置解析示例数据,将字段值转换为指定的数据类型,然后将数据写入InfluxDB

    cat <<EOF > ./telegraf.conf
    [[inputs.file]]
      ## For each interval, parse data from files in the list.
      files = ["home.lp"]
      influx_timestamp_precision = "1s"
      precision = "1s"
      tagexclude = ["host"]
    [[processors.converter]]
      [processors.converter.fields]
        ## A data type and a list of fields to convert to the data type.
        float = ["temp", "hum"]
        integer = ["co"]
    [[outputs.influxdb_v2]]
      ## InfluxDB v2 API credentials and the database to write to.
      urls = ["https://cluster-host.com"]
      token = "
    DATABASE_TOKEN
    "
    organization = "" bucket = "
    DATABASE_NAME
    "
    EOF

    替换以下内容

    • DATABASE_NAME:写入数据的 数据库名称
    • DATABASE_TOKEN:具有对指定数据库 写入 访问权限的 数据库令牌将其存储在密钥库或环境变量中,以避免暴露原始令牌字符串。
  3. 要测试输入和处理器,请输入以下命令

    telegraf --test --config telegraf.conf
    

    Telegraf将以下内容输出到stdout,然后退出

    > home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000
    > home,room=Living\ Room co=17i,hum=36.4,temp=22 1641067200000000000
    > home,room=Kitchen co=26i,hum=36.5,temp=22.7 1641067200000000000
    

合并行以优化内存和带宽

使用Telegraf和合并聚合器插件合并具有相同度量、标签集和时间戳的点。

以下示例创建两个系列(度量、标签集和时间戳的组合)的样本数据,然后合并每个系列中的点

  1. 在您的终端中,输入以下命令以创建样本数据文件并计算最早时间戳和现在之间的秒数。该命令将计算值分配给grace_duration变量,您将在下一步中使用它。

    cat <<EOF > ./home.lp
    home,room=Kitchen temp=23.1 1641063600
    home,room=Kitchen hum=36.6 1641063600
    home,room=Kitchen co=22i 1641063600
    home,room=Living\ Room temp=22.7 1641063600
    home,room=Living\ Room hum=36.4 1641063600
    home,room=Living\ Room co=17i 1641063600
    EOF
    grace_duration="$(($(date +%s)-1641063000))s"
    
  2. 输入以下命令以配置Telegraf解析文件,合并点,并将数据写入InfluxDB - 特别是配置设置了以下属性

    • influx_timestamp_precision:对于解析器,指定输入数据中的时间戳精度
    • 可选:aggregators.merge.grace扩展合并点的持续时间。为了确保样本数据被包含,配置使用上一步中计算出的变量。
    cat <<EOF > ./telegraf.conf
    # Parse metrics from a file
    [[inputs.file]]
      ## A list of files to parse during each interval.
      files = ["home.lp"]
      ## The precision of timestamps in your data.
      influx_timestamp_precision = "1s"
      tagexclude = ["host"]
    # Merge separate metrics that share a series key
    [[aggregators.merge]]
      grace = "$grace_duration"
      ## If true, drops the original metric.
      drop_original = true
    # Writes metrics as line protocol to the InfluxDB v2 API
    [[outputs.influxdb_v2]]
      ## InfluxDB v2 API credentials and the database to write data to.
      urls = ["https://cluster-host.com"]
      token = "
    DATABASE_TOKEN
    "
    organization = "" bucket = "
    DATABASE_NAME
    "
    EOF

    替换以下内容

    • DATABASE_NAME:写入数据的 数据库名称
    • DATABASE_TOKEN:具有对指定数据库 写入 访问权限的 数据库令牌将其存储在密钥库或环境变量中,以避免暴露原始令牌字符串。
  3. 要测试输入和聚合器,请输入以下命令

    telegraf --test --config telegraf.conf
    

    Telegraf将以下内容输出到stdout,然后退出

    > home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000
    > home,room=Living\ Room co=17i,hum=36.4,temp=22.7 1641063600000000000
    

避免发送重复数据

使用Telegraf和去重处理器插件过滤字段值与前一次值完全相同的重复数据。去重数据可以减少您的写入负载大小和资源使用。

以下示例显示了如何使用Telegraf删除重复字段值的点,并将数据写入InfluxDB

  1. 在您的终端中,输入以下命令以创建样本数据文件并计算最早时间戳和现在之间的秒数。该命令将计算值分配给dedup_duration变量,您将在下一步中使用它。

    cat <<EOF > ./home.lp
    home,room=Kitchen temp=23.1,hum=36.6,co=22i 1641063600
    home,room=Living\ Room temp=22.5,hum=36.4,co=17i 1641063600
    home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641063605
    home,room=Living\ Room temp=22.5,hum=36.4,co=17i 1641063605
    home,room=Kitchen temp=23.1,hum=36.6,co=22i 1641063610
    home,room=Living\ Room temp=23.0,hum=36.4,co=17i 1641063610
    EOF
    dedup_duration="$(($(date +%s)-1641063000))s"
    
  2. 输入以下命令以配置Telegraf解析文件,删除重复点,并将数据写入InfluxDB - 特别是以下示例配置设置了以下内容

    • influx_timestamp_precision:对于解析器,指定输入数据中的时间戳精度
    • processors.dedup:配置去重处理器插件
    • 可选:processors.dedup.dedup_interval。范围在dedup_interval现在之间的点被视为删除。为了确保样本数据被包含,配置使用上一步中计算出的变量。
    cat <<EOF > ./telegraf.conf
    # Parse metrics from a file
    [[inputs.file]]
      ## A list of files to parse during each interval.
      files = ["home.lp"]
      ## The precision of timestamps in your data.
      influx_timestamp_precision = "1s"
      tagexclude = ["host"]
    # Filter metrics that repeat previous field values
    [[processors.dedup]]
      ## Drops duplicates within the specified duration
      dedup_interval = "$dedup_duration"
    # Writes metrics as line protocol to the InfluxDB v2 API
    [[outputs.influxdb_v2]]
      ## InfluxDB v2 API credentials and the database to write data to.
      urls = ["https://cluster-host.com"]
      token = "
    DATABASE_TOKEN
    "
    organization = "" bucket = "
    DATABASE_NAME
    "
    EOF

    替换以下内容

    • DATABASE_NAME:写入数据的 数据库名称
    • DATABASE_TOKEN:具有对指定数据库 写入 访问权限的 数据库令牌将其存储在密钥库或环境变量中,以避免暴露原始令牌字符串。
  3. 要测试输入和处理器,请输入以下命令

    telegraf --test --config telegraf.conf
    

    Telegraf将以下内容输出到stdout,然后退出

    > home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000
    > home,room=Living\ Room co=17i,hum=36.4,temp=22.5 1641063600000000000
    > home,room=Kitchen co=26i,hum=36.5,temp=22.7 1641063605000000000
    > home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063610000000000
    > home,room=Living\ Room co=17i,hum=36.4,temp=23 1641063610000000000
    

运行自定义预处理代码

使用 Telegraf 和 Execd 处理器插件 来执行 Telegraf 外部的代码,然后写入处理后的数据。Execd 插件期望从 stdin 接收行协议数据,将数据传递给配置的可执行文件,然后将行协议输出到 stdout。

以下示例展示了如何使用 Telegraf 执行 Go 代码来处理指标,然后将输出写入 InfluxDB。Go 的 multiplier.go 示例代码执行以下操作

  1. 从 Telegraf 导入 influx 解析器和序列化器插件。

  2. 将每行数据解析为 Telegraf 指标。

  3. 如果指标包含 count 字段,则将字段值乘以 2;否则,将消息打印到 stderr 并退出。

  4. 在您的编辑器中输入以下示例代码,并将文件保存为 multiplier.go

    package main
    
    import (
        "fmt"
        "os"
    
        "github.com/influxdata/telegraf/plugins/parsers/influx"
        influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx"
    )
    
    func main() {
        parser := influx.NewStreamParser(os.Stdin)
        serializer := influxSerializer.Serializer{}
        if err := serializer.Init(); err != nil {
            fmt.Fprintf(os.Stderr, "serializer init failed: %v\n", err)
            os.Exit(1)
        }
    
        for {
            metric, err := parser.Next()
            if err != nil {
                if err == influx.EOF {
                    return // stream ended
                }
                if parseErr, isParseError := err.(*influx.ParseError); isParseError {
                    fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr)
                    os.Exit(1)
                }
                fmt.Fprintf(os.Stderr, "ERR %v\n", err)
                os.Exit(1)
            }
    
            c, found := metric.GetField("count")
            if !found {
                fmt.Fprintf(os.Stderr, "metric has no count field\n")
                os.Exit(1)
            }
            switch t := c.(type) {
            case float64:
                t *= 2
                metric.AddField("count", t)
            case int64:
                t *= 2
                metric.AddField("count", t)
            default:
                fmt.Fprintf(os.Stderr, "count is not an unknown type, it's a %T\n", c)
                os.Exit(1)
            }
            b, err := serializer.Serialize(metric)
            if err != nil {
                fmt.Fprintf(os.Stderr, "ERR %v\n", err)
                os.Exit(1)
            }
            fmt.Fprint(os.Stdout, string(b))
        }
    }
    
  5. 初始化模块并安装依赖项

    go mod init processlp
    go mod tidy
    
  6. 在您的终端中,输入以下命令以创建示例数据文件

    cat <<EOF > ./home.lp
    home,room=Kitchen temp=23.1,count=1 1641063600
    home,room=Living\ Room temp=22.7,count=1 1641063600
    home,room=Kitchen temp=23.1 1641063601
    home,room=Living\ Room temp=22.7 1641063601
    EOF
    
  7. 输入以下命令以配置 Telegraf 解析文件,执行 Go 二进制文件,并写入数据 - 特别是,以下示例配置设置了以下内容

    • influx_timestamp_precision:对于解析器,指定输入数据中的时间戳精度
    • processors.execd:配置 Execd 插件
    • processors.execd.command:为 Execd 设置要运行的可执行文件和参数
    cat <<EOF > ./telegraf.conf
    # Parse metrics from a file
    [[inputs.file]]
      ## A list of files to parse during each interval.
      files = ["home.lp"]
      ## The precision of timestamps in your data.
      influx_timestamp_precision = "1s"
      tagexclude = ["host"]
    # Filter metrics that repeat previous field values
    [[processors.execd]]
      ## A list that contains the executable command and arguments to run as a daemon.
      command = ["go", "run", "multiplier.go"]
    # Writes metrics as line protocol to the InfluxDB v2 API
    [[outputs.influxdb_v2]]
      ## InfluxDB v2 API credentials and the database to write data to.
      urls = ["https://cluster-host.com"]
      token = "
    DATABASE_TOKEN
    "
    organization = "" bucket = "
    DATABASE_NAME
    "
    EOF

    替换以下内容

    • DATABASE_NAME:写入数据的 数据库名称
    • DATABASE_TOKEN:具有对指定数据库 写入 访问权限的 数据库令牌将其存储在密钥库或环境变量中,以避免暴露原始令牌字符串。
  8. 要测试输入和处理器,请输入以下命令

    telegraf --test --config telegraf.conf
    

    Telegraf将以下内容输出到stdout,然后退出

    > home,room=Kitchen count=2,temp=23.1 1641063600000000000
    > home,room=Living\ Room count=2,temp=22.7 1641063600000000000
    

这个页面有帮助吗?

感谢您的反馈!


Flux 的未来

Flux 正在进入维护模式。您可以继续像现在一样使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB v3 增强功能和 InfluxDB 集群版现已正式发布

新功能包括更快的查询性能和管理工具,这些功能提升了 InfluxDB v3 产品线。InfluxDB 集群版现已正式发布。

InfluxDB v3 性能和功能

InfluxDB v3 产品线在查询性能方面取得了显著提升,并提供了新的管理工具。这些增强功能包括用于监控 InfluxDB 集群健康状况的操作仪表板、InfluxDB Cloud Dedicated 中的单点登录(SSO)支持以及新的令牌和数据库管理 API。

了解新的 v3 增强功能


InfluxDB 集群版正式发布

InfluxDB 集群版现已正式发布,并为您在自管理堆栈中提供了 InfluxDB v3 的功能。

与我们讨论 InfluxDB 集群