优化 InfluxDB 写入
使用以下技巧在向 InfluxDB 写入数据时优化性能和系统开销。
以下工具写入 InfluxDB,并默认采用 大多数 写入优化
批量写入
将数据批量写入以最小化写入 InfluxDB 时的网络开销。
最佳批量大小为行协议的 10,000 行或 10 MB,以先达到的阈值为准。
按键排序标签
在将数据点写入 InfluxDB 之前,按字典顺序对标签进行按键排序。验证排序结果与 Go bytes.Compare
函数 的结果匹配。
# Line protocol example with unsorted tags
measurement,tagC=therefore,tagE=am,tagA=i,tagD=i,tagB=think fieldKey=fieldValue 1562020262
# Optimized line protocol example with tags sorted by key
measurement,tagA=i,tagB=think,tagC=therefore,tagD=i,tagE=am fieldKey=fieldValue 1562020262
使用可能的最粗时间精度
默认情况下,InfluxDB 以纳秒精度写入数据。但如果您的数据不是按纳秒收集的,就没有必要以那种精度写入。为了更好的性能,请使用可能的最粗精度的时间戳。
在向 InfluxDB 写入数据 时指定时间戳精度。
使用 gzip 压缩
使用 gzip 压缩以加快向 InfluxDB 的写入速度。基准测试显示,数据压缩后速度可提高 5 倍。
在 Telegraf 中启用 gzip 压缩
在您的 telegraf.conf
中的 influxdb_v2
输出插件配置中,将 content_encoding
选项设置为 gzip
[[outputs.influxdb_v2]]
urls = ["https://cluster-host.com"]
# ...
content_encoding = "gzip"
在 InfluxDB 客户端库中启用 gzip 压缩
每个 InfluxDB 客户端库 都提供了压缩写入请求的选项或默认启用压缩。每个库启用压缩的方法不同。有关具体说明,请参阅InfluxDB 客户端库文档。
使用 InfluxDB API 进行 gzip 压缩
当使用 InfluxDB API /api/v2/write
端点写入数据时,使用 gzip
压缩数据,并将 Content-Encoding
标头设置为 gzip
–例如
替换以下内容
使用 NTP 同步主机
使用网络时间协议(NTP)在主机之间同步时间。如果行协议中未包含时间戳,InfluxDB 将使用其主机的本地时间(UTC)为每个点分配时间戳。如果主机的时钟未与 NTP 同步,则时间戳可能不准确。
在一个请求中写入多个数据点
要在单个请求中写入多行,每行行协议必须由换行符(\n
)分隔。
在写入前预处理数据
在您的写入工作中预处理数据可以帮助您避免由于模式冲突或资源使用而导致的写入失败。例如,如果您有多个设备写入相同的测量值,并且某些设备使用不同的数据类型来表示相同的字段,那么您可能希望在将数据发送到 InfluxDB 之前生成警报或转换字段数据以符合您的模式。
使用 Telegraf,您可以处理来自其他服务和文件的数据,然后将它们写入 InfluxDB。除了使用 Telegraf 内置的插件处理数据外,您还可以使用 Execd 处理器插件 来集成自己的代码和外部应用程序。
以下示例演示了如何配置 Telegraf 代理 和 插件 以优化写入。示例使用 文件输入插件 从文件中读取数据,并使用 InfluxDB v2 输出插件 将数据写入桶,但您可以使用任何输入和输出插件。
先决条件
如果您还没有安装,请安装 Telegraf。
从批次中过滤数据
使用 Telegraf 和度量过滤来在写入 InfluxDB 之前过滤数据。
配置 度量过滤器 以保留或删除数据元素(在处理器和聚合器插件运行之前)。
输入以下命令以创建一个Telegraf配置,该配置解析系统使用数据,移除指定的字段和标签,然后将数据写入InfluxDB
cat <<EOF >> ./telegraf.conf [[inputs.cpu]] # Remove the specified fields from points. fieldpass = ["usage_system", "usage_idle"] # Remove the specified tags from points. tagexclude = ["host"] [[outputs.influxdb_v2]] urls = ["https://cluster-host.com"] token = "
DATABASE_TOKEN" organization = "" bucket = "DATABASE_NAME" EOF替换以下内容
要测试输入和处理器,请输入以下命令
telegraf --test --config telegraf.conf
输出类似于以下内容。对于输入数据的每一行,过滤器会传递指标名称、标签、指定的字段和时间戳。
> cpu,cpu=cpu0 usage_idle=100,usage_system=0 1702067201000000000
...
> cpu,cpu=cpu-total usage_idle=99.80198019802448,usage_system=0.1980198019802045 1702067201000000000
强制数据类型以避免拒绝的点错误
使用Telegraf和转换器处理器插件将字段数据类型转换为符合您的模式。
例如,如果您将开始使用家庭传感器数据中的示例数据写入数据库,然后尝试将以下批次写入相同的度量
home,room=Kitchen temp=23.1,hum=36.6,co=22.1 1641063600
home,room=Living\ Room temp=22i,hum=36.4,co=17i 1641067200
home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641067200
InfluxDB期望co
包含整数值,并拒绝包含co
浮点小数(22.1
)值的点。为了避免错误,请配置Telegraf将字段转换为您的模式列中的数据类型。
以下示例将temp
、hum
和co
字段转换为符合示例数据模式
在您的终端中,输入以下命令以创建示例数据文件
cat <<EOF > ./home.lp home,room=Kitchen temp=23.1,hum=36.6,co=22.1 1641063600 home,room=Living\ Room temp=22i,hum=36.4,co=17i 1641067200 home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641067200 EOF
输入以下命令以创建一个Telegraf配置,该配置解析示例数据,将字段值转换为指定的数据类型,然后将数据写入InfluxDB
cat <<EOF > ./telegraf.conf [[inputs.file]] ## For each interval, parse data from files in the list. files = ["home.lp"] influx_timestamp_precision = "1s" precision = "1s" tagexclude = ["host"] [[processors.converter]] [processors.converter.fields] ## A data type and a list of fields to convert to the data type. float = ["temp", "hum"] integer = ["co"] [[outputs.influxdb_v2]] ## InfluxDB v2 API credentials and the database to write to. urls = ["https://cluster-host.com"] token = "
DATABASE_TOKEN" organization = "" bucket = "DATABASE_NAME" EOF替换以下内容
要测试输入和处理器,请输入以下命令
telegraf --test --config telegraf.conf
Telegraf将以下内容输出到stdout,然后退出
> home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000 > home,room=Living\ Room co=17i,hum=36.4,temp=22 1641067200000000000 > home,room=Kitchen co=26i,hum=36.5,temp=22.7 1641067200000000000
合并行以优化内存和带宽
使用Telegraf和合并聚合器插件合并具有相同度量、标签集和时间戳的点。
以下示例创建两个系列(度量、标签集和时间戳的组合)的样本数据,然后合并每个系列中的点
在您的终端中,输入以下命令以创建样本数据文件并计算最早时间戳和现在之间的秒数。该命令将计算值分配给
grace_duration
变量,您将在下一步中使用它。cat <<EOF > ./home.lp home,room=Kitchen temp=23.1 1641063600 home,room=Kitchen hum=36.6 1641063600 home,room=Kitchen co=22i 1641063600 home,room=Living\ Room temp=22.7 1641063600 home,room=Living\ Room hum=36.4 1641063600 home,room=Living\ Room co=17i 1641063600 EOF grace_duration="$(($(date +%s)-1641063000))s"
输入以下命令以配置Telegraf解析文件,合并点,并将数据写入InfluxDB - 特别是配置设置了以下属性
influx_timestamp_precision
:对于解析器,指定输入数据中的时间戳精度- 可选:
aggregators.merge.grace
扩展合并点的持续时间。为了确保样本数据被包含,配置使用上一步中计算出的变量。
cat <<EOF > ./telegraf.conf # Parse metrics from a file [[inputs.file]] ## A list of files to parse during each interval. files = ["home.lp"] ## The precision of timestamps in your data. influx_timestamp_precision = "1s" tagexclude = ["host"] # Merge separate metrics that share a series key [[aggregators.merge]] grace = "$grace_duration" ## If true, drops the original metric. drop_original = true # Writes metrics as line protocol to the InfluxDB v2 API [[outputs.influxdb_v2]] ## InfluxDB v2 API credentials and the database to write data to. urls = ["https://cluster-host.com"] token = "
DATABASE_TOKEN" organization = "" bucket = "DATABASE_NAME" EOF替换以下内容
要测试输入和聚合器,请输入以下命令
telegraf --test --config telegraf.conf
Telegraf将以下内容输出到stdout,然后退出
> home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000 > home,room=Living\ Room co=17i,hum=36.4,temp=22.7 1641063600000000000
避免发送重复数据
使用Telegraf和去重处理器插件过滤字段值与前一次值完全相同的重复数据。去重数据可以减少您的写入负载大小和资源使用。
以下示例显示了如何使用Telegraf删除重复字段值的点,并将数据写入InfluxDB
在您的终端中,输入以下命令以创建样本数据文件并计算最早时间戳和现在之间的秒数。该命令将计算值分配给
dedup_duration
变量,您将在下一步中使用它。cat <<EOF > ./home.lp home,room=Kitchen temp=23.1,hum=36.6,co=22i 1641063600 home,room=Living\ Room temp=22.5,hum=36.4,co=17i 1641063600 home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641063605 home,room=Living\ Room temp=22.5,hum=36.4,co=17i 1641063605 home,room=Kitchen temp=23.1,hum=36.6,co=22i 1641063610 home,room=Living\ Room temp=23.0,hum=36.4,co=17i 1641063610 EOF dedup_duration="$(($(date +%s)-1641063000))s"
输入以下命令以配置Telegraf解析文件,删除重复点,并将数据写入InfluxDB - 特别是以下示例配置设置了以下内容
influx_timestamp_precision
:对于解析器,指定输入数据中的时间戳精度processors.dedup
:配置去重处理器插件- 可选:
processors.dedup.dedup_interval
。范围在dedup_interval
到现在之间的点被视为删除。为了确保样本数据被包含,配置使用上一步中计算出的变量。
cat <<EOF > ./telegraf.conf # Parse metrics from a file [[inputs.file]] ## A list of files to parse during each interval. files = ["home.lp"] ## The precision of timestamps in your data. influx_timestamp_precision = "1s" tagexclude = ["host"] # Filter metrics that repeat previous field values [[processors.dedup]] ## Drops duplicates within the specified duration dedup_interval = "$dedup_duration" # Writes metrics as line protocol to the InfluxDB v2 API [[outputs.influxdb_v2]] ## InfluxDB v2 API credentials and the database to write data to. urls = ["https://cluster-host.com"] token = "
DATABASE_TOKEN" organization = "" bucket = "DATABASE_NAME" EOF替换以下内容
要测试输入和处理器,请输入以下命令
telegraf --test --config telegraf.conf
Telegraf将以下内容输出到stdout,然后退出
> home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000 > home,room=Living\ Room co=17i,hum=36.4,temp=22.5 1641063600000000000 > home,room=Kitchen co=26i,hum=36.5,temp=22.7 1641063605000000000 > home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063610000000000 > home,room=Living\ Room co=17i,hum=36.4,temp=23 1641063610000000000
运行自定义预处理代码
使用 Telegraf 和 Execd 处理器插件 来执行 Telegraf 外部的代码,然后写入处理后的数据。Execd 插件期望从 stdin 接收行协议数据,将数据传递给配置的可执行文件,然后将行协议输出到 stdout。
以下示例展示了如何使用 Telegraf 执行 Go 代码来处理指标,然后将输出写入 InfluxDB。Go 的 multiplier.go
示例代码执行以下操作
从 Telegraf 导入
influx
解析器和序列化器插件。将每行数据解析为 Telegraf 指标。
如果指标包含
count
字段,则将字段值乘以2
;否则,将消息打印到 stderr 并退出。在您的编辑器中输入以下示例代码,并将文件保存为
multiplier.go
package main import ( "fmt" "os" "github.com/influxdata/telegraf/plugins/parsers/influx" influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx" ) func main() { parser := influx.NewStreamParser(os.Stdin) serializer := influxSerializer.Serializer{} if err := serializer.Init(); err != nil { fmt.Fprintf(os.Stderr, "serializer init failed: %v\n", err) os.Exit(1) } for { metric, err := parser.Next() if err != nil { if err == influx.EOF { return // stream ended } if parseErr, isParseError := err.(*influx.ParseError); isParseError { fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr) os.Exit(1) } fmt.Fprintf(os.Stderr, "ERR %v\n", err) os.Exit(1) } c, found := metric.GetField("count") if !found { fmt.Fprintf(os.Stderr, "metric has no count field\n") os.Exit(1) } switch t := c.(type) { case float64: t *= 2 metric.AddField("count", t) case int64: t *= 2 metric.AddField("count", t) default: fmt.Fprintf(os.Stderr, "count is not an unknown type, it's a %T\n", c) os.Exit(1) } b, err := serializer.Serialize(metric) if err != nil { fmt.Fprintf(os.Stderr, "ERR %v\n", err) os.Exit(1) } fmt.Fprint(os.Stdout, string(b)) } }
初始化模块并安装依赖项
go mod init processlp go mod tidy
在您的终端中,输入以下命令以创建示例数据文件
cat <<EOF > ./home.lp home,room=Kitchen temp=23.1,count=1 1641063600 home,room=Living\ Room temp=22.7,count=1 1641063600 home,room=Kitchen temp=23.1 1641063601 home,room=Living\ Room temp=22.7 1641063601 EOF
输入以下命令以配置 Telegraf 解析文件,执行 Go 二进制文件,并写入数据 - 特别是,以下示例配置设置了以下内容
influx_timestamp_precision
:对于解析器,指定输入数据中的时间戳精度processors.execd
:配置 Execd 插件processors.execd.command
:为 Execd 设置要运行的可执行文件和参数
cat <<EOF > ./telegraf.conf # Parse metrics from a file [[inputs.file]] ## A list of files to parse during each interval. files = ["home.lp"] ## The precision of timestamps in your data. influx_timestamp_precision = "1s" tagexclude = ["host"] # Filter metrics that repeat previous field values [[processors.execd]] ## A list that contains the executable command and arguments to run as a daemon. command = ["go", "run", "multiplier.go"] # Writes metrics as line protocol to the InfluxDB v2 API [[outputs.influxdb_v2]] ## InfluxDB v2 API credentials and the database to write data to. urls = ["https://cluster-host.com"] token = "
DATABASE_TOKEN" organization = "" bucket = "DATABASE_NAME" EOF替换以下内容
要测试输入和处理器,请输入以下命令
telegraf --test --config telegraf.conf
Telegraf将以下内容输出到stdout,然后退出
> home,room=Kitchen count=2,temp=23.1 1641063600000000000 > home,room=Living\ Room count=2,temp=22.7 1641063600000000000
这个页面有帮助吗?
感谢您的反馈!