文档文档

基本转换插件

基本转换插件支持 InfluxDB 3 中时间序列数据的实时和计划转换。该插件可以转换字段和标签名称,在单位之间转换值,并应用自定义字符串替换以标准化或清理您的数据。该插件支持对历史数据进行计划的批量处理以及数据写入时的实时转换。

配置

必需参数

参数类型默认描述
measurementstring必需包含要转换数据的源测量
target_measurementstring必需转换后的数据的目标测量
target_databasestring当前数据库存储转换后数据的数据库
dry_runstring"false"当设置为 "true" 时,会记录转换过程而不进行写入

转换参数

参数类型默认描述
names_transformationsstringnone字段/标签名称转换规则。格式:'field1:"transform1 transform2".field2:"transform3"'
values_transformationsstringnone字段值转换规则。格式:'field1:"transform1".field2:"transform2"'
custom_replacementsstringnone自定义字符串替换。格式:'rule_name:"find=replace"'
custom_regexstringnone用于字段匹配的正则表达式。格式:'pattern_name:"temp%"'

数据选择参数

参数类型默认描述
windowstring必需 (仅计划任务)历史数据窗口。格式:<数字><单位> (例如,"30d", "1h")
included_fieldsstring所有字段要包含的字段的句点分隔列表 (例如,"temp.humidity")
excluded_fieldsstringnone要排除的字段的句点分隔列表
filtersstringnone查询过滤器。格式:'field:"operator value"'

TOML 配置

参数类型默认描述
config_file_pathstringnone相对于 PLUGIN_DIR 的 TOML 配置文件路径 (TOML 配置必需)

要使用 TOML 配置文件,请设置 PLUGIN_DIR 环境变量并在触发器参数中指定 config_file_path 这与启动 InfluxDB 3 时的 --plugin-dir 标志是附加的。

示例 TOML 配置

influxdb3 create trigger \
 --database mydb \
 --plugin-filename basic_transformation.py \
 --trigger-spec "every:1d" \
 --trigger-arguments config_file_path=basic_transformation_config_scheduler.toml \
 basic_transform_trigger

有关使用 TOML 配置文件,请参阅 influxdb3_plugins /README.md 中的“使用 TOML 配置文件”部分。

架构要求

该插件假定表架构已在数据库中定义,因为它依赖于此架构来检索处理所需的字段和标签名称。

需要现有架构

根据设计,如果架构不存在或不包含预期的列,则插件会返回错误。

安装步骤

  1. 启用处理引擎来启动 InfluxDB 3 核心(--plugin-dir /path/to/plugins

    influxdb3 serve \
      --node-id node0 \
      --object-store file \
      --data-dir ~/.influxdb3 \
      --plugin-dir ~/.plugins
  2. 安装必需的 Python 包

    • pint (用于单位转换)
    influxdb3 install package pint

触发器设置

计划转换

定期对历史数据运行转换

influxdb3 create trigger \
  --database mydb \
  --plugin-filename gh:influxdata/basic_transformation/basic_transformation.py \
  --trigger-spec "every:1h" \
  --trigger-arguments 'measurement=temperature,window=24h,target_measurement=temperature_normalized,names_transformations=temp:"snake",values_transformations=temp:"convert_degC_to_degF"' \
  hourly_temp_transform

实时转换

在写入时转换数据

influxdb3 create trigger \
  --database mydb \
  --plugin-filename gh:influxdata/basic_transformation/basic_transformation.py \
  --trigger-spec "all_tables" \
  --trigger-arguments 'measurement=sensor_data,target_measurement=sensor_data_clean,names_transformations=.*:"snake alnum_underscore_only"' \
  realtime_clean

示例用法

示例 1:温度单位转换

将温度读数从摄氏度转换为华氏度,同时标准化字段名称

# Create the trigger
influxdb3 create trigger \
  --database weather \
  --plugin-filename gh:influxdata/basic_transformation/basic_transformation.py \
  --trigger-spec "every:30m" \
  --trigger-arguments 'measurement=raw_temps,window=1h,target_measurement=temps_fahrenheit,names_transformations=Temperature:"snake",values_transformations=temperature:"convert_degC_to_degF"' \
  temp_converter

# Write test data
influxdb3 write \
  --database weather \
  "raw_temps,location=office Temperature=22.5"

# Query transformed data (after trigger runs)
influxdb3 query \
  --database weather \
  "SELECT * FROM temps_fahrenheit"

预期输出

location | temperature | time
---------|-------------|-----
office   | 72.5        | 2024-01-01T00:00:00Z

转换详情

  • 之前:Temperature=22.5 (摄氏度)
  • 之后:temperature=72.5 (华氏度,字段名称转换为 snake_case)

示例 2:字段名称标准化

清理和标准化来自各种传感器的字段名称

# Create trigger with multiple transformations
influxdb3 create trigger \
  --database sensors \
  --plugin-filename gh:influxdata/basic_transformation/basic_transformation.py \
  --trigger-spec "all_tables" \
  --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake alnum_underscore_only collapse_underscore trim_underscore"' \
  field_cleaner

# Write data with inconsistent field names
influxdb3 write \
  --database sensors \
  "raw_sensors,device=sensor1 \"Room Temperature\"=20.1,\"__Humidity_%\"=45.2"

# Query cleaned data
influxdb3 query \
  --database sensors \
  "SELECT * FROM clean_sensors"

预期输出

device  | room_temperature | humidity | time
--------|------------------|----------|-----
sensor1 | 20.1            | 45.2     | 2024-01-01T00:00:00Z

转换详情

  • 之前:"Room Temperature"=20.1, "__Humidity_%"=45.2
  • 之后:room_temperature=20.1, humidity=45.2 (字段名称已标准化)

示例 3:自定义字符串替换

替换字段值中的特定字符串

# Create trigger with custom replacements
influxdb3 create trigger \
  --database inventory \
  --plugin-filename gh:influxdata/basic_transformation/basic_transformation.py \
  --trigger-spec "every:1d" \
  --trigger-arguments 'measurement=products,window=7d,target_measurement=products_updated,values_transformations=status:"status_replace",custom_replacements=status_replace:"In Stock=available.Out of Stock=unavailable"' \
  status_updater

代码概述

文件

  • basic_transformation.py:包含计划任务和数据写入转换处理程序的主要插件代码
  • basic_transformation_config_data_writes.toml:用于数据写入触发器的示例 TOML 配置文件
  • basic_transformation_config_scheduler.toml:用于计划触发器的示例 TOML 配置文件

日志记录

日志存储在 _internal 数据库(或创建触发器的数据库)的 system.processing_engine_logs 表中。要查看日志

influxdb3 query --database _internal "SELECT * FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name'"

日志列

  • event_time:日志事件的时间戳
  • trigger_name:生成日志的触发器名称
  • log_level:严重性级别(INFO、WARN、ERROR)
  • log_text:描述操作或错误的邮件

主函数

process_scheduled_call(influxdb3_local, call_time, args)

处理计划的转换任务。查询指定窗口内的历史数据并应用转换。

关键操作

  1. 从参数解析配置
  2. 使用过滤器查询源测量
  3. 应用名称和值转换
  4. 将转换后的数据写入目标测量

process_writes(influxdb3_local, table_batches, args)

在数据写入期间处理实时转换。在写入之前处理传入的数据批次并应用转换。

关键操作

  1. 过滤相关的表批次
  2. 对每一行应用转换
  3. 立即写入目标测量

apply_transformations(value, transformations)

核心转换引擎,将一系列转换应用于值。

支持的转换

  • 字符串操作:lower, upper, snake
  • 空格处理:space_to_underscore, remove_space
  • 字符过滤:alnum_underscore_only
  • 下划线管理:collapse_underscore, trim_underscore
  • 单位转换:convert_<from>_to_<to>
  • 自定义替换:用户定义的字符串替换

故障排除

常见问题

问题:转换未应用

解决方案:检查字段名称是否完全匹配(区分大小写)。使用正则表达式进行灵活匹配

--trigger-arguments 'custom_regex=temp_fields:"temp%",values_transformations=temp_fields:"convert_degC_to_degF"'

问题:“Permission denied” 错误日志

解决方案:确保插件文件具有执行权限

chmod +x ~/.plugins/basic_transformation.py

问题:单位转换失败

解决方案:验证单位名称是否为有效的 pint 单位。常用单位

  • 温度:degC, degF, degK
  • 长度:meter, foot, inch
  • 时间:second, minute, hour

问题:目标测量中没有数据

解决方案:

  1. 检查 dry_run 是否设置为 "true"
  2. 验证源测量包含数据
  3. 检查日志是否有错误
    influxdb3 query \
      --database _internal \
      "SELECT * FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name'"

调试技巧

  1. 启用 dry run 来测试转换

    --trigger-arguments 'dry_run=true,...'
  2. 使用特定的时间窗口进行测试

    --trigger-arguments 'window=1h,...'
  3. 检查源数据中的字段名称

    influxdb3 query --database mydb "SHOW FIELD KEYS FROM measurement"

性能注意事项

  • 字段名称缓存可减少查询开销(1 小时缓存)
  • 计划任务的批量处理可提高吞吐量
  • 重试机制(3 次尝试)可处理瞬时写入失败
  • 使用过滤器仅处理相关数据

报告问题

有关插件问题,请参阅插件存储库的 issues 页面

查找 InfluxDB 3 Core 的支持

加入 InfluxDB Discord 服务器 是获取 InfluxDB 3 Core 和 InfluxDB 3 Enterprise 支持的最佳途径。对于其他 InfluxDB 版本,请参阅 支持和反馈 选项。


此页面是否有帮助?

感谢您的反馈!


InfluxDB 3.8 新特性

InfluxDB 3.8 和 InfluxDB 3 Explorer 1.6 的主要增强功能。

查看博客文章

InfluxDB 3.8 现已适用于 Core 和 Enterprise 版本,同时发布了 InfluxDB 3 Explorer UI 的 1.6 版本。本次发布着重于操作成熟度,以及如何更轻松地部署、管理和可靠地运行 InfluxDB。

更多信息,请查看

InfluxDB Docker 的 latest 标签将指向 InfluxDB 3 Core

在 **2026 年 2 月 3 日**,InfluxDB Docker 镜像的 latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。

如果使用 Docker 来安装和运行 InfluxDB,latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。例如,如果使用 Docker 运行 InfluxDB v2,请将 latest 版本标签替换为 Docker pull 命令中的特定版本标签 — 例如

docker pull influxdb:2