优化 InfluxDB 的写入
使用这些技巧来优化性能和系统开销,以便将数据写入 InfluxDB。
以下工具写入 InfluxDB,并且默认情况下采用大多数写入优化
批量写入
批量写入数据,以最大限度地减少将数据写入 InfluxDB 时的网络开销。
最佳批处理大小为 10,000 行 Line Protocol 或 10 MB,以先达到的阈值为准。
按键对标签进行排序
在将数据点写入 InfluxDB 之前,按字典顺序按键对标签进行排序。验证排序结果是否与 Go bytes.Compare
函数的结果匹配。
# Line protocol example with unsorted tags
measurement,tagC=therefore,tagE=am,tagA=i,tagD=i,tagB=think fieldKey=fieldValue 1562020262
# Optimized line protocol example with tags sorted by key
measurement,tagA=i,tagB=think,tagC=therefore,tagD=i,tagE=am fieldKey=fieldValue 1562020262
尽可能使用最粗略的时间精度
默认情况下,InfluxDB 以纳秒精度写入数据。但是,如果您的数据不是以纳秒为单位收集的,则无需以该精度写入。为了获得更好的性能,请为时间戳使用尽可能粗略的精度。
在写入 InfluxDB时指定时间戳精度。
使用 gzip 压缩
使用 gzip 压缩来加速写入 InfluxDB。基准测试表明,当数据被压缩时,速度最多可提高 5 倍。
在 Telegraf 中启用 gzip 压缩
在您的 telegraf.conf
中的 influxdb_v2
输出插件配置中,将 content_encoding
选项设置为 gzip
[[outputs.influxdb_v2]]
urls = ["https://cluster-host.com"]
# ...
content_encoding = "gzip"
在 InfluxDB 客户端库中启用 gzip 压缩
每个InfluxDB 客户端库都提供了压缩写入请求的选项,或者默认强制压缩。启用压缩的方法对于每个库都不同。有关具体说明,请参阅InfluxDB 客户端库文档。
将 gzip 压缩与 InfluxDB API 结合使用
当使用 InfluxDB API /api/v2/write
端点写入数据时,使用 gzip
压缩数据并将 Content-Encoding
标头设置为 gzip
– 例如
替换以下内容
使用 NTP 同步主机
使用网络时间协议 (NTP) 同步主机之间的时间。如果 Line Protocol 中未包含时间戳,则 InfluxDB 使用其主机的本地时间(UTC)为每个点分配时间戳。如果主机的时钟未与 NTP 同步,则时间戳可能不准确。
在一个请求中写入多个数据点
要在一个请求中写入多行,则 Line Protocol 的每一行必须以换行符 (\n
) 分隔。
在写入之前预处理数据
在您的写入工作负载中预处理数据可以帮助您避免由于模式冲突或资源使用而导致的写入失败。 例如,如果您有许多设备写入同一个 measurement,并且某些设备对同一字段使用不同的数据类型,那么您可能需要生成警报或转换字段数据以适应您的模式,然后再将数据发送到 InfluxDB。
使用Telegraf,您可以处理来自其他服务和文件的数据,然后将其写入 InfluxDB。除了使用 Telegraf 包含的插件处理数据外,您还可以使用Execd 处理器插件来集成您自己的代码和外部应用程序。
以下示例显示了如何配置 Telegraf 代理和插件以优化写入。这些示例使用文件输入插件从文件读取数据,并使用InfluxDB v2 输出插件将数据写入存储桶,但您可以使用任何输入和输出插件。
先决条件
如果您尚未安装,请安装 Telegraf。
从批次中筛选数据
使用 Telegraf 和指标筛选在将数据写入 InfluxDB 之前筛选数据。
配置指标过滤器以保留或删除数据元素(在处理器和聚合器插件运行之前)。
输入以下命令以创建 Telegraf 配置,该配置解析系统使用情况数据,删除指定的字段和标签,然后将数据写入 InfluxDB
cat <<EOF >> ./telegraf.conf [[inputs.cpu]] # Remove the specified fields from points. fieldpass = ["usage_system", "usage_idle"] # Remove the specified tags from points. tagexclude = ["host"] [[outputs.influxdb_v2]] urls = ["https://cluster-host.com"] token = "
DATABASE_TOKEN" organization = "" bucket = "DATABASE_NAME" EOF替换以下内容
要测试输入和处理器,请输入以下命令
telegraf --test --config telegraf.conf
输出类似于以下内容。对于每一行输入数据,过滤器都会传递指标名称、标签、指定的字段和时间戳。
> cpu,cpu=cpu0 usage_idle=100,usage_system=0 1702067201000000000
...
> cpu,cpu=cpu-total usage_idle=99.80198019802448,usage_system=0.1980198019802045 1702067201000000000
强制数据类型以避免拒绝点错误
使用 Telegraf 和Converter 处理器插件来转换字段数据类型以适应您的模式。
例如,如果您将入门家庭传感器数据中的示例数据写入数据库,然后尝试将以下批次写入同一个 measurement
home,room=Kitchen temp=23.1,hum=36.6,co=22.1 1641063600
home,room=Living\ Room temp=22i,hum=36.4,co=17i 1641067200
home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641067200
InfluxDB 期望 co
包含一个整数值,并拒绝 co
浮点十进制 (22.1
) 值的点。为了避免错误,请配置 Telegraf 将字段转换为模式列中的数据类型。
以下示例转换 temp
、hum
和 co
字段以适应示例数据模式
在您的终端中,输入以下命令以创建示例数据文件
cat <<EOF > ./home.lp home,room=Kitchen temp=23.1,hum=36.6,co=22.1 1641063600 home,room=Living\ Room temp=22i,hum=36.4,co=17i 1641067200 home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641067200 EOF
输入以下命令以创建 Telegraf 配置,该配置解析示例数据,将字段值转换为指定的数据类型,然后将数据写入 InfluxDB
cat <<EOF > ./telegraf.conf [[inputs.file]] ## For each interval, parse data from files in the list. files = ["home.lp"] influx_timestamp_precision = "1s" precision = "1s" tagexclude = ["host"] [[processors.converter]] [processors.converter.fields] ## A data type and a list of fields to convert to the data type. float = ["temp", "hum"] integer = ["co"] [[outputs.influxdb_v2]] ## InfluxDB v2 API credentials and the database to write to. urls = ["https://cluster-host.com"] token = "
DATABASE_TOKEN" organization = "" bucket = "DATABASE_NAME" EOF替换以下内容
要测试输入和处理器,请输入以下命令
telegraf --test --config telegraf.conf
Telegraf 将以下内容输出到 stdout,然后退出
> home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000 > home,room=Living\ Room co=17i,hum=36.4,temp=22 1641067200000000000 > home,room=Kitchen co=26i,hum=36.5,temp=22.7 1641067200000000000
合并行以优化内存和带宽
使用 Telegraf 和Merge 聚合器插件来合并共享相同 measurement、标签集和时间戳的点。
以下示例为两个序列(measurement、标签集和时间戳的组合)创建示例数据,然后在每个序列中合并点
在您的终端中,输入以下命令以创建示例数据文件并计算最早时间戳和现在之间经过的秒数。该命令将计算的值分配给您将在下一步中使用的
grace_duration
变量。cat <<EOF > ./home.lp home,room=Kitchen temp=23.1 1641063600 home,room=Kitchen hum=36.6 1641063600 home,room=Kitchen co=22i 1641063600 home,room=Living\ Room temp=22.7 1641063600 home,room=Living\ Room hum=36.4 1641063600 home,room=Living\ Room co=17i 1641063600 EOF grace_duration="$(($(date +%s)-1641063000))s"
输入以下命令以配置 Telegraf 解析文件、合并点并将数据写入 InfluxDB – 具体而言,该配置设置了以下属性
influx_timestamp_precision
:对于解析器,指定输入数据中的时间戳精度- 可选:
aggregators.merge.grace
延长了合并点的时间。为了确保包含示例数据,配置使用了上一步中计算的变量。
cat <<EOF > ./telegraf.conf # Parse metrics from a file [[inputs.file]] ## A list of files to parse during each interval. files = ["home.lp"] ## The precision of timestamps in your data. influx_timestamp_precision = "1s" tagexclude = ["host"] # Merge separate metrics that share a series key [[aggregators.merge]] grace = "$grace_duration" ## If true, drops the original metric. drop_original = true # Writes metrics as line protocol to the InfluxDB v2 API [[outputs.influxdb_v2]] ## InfluxDB v2 API credentials and the database to write data to. urls = ["https://cluster-host.com"] token = "
DATABASE_TOKEN" organization = "" bucket = "DATABASE_NAME" EOF替换以下内容
要测试输入和聚合器,请输入以下命令
telegraf --test --config telegraf.conf
Telegraf 将以下内容输出到 stdout,然后退出
> home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000 > home,room=Living\ Room co=17i,hum=36.4,temp=22.7 1641063600000000000
避免发送重复数据
使用 Telegraf 和Dedup 处理器插件来筛选字段值与先前值完全重复的数据。 对数据进行重复数据删除可以减少您的写入负载大小和资源使用量。
以下示例显示了如何使用 Telegraf 删除重复字段值的点,然后将数据写入 InfluxDB
在您的终端中,输入以下命令以创建示例数据文件并计算最早时间戳和现在之间经过的秒数。该命令将计算的值分配给您将在下一步中使用的
dedup_duration
变量。cat <<EOF > ./home.lp home,room=Kitchen temp=23.1,hum=36.6,co=22i 1641063600 home,room=Living\ Room temp=22.5,hum=36.4,co=17i 1641063600 home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641063605 home,room=Living\ Room temp=22.5,hum=36.4,co=17i 1641063605 home,room=Kitchen temp=23.1,hum=36.6,co=22i 1641063610 home,room=Living\ Room temp=23.0,hum=36.4,co=17i 1641063610 EOF dedup_duration="$(($(date +%s)-1641063000))s"
输入以下命令以配置 Telegraf 解析文件、删除重复点并将数据写入 InfluxDB – 具体而言,示例配置设置了以下内容
influx_timestamp_precision
:对于解析器,指定输入数据中的时间戳精度processors.dedup
:配置 Dedup 处理器插件- 可选:
processors.dedup.dedup_interval
。将考虑删除dedup_interval
到现在范围内的点。为了确保包含示例数据,配置使用了上一步中计算的变量。
cat <<EOF > ./telegraf.conf # Parse metrics from a file [[inputs.file]] ## A list of files to parse during each interval. files = ["home.lp"] ## The precision of timestamps in your data. influx_timestamp_precision = "1s" tagexclude = ["host"] # Filter metrics that repeat previous field values [[processors.dedup]] ## Drops duplicates within the specified duration dedup_interval = "$dedup_duration" # Writes metrics as line protocol to the InfluxDB v2 API [[outputs.influxdb_v2]] ## InfluxDB v2 API credentials and the database to write data to. urls = ["https://cluster-host.com"] token = "
DATABASE_TOKEN" organization = "" bucket = "DATABASE_NAME" EOF替换以下内容
要测试输入和处理器,请输入以下命令
telegraf --test --config telegraf.conf
Telegraf 将以下内容输出到 stdout,然后退出
> home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000 > home,room=Living\ Room co=17i,hum=36.4,temp=22.5 1641063600000000000 > home,room=Kitchen co=26i,hum=36.5,temp=22.7 1641063605000000000 > home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063610000000000 > home,room=Living\ Room co=17i,hum=36.4,temp=23 1641063610000000000
运行自定义预处理代码
使用 Telegraf 和Execd 处理器插件来执行 Telegraf 外部的代码,然后写入处理后的数据。Execd 插件期望 stdin 中有 Line Protocol 数据,将数据传递到配置的可执行文件,然后将 Line Protocol 输出到 stdout。
以下示例显示了如何使用 Telegraf 执行 Go 代码来处理指标,然后将输出写入 InfluxDB。Go multiplier.go
示例代码执行以下操作
从 Telegraf 导入
influx
解析器和序列化器插件。将每行数据解析为 Telegraf 指标。
如果指标包含
count
字段,则将字段值乘以2
;否则,将消息打印到 stderr 并退出。在您的编辑器中,输入以下示例代码并将文件另存为
multiplier.go
package main import ( "fmt" "os" "github.com/influxdata/telegraf/plugins/parsers/influx" influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx" ) func main() { parser := influx.NewStreamParser(os.Stdin) serializer := influxSerializer.Serializer{} if err := serializer.Init(); err != nil { fmt.Fprintf(os.Stderr, "serializer init failed: %v\n", err) os.Exit(1) } for { metric, err := parser.Next() if err != nil { if err == influx.EOF { return // stream ended } if parseErr, isParseError := err.(*influx.ParseError); isParseError { fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr) os.Exit(1) } fmt.Fprintf(os.Stderr, "ERR %v\n", err) os.Exit(1) } c, found := metric.GetField("count") if !found { fmt.Fprintf(os.Stderr, "metric has no count field\n") os.Exit(1) } switch t := c.(type) { case float64: t *= 2 metric.AddField("count", t) case int64: t *= 2 metric.AddField("count", t) default: fmt.Fprintf(os.Stderr, "count is not an unknown type, it's a %T\n", c) os.Exit(1) } b, err := serializer.Serialize(metric) if err != nil { fmt.Fprintf(os.Stderr, "ERR %v\n", err) os.Exit(1) } fmt.Fprint(os.Stdout, string(b)) } }
初始化模块并安装依赖项
go mod init processlp go mod tidy
在您的终端中,输入以下命令以创建示例数据文件
cat <<EOF > ./home.lp home,room=Kitchen temp=23.1,count=1 1641063600 home,room=Living\ Room temp=22.7,count=1 1641063600 home,room=Kitchen temp=23.1 1641063601 home,room=Living\ Room temp=22.7 1641063601 EOF
输入以下命令以配置 Telegraf 解析文件、执行 Go 二进制文件并写入数据 – 具体而言,示例配置设置了以下内容
influx_timestamp_precision
:对于解析器,指定输入数据中的时间戳精度processors.execd
:配置 Execd 插件processors.execd.command
:设置 Execd 运行的可执行文件和参数
cat <<EOF > ./telegraf.conf # Parse metrics from a file [[inputs.file]] ## A list of files to parse during each interval. files = ["home.lp"] ## The precision of timestamps in your data. influx_timestamp_precision = "1s" tagexclude = ["host"] # Filter metrics that repeat previous field values [[processors.execd]] ## A list that contains the executable command and arguments to run as a daemon. command = ["go", "run", "multiplier.go"] # Writes metrics as line protocol to the InfluxDB v2 API [[outputs.influxdb_v2]] ## InfluxDB v2 API credentials and the database to write data to. urls = ["https://cluster-host.com"] token = "
DATABASE_TOKEN" organization = "" bucket = "DATABASE_NAME" EOF替换以下内容
要测试输入和处理器,请输入以下命令
telegraf --test --config telegraf.conf
Telegraf 将以下内容输出到 stdout,然后退出
> home,room=Kitchen count=2,temp=23.1 1641063600000000000 > home,room=Living\ Room count=2,temp=22.7 1641063600000000000
此页是否对您有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一份子!我们欢迎并鼓励您提供关于 InfluxDB Clustered 和本文档的反馈和错误报告。要寻求支持,请使用以下资源
拥有年度合同或支持合同的客户可以联系 InfluxData 支持。