基本转换插件
基本转换插件支持 InfluxDB 3 中时间序列数据的实时和计划转换。该插件可以转换字段和标签名称,在单位之间转换值,并应用自定义字符串替换以标准化或清理您的数据。该插件支持对历史数据进行计划的批量处理以及数据写入时的实时转换。
配置
必需参数
| 参数 | 类型 | 默认 | 描述 |
|---|---|---|---|
measurement | string | 必需 | 包含要转换数据的源测量 |
target_measurement | string | 必需 | 转换后的数据的目标测量 |
target_database | string | 当前数据库 | 存储转换后数据的数据库 |
dry_run | string | "false" | 当设置为 "true" 时,会记录转换过程而不进行写入 |
转换参数
| 参数 | 类型 | 默认 | 描述 |
|---|---|---|---|
names_transformations | string | none | 字段/标签名称转换规则。格式:'field1:"transform1 transform2".field2:"transform3"' |
values_transformations | string | none | 字段值转换规则。格式:'field1:"transform1".field2:"transform2"' |
custom_replacements | string | none | 自定义字符串替换。格式:'rule_name:"find=replace"' |
custom_regex | string | none | 用于字段匹配的正则表达式。格式:'pattern_name:"temp%"' |
数据选择参数
| 参数 | 类型 | 默认 | 描述 |
|---|---|---|---|
window | string | 必需 (仅计划任务) | 历史数据窗口。格式:<数字><单位> (例如,"30d", "1h") |
included_fields | string | 所有字段 | 要包含的字段的句点分隔列表 (例如,"temp.humidity") |
excluded_fields | string | none | 要排除的字段的句点分隔列表 |
filters | string | none | 查询过滤器。格式:'field:"operator value"' |
TOML 配置
| 参数 | 类型 | 默认 | 描述 |
|---|---|---|---|
config_file_path | string | none | 相对于 PLUGIN_DIR 的 TOML 配置文件路径 (TOML 配置必需) |
要使用 TOML 配置文件,请设置 PLUGIN_DIR 环境变量并在触发器参数中指定 config_file_path。 这与启动 InfluxDB 3 时的 --plugin-dir 标志是附加的。
示例 TOML 配置
- basic_transformation_config_scheduler.toml - 用于计划触发器
- basic_transformation_config_data_writes.toml - 用于数据写入触发器
influxdb3 create trigger \
--database mydb \
--plugin-filename basic_transformation.py \
--trigger-spec "every:1d" \
--trigger-arguments config_file_path=basic_transformation_config_scheduler.toml \
basic_transform_trigger有关使用 TOML 配置文件,请参阅 influxdb3_plugins /README.md 中的“使用 TOML 配置文件”部分。
架构要求
该插件假定表架构已在数据库中定义,因为它依赖于此架构来检索处理所需的字段和标签名称。
需要现有架构
根据设计,如果架构不存在或不包含预期的列,则插件会返回错误。
安装步骤
启用处理引擎来启动 InfluxDB 3 核心(
--plugin-dir /path/to/plugins)influxdb3 serve \ --node-id node0 \ --object-store file \ --data-dir ~/.influxdb3 \ --plugin-dir ~/.plugins安装必需的 Python 包
pint(用于单位转换)
influxdb3 install package pint
触发器设置
计划转换
定期对历史数据运行转换
influxdb3 create trigger \
--database mydb \
--plugin-filename gh:influxdata/basic_transformation/basic_transformation.py \
--trigger-spec "every:1h" \
--trigger-arguments 'measurement=temperature,window=24h,target_measurement=temperature_normalized,names_transformations=temp:"snake",values_transformations=temp:"convert_degC_to_degF"' \
hourly_temp_transform实时转换
在写入时转换数据
influxdb3 create trigger \
--database mydb \
--plugin-filename gh:influxdata/basic_transformation/basic_transformation.py \
--trigger-spec "all_tables" \
--trigger-arguments 'measurement=sensor_data,target_measurement=sensor_data_clean,names_transformations=.*:"snake alnum_underscore_only"' \
realtime_clean示例用法
示例 1:温度单位转换
将温度读数从摄氏度转换为华氏度,同时标准化字段名称
# Create the trigger
influxdb3 create trigger \
--database weather \
--plugin-filename gh:influxdata/basic_transformation/basic_transformation.py \
--trigger-spec "every:30m" \
--trigger-arguments 'measurement=raw_temps,window=1h,target_measurement=temps_fahrenheit,names_transformations=Temperature:"snake",values_transformations=temperature:"convert_degC_to_degF"' \
temp_converter
# Write test data
influxdb3 write \
--database weather \
"raw_temps,location=office Temperature=22.5"
# Query transformed data (after trigger runs)
influxdb3 query \
--database weather \
"SELECT * FROM temps_fahrenheit"预期输出
location | temperature | time
---------|-------------|-----
office | 72.5 | 2024-01-01T00:00:00Z转换详情
- 之前:
Temperature=22.5(摄氏度) - 之后:
temperature=72.5(华氏度,字段名称转换为 snake_case)
示例 2:字段名称标准化
清理和标准化来自各种传感器的字段名称
# Create trigger with multiple transformations
influxdb3 create trigger \
--database sensors \
--plugin-filename gh:influxdata/basic_transformation/basic_transformation.py \
--trigger-spec "all_tables" \
--trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake alnum_underscore_only collapse_underscore trim_underscore"' \
field_cleaner
# Write data with inconsistent field names
influxdb3 write \
--database sensors \
"raw_sensors,device=sensor1 \"Room Temperature\"=20.1,\"__Humidity_%\"=45.2"
# Query cleaned data
influxdb3 query \
--database sensors \
"SELECT * FROM clean_sensors"预期输出
device | room_temperature | humidity | time
--------|------------------|----------|-----
sensor1 | 20.1 | 45.2 | 2024-01-01T00:00:00Z转换详情
- 之前:
"Room Temperature"=20.1,"__Humidity_%"=45.2 - 之后:
room_temperature=20.1,humidity=45.2(字段名称已标准化)
示例 3:自定义字符串替换
替换字段值中的特定字符串
# Create trigger with custom replacements
influxdb3 create trigger \
--database inventory \
--plugin-filename gh:influxdata/basic_transformation/basic_transformation.py \
--trigger-spec "every:1d" \
--trigger-arguments 'measurement=products,window=7d,target_measurement=products_updated,values_transformations=status:"status_replace",custom_replacements=status_replace:"In Stock=available.Out of Stock=unavailable"' \
status_updater代码概述
文件
basic_transformation.py:包含计划任务和数据写入转换处理程序的主要插件代码basic_transformation_config_data_writes.toml:用于数据写入触发器的示例 TOML 配置文件basic_transformation_config_scheduler.toml:用于计划触发器的示例 TOML 配置文件
日志记录
日志存储在 _internal 数据库(或创建触发器的数据库)的 system.processing_engine_logs 表中。要查看日志
influxdb3 query --database _internal "SELECT * FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name'"日志列
- event_time:日志事件的时间戳
- trigger_name:生成日志的触发器名称
- log_level:严重性级别(INFO、WARN、ERROR)
- log_text:描述操作或错误的邮件
主函数
process_scheduled_call(influxdb3_local, call_time, args)
处理计划的转换任务。查询指定窗口内的历史数据并应用转换。
关键操作
- 从参数解析配置
- 使用过滤器查询源测量
- 应用名称和值转换
- 将转换后的数据写入目标测量
process_writes(influxdb3_local, table_batches, args)
在数据写入期间处理实时转换。在写入之前处理传入的数据批次并应用转换。
关键操作
- 过滤相关的表批次
- 对每一行应用转换
- 立即写入目标测量
apply_transformations(value, transformations)
核心转换引擎,将一系列转换应用于值。
支持的转换
- 字符串操作:
lower,upper,snake - 空格处理:
space_to_underscore,remove_space - 字符过滤:
alnum_underscore_only - 下划线管理:
collapse_underscore,trim_underscore - 单位转换:
convert_<from>_to_<to> - 自定义替换:用户定义的字符串替换
故障排除
常见问题
问题:转换未应用
解决方案:检查字段名称是否完全匹配(区分大小写)。使用正则表达式进行灵活匹配
--trigger-arguments 'custom_regex=temp_fields:"temp%",values_transformations=temp_fields:"convert_degC_to_degF"'问题:“Permission denied” 错误日志
解决方案:确保插件文件具有执行权限
chmod +x ~/.plugins/basic_transformation.py问题:单位转换失败
解决方案:验证单位名称是否为有效的 pint 单位。常用单位
- 温度:
degC,degF,degK - 长度:
meter,foot,inch - 时间:
second,minute,hour
问题:目标测量中没有数据
解决方案:
- 检查 dry_run 是否设置为 "true"
- 验证源测量包含数据
- 检查日志是否有错误
influxdb3 query \ --database _internal \ "SELECT * FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name'"
调试技巧
启用 dry run 来测试转换
--trigger-arguments 'dry_run=true,...'使用特定的时间窗口进行测试
--trigger-arguments 'window=1h,...'检查源数据中的字段名称
influxdb3 query --database mydb "SHOW FIELD KEYS FROM measurement"
性能注意事项
- 字段名称缓存可减少查询开销(1 小时缓存)
- 计划任务的批量处理可提高吞吐量
- 重试机制(3 次尝试)可处理瞬时写入失败
- 使用过滤器仅处理相关数据
报告问题
有关插件问题,请参阅插件存储库的 issues 页面。
查找 InfluxDB 3 Core 的支持
加入 InfluxDB Discord 服务器 是获取 InfluxDB 3 Core 和 InfluxDB 3 Enterprise 支持的最佳途径。对于其他 InfluxDB 版本,请参阅 支持和反馈 选项。
此页面是否有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一员!我们欢迎并鼓励您对 InfluxDB 3 Core 和本文档提供反馈和错误报告。要获得支持,请使用以下资源
具有年度合同或支持合同的客户可以 联系 InfluxData 支持。