文档文档

降采样插件

Downsampler 插件支持 InfluxDB 3 中的按时间进行数据聚合和降采样。通过使用 avg、sum、min、max、derivative 或 median 等函数,在指定的时间间隔内聚合测量数据,从而减少数据量。该插件同时支持对历史数据进行计划性批量处理和通过 HTTP 请求进行按需降采样。每个降采样后的记录都包含有关压缩的原始数据点的元数据。

配置

必需参数

参数类型默认描述
source_measurementstring必需包含要降采样数据的源测量
target_measurementstring必需降采样数据的目标测量
windowstringrequired (仅限计划性)每个降采样作业的时间窗口。格式:<数字><单位>(例如,"1h""1d"

聚合参数

参数类型默认描述
intervalstring"10min"降采样的时间间隔。格式:<数字><单位>(例如,"10min""2h""1d"
calculationsstring“avg”聚合函数。单个函数或以点分隔的字段:聚合对
specific_fieldsstring所有字段要降采样的字段的点分隔列表(例如,"co.temperature"
excluded_fieldsstringnone要从降采样中排除的字段的点分隔列表

过滤参数

参数类型默认描述
tag_valuesstringnone标签过滤器。格式:tag:value1@value2@value3 表示多个值
offsetstring“0”应用于窗口的时间偏移量

高级参数

参数类型默认描述
target_databasestring“default”存储降采样数据的数据库
max_retriesinteger5写入操作的最大重试次数
batch_sizestring“30d”批量处理的时间间隔(仅限 HTTP 模式)

TOML 配置

参数类型默认描述
config_file_pathstringnone相对于 PLUGIN_DIR 的 TOML 配置文件路径 (TOML 配置必需)

要使用 TOML 配置文件,请设置 PLUGIN_DIR 环境变量并在触发器参数中指定 config_file_path 这与启动 InfluxDB 3 时的 --plugin-dir 标志是附加的。

示例 TOML 配置

downsampling_config_scheduler.toml

有关使用 TOML 配置文件,请参阅 influxdb3_plugins /README.md 中的“使用 TOML 配置文件”部分。

模式管理

每个降采样记录包含三个额外的元数据列

  • record_count—压缩到此单个降采样行中的原始点数
  • time_from—时间间隔内原始点中的最小时间戳
  • time_to—时间间隔内原始点中的最大时间戳

安装步骤

  1. 启用处理引擎来启动 InfluxDB 3 核心(--plugin-dir /path/to/plugins

    influxdb3 serve \
      --node-id node0 \
      --object-store file \
      --data-dir ~/.influxdb3 \
      --plugin-dir ~/.plugins
  2. 此插件无需额外的 Python 包。

触发器设置

计划性降采样

定期对历史数据运行降采样

influxdb3 create trigger \
  --database mydb \
  --plugin-filename gh:influxdata/downsampler/downsampler.py \
  --trigger-spec "every:1h" \
  --trigger-arguments 'source_measurement=cpu_metrics,target_measurement=cpu_hourly,interval=1h,window=6h,calculations=avg,specific_fields=usage_user.usage_system' \
  cpu_hourly_downsample

按需降采样

通过 HTTP 请求触发降采样

influxdb3 create trigger \
  --database mydb \
  --plugin-filename gh:influxdata/downsampler/downsampler.py \
  --trigger-spec "request:downsample" \
  downsample_api

示例用法

示例 1:CPU 指标每小时聚合

将 CPU 使用率数据从 1 分钟间隔降采样到每小时平均值

# Create the trigger
influxdb3 create trigger \
  --database system_metrics \
  --plugin-filename gh:influxdata/downsampler/downsampler.py \
  --trigger-spec "every:1h" \
  --trigger-arguments 'source_measurement=cpu,target_measurement=cpu_hourly,interval=1h,window=6h,calculations=avg,specific_fields=usage_user.usage_system.usage_idle' \
  cpu_hourly_downsample

# Write test data
influxdb3 write \
  --database system_metrics \
  "cpu,host=server1 usage_user=45.2,usage_system=12.1,usage_idle=42.7"

# Query downsampled data (after trigger runs)
influxdb3 query \
  --database system_metrics \
  "SELECT * FROM cpu_hourly WHERE time >= now() - 1d"

预期输出

host    | usage_user | usage_system | usage_idle | record_count | time_from           | time_to             | time
--------|------------|--------------|------------|--------------|---------------------|---------------------|-----
server1 | 44.8       | 11.9         | 43.3       | 60           | 2024-01-01T00:00:00Z| 2024-01-01T00:59:59Z| 2024-01-01T01:00:00Z

聚合详情

  • 之前:1 小时内的 60 个独立 CPU 测量
  • 之后:1 个聚合测量,包含平均值和元数据
  • 元数据显示原始记录数和时间范围

示例 2:具有不同函数的多个字段聚合

对不同字段应用不同的聚合函数

# Create trigger with field-specific aggregations
influxdb3 create trigger \
  --database sensors \
  --plugin-filename gh:influxdata/downsampler/downsampler.py \
  --trigger-spec "every:10min" \
  --trigger-arguments 'source_measurement=environment,target_measurement=environment_10min,interval=10min,window=30min,calculations=temperature:avg.humidity:avg.pressure:max' \
  env_multi_agg

# Write data with various sensor readings
influxdb3 write \
  --database sensors \
  "environment,location=office temperature=22.5,humidity=45.2,pressure=1013.25"

# Query aggregated data
influxdb3 query \
  --database sensors \
  "SELECT * FROM environment_10min WHERE time >= now() - 1h"

预期输出

location | temperature | humidity | pressure | record_count | time
---------|-------------|----------|----------|--------------|-----
office   | 22.3        | 44.8     | 1015.1   | 10           | 2024-01-01T00:10:00Z

示例 3:HTTP API 降采样与回填

使用 HTTP API 对历史数据进行按需降采样

# Send HTTP request for backfill downsampling
curl -X POST https://:8181/api/v3/engine/downsample \
  --header "Authorization: Bearer YOUR_TOKEN" \
  --data '{
    "source_measurement": "metrics",
    "target_measurement": "metrics_daily",
    "target_database": "analytics",
    "interval": "1d",
    "batch_size": "7d",
    "calculations": [["cpu_usage", "avg"], ["memory_usage", "max"], ["disk_usage", "avg"]],
    "backfill_start": "2024-01-01T00:00:00Z",
    "backfill_end": "2024-01-31T00:00:00Z",
    "max_retries": 3
  }'

代码概述

文件

  • downsampler.py:包含计划性和 HTTP 触发降采样处理程序的 मुख्य 插件代码
  • downsampling_config_scheduler.toml:计划性触发器的示例 TOML 配置文件

日志记录

日志存储在 _internal 数据库(或创建触发器的数据库)的 system.processing_engine_logs 表中。要查看日志

influxdb3 query --database _internal "SELECT * FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name'"

日志列

  • event_time:日志事件的时间戳(具有纳秒精度)
  • trigger_name:生成日志的触发器名称
  • log_level:严重性级别(INFO、WARN、ERROR)
  • log_text:描述操作或错误的与唯一 task_id 相关的消息,用于可追溯性

主函数

process_scheduled_call(influxdb3_local, call_time, args)

处理计划性降采样任务。查询指定窗口内的历史数据并应用聚合函数。

关键操作

  1. 从参数或 TOML 文件解析配置
  2. 使用可选的标签过滤器查询源测量
  3. 使用指定的函数应用基于时间的聚合
  4. 写入带有元数据列的降采样数据

process_http_request(influxdb3_local, request_body, args)

处理 HTTP 触发的按需降采样。处理批量降采样,并为回填场景配置可配置的时间范围。

关键操作

  1. 解析 JSON 请求正文参数
  2. 处理可配置时间批次中的数据
  3. 对历史数据应用聚合函数
  4. 返回处理统计信息和结果

aggregate_data(data, interval, calculations)

核心聚合引擎,将统计函数应用于时间序列数据。

支持的聚合函数

  • avg:平均值
  • sum:值之和
  • min:最小值
  • max:最大值
  • derivative:变化率
  • median:中位数

故障排除

常见问题

问题:目标测量中没有数据

解决方案:检查源测量是否存在并在指定的时间窗口内包含数据

influxdb3 query --database mydb "SELECT COUNT(*) FROM source_measurement WHERE time >= now() - 1h"

问题:聚合函数不起作用

解决方案:验证字段名称和聚合语法。使用 SHOW FIELD KEYS 检查可用字段

influxdb3 query --database mydb "SHOW FIELD KEYS FROM source_measurement"

问题:标签过滤器未应用

解决方案:检查标签值格式。使用 @ 分隔符表示多个值

--trigger-arguments 'tag_values=host:server1@server2@server3'

问题:HTTP 端点无法访问

解决方案:验证触发器是否使用正确的请求规范创建

influxdb3 list triggers --database mydb

调试技巧

  1. 检查执行日志(通过 task ID 过滤)

    influxdb3 query --database _internal \
      "SELECT * FROM system.processing_engine_logs WHERE log_text LIKE '%task_id%' ORDER BY event_time DESC LIMIT 10"
  2. 使用较小的时间窗口进行测试以进行调试

    --trigger-arguments 'window=5min,interval=1min'
  3. 在聚合之前验证字段类型

    influxdb3 query --database mydb "SELECT * FROM source_measurement LIMIT 1"

性能注意事项

  • 批量处理:对 HTTP 请求使用适当的 batch_size 以平衡内存使用和性能
  • 字段过滤:使用 specific_fields 仅处理必要的数据
  • 重试逻辑:根据网络可靠性配置 max_retries
  • 元数据开销:元数据列会增加约 20% 的存储开销,但提供有用的调试信息
  • 索引优化:对于大型数据集,标签过滤器比字段过滤器更有效

报告问题

有关插件问题,请参阅插件存储库的 issues 页面

查找 InfluxDB 3 Core 的支持

加入 InfluxDB Discord 服务器 是获取 InfluxDB 3 Core 和 InfluxDB 3 Enterprise 支持的最佳途径。对于其他 InfluxDB 版本,请参阅 支持和反馈 选项。


此页面是否有帮助?

感谢您的反馈!


InfluxDB 3.8 新特性

InfluxDB 3.8 和 InfluxDB 3 Explorer 1.6 的主要增强功能。

查看博客文章

InfluxDB 3.8 现已适用于 Core 和 Enterprise 版本,同时发布了 InfluxDB 3 Explorer UI 的 1.6 版本。本次发布着重于操作成熟度,以及如何更轻松地部署、管理和可靠地运行 InfluxDB。

更多信息,请查看

InfluxDB Docker 的 latest 标签将指向 InfluxDB 3 Core

在 **2026 年 2 月 3 日**,InfluxDB Docker 镜像的 latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。

如果使用 Docker 来安装和运行 InfluxDB,latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。例如,如果使用 Docker 运行 InfluxDB v2,请将 latest 版本标签替换为 Docker pull 命令中的特定版本标签 — 例如

docker pull influxdb:2