优化 InfluxDB 3 Core 的写入
使用以下技巧来优化向 InfluxDB 3 Core 写入数据时的性能和系统开销。
以下工具默认情况下写入 InfluxDB 并采用大多数写入优化
- Telegraf
- InfluxDB 客户端库
批量写入
批量写入数据,以最大限度地减少向 InfluxDB 写入数据时的网络开销。
最佳批处理大小为 10,000 行 Line Protocol 或 10 MB,以先达到的阈值为准。
按键对标签进行排序
在将数据点写入 InfluxDB 之前,请按字典顺序按键对标签进行排序。验证排序结果是否与 Go bytes.Compare
函数的结果匹配。
# Line protocol example with unsorted tags
measurement,tagC=therefore,tagE=am,tagA=i,tagD=i,tagB=think fieldKey=fieldValue 1562020262
# Optimized line protocol example with tags sorted by key
measurement,tagA=i,tagB=think,tagC=therefore,tagD=i,tagE=am fieldKey=fieldValue 1562020262
尽可能使用最粗略的时间精度
InfluxDB 3 Core 支持高达纳秒级的时间戳精度。但是,如果您的数据不是以纳秒为单位收集的,则无需以该精度写入。为了获得更好的性能,请为您的用例使用尽可能粗略的时间戳精度。
默认情况下,InfluxDB 3 Core 尝试通过识别哪个精度相对接近“现在”来自动检测 Line Protocol 中时间戳的精度。您还可以在写入请求中指定时间戳精度。InfluxDB 3 Core 支持以下时间戳精度
ns
(纳秒)us
(微秒)ms
(毫秒)s
(秒)
使用 gzip 压缩
使用 gzip 压缩来加快写入 InfluxDB 3 Core 的速度。基准测试表明,数据压缩后速度最多可提高 5 倍。
在 Telegraf 中启用 gzip 压缩
在您的 telegraf.conf
中的 influxdb_v2
输出插件配置中,将 content_encoding
选项设置为 gzip
[[outputs.influxdb_v2]]
urls = ["https://localhost:8181"]
# ...
content_encoding = "gzip"
在 InfluxDB 客户端库中启用 gzip 压缩
每个 InfluxDB 客户端库 都提供了压缩写入请求的选项,或者默认强制压缩。启用压缩的方法对于每个库都不同。有关具体说明,请参阅 InfluxDB 客户端库文档。
将 gzip 压缩与 InfluxDB API 一起使用
当使用 InfluxDB API /api/v2/write
端点写入数据时,请使用 gzip
压缩数据,并将 Content-Encoding
标头设置为 gzip
,例如
替换以下内容
DATABASE_NAME
:要写入数据的数据库的名称AUTH_TOKEN
:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。在 Beta 阶段,InfluxDB 3 Core 不需要授权令牌。您可以省略
Authorization
标头,也可以提供任意令牌字符串。
使用 NTP 同步主机
使用网络时间协议 (NTP) 同步主机之间的时间。如果 Line Protocol 中未包含时间戳,则 InfluxDB 使用其主机的本地时间(UTC 时间)为每个点分配时间戳。如果主机的时钟未与 NTP 同步,则时间戳可能不准确。
在一个请求中写入多个数据点
要在一个请求中写入多行,Line Protocol 的每一行必须以新行 (\n
) 分隔。
在写入之前预处理数据
在写入工作负载中预处理数据可以帮助您避免由于 Schema 冲突或资源使用而导致的写入失败。例如,如果您有许多设备写入同一张表,并且某些设备对同一字段使用不同的数据类型,那么您可能希望在将数据发送到 InfluxDB 之前生成警报或转换字段数据以适应您的 Schema。
使用 Telegraf,您可以处理来自其他服务和文件的数据,然后将其写入 InfluxDB。除了使用 Telegraf 包含的插件处理数据外,您还可以使用 Execd 处理器插件 来集成您自己的代码和外部应用程序。
以下示例演示了如何配置 Telegraf 代理和插件以优化写入。这些示例使用 File 输入插件 从文件读取数据,并使用 InfluxDB v2 输出插件 将数据写入数据库,但您可以使用任何输入和输出插件。
先决条件
如果您尚未安装,请安装 Telegraf。
从批处理中筛选数据
使用 Telegraf 和指标筛选在将数据写入 InfluxDB 之前对其进行筛选。
配置指标过滤器以保留或删除数据元素(在处理器和聚合器插件运行之前)。
输入以下命令以创建 Telegraf 配置,该配置解析系统使用情况数据,删除指定的字段和标签,然后将数据写入 InfluxDB
cat <<EOF >> ./telegraf.conf [[inputs.cpu]] # Remove the specified fields from points. fieldpass = ["usage_system", "usage_idle"] # Remove the specified tags from points. tagexclude = ["host"] [[outputs.influxdb_v2]] urls = ["http://localhost:8181"] token = "
AUTH_TOKEN" organization = "" bucket = "DATABASE_NAME" EOF替换以下内容
DATABASE_NAME
:要写入数据的数据库的名称AUTH_TOKEN
:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。在 Beta 阶段,InfluxDB 3 Core 不需要授权令牌。对于
token
选项,请提供空令牌字符串或任意令牌字符串。
要测试输入和处理器,请输入以下命令
telegraf --test --config telegraf.conf
输出类似于以下内容。对于输入数据的每一行,过滤器都会传递指标名称、标签、指定的字段和时间戳。
> cpu,cpu=cpu0 usage_idle=100,usage_system=0 1702067201000000000 ... > cpu,cpu=cpu-total usage_idle=99.80198019802448,usage_system=0.1980198019802045 1702067201000000000
强制数据类型以避免拒绝点错误
使用 Telegraf 和 Converter 处理器插件 来转换字段数据类型以适应您的 Schema。
例如,如果您将 家庭传感器示例数据 中的示例数据写入数据库,然后尝试将以下批次写入同一张表
InfluxDB 期望 co
包含整数值,并拒绝 co
浮点小数 (22.1
) 值的点。为避免错误,请配置 Telegraf 以将字段转换为 Schema 列中的数据类型。
以下示例转换 temp
、hum
和 co
字段以适应 示例数据 Schema
在您的终端中,输入以下命令以创建示例数据文件
cat <<EOF > ./home.lp home,room=Kitchen temp=23.1,hum=36.6,co=22.1 1641063600 home,room=Living\ Room temp=22i,hum=36.4,co=17i 1641067200 home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641067200 EOF
输入以下命令以创建 Telegraf 配置,该配置解析示例数据,将字段值转换为指定的数据类型,然后将数据写入 InfluxDB
cat <<EOF > ./telegraf.conf [[inputs.file]] ## For each interval, parse data from files in the list. files = ["home.lp"] influx_timestamp_precision = "1s" precision = "1s" tagexclude = ["host"] [[processors.converter]] [processors.converter.fields] ## A data type and a list of fields to convert to the data type. float = ["temp", "hum"] integer = ["co"] [[outputs.influxdb_v2]] ## InfluxDB v2 API credentials and the database to write to. urls = ["https://localhost:8181"] token = "
AUTH_TOKEN" organization = "" bucket = "DATABASE_NAME" EOF替换以下内容
DATABASE_NAME
:要写入数据的数据库的名称AUTH_TOKEN
:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。在 Beta 阶段,InfluxDB 3 Core 不需要授权令牌。对于
token
选项,请提供空令牌字符串或任意令牌字符串。
要测试输入和处理器,请输入以下命令
telegraf --test --config telegraf.conf
Telegraf 将以下内容输出到 stdout,然后退出
> home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000 > home,room=Living\ Room co=17i,hum=36.4,temp=22 1641067200000000000 > home,room=Kitchen co=26i,hum=36.5,temp=22.7 1641067200000000000
合并行以优化内存和带宽
使用 Telegraf 和 Merge 聚合器插件 来合并共享相同 measurement、标签集和时间戳的点。
以下示例为两个序列(表、标签集和时间戳的组合)创建示例数据,然后合并每个序列中的点
在您的终端中,输入以下命令以创建示例数据文件并计算最早时间戳与现在之间相隔的秒数。该命令将计算出的值分配给您将在下一步中使用的
grace_duration
变量。cat <<EOF > ./home.lp home,room=Kitchen temp=23.1 1641063600 home,room=Kitchen hum=36.6 1641063600 home,room=Kitchen co=22i 1641063600 home,room=Living\ Room temp=22.7 1641063600 home,room=Living\ Room hum=36.4 1641063600 home,room=Living\ Room co=17i 1641063600 EOF grace_duration="$(($(date +%s)-1641063000))s"
输入以下命令以配置 Telegraf 来解析文件、合并点并将数据写入 InfluxDB,具体来说,该配置设置以下属性
influx_timestamp_precision
:对于解析器,指定输入数据中的时间戳精度- 可选:
aggregators.merge.grace
延长合并点的持续时间。为确保包含示例数据,配置使用上一步中计算出的变量。
cat <<EOF > ./telegraf.conf # Parse metrics from a file [[inputs.file]] ## A list of files to parse during each interval. files = ["home.lp"] ## The precision of timestamps in your data. influx_timestamp_precision = "1s" tagexclude = ["host"] # Merge separate metrics that share a series key [[aggregators.merge]] grace = "$grace_duration" ## If true, drops the original metric. drop_original = true # Writes metrics as line protocol to the InfluxDB v2 API [[outputs.influxdb_v2]] ## InfluxDB v2 API credentials and the database to write data to. urls = ["https://localhost:8181"] token = "
AUTH_TOKEN" organization = "" bucket = "DATABASE_NAME" EOF替换以下内容
DATABASE_NAME
:要写入数据的数据库的名称AUTH_TOKEN
:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。在 Beta 阶段,InfluxDB 3 Core 不需要授权令牌。对于
token
选项,请提供空令牌字符串或任意令牌字符串。
要测试输入和聚合器,请输入以下命令
telegraf --test --config telegraf.conf
Telegraf 将以下内容输出到 stdout,然后退出
> home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000 > home,room=Living\ Room co=17i,hum=36.4,temp=22.7 1641063600000000000
避免发送重复数据
使用 Telegraf 和 Dedup 处理器插件 来筛选字段值与先前值完全重复的数据。对数据进行去重可以减少您的写入负载大小和资源使用量。
以下示例演示了如何使用 Telegraf 删除重复字段值的点,然后将数据写入 InfluxDB
在您的终端中,输入以下命令以创建示例数据文件并计算最早时间戳与现在之间相隔的秒数。该命令将计算出的值分配给您将在下一步中使用的
dedup_duration
变量。cat <<EOF > ./home.lp home,room=Kitchen temp=23.1,hum=36.6,co=22i 1641063600 home,room=Living\ Room temp=22.5,hum=36.4,co=17i 1641063600 home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641063605 home,room=Living\ Room temp=22.5,hum=36.4,co=17i 1641063605 home,room=Kitchen temp=23.1,hum=36.6,co=22i 1641063610 home,room=Living\ Room temp=23.0,hum=36.4,co=17i 1641063610 EOF dedup_duration="$(($(date +%s)-1641063000))s"
输入以下命令以配置 Telegraf 来解析文件、删除重复点并将数据写入 InfluxDB,具体来说,示例配置设置以下内容
influx_timestamp_precision
:对于解析器,指定输入数据中的时间戳精度processors.dedup
:配置 Dedup 处理器插件- 可选:
processors.dedup.dedup_interval
。范围在dedup_interval
到现在 之间的点将被考虑删除。为确保包含示例数据,配置使用上一步中计算出的变量。
cat <<EOF > ./telegraf.conf # Parse metrics from a file [[inputs.file]] ## A list of files to parse during each interval. files = ["home.lp"] ## The precision of timestamps in your data. influx_timestamp_precision = "1s" tagexclude = ["host"] # Filter metrics that repeat previous field values [[processors.dedup]] ## Drops duplicates within the specified duration dedup_interval = "$dedup_duration" # Writes metrics as line protocol to the InfluxDB v2 API [[outputs.influxdb_v2]] ## InfluxDB v2 API credentials and the database to write data to. urls = ["https://localhost:8181"] token = "
AUTH_TOKEN" organization = "" bucket = "DATABASE_NAME" EOF替换以下内容
DATABASE_NAME
:要写入数据的数据库的名称AUTH_TOKEN
:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。在 Beta 阶段,InfluxDB 3 Core 不需要授权令牌。对于
token
选项,请提供空令牌字符串或任意令牌字符串。
要测试输入和处理器,请输入以下命令
telegraf --test --config telegraf.conf
Telegraf 将以下内容输出到 stdout,然后退出
> home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000 > home,room=Living\ Room co=17i,hum=36.4,temp=22.5 1641063600000000000 > home,room=Kitchen co=26i,hum=36.5,temp=22.7 1641063605000000000 > home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063610000000000 > home,room=Living\ Room co=17i,hum=36.4,temp=23 1641063610000000000
运行自定义预处理代码
使用 Telegraf 和 Execd 处理器插件 来执行 Telegraf 外部的代码,然后写入处理后的数据。Execd 插件期望 stdin 中包含 Line Protocol 数据,将数据传递给配置的可执行文件,然后将 Line Protocol 输出到 stdout。
以下示例演示了如何使用 Telegraf 执行 Go 代码来处理指标,然后将输出写入 InfluxDB。Go multiplier.go
示例代码执行以下操作
从 Telegraf 导入
influx
解析器和序列化器插件。将每行数据解析为 Telegraf 指标。
如果指标包含
count
字段,则将字段值乘以2
;否则,将消息打印到 stderr 并退出。在您的编辑器中,输入以下示例代码并将文件另存为
multiplier.go
package main import ( "fmt" "os" "github.com/influxdata/telegraf/plugins/parsers/influx" influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx" ) func main() { parser := influx.NewStreamParser(os.Stdin) serializer := influxSerializer.Serializer{} if err := serializer.Init(); err != nil { fmt.Fprintf(os.Stderr, "serializer init failed: %v\n", err) os.Exit(1) } for { metric, err := parser.Next() if err != nil { if err == influx.EOF { return // stream ended } if parseErr, isParseError := err.(*influx.ParseError); isParseError { fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr) os.Exit(1) } fmt.Fprintf(os.Stderr, "ERR %v\n", err) os.Exit(1) } c, found := metric.GetField("count") if !found { fmt.Fprintf(os.Stderr, "metric has no count field\n") os.Exit(1) } switch t := c.(type) { case float64: t *= 2 metric.AddField("count", t) case int64: t *= 2 metric.AddField("count", t) default: fmt.Fprintf(os.Stderr, "count is not an unknown type, it's a %T\n", c) os.Exit(1) } b, err := serializer.Serialize(metric) if err != nil { fmt.Fprintf(os.Stderr, "ERR %v\n", err) os.Exit(1) } fmt.Fprint(os.Stdout, string(b)) } }
初始化模块并安装依赖项
go mod init processlp go mod tidy
在您的终端中,输入以下命令以创建示例数据文件
cat <<EOF > ./home.lp home,room=Kitchen temp=23.1,count=1 1641063600 home,room=Living\ Room temp=22.7,count=1 1641063600 home,room=Kitchen temp=23.1 1641063601 home,room=Living\ Room temp=22.7 1641063601 EOF
输入以下命令以配置 Telegraf 来解析文件、执行 Go 二进制文件并将数据写入,具体来说,示例配置设置以下内容
influx_timestamp_precision
:对于解析器,指定输入数据中的时间戳精度processors.execd
:配置 Execd 插件processors.execd.command
:设置 Execd 要运行的可执行文件和参数
cat <<EOF > ./telegraf.conf # Parse metrics from a file [[inputs.file]] ## A list of files to parse during each interval. files = ["home.lp"] ## The precision of timestamps in your data. influx_timestamp_precision = "1s" tagexclude = ["host"] # Filter metrics that repeat previous field values [[processors.execd]] ## A list that contains the executable command and arguments to run as a daemon. command = ["go", "run", "multiplier.go"] # Writes metrics as line protocol to the InfluxDB v2 API [[outputs.influxdb_v2]] ## InfluxDB v2 API credentials and the database to write data to. urls = ["https://localhost:8181"] token = "
AUTH_TOKEN" organization = "" bucket = "DATABASE_NAME" EOF替换以下内容
DATABASE_NAME
:要写入数据的数据库的名称AUTH_TOKEN
:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。在 Beta 阶段,InfluxDB 3 Core 不需要授权令牌。对于
token
选项,请提供空令牌字符串或任意令牌字符串。
要测试输入和处理器,请输入以下命令
telegraf --test --config telegraf.conf
Telegraf 将以下内容输出到 stdout,然后退出
> home,room=Kitchen count=2,temp=23.1 1641063600000000000 > home,room=Living\ Room count=2,temp=22.7 1641063600000000000
此页面是否对您有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一份子!我们欢迎并鼓励您对 InfluxDB 3 Core 和本文档提出反馈和错误报告。要寻求支持,请使用以下资源
拥有年度合同或支持合同的客户可以联系 InfluxData 支持。