InfluxDB 到 Iceberg 插件
InfluxDB 到 Iceberg 插件支持将数据从 InfluxDB 3 传输到 Apache Iceberg 表。将时序数据传输到 Iceberg,用于长期存储、分析或与数据湖架构集成。该插件支持对历史数据进行计划的批量传输以及通过 HTTP API 进行的按需传输。
配置
调度器触发参数
必需参数
| 参数 | 类型 | 默认 | 描述 |
|---|---|---|---|
measurement | string | 必需 | 包含待传输数据的源测量 |
window | string | 必需 | 数据传输的时间窗口。格式:<数量><单位>(例如,"1h"、"30d") |
catalog_configs | string | 必需 | 包含 Iceberg catalog 配置的 Base64 编码的 JSON 字符串 |
可选参数
| 参数 | 类型 | 默认 | 描述 |
|---|---|---|---|
included_fields | string | 所有字段 | 要包含的字段的.分隔列表(例如,"usage_user.usage_idle") |
excluded_fields | string | none | 要排除的字段的.分隔列表 |
namespace | string | “default” | 目标表的 Iceberg 命名空间 |
table_name | string | 测量名称 | Iceberg 表名称 |
config_file_path | string | none | 相对于 PLUGIN_DIR 的 TOML 配置文件路径 |
HTTP 触发参数
请求正文结构
| 参数 | 类型 | 必需 | 描述 |
|---|---|---|---|
measurement | string | 是 | 包含待传输数据的源测量 |
catalog_configs | object | 是 | Iceberg catalog 配置字典。请参阅 PyIceberg catalog 文档 |
included_fields | array | 否 | 包含在复制中的字段名称列表 |
excluded_fields | array | 否 | 要从复制中排除的字段名称列表 |
namespace | string | 否 | 目标 Iceberg 命名空间(默认:“default”) |
table_name | string | 否 | 目标 Iceberg 表名称(默认:测量名称) |
batch_size | string | 否 | 处理的批量大小持续时间(默认:“1d”)。格式:<数量><单位> |
backfill_start | string | 否 | 带时区的 ISO 8601 日期时间,用于回填开始 |
backfill_end | string | 否 | 带时区的 ISO 8601 日期时间,用于回填结束 |
Schema 管理
- 从第一批数据自动创建 Iceberg 表 schema
- 将 pandas 数据类型映射到 Iceberg 类型
int64→IntegerTypefloat64→FloatTypedatetime64[us]→TimestampTypeobject→StringType
- 没有 null 值的字段被标记为
required time列被转换为datetime64[us]以实现 Iceberg 兼容性- 表创建格式为:
<namespace>.<table_name>
架构要求
该插件假定 Iceberg 表 schema 已在数据库中定义,因为它依赖于此 schema 来检索处理所需的字段和标签名称。
需要现有架构
根据设计,如果架构不存在或不包含预期的列,则插件会返回错误。
TOML 配置
| 参数 | 类型 | 默认 | 描述 |
|---|---|---|---|
config_file_path | string | none | 相对于 PLUGIN_DIR 的 TOML 配置文件路径 (TOML 配置必需) |
要使用 TOML 配置文件,请设置 PLUGIN_DIR 环境变量并在触发器参数中指定 config_file_path。 这与启动 InfluxDB 3 时的 --plugin-dir 标志是附加的。
示例 TOML 配置
influxdb_to_iceberg_config_scheduler.toml
有关使用 TOML 配置文件,请参阅 influxdb3_plugins /README.md 中的“使用 TOML 配置文件”部分。
安装步骤
启用处理引擎来启动 InfluxDB 3 核心(
--plugin-dir /path/to/plugins)安装必需的 Python 包
pandas(用于数据操作)pyarrow(用于 Parquet 支持)pyiceberg[catalog-options](用于 Iceberg 集成)
influxdb3 install package pandas influxdb3 install package pyarrow influxdb3 install package "pyiceberg[s3fs,hive,sql-sqlite]"注意:根据您的 catalog 类型包含适当的 PyIceberg 扩展
[s3fs]用于 S3 存储[hive]用于 Hive metastore[sql-sqlite]用于带 SQLite 的 SQL catalog- 请参阅 PyIceberg 文档 查看所有选项
触发器设置
计划数据传输
定期将数据从 InfluxDB 3 传输到 Iceberg
influxdb3 create trigger \
--database mydb \
--plugin-filename gh:influxdata/influxdb_to_iceberg/influxdb_to_iceberg.py \
--trigger-spec "every:1h" \
--trigger-arguments 'measurement=cpu,window=1h,catalog_configs="eyJ1cmkiOiAiaHR0cDovL25lc3NpZTo5MDAwIn0=",namespace=monitoring,table_name=cpu_metrics' \
hourly_iceberg_transferHTTP API 端点
创建按需传输端点
influxdb3 create trigger \
--database mydb \
--plugin-filename gh:influxdata/influxdb_to_iceberg/influxdb_to_iceberg.py \
--trigger-spec "request:replicate" \
iceberg_http_transfer启用触发器
influxdb3 enable trigger --database mydb iceberg_http_transfer端点在 /api/v3/engine/replicate 注册。
示例用法
示例 1:基本计划传输
每小时将 CPU 指标传输到 Iceberg
# Create trigger with base64-encoded catalog config
# Original JSON: {"uri": "http://nessie:9000"}
# Base64: eyJ1cmkiOiAiaHR0cDovL25lc3NpZTo5MDAwIn0=
influxdb3 create trigger \
--database metrics \
--plugin-filename gh:influxdata/influxdb_to_iceberg/influxdb_to_iceberg.py \
--trigger-spec "every:1h" \
--trigger-arguments 'measurement=cpu,window=24h,catalog_configs="eyJ1cmkiOiAiaHR0cDovL25lc3NpZTo5MDAwIn0="' \
cpu_to_iceberg
# Write test data
influxdb3 write \
--database metrics \
"cpu,host=server1 usage_user=45.2,usage_system=12.1"
# After trigger runs, data is available in Iceberg table "default.cpu"预期结果
- 创建 Iceberg 表
default.cpu,其 schema 与测量匹配 - 传输过去 24 小时的所有 CPU 数据
- 每次每小时运行都附加新数据
示例 2:带字段过滤的 HTTP 回填
回填历史数据中的特定字段
# Create and enable HTTP trigger
influxdb3 create trigger \
--database metrics \
--plugin-filename gh:influxdata/influxdb_to_iceberg/influxdb_to_iceberg.py \
--trigger-spec "request:replicate" \
iceberg_backfill
influxdb3 enable trigger --database metrics iceberg_backfill
# Request backfill via HTTP
curl -X POST https://:8181/api/v3/engine/replicate \
-H "Authorization: Bearer YOUR_TOKEN" \
-d '{
"measurement": "temperature",
"catalog_configs": {
"type": "sql",
"uri": "sqlite:///path/to/catalog.db"
},
"included_fields": ["temp_celsius", "humidity"],
"namespace": "weather",
"table_name": "temperature_history",
"batch_size": "12h",
"backfill_start": "2024-01-01T00:00:00+00:00",
"backfill_end": "2024-01-07T00:00:00+00:00"
}'预期结果
- 创建 Iceberg 表
weather.temperature_history - 仅传输
temp_celsius和humidity字段 - 在指定的周内以 12 小时为批次处理数据
- 返回回填操作的状态
示例 3:S3 支持的 Iceberg catalog
将数据传输到存储在 S3 中的 Iceberg 表
# Create catalog config JSON
cat > catalog_config.json << EOF
{
"type": "sql",
"uri": "sqlite:///iceberg/catalog.db",
"warehouse": "s3://my-bucket/iceberg-warehouse/",
"s3.endpoint": "http://minio:9000",
"s3.access-key-id": "minioadmin",
"s3.secret-access-key": "minioadmin",
"s3.path-style-access": true
}
EOF
# Encode to base64
CATALOG_CONFIG=$(base64 < catalog_config.json)
# Create trigger
influxdb3 create trigger \
--database metrics \
--plugin-filename gh:influxdata/influxdb_to_iceberg/influxdb_to_iceberg.py \
--trigger-spec "every:30m" \
--trigger-arguments "measurement=sensor_data,window=1h,catalog_configs=\"$CATALOG_CONFIG\",namespace=iot,table_name=sensors" \
s3_iceberg_transfer使用 TOML 配置文件
该插件支持使用 TOML 配置文件来指定所有插件参数。这对于复杂的配置或希望对插件设置进行版本控制很有用。
重要要求
要使用 TOML 配置文件,您必须在 InfluxDB 3 主机环境中设置 PLUGIN_DIR 环境变量。 这与启动 InfluxDB 3 时的 --plugin-dir 标志是附加要求。
--plugin-dir告诉 InfluxDB 3 在哪里查找插件 Python 文件PLUGIN_DIR环境变量告诉插件在哪里查找 TOML 配置文件
设置 TOML 配置
设置 PLUGIN_DIR 环境变量后启动 InfluxDB 3:
PLUGIN_DIR=~/.plugins influxdb3 serve --node-id node0 --object-store file --data-dir ~/.influxdb3 --plugin-dir ~/.plugins将示例 TOML 配置文件复制到您的插件目录:
cp influxdb_to_iceberg_config_scheduler.toml ~/.plugins/编辑 TOML 文件 以匹配您的需求
# Required parameters measurement = "cpu" window = "1h" # Optional parameters namespace = "monitoring" table_name = "cpu_metrics" # Iceberg catalog configuration [catalog_configs] type = "sql" uri = "http://nessie:9000" warehouse = "s3://iceberg-warehouse/"使用
config_file_path参数创建触发器:influxdb3 create trigger \ --database mydb \ --plugin-filename influxdb_to_iceberg.py \ --trigger-spec "every:1h" \ --trigger-arguments config_file_path=influxdb_to_iceberg_config_scheduler.toml \ iceberg_toml_trigger
代码概述
文件
influxdb_to_iceberg.py:包含计划和 HTTP 触发器处理程序的 मुख्य 插件代码influxdb_to_iceberg_config_scheduler.toml:计划触发器的示例 TOML 配置文件
日志记录
日志存储在 _internal 数据库(或创建触发器的数据库)的 system.processing_engine_logs 表中。要查看日志
influxdb3 query --database _internal "SELECT * FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name'"日志列
- event_time:日志事件的时间戳
- trigger_name:生成日志的触发器名称
- log_level:严重性级别(INFO、WARN、ERROR)
- log_text:描述操作或错误的邮件
主函数
process_scheduled_call(influxdb3_local, call_time, args)
处理计划数据传输。查询指定窗口内的数据并附加到 Iceberg 表。
关键操作
- 解析配置并解码 catalog 设置
- 查询源测量,可选字段过滤
- 如有需要,创建 Iceberg 表
- 将数据附加到 Iceberg 表
process_http_request(influxdb3_local, request_body, args)
通过 HTTP 处理按需数据传输。支持带有可配置批量大小的回填操作。
关键操作
- 验证请求正文参数
- 确定回填时间范围
- 分批处理数据
- 返回传输状态
故障排除
常见问题
问题:“无法解码 catalog_configs”错误
解决方案:确保 catalog 配置已正确 base64 编码
# Create JSON file
echo '{"uri": "http://nessie:9000"}' > config.json
# Encode to base64
base64 config.json问题:“创建 Iceberg 表失败”错误
解决方案:
- 验证 catalog 配置是否正确
- 检查 warehouse 路径权限
- 确保已安装必需的 PyIceberg 扩展
influxdb3 install package "pyiceberg[s3fs]"
问题:传输后 Iceberg 表中没有数据
解决方案:
- 检查源测量是否包含数据
influxdb3 query --database mydb "SELECT COUNT(*) FROM measurement" - 验证时间窗口是否包含数据
influxdb3 query --database mydb "SELECT MIN(time), MAX(time) FROM measurement" - 检查日志以获取错误
influxdb3 query --database _internal "SELECT * FROM system.processing_engine_logs WHERE log_level = 'ERROR'"
问题:“不支持 Schema 演进”错误
解决方案:该插件不处理 Schema 更改。如果字段更改
- 创建一个具有不同名称的新表
- 或手动更新 Iceberg 表 schema
调试技巧
测试 catalog 连接性:
from pyiceberg.catalog import load_catalog catalog = load_catalog("my_catalog", **catalog_configs) print(catalog.list_namespaces())验证字段名称:
influxdb3 query --database mydb "SHOW FIELD KEYS FROM measurement"使用较小的时间窗口进行初始测试
--trigger-arguments 'window=5m,...'
性能注意事项
- 文件大小:每次计划运行都会创建新的 Parquet 文件。使用适当的时间窗口大小来平衡文件数量和大小
- 批量处理:对于 HTTP 传输,根据可用内存调整
batch_size - 字段过滤:当只需要特定字段时,使用
included_fields来减少数据量 - Catalog 选择:SQL catalog(SQLite)更简单,但 REST catalog 的扩展性更好
报告问题
有关插件问题,请参阅插件存储库的 issues 页面。
查找 InfluxDB 3 Core 的支持
加入 InfluxDB Discord 服务器 是获取 InfluxDB 3 Core 和 InfluxDB 3 Enterprise 支持的最佳途径。对于其他 InfluxDB 版本,请参阅 支持和反馈 选项。
此页面是否有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一员!我们欢迎并鼓励您对 InfluxDB 3 Core 和本文档提供反馈和错误报告。要获得支持,请使用以下资源
具有年度合同或支持合同的客户可以 联系 InfluxData 支持。