文档文档

InfluxDB 到 Iceberg 插件

InfluxDB 到 Iceberg 插件支持将数据从 InfluxDB 3 传输到 Apache Iceberg 表。将时序数据传输到 Iceberg,用于长期存储、分析或与数据湖架构集成。该插件支持对历史数据进行计划的批量传输以及通过 HTTP API 进行的按需传输。

配置

调度器触发参数

必需参数

参数类型默认描述
measurementstring必需包含待传输数据的源测量
windowstring必需数据传输的时间窗口。格式:<数量><单位>(例如,"1h""30d"
catalog_configsstring必需包含 Iceberg catalog 配置的 Base64 编码的 JSON 字符串

可选参数

参数类型默认描述
included_fieldsstring所有字段要包含的字段的.分隔列表(例如,"usage_user.usage_idle"
excluded_fieldsstringnone要排除的字段的.分隔列表
namespacestring“default”目标表的 Iceberg 命名空间
table_namestring测量名称Iceberg 表名称
config_file_pathstringnone相对于 PLUGIN_DIR 的 TOML 配置文件路径

HTTP 触发参数

请求正文结构

参数类型必需描述
measurementstring包含待传输数据的源测量
catalog_configsobjectIceberg catalog 配置字典。请参阅 PyIceberg catalog 文档
included_fieldsarray包含在复制中的字段名称列表
excluded_fieldsarray要从复制中排除的字段名称列表
namespacestring目标 Iceberg 命名空间(默认:“default”)
table_namestring目标 Iceberg 表名称(默认:测量名称)
batch_sizestring处理的批量大小持续时间(默认:“1d”)。格式:<数量><单位>
backfill_startstring带时区的 ISO 8601 日期时间,用于回填开始
backfill_endstring带时区的 ISO 8601 日期时间,用于回填结束

Schema 管理

  • 从第一批数据自动创建 Iceberg 表 schema
  • 将 pandas 数据类型映射到 Iceberg 类型
    • int64IntegerType
    • float64FloatType
    • datetime64[us]TimestampType
    • objectStringType
  • 没有 null 值的字段被标记为 required
  • time 列被转换为 datetime64[us] 以实现 Iceberg 兼容性
  • 表创建格式为:<namespace>.<table_name>

架构要求

该插件假定 Iceberg 表 schema 已在数据库中定义,因为它依赖于此 schema 来检索处理所需的字段和标签名称。

需要现有架构

根据设计,如果架构不存在或不包含预期的列,则插件会返回错误。

TOML 配置

参数类型默认描述
config_file_pathstringnone相对于 PLUGIN_DIR 的 TOML 配置文件路径 (TOML 配置必需)

要使用 TOML 配置文件,请设置 PLUGIN_DIR 环境变量并在触发器参数中指定 config_file_path 这与启动 InfluxDB 3 时的 --plugin-dir 标志是附加的。

示例 TOML 配置

influxdb_to_iceberg_config_scheduler.toml

有关使用 TOML 配置文件,请参阅 influxdb3_plugins /README.md 中的“使用 TOML 配置文件”部分。

安装步骤

  1. 启用处理引擎来启动 InfluxDB 3 核心(--plugin-dir /path/to/plugins

  2. 安装必需的 Python 包

    • pandas(用于数据操作)
    • pyarrow(用于 Parquet 支持)
    • pyiceberg[catalog-options](用于 Iceberg 集成)
    influxdb3 install package pandas
    influxdb3 install package pyarrow
    influxdb3 install package "pyiceberg[s3fs,hive,sql-sqlite]"

    注意:根据您的 catalog 类型包含适当的 PyIceberg 扩展

    • [s3fs] 用于 S3 存储
    • [hive] 用于 Hive metastore
    • [sql-sqlite] 用于带 SQLite 的 SQL catalog
    • 请参阅 PyIceberg 文档 查看所有选项

触发器设置

计划数据传输

定期将数据从 InfluxDB 3 传输到 Iceberg

influxdb3 create trigger \
  --database mydb \
  --plugin-filename gh:influxdata/influxdb_to_iceberg/influxdb_to_iceberg.py \
  --trigger-spec "every:1h" \
  --trigger-arguments 'measurement=cpu,window=1h,catalog_configs="eyJ1cmkiOiAiaHR0cDovL25lc3NpZTo5MDAwIn0=",namespace=monitoring,table_name=cpu_metrics' \
  hourly_iceberg_transfer

HTTP API 端点

创建按需传输端点

influxdb3 create trigger \
  --database mydb \
  --plugin-filename gh:influxdata/influxdb_to_iceberg/influxdb_to_iceberg.py \
  --trigger-spec "request:replicate" \
  iceberg_http_transfer

启用触发器

influxdb3 enable trigger --database mydb iceberg_http_transfer

端点在 /api/v3/engine/replicate 注册。

示例用法

示例 1:基本计划传输

每小时将 CPU 指标传输到 Iceberg

# Create trigger with base64-encoded catalog config
# Original JSON: {"uri": "http://nessie:9000"}
# Base64: eyJ1cmkiOiAiaHR0cDovL25lc3NpZTo5MDAwIn0=
influxdb3 create trigger \
  --database metrics \
  --plugin-filename gh:influxdata/influxdb_to_iceberg/influxdb_to_iceberg.py \
  --trigger-spec "every:1h" \
  --trigger-arguments 'measurement=cpu,window=24h,catalog_configs="eyJ1cmkiOiAiaHR0cDovL25lc3NpZTo5MDAwIn0="' \
  cpu_to_iceberg

# Write test data
influxdb3 write \
  --database metrics \
  "cpu,host=server1 usage_user=45.2,usage_system=12.1"

# After trigger runs, data is available in Iceberg table "default.cpu"

预期结果

  • 创建 Iceberg 表 default.cpu,其 schema 与测量匹配
  • 传输过去 24 小时的所有 CPU 数据
  • 每次每小时运行都附加新数据

示例 2:带字段过滤的 HTTP 回填

回填历史数据中的特定字段

# Create and enable HTTP trigger
influxdb3 create trigger \
  --database metrics \
  --plugin-filename gh:influxdata/influxdb_to_iceberg/influxdb_to_iceberg.py \
  --trigger-spec "request:replicate" \
  iceberg_backfill

influxdb3 enable trigger --database metrics iceberg_backfill

# Request backfill via HTTP
curl -X POST https://:8181/api/v3/engine/replicate \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -d '{
    "measurement": "temperature",
    "catalog_configs": {
      "type": "sql",
      "uri": "sqlite:///path/to/catalog.db"
    },
    "included_fields": ["temp_celsius", "humidity"],
    "namespace": "weather",
    "table_name": "temperature_history",
    "batch_size": "12h",
    "backfill_start": "2024-01-01T00:00:00+00:00",
    "backfill_end": "2024-01-07T00:00:00+00:00"
  }'

预期结果

  • 创建 Iceberg 表 weather.temperature_history
  • 仅传输 temp_celsiushumidity 字段
  • 在指定的周内以 12 小时为批次处理数据
  • 返回回填操作的状态

示例 3:S3 支持的 Iceberg catalog

将数据传输到存储在 S3 中的 Iceberg 表

# Create catalog config JSON
cat > catalog_config.json << EOF
{
  "type": "sql",
  "uri": "sqlite:///iceberg/catalog.db",
  "warehouse": "s3://my-bucket/iceberg-warehouse/",
  "s3.endpoint": "http://minio:9000",
  "s3.access-key-id": "minioadmin",
  "s3.secret-access-key": "minioadmin",
  "s3.path-style-access": true
}
EOF

# Encode to base64
CATALOG_CONFIG=$(base64 < catalog_config.json)

# Create trigger
influxdb3 create trigger \
  --database metrics \
  --plugin-filename gh:influxdata/influxdb_to_iceberg/influxdb_to_iceberg.py \
  --trigger-spec "every:30m" \
  --trigger-arguments "measurement=sensor_data,window=1h,catalog_configs=\"$CATALOG_CONFIG\",namespace=iot,table_name=sensors" \
  s3_iceberg_transfer

使用 TOML 配置文件

该插件支持使用 TOML 配置文件来指定所有插件参数。这对于复杂的配置或希望对插件设置进行版本控制很有用。

重要要求

要使用 TOML 配置文件,您必须在 InfluxDB 3 主机环境中设置 PLUGIN_DIR 环境变量。 这与启动 InfluxDB 3 时的 --plugin-dir 标志是附加要求。

  • --plugin-dir 告诉 InfluxDB 3 在哪里查找插件 Python 文件
  • PLUGIN_DIR 环境变量告诉插件在哪里查找 TOML 配置文件

设置 TOML 配置

  1. 设置 PLUGIN_DIR 环境变量后启动 InfluxDB 3:

    PLUGIN_DIR=~/.plugins influxdb3 serve --node-id node0 --object-store file --data-dir ~/.influxdb3 --plugin-dir ~/.plugins
  2. 将示例 TOML 配置文件复制到您的插件目录:

    cp influxdb_to_iceberg_config_scheduler.toml ~/.plugins/
  3. 编辑 TOML 文件 以匹配您的需求

    # Required parameters
    measurement = "cpu"
    window = "1h"
    
    # Optional parameters
    namespace = "monitoring"
    table_name = "cpu_metrics"
    
    # Iceberg catalog configuration
    [catalog_configs]
    type = "sql"
    uri = "http://nessie:9000"
    warehouse = "s3://iceberg-warehouse/"
  4. 使用 config_file_path 参数创建触发器:

    influxdb3 create trigger \
      --database mydb \
      --plugin-filename influxdb_to_iceberg.py \
      --trigger-spec "every:1h" \
      --trigger-arguments config_file_path=influxdb_to_iceberg_config_scheduler.toml \
      iceberg_toml_trigger

代码概述

文件

  • influxdb_to_iceberg.py:包含计划和 HTTP 触发器处理程序的 मुख्य 插件代码
  • influxdb_to_iceberg_config_scheduler.toml:计划触发器的示例 TOML 配置文件

日志记录

日志存储在 _internal 数据库(或创建触发器的数据库)的 system.processing_engine_logs 表中。要查看日志

influxdb3 query --database _internal "SELECT * FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name'"

日志列

  • event_time:日志事件的时间戳
  • trigger_name:生成日志的触发器名称
  • log_level:严重性级别(INFO、WARN、ERROR)
  • log_text:描述操作或错误的邮件

主函数

process_scheduled_call(influxdb3_local, call_time, args)

处理计划数据传输。查询指定窗口内的数据并附加到 Iceberg 表。

关键操作

  1. 解析配置并解码 catalog 设置
  2. 查询源测量,可选字段过滤
  3. 如有需要,创建 Iceberg 表
  4. 将数据附加到 Iceberg 表

process_http_request(influxdb3_local, request_body, args)

通过 HTTP 处理按需数据传输。支持带有可配置批量大小的回填操作。

关键操作

  1. 验证请求正文参数
  2. 确定回填时间范围
  3. 分批处理数据
  4. 返回传输状态

故障排除

常见问题

问题:“无法解码 catalog_configs”错误

解决方案:确保 catalog 配置已正确 base64 编码

# Create JSON file
echo '{"uri": "http://nessie:9000"}' > config.json
# Encode to base64
base64 config.json

问题:“创建 Iceberg 表失败”错误

解决方案:

  1. 验证 catalog 配置是否正确
  2. 检查 warehouse 路径权限
  3. 确保已安装必需的 PyIceberg 扩展
    influxdb3 install package "pyiceberg[s3fs]"

问题:传输后 Iceberg 表中没有数据

解决方案:

  1. 检查源测量是否包含数据
    influxdb3 query --database mydb "SELECT COUNT(*) FROM measurement"
  2. 验证时间窗口是否包含数据
    influxdb3 query --database mydb "SELECT MIN(time), MAX(time) FROM measurement"
  3. 检查日志以获取错误
    influxdb3 query --database _internal "SELECT * FROM system.processing_engine_logs WHERE log_level = 'ERROR'"

问题:“不支持 Schema 演进”错误

解决方案:该插件不处理 Schema 更改。如果字段更改

  1. 创建一个具有不同名称的新表
  2. 或手动更新 Iceberg 表 schema

调试技巧

  1. 测试 catalog 连接性:

    from pyiceberg.catalog import load_catalog
    catalog = load_catalog("my_catalog", **catalog_configs)
    print(catalog.list_namespaces())
  2. 验证字段名称:

    influxdb3 query --database mydb "SHOW FIELD KEYS FROM measurement"
  3. 使用较小的时间窗口进行初始测试

    --trigger-arguments 'window=5m,...'

性能注意事项

  • 文件大小:每次计划运行都会创建新的 Parquet 文件。使用适当的时间窗口大小来平衡文件数量和大小
  • 批量处理:对于 HTTP 传输,根据可用内存调整 batch_size
  • 字段过滤:当只需要特定字段时,使用 included_fields 来减少数据量
  • Catalog 选择:SQL catalog(SQLite)更简单,但 REST catalog 的扩展性更好

报告问题

有关插件问题,请参阅插件存储库的 issues 页面

查找 InfluxDB 3 Core 的支持

加入 InfluxDB Discord 服务器 是获取 InfluxDB 3 Core 和 InfluxDB 3 Enterprise 支持的最佳途径。对于其他 InfluxDB 版本,请参阅 支持和反馈 选项。


此页面是否有帮助?

感谢您的反馈!


InfluxDB 3.8 新特性

InfluxDB 3.8 和 InfluxDB 3 Explorer 1.6 的主要增强功能。

查看博客文章

InfluxDB 3.8 现已适用于 Core 和 Enterprise 版本,同时发布了 InfluxDB 3 Explorer UI 的 1.6 版本。本次发布着重于操作成熟度,以及如何更轻松地部署、管理和可靠地运行 InfluxDB。

更多信息,请查看

InfluxDB Docker 的 latest 标签将指向 InfluxDB 3 Core

在 **2026 年 2 月 3 日**,InfluxDB Docker 镜像的 latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。

如果使用 Docker 来安装和运行 InfluxDB,latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。例如,如果使用 Docker 运行 InfluxDB v2,请将 latest 版本标签替换为 Docker pull 命令中的特定版本标签 — 例如

docker pull influxdb:2