文档文档

优化写入 InfluxDB 3 Core

InfluxDB 3 Core 处于公开 Alpha 阶段

InfluxDB 3 Core 处于公开 alpha 阶段,可用于测试和反馈,但不适用于生产环境。产品和本文档都在不断完善中。我们欢迎并鼓励您提供关于 alpha 体验的反馈,并邀请您加入我们的公共频道以获取更新和分享反馈。

Alpha 预期和建议

使用这些技巧来优化性能和系统开销,以便将数据写入 InfluxDB 3 Core。

以下工具写入 InfluxDB,默认情况下采用大多数写入优化

批量写入

批量写入数据,以最大限度地减少将数据写入 InfluxDB 时的网络开销。

最佳批次大小为 10,000 行 Line Protocol 或 10 MB,以先达到的阈值为准。

按键对标签进行排序

在将数据点写入 InfluxDB 之前,请按字典顺序按键对标签进行排序。验证排序结果是否与 Go bytes.Compare 函数的结果匹配。

# Line protocol example with unsorted tags
measurement,tagC=therefore,tagE=am,tagA=i,tagD=i,tagB=think fieldKey=fieldValue 1562020262

# Optimized line protocol example with tags sorted by key
measurement,tagA=i,tagB=think,tagC=therefore,tagD=i,tagE=am fieldKey=fieldValue 1562020262

尽可能使用最粗略的时间精度

InfluxDB 3 Core 支持高达纳秒的时间戳精度。但是,如果您的数据不是以纳秒为单位收集的,则无需以该精度写入。为了获得更好的性能,请针对您的用例使用尽可能粗略的时间戳精度。

默认情况下,InfluxDB 3 Core 尝试通过识别哪个精度相对接近“现在”来自动检测 Line Protocol 中时间戳的精度。您还可以在写入请求中指定时间戳精度。InfluxDB 3 Core 支持以下时间戳精度

  • ns (纳秒)
  • us (微秒)
  • ms (毫秒)
  • s (秒)

使用 gzip 压缩

使用 gzip 压缩来加速写入 InfluxDB 3 Core。基准测试表明,当数据被压缩时,速度最多可提高 5 倍。

在 Telegraf 中启用 gzip 压缩

telegraf.conf 中的 influxdb_v2 输出插件配置中,将 content_encoding 选项设置为 gzip

[[outputs.influxdb_v2]]
  urls = ["https://#:8181"]
  # ...
  content_encoding = "gzip"

在 InfluxDB 客户端库中启用 gzip 压缩

每个InfluxDB 客户端库都提供了压缩写入请求的选项,或者默认强制压缩。启用压缩的方法因库而异。有关具体说明,请参阅InfluxDB 客户端库文档

将 gzip 压缩与 InfluxDB API 一起使用

当使用 InfluxDB API /api/v2/write 端点写入数据时,请使用 gzip 压缩数据并将 Content-Encoding 标头设置为 gzip – 例如

echo "mem,host=host1 used_percent=23.43234543 1641024000
mem,host=host2 used_percent=26.81522361 1641027600
mem,host=host1 used_percent=22.52984738 1641031200
mem,host=host2 used_percent=27.18294630 1641034800" | gzip > system.gzip \

curl --request POST "https://#:8181/api/v2/write?org=ignored&bucket=
DATABASE_NAME
"
\
--header "Authorization: Token
AUTH_TOKEN
"
\
--header "Content-Type: text/plain; charset=utf-8" \ --header "Content-Encoding: gzip" \ --data-binary @system.gzip

替换以下内容

  • DATABASE_NAME:要将数据写入的数据库的名称

  • AUTH_TOKEN:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。

    在 alpha 阶段,InfluxDB 3 Core 不需要授权令牌。您可以省略 Authorization 标头,也可以提供任意令牌字符串。

使用 NTP 同步主机

使用网络时间协议 (NTP) 同步主机之间的时间。如果 Line Protocol 中未包含时间戳,则 InfluxDB 使用其主机的本地时间(UTC)为每个点分配时间戳。如果主机的时钟未与 NTP 同步,则时间戳可能不准确。

在一个请求中写入多个数据点

要在一个请求中写入多行,Line Protocol 的每一行都必须用换行符 (\n) 分隔。

在写入前预处理数据

在您的写入工作负载中预处理数据可以帮助您避免由于模式冲突或资源使用而导致的写入失败。例如,如果您有许多设备写入同一个表,并且某些设备对同一字段使用不同的数据类型,那么您可能希望在将数据发送到 InfluxDB 之前生成警报或转换字段数据以适应您的模式。

使用Telegraf,您可以处理来自其他服务和文件的数据,然后将其写入 InfluxDB。除了使用 Telegraf 包含的插件处理数据外,您还可以使用Execd 处理器插件来集成您自己的代码和外部应用程序。

以下示例显示了如何配置 Telegraf 代理插件以优化写入。这些示例使用文件输入插件从文件中读取数据,并使用InfluxDB v2 输出插件将数据写入数据库,但您可以使用任何输入和输出插件。

先决条件

如果您尚未安装,请安装 Telegraf

从批次中筛选数据

使用 Telegraf 和指标筛选来筛选数据,然后再将其写入 InfluxDB。

配置指标过滤器以保留或删除数据元素(在处理器和聚合器插件运行之前)。

  1. 输入以下命令以创建 Telegraf 配置,该配置解析系统使用情况数据,删除指定的字段和标签,然后将数据写入 InfluxDB

    cat <<EOF >> ./telegraf.conf
      [[inputs.cpu]]
        # Remove the specified fields from points.
        fieldpass = ["usage_system", "usage_idle"]
        # Remove the specified tags from points.
        tagexclude = ["host"]
      [[outputs.influxdb_v2]]
        urls = ["https://#:8181"]
        token = "
    AUTH_TOKEN
    "
    organization = "" bucket = "
    DATABASE_NAME
    "
    EOF

    替换以下内容

    • DATABASE_NAME:要将数据写入的数据库的名称

    • AUTH_TOKEN:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。

      在 alpha 阶段,InfluxDB 3 Core 不需要授权令牌。对于 token 选项,请提供一个空的或任意的令牌字符串。

  2. 要测试输入和处理器,请输入以下命令

    telegraf --test --config telegraf.conf
    

    输出类似于以下内容。对于输入数据的每一行,过滤器都会传递指标名称、标签、指定的字段和时间戳。

    > cpu,cpu=cpu0 usage_idle=100,usage_system=0 1702067201000000000
    ...
    > cpu,cpu=cpu-total usage_idle=99.80198019802448,usage_system=0.1980198019802045 1702067201000000000
    

强制数据类型以避免拒绝点错误

使用 Telegraf 和转换器处理器插件来转换字段数据类型以适应您的模式。

例如,如果您将家庭传感器示例数据中的示例数据写入数据库,然后尝试将以下批次写入同一表

home,room=Kitchen temp=23.1,hum=36.6,co=22.1 1641063600
home,room=Living\ Room temp=22i,hum=36.4,co=17i 1641067200
home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641067200

InfluxDB 期望 co 包含整数值,并拒绝包含 co 浮点十进制 (22.1) 值的点。为避免错误,请配置 Telegraf 以将字段转换为模式列中的数据类型。

以下示例转换 temphumco 字段以适应示例数据模式

  1. 在您的终端中,输入以下命令以创建示例数据文件

    cat <<EOF > ./home.lp
    home,room=Kitchen temp=23.1,hum=36.6,co=22.1 1641063600
    home,room=Living\ Room temp=22i,hum=36.4,co=17i 1641067200
    home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641067200
    EOF
    
  2. 输入以下命令以创建 Telegraf 配置,该配置解析示例数据,将字段值转换为指定的数据类型,然后将数据写入 InfluxDB

    cat <<EOF > ./telegraf.conf
    [[inputs.file]]
      ## For each interval, parse data from files in the list.
      files = ["home.lp"]
      influx_timestamp_precision = "1s"
      precision = "1s"
      tagexclude = ["host"]
    [[processors.converter]]
      [processors.converter.fields]
        ## A data type and a list of fields to convert to the data type.
        float = ["temp", "hum"]
        integer = ["co"]
    [[outputs.influxdb_v2]]
      ## InfluxDB v2 API credentials and the database to write to.
      urls = ["https://#:8181"]
      token = "
    AUTH_TOKEN
    "
    organization = "" bucket = "
    DATABASE_NAME
    "
    EOF

    替换以下内容

    • DATABASE_NAME:要将数据写入的数据库的名称

    • AUTH_TOKEN:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。

      在 alpha 阶段,InfluxDB 3 Core 不需要授权令牌。对于 token 选项,请提供一个空的或任意的令牌字符串。

  3. 要测试输入和处理器,请输入以下命令

    telegraf --test --config telegraf.conf
    

    Telegraf 将以下内容输出到 stdout,然后退出

    > home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000
    > home,room=Living\ Room co=17i,hum=36.4,temp=22 1641067200000000000
    > home,room=Kitchen co=26i,hum=36.5,temp=22.7 1641067200000000000
    

合并行以优化内存和带宽

使用 Telegraf 和合并聚合器插件来合并共享相同指标、标签集和时间戳的点。

以下示例为两个序列(表、标签集和时间戳的组合)创建示例数据,然后合并每个序列中的点

  1. 在您的终端中,输入以下命令以创建示例数据文件并计算最早时间戳与现在之间的时间秒数。该命令将计算出的值分配给您将在下一步中使用的 grace_duration 变量。

    cat <<EOF > ./home.lp
    home,room=Kitchen temp=23.1 1641063600
    home,room=Kitchen hum=36.6 1641063600
    home,room=Kitchen co=22i 1641063600
    home,room=Living\ Room temp=22.7 1641063600
    home,room=Living\ Room hum=36.4 1641063600
    home,room=Living\ Room co=17i 1641063600
    EOF
    grace_duration="$(($(date +%s)-1641063000))s"
    
  2. 输入以下命令以配置 Telegraf 解析文件、合并点并将数据写入 InfluxDB – 具体来说,该配置设置了以下属性

    • influx_timestamp_precision:对于解析器,指定输入数据中的时间戳精度
    • 可选:aggregators.merge.grace 延长合并点的持续时间。为了确保包含示例数据,该配置使用了上一步中计算出的变量。
    cat <<EOF > ./telegraf.conf
    # Parse metrics from a file
    [[inputs.file]]
      ## A list of files to parse during each interval.
      files = ["home.lp"]
      ## The precision of timestamps in your data.
      influx_timestamp_precision = "1s"
      tagexclude = ["host"]
    # Merge separate metrics that share a series key
    [[aggregators.merge]]
      grace = "$grace_duration"
      ## If true, drops the original metric.
      drop_original = true
    # Writes metrics as line protocol to the InfluxDB v2 API
    [[outputs.influxdb_v2]]
      ## InfluxDB v2 API credentials and the database to write data to.
      urls = ["https://#:8181"]
      token = "
    AUTH_TOKEN
    "
    organization = "" bucket = "
    DATABASE_NAME
    "
    EOF

    替换以下内容

    • DATABASE_NAME:要将数据写入的数据库的名称

    • AUTH_TOKEN:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。

      在 alpha 阶段,InfluxDB 3 Core 不需要授权令牌。对于 token 选项,请提供一个空的或任意的令牌字符串。

  3. 要测试输入和聚合器,请输入以下命令

    telegraf --test --config telegraf.conf
    

    Telegraf 将以下内容输出到 stdout,然后退出

    > home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000
    > home,room=Living\ Room co=17i,hum=36.4,temp=22.7 1641063600000000000
    

避免发送重复数据

使用 Telegraf 和Dedup 处理器插件来筛选字段值与先前值完全重复的数据。重复数据删除可以减少您的写入负载大小和资源使用。

以下示例显示了如何使用 Telegraf 删除重复字段值的点,然后将数据写入 InfluxDB

  1. 在您的终端中,输入以下命令以创建示例数据文件并计算最早时间戳与现在之间的时间秒数。该命令将计算出的值分配给您将在下一步中使用的 dedup_duration 变量。

    cat <<EOF > ./home.lp
    home,room=Kitchen temp=23.1,hum=36.6,co=22i 1641063600
    home,room=Living\ Room temp=22.5,hum=36.4,co=17i 1641063600
    home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641063605
    home,room=Living\ Room temp=22.5,hum=36.4,co=17i 1641063605
    home,room=Kitchen temp=23.1,hum=36.6,co=22i 1641063610
    home,room=Living\ Room temp=23.0,hum=36.4,co=17i 1641063610
    EOF
    dedup_duration="$(($(date +%s)-1641063000))s"
    
  2. 输入以下命令以配置 Telegraf 解析文件、删除重复点并将数据写入 InfluxDB – 具体来说,示例配置设置了以下内容

    • influx_timestamp_precision:对于解析器,指定输入数据中的时间戳精度
    • processors.dedup:配置 Dedup 处理器插件
    • 可选:processors.dedup.dedup_interval。范围 dedup_interval 到现在的点被考虑删除。为了确保包含示例数据,该配置使用了上一步中计算出的变量。
    cat <<EOF > ./telegraf.conf
    # Parse metrics from a file
    [[inputs.file]]
      ## A list of files to parse during each interval.
      files = ["home.lp"]
      ## The precision of timestamps in your data.
      influx_timestamp_precision = "1s"
      tagexclude = ["host"]
    # Filter metrics that repeat previous field values
    [[processors.dedup]]
      ## Drops duplicates within the specified duration
      dedup_interval = "$dedup_duration"
    # Writes metrics as line protocol to the InfluxDB v2 API
    [[outputs.influxdb_v2]]
      ## InfluxDB v2 API credentials and the database to write data to.
      urls = ["https://#:8181"]
      token = "
    AUTH_TOKEN
    "
    organization = "" bucket = "
    DATABASE_NAME
    "
    EOF

    替换以下内容

    • DATABASE_NAME:要将数据写入的数据库的名称

    • AUTH_TOKEN:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。

      在 alpha 阶段,InfluxDB 3 Core 不需要授权令牌。对于 token 选项,请提供一个空的或任意的令牌字符串。

  3. 要测试输入和处理器,请输入以下命令

    telegraf --test --config telegraf.conf
    

    Telegraf 将以下内容输出到 stdout,然后退出

    > home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000
    > home,room=Living\ Room co=17i,hum=36.4,temp=22.5 1641063600000000000
    > home,room=Kitchen co=26i,hum=36.5,temp=22.7 1641063605000000000
    > home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063610000000000
    > home,room=Living\ Room co=17i,hum=36.4,temp=23 1641063610000000000
    

运行自定义预处理代码

使用 Telegraf 和Execd 处理器插件来执行 Telegraf 外部的代码,然后写入处理后的数据。Execd 插件期望 stdin 中的 Line Protocol 数据,将数据传递到配置的可执行文件,然后将 Line Protocol 输出到 stdout。

以下示例显示了如何使用 Telegraf 执行 Go 代码以处理指标,然后将输出写入 InfluxDB。Go multiplier.go 示例代码执行以下操作

  1. 从 Telegraf 导入 influx 解析器和序列化器插件。

  2. 将每行数据解析为 Telegraf 指标。

  3. 如果指标包含 count 字段,则将字段值乘以 2;否则,将消息打印到 stderr 并退出。

  4. 在您的编辑器中,输入以下示例代码并将文件另存为 multiplier.go

    package main
    
    import (
        "fmt"
        "os"
    
        "github.com/influxdata/telegraf/plugins/parsers/influx"
        influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx"
    )
    
    func main() {
        parser := influx.NewStreamParser(os.Stdin)
        serializer := influxSerializer.Serializer{}
        if err := serializer.Init(); err != nil {
            fmt.Fprintf(os.Stderr, "serializer init failed: %v\n", err)
            os.Exit(1)
        }
    
        for {
            metric, err := parser.Next()
            if err != nil {
                if err == influx.EOF {
                    return // stream ended
                }
                if parseErr, isParseError := err.(*influx.ParseError); isParseError {
                    fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr)
                    os.Exit(1)
                }
                fmt.Fprintf(os.Stderr, "ERR %v\n", err)
                os.Exit(1)
            }
    
            c, found := metric.GetField("count")
            if !found {
                fmt.Fprintf(os.Stderr, "metric has no count field\n")
                os.Exit(1)
            }
            switch t := c.(type) {
            case float64:
                t *= 2
                metric.AddField("count", t)
            case int64:
                t *= 2
                metric.AddField("count", t)
            default:
                fmt.Fprintf(os.Stderr, "count is not an unknown type, it's a %T\n", c)
                os.Exit(1)
            }
            b, err := serializer.Serialize(metric)
            if err != nil {
                fmt.Fprintf(os.Stderr, "ERR %v\n", err)
                os.Exit(1)
            }
            fmt.Fprint(os.Stdout, string(b))
        }
    }
    
  5. 初始化模块并安装依赖项

    go mod init processlp
    go mod tidy
    
  6. 在您的终端中,输入以下命令以创建示例数据文件

    cat <<EOF > ./home.lp
    home,room=Kitchen temp=23.1,count=1 1641063600
    home,room=Living\ Room temp=22.7,count=1 1641063600
    home,room=Kitchen temp=23.1 1641063601
    home,room=Living\ Room temp=22.7 1641063601
    EOF
    
  7. 输入以下命令以配置 Telegraf 解析文件、执行 Go 二进制文件并写入数据 – 具体来说,示例配置设置了以下内容

    • influx_timestamp_precision:对于解析器,指定输入数据中的时间戳精度
    • processors.execd:配置 Execd 插件
    • processors.execd.command:设置 Execd 运行的可执行文件和参数
    cat <<EOF > ./telegraf.conf
    # Parse metrics from a file
    [[inputs.file]]
      ## A list of files to parse during each interval.
      files = ["home.lp"]
      ## The precision of timestamps in your data.
      influx_timestamp_precision = "1s"
      tagexclude = ["host"]
    # Filter metrics that repeat previous field values
    [[processors.execd]]
      ## A list that contains the executable command and arguments to run as a daemon.
      command = ["go", "run", "multiplier.go"]
    # Writes metrics as line protocol to the InfluxDB v2 API
    [[outputs.influxdb_v2]]
      ## InfluxDB v2 API credentials and the database to write data to.
      urls = ["https://#:8181"]
      token = "
    AUTH_TOKEN
    "
    organization = "" bucket = "
    DATABASE_NAME
    "
    EOF

    替换以下内容

    • DATABASE_NAME:要将数据写入的数据库的名称

    • AUTH_TOKEN:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。

      在 alpha 阶段,InfluxDB 3 Core 不需要授权令牌。对于 token 选项,请提供一个空的或任意的令牌字符串。

  8. 要测试输入和处理器,请输入以下命令

    telegraf --test --config telegraf.conf
    

    Telegraf 将以下内容输出到 stdout,然后退出

    > home,room=Kitchen count=2,temp=23.1 1641063600000000000
    > home,room=Living\ Room count=2,temp=22.7 1641063600000000000
    

此页是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 将进入维护模式。您可以继续像当前一样使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已发布公开 Alpha 版

InfluxDB 3 开源版本现已可用于 alpha 测试,根据 MIT 或 Apache 2 许可获得许可。

我们正在发布两个产品作为 alpha 版本的一部分。

InfluxDB 3 Core 是我们新的开源产品。它是一个用于时间序列和事件数据的最新数据引擎。InfluxDB 3 Enterprise 是一个商业版本,它建立在 Core 的基础上,增加了历史查询功能、只读副本、高可用性、可扩展性和细粒度的安全性。

有关如何开始使用的更多信息,请查看