优化写入 InfluxDB 3 Core
InfluxDB 3 Core 处于公开 Alpha 阶段
InfluxDB 3 Core 处于公开 alpha 阶段,可用于测试和反馈,但不适用于生产环境。产品和本文档都在不断完善中。我们欢迎并鼓励您提供关于 alpha 体验的反馈,并邀请您加入我们的公共频道以获取更新和分享反馈。
使用这些技巧来优化性能和系统开销,以便将数据写入 InfluxDB 3 Core。
以下工具写入 InfluxDB,默认情况下采用大多数写入优化
- Telegraf
- InfluxDB 客户端库
批量写入
批量写入数据,以最大限度地减少将数据写入 InfluxDB 时的网络开销。
最佳批次大小为 10,000 行 Line Protocol 或 10 MB,以先达到的阈值为准。
按键对标签进行排序
在将数据点写入 InfluxDB 之前,请按字典顺序按键对标签进行排序。验证排序结果是否与 Go bytes.Compare
函数的结果匹配。
# Line protocol example with unsorted tags
measurement,tagC=therefore,tagE=am,tagA=i,tagD=i,tagB=think fieldKey=fieldValue 1562020262
# Optimized line protocol example with tags sorted by key
measurement,tagA=i,tagB=think,tagC=therefore,tagD=i,tagE=am fieldKey=fieldValue 1562020262
尽可能使用最粗略的时间精度
InfluxDB 3 Core 支持高达纳秒的时间戳精度。但是,如果您的数据不是以纳秒为单位收集的,则无需以该精度写入。为了获得更好的性能,请针对您的用例使用尽可能粗略的时间戳精度。
默认情况下,InfluxDB 3 Core 尝试通过识别哪个精度相对接近“现在”来自动检测 Line Protocol 中时间戳的精度。您还可以在写入请求中指定时间戳精度。InfluxDB 3 Core 支持以下时间戳精度
ns
(纳秒)us
(微秒)ms
(毫秒)s
(秒)
使用 gzip 压缩
使用 gzip 压缩来加速写入 InfluxDB 3 Core。基准测试表明,当数据被压缩时,速度最多可提高 5 倍。
在 Telegraf 中启用 gzip 压缩
在 telegraf.conf
中的 influxdb_v2
输出插件配置中,将 content_encoding
选项设置为 gzip
[[outputs.influxdb_v2]]
urls = ["https://#:8181"]
# ...
content_encoding = "gzip"
在 InfluxDB 客户端库中启用 gzip 压缩
每个InfluxDB 客户端库都提供了压缩写入请求的选项,或者默认强制压缩。启用压缩的方法因库而异。有关具体说明,请参阅InfluxDB 客户端库文档。
将 gzip 压缩与 InfluxDB API 一起使用
当使用 InfluxDB API /api/v2/write
端点写入数据时,请使用 gzip
压缩数据并将 Content-Encoding
标头设置为 gzip
– 例如
替换以下内容
DATABASE_NAME
:要将数据写入的数据库的名称AUTH_TOKEN
:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。在 alpha 阶段,InfluxDB 3 Core 不需要授权令牌。您可以省略
Authorization
标头,也可以提供任意令牌字符串。
使用 NTP 同步主机
使用网络时间协议 (NTP) 同步主机之间的时间。如果 Line Protocol 中未包含时间戳,则 InfluxDB 使用其主机的本地时间(UTC)为每个点分配时间戳。如果主机的时钟未与 NTP 同步,则时间戳可能不准确。
在一个请求中写入多个数据点
要在一个请求中写入多行,Line Protocol 的每一行都必须用换行符 (\n
) 分隔。
在写入前预处理数据
在您的写入工作负载中预处理数据可以帮助您避免由于模式冲突或资源使用而导致的写入失败。例如,如果您有许多设备写入同一个表,并且某些设备对同一字段使用不同的数据类型,那么您可能希望在将数据发送到 InfluxDB 之前生成警报或转换字段数据以适应您的模式。
使用Telegraf,您可以处理来自其他服务和文件的数据,然后将其写入 InfluxDB。除了使用 Telegraf 包含的插件处理数据外,您还可以使用Execd 处理器插件来集成您自己的代码和外部应用程序。
以下示例显示了如何配置 Telegraf 代理和插件以优化写入。这些示例使用文件输入插件从文件中读取数据,并使用InfluxDB v2 输出插件将数据写入数据库,但您可以使用任何输入和输出插件。
先决条件
如果您尚未安装,请安装 Telegraf。
从批次中筛选数据
使用 Telegraf 和指标筛选来筛选数据,然后再将其写入 InfluxDB。
配置指标过滤器以保留或删除数据元素(在处理器和聚合器插件运行之前)。
输入以下命令以创建 Telegraf 配置,该配置解析系统使用情况数据,删除指定的字段和标签,然后将数据写入 InfluxDB
cat <<EOF >> ./telegraf.conf [[inputs.cpu]] # Remove the specified fields from points. fieldpass = ["usage_system", "usage_idle"] # Remove the specified tags from points. tagexclude = ["host"] [[outputs.influxdb_v2]] urls = ["https://#:8181"] token = "
AUTH_TOKEN" organization = "" bucket = "DATABASE_NAME" EOF替换以下内容
DATABASE_NAME
:要将数据写入的数据库的名称AUTH_TOKEN
:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。在 alpha 阶段,InfluxDB 3 Core 不需要授权令牌。对于
token
选项,请提供一个空的或任意的令牌字符串。
要测试输入和处理器,请输入以下命令
telegraf --test --config telegraf.conf
输出类似于以下内容。对于输入数据的每一行,过滤器都会传递指标名称、标签、指定的字段和时间戳。
> cpu,cpu=cpu0 usage_idle=100,usage_system=0 1702067201000000000 ... > cpu,cpu=cpu-total usage_idle=99.80198019802448,usage_system=0.1980198019802045 1702067201000000000
强制数据类型以避免拒绝点错误
使用 Telegraf 和转换器处理器插件来转换字段数据类型以适应您的模式。
例如,如果您将家庭传感器示例数据中的示例数据写入数据库,然后尝试将以下批次写入同一表
InfluxDB 期望 co
包含整数值,并拒绝包含 co
浮点十进制 (22.1
) 值的点。为避免错误,请配置 Telegraf 以将字段转换为模式列中的数据类型。
以下示例转换 temp
、hum
和 co
字段以适应示例数据模式
在您的终端中,输入以下命令以创建示例数据文件
cat <<EOF > ./home.lp home,room=Kitchen temp=23.1,hum=36.6,co=22.1 1641063600 home,room=Living\ Room temp=22i,hum=36.4,co=17i 1641067200 home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641067200 EOF
输入以下命令以创建 Telegraf 配置,该配置解析示例数据,将字段值转换为指定的数据类型,然后将数据写入 InfluxDB
cat <<EOF > ./telegraf.conf [[inputs.file]] ## For each interval, parse data from files in the list. files = ["home.lp"] influx_timestamp_precision = "1s" precision = "1s" tagexclude = ["host"] [[processors.converter]] [processors.converter.fields] ## A data type and a list of fields to convert to the data type. float = ["temp", "hum"] integer = ["co"] [[outputs.influxdb_v2]] ## InfluxDB v2 API credentials and the database to write to. urls = ["https://#:8181"] token = "
AUTH_TOKEN" organization = "" bucket = "DATABASE_NAME" EOF替换以下内容
DATABASE_NAME
:要将数据写入的数据库的名称AUTH_TOKEN
:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。在 alpha 阶段,InfluxDB 3 Core 不需要授权令牌。对于
token
选项,请提供一个空的或任意的令牌字符串。
要测试输入和处理器,请输入以下命令
telegraf --test --config telegraf.conf
Telegraf 将以下内容输出到 stdout,然后退出
> home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000 > home,room=Living\ Room co=17i,hum=36.4,temp=22 1641067200000000000 > home,room=Kitchen co=26i,hum=36.5,temp=22.7 1641067200000000000
合并行以优化内存和带宽
使用 Telegraf 和合并聚合器插件来合并共享相同指标、标签集和时间戳的点。
以下示例为两个序列(表、标签集和时间戳的组合)创建示例数据,然后合并每个序列中的点
在您的终端中,输入以下命令以创建示例数据文件并计算最早时间戳与现在之间的时间秒数。该命令将计算出的值分配给您将在下一步中使用的
grace_duration
变量。cat <<EOF > ./home.lp home,room=Kitchen temp=23.1 1641063600 home,room=Kitchen hum=36.6 1641063600 home,room=Kitchen co=22i 1641063600 home,room=Living\ Room temp=22.7 1641063600 home,room=Living\ Room hum=36.4 1641063600 home,room=Living\ Room co=17i 1641063600 EOF grace_duration="$(($(date +%s)-1641063000))s"
输入以下命令以配置 Telegraf 解析文件、合并点并将数据写入 InfluxDB – 具体来说,该配置设置了以下属性
influx_timestamp_precision
:对于解析器,指定输入数据中的时间戳精度- 可选:
aggregators.merge.grace
延长合并点的持续时间。为了确保包含示例数据,该配置使用了上一步中计算出的变量。
cat <<EOF > ./telegraf.conf # Parse metrics from a file [[inputs.file]] ## A list of files to parse during each interval. files = ["home.lp"] ## The precision of timestamps in your data. influx_timestamp_precision = "1s" tagexclude = ["host"] # Merge separate metrics that share a series key [[aggregators.merge]] grace = "$grace_duration" ## If true, drops the original metric. drop_original = true # Writes metrics as line protocol to the InfluxDB v2 API [[outputs.influxdb_v2]] ## InfluxDB v2 API credentials and the database to write data to. urls = ["https://#:8181"] token = "
AUTH_TOKEN" organization = "" bucket = "DATABASE_NAME" EOF替换以下内容
DATABASE_NAME
:要将数据写入的数据库的名称AUTH_TOKEN
:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。在 alpha 阶段,InfluxDB 3 Core 不需要授权令牌。对于
token
选项,请提供一个空的或任意的令牌字符串。
要测试输入和聚合器,请输入以下命令
telegraf --test --config telegraf.conf
Telegraf 将以下内容输出到 stdout,然后退出
> home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000 > home,room=Living\ Room co=17i,hum=36.4,temp=22.7 1641063600000000000
避免发送重复数据
使用 Telegraf 和Dedup 处理器插件来筛选字段值与先前值完全重复的数据。重复数据删除可以减少您的写入负载大小和资源使用。
以下示例显示了如何使用 Telegraf 删除重复字段值的点,然后将数据写入 InfluxDB
在您的终端中,输入以下命令以创建示例数据文件并计算最早时间戳与现在之间的时间秒数。该命令将计算出的值分配给您将在下一步中使用的
dedup_duration
变量。cat <<EOF > ./home.lp home,room=Kitchen temp=23.1,hum=36.6,co=22i 1641063600 home,room=Living\ Room temp=22.5,hum=36.4,co=17i 1641063600 home,room=Kitchen temp=22.7,hum=36.5,co=26i 1641063605 home,room=Living\ Room temp=22.5,hum=36.4,co=17i 1641063605 home,room=Kitchen temp=23.1,hum=36.6,co=22i 1641063610 home,room=Living\ Room temp=23.0,hum=36.4,co=17i 1641063610 EOF dedup_duration="$(($(date +%s)-1641063000))s"
输入以下命令以配置 Telegraf 解析文件、删除重复点并将数据写入 InfluxDB – 具体来说,示例配置设置了以下内容
influx_timestamp_precision
:对于解析器,指定输入数据中的时间戳精度processors.dedup
:配置 Dedup 处理器插件- 可选:
processors.dedup.dedup_interval
。范围dedup_interval
到现在的点被考虑删除。为了确保包含示例数据,该配置使用了上一步中计算出的变量。
cat <<EOF > ./telegraf.conf # Parse metrics from a file [[inputs.file]] ## A list of files to parse during each interval. files = ["home.lp"] ## The precision of timestamps in your data. influx_timestamp_precision = "1s" tagexclude = ["host"] # Filter metrics that repeat previous field values [[processors.dedup]] ## Drops duplicates within the specified duration dedup_interval = "$dedup_duration" # Writes metrics as line protocol to the InfluxDB v2 API [[outputs.influxdb_v2]] ## InfluxDB v2 API credentials and the database to write data to. urls = ["https://#:8181"] token = "
AUTH_TOKEN" organization = "" bucket = "DATABASE_NAME" EOF替换以下内容
DATABASE_NAME
:要将数据写入的数据库的名称AUTH_TOKEN
:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。在 alpha 阶段,InfluxDB 3 Core 不需要授权令牌。对于
token
选项,请提供一个空的或任意的令牌字符串。
要测试输入和处理器,请输入以下命令
telegraf --test --config telegraf.conf
Telegraf 将以下内容输出到 stdout,然后退出
> home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063600000000000 > home,room=Living\ Room co=17i,hum=36.4,temp=22.5 1641063600000000000 > home,room=Kitchen co=26i,hum=36.5,temp=22.7 1641063605000000000 > home,room=Kitchen co=22i,hum=36.6,temp=23.1 1641063610000000000 > home,room=Living\ Room co=17i,hum=36.4,temp=23 1641063610000000000
运行自定义预处理代码
使用 Telegraf 和Execd 处理器插件来执行 Telegraf 外部的代码,然后写入处理后的数据。Execd 插件期望 stdin 中的 Line Protocol 数据,将数据传递到配置的可执行文件,然后将 Line Protocol 输出到 stdout。
以下示例显示了如何使用 Telegraf 执行 Go 代码以处理指标,然后将输出写入 InfluxDB。Go multiplier.go
示例代码执行以下操作
从 Telegraf 导入
influx
解析器和序列化器插件。将每行数据解析为 Telegraf 指标。
如果指标包含
count
字段,则将字段值乘以2
;否则,将消息打印到 stderr 并退出。在您的编辑器中,输入以下示例代码并将文件另存为
multiplier.go
package main import ( "fmt" "os" "github.com/influxdata/telegraf/plugins/parsers/influx" influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx" ) func main() { parser := influx.NewStreamParser(os.Stdin) serializer := influxSerializer.Serializer{} if err := serializer.Init(); err != nil { fmt.Fprintf(os.Stderr, "serializer init failed: %v\n", err) os.Exit(1) } for { metric, err := parser.Next() if err != nil { if err == influx.EOF { return // stream ended } if parseErr, isParseError := err.(*influx.ParseError); isParseError { fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr) os.Exit(1) } fmt.Fprintf(os.Stderr, "ERR %v\n", err) os.Exit(1) } c, found := metric.GetField("count") if !found { fmt.Fprintf(os.Stderr, "metric has no count field\n") os.Exit(1) } switch t := c.(type) { case float64: t *= 2 metric.AddField("count", t) case int64: t *= 2 metric.AddField("count", t) default: fmt.Fprintf(os.Stderr, "count is not an unknown type, it's a %T\n", c) os.Exit(1) } b, err := serializer.Serialize(metric) if err != nil { fmt.Fprintf(os.Stderr, "ERR %v\n", err) os.Exit(1) } fmt.Fprint(os.Stdout, string(b)) } }
初始化模块并安装依赖项
go mod init processlp go mod tidy
在您的终端中,输入以下命令以创建示例数据文件
cat <<EOF > ./home.lp home,room=Kitchen temp=23.1,count=1 1641063600 home,room=Living\ Room temp=22.7,count=1 1641063600 home,room=Kitchen temp=23.1 1641063601 home,room=Living\ Room temp=22.7 1641063601 EOF
输入以下命令以配置 Telegraf 解析文件、执行 Go 二进制文件并写入数据 – 具体来说,示例配置设置了以下内容
influx_timestamp_precision
:对于解析器,指定输入数据中的时间戳精度processors.execd
:配置 Execd 插件processors.execd.command
:设置 Execd 运行的可执行文件和参数
cat <<EOF > ./telegraf.conf # Parse metrics from a file [[inputs.file]] ## A list of files to parse during each interval. files = ["home.lp"] ## The precision of timestamps in your data. influx_timestamp_precision = "1s" tagexclude = ["host"] # Filter metrics that repeat previous field values [[processors.execd]] ## A list that contains the executable command and arguments to run as a daemon. command = ["go", "run", "multiplier.go"] # Writes metrics as line protocol to the InfluxDB v2 API [[outputs.influxdb_v2]] ## InfluxDB v2 API credentials and the database to write data to. urls = ["https://#:8181"] token = "
AUTH_TOKEN" organization = "" bucket = "DATABASE_NAME" EOF替换以下内容
DATABASE_NAME
:要将数据写入的数据库的名称AUTH_TOKEN
:您的 InfluxDB 3 Core 授权令牌。将其存储在密钥存储或环境变量中,以避免暴露原始令牌字符串。在 alpha 阶段,InfluxDB 3 Core 不需要授权令牌。对于
token
选项,请提供一个空的或任意的令牌字符串。
要测试输入和处理器,请输入以下命令
telegraf --test --config telegraf.conf
Telegraf 将以下内容输出到 stdout,然后退出
> home,room=Kitchen count=2,temp=23.1 1641063600000000000 > home,room=Living\ Room count=2,temp=22.7 1641063600000000000
此页是否对您有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一份子!我们欢迎并鼓励您提供关于 InfluxDB 3 Core 和本文档的反馈和错误报告。要获得支持,请使用以下资源
拥有年度合同或支持合同的客户可以联系 InfluxData 支持。