降采样插件
Downsampler 插件支持 InfluxDB 3 中的按时间进行数据聚合和降采样。通过使用 avg、sum、min、max、derivative 或 median 等函数,在指定的时间间隔内聚合测量数据,从而减少数据量。该插件同时支持对历史数据进行计划性批量处理和通过 HTTP 请求进行按需降采样。每个降采样后的记录都包含有关压缩的原始数据点的元数据。
配置
必需参数
| 参数 | 类型 | 默认 | 描述 |
|---|---|---|---|
source_measurement | string | 必需 | 包含要降采样数据的源测量 |
target_measurement | string | 必需 | 降采样数据的目标测量 |
window | string | required (仅限计划性) | 每个降采样作业的时间窗口。格式:<数字><单位>(例如,"1h"、"1d") |
聚合参数
| 参数 | 类型 | 默认 | 描述 |
|---|---|---|---|
interval | string | "10min" | 降采样的时间间隔。格式:<数字><单位>(例如,"10min"、"2h"、"1d") |
calculations | string | “avg” | 聚合函数。单个函数或以点分隔的字段:聚合对 |
specific_fields | string | 所有字段 | 要降采样的字段的点分隔列表(例如,"co.temperature") |
excluded_fields | string | none | 要从降采样中排除的字段的点分隔列表 |
过滤参数
| 参数 | 类型 | 默认 | 描述 |
|---|---|---|---|
tag_values | string | none | 标签过滤器。格式:tag:value1@value2@value3 表示多个值 |
offset | string | “0” | 应用于窗口的时间偏移量 |
高级参数
| 参数 | 类型 | 默认 | 描述 |
|---|---|---|---|
target_database | string | “default” | 存储降采样数据的数据库 |
max_retries | integer | 5 | 写入操作的最大重试次数 |
batch_size | string | “30d” | 批量处理的时间间隔(仅限 HTTP 模式) |
TOML 配置
| 参数 | 类型 | 默认 | 描述 |
|---|---|---|---|
config_file_path | string | none | 相对于 PLUGIN_DIR 的 TOML 配置文件路径 (TOML 配置必需) |
要使用 TOML 配置文件,请设置 PLUGIN_DIR 环境变量并在触发器参数中指定 config_file_path。 这与启动 InfluxDB 3 时的 --plugin-dir 标志是附加的。
示例 TOML 配置
downsampling_config_scheduler.toml
有关使用 TOML 配置文件,请参阅 influxdb3_plugins /README.md 中的“使用 TOML 配置文件”部分。
模式管理
每个降采样记录包含三个额外的元数据列
record_count—压缩到此单个降采样行中的原始点数time_from—时间间隔内原始点中的最小时间戳time_to—时间间隔内原始点中的最大时间戳
安装步骤
启用处理引擎来启动 InfluxDB 3 核心(
--plugin-dir /path/to/plugins)influxdb3 serve \ --node-id node0 \ --object-store file \ --data-dir ~/.influxdb3 \ --plugin-dir ~/.plugins此插件无需额外的 Python 包。
触发器设置
计划性降采样
定期对历史数据运行降采样
influxdb3 create trigger \
--database mydb \
--plugin-filename gh:influxdata/downsampler/downsampler.py \
--trigger-spec "every:1h" \
--trigger-arguments 'source_measurement=cpu_metrics,target_measurement=cpu_hourly,interval=1h,window=6h,calculations=avg,specific_fields=usage_user.usage_system' \
cpu_hourly_downsample按需降采样
通过 HTTP 请求触发降采样
influxdb3 create trigger \
--database mydb \
--plugin-filename gh:influxdata/downsampler/downsampler.py \
--trigger-spec "request:downsample" \
downsample_api示例用法
示例 1:CPU 指标每小时聚合
将 CPU 使用率数据从 1 分钟间隔降采样到每小时平均值
# Create the trigger
influxdb3 create trigger \
--database system_metrics \
--plugin-filename gh:influxdata/downsampler/downsampler.py \
--trigger-spec "every:1h" \
--trigger-arguments 'source_measurement=cpu,target_measurement=cpu_hourly,interval=1h,window=6h,calculations=avg,specific_fields=usage_user.usage_system.usage_idle' \
cpu_hourly_downsample
# Write test data
influxdb3 write \
--database system_metrics \
"cpu,host=server1 usage_user=45.2,usage_system=12.1,usage_idle=42.7"
# Query downsampled data (after trigger runs)
influxdb3 query \
--database system_metrics \
"SELECT * FROM cpu_hourly WHERE time >= now() - 1d"预期输出
host | usage_user | usage_system | usage_idle | record_count | time_from | time_to | time
--------|------------|--------------|------------|--------------|---------------------|---------------------|-----
server1 | 44.8 | 11.9 | 43.3 | 60 | 2024-01-01T00:00:00Z| 2024-01-01T00:59:59Z| 2024-01-01T01:00:00Z聚合详情
- 之前:1 小时内的 60 个独立 CPU 测量
- 之后:1 个聚合测量,包含平均值和元数据
- 元数据显示原始记录数和时间范围
示例 2:具有不同函数的多个字段聚合
对不同字段应用不同的聚合函数
# Create trigger with field-specific aggregations
influxdb3 create trigger \
--database sensors \
--plugin-filename gh:influxdata/downsampler/downsampler.py \
--trigger-spec "every:10min" \
--trigger-arguments 'source_measurement=environment,target_measurement=environment_10min,interval=10min,window=30min,calculations=temperature:avg.humidity:avg.pressure:max' \
env_multi_agg
# Write data with various sensor readings
influxdb3 write \
--database sensors \
"environment,location=office temperature=22.5,humidity=45.2,pressure=1013.25"
# Query aggregated data
influxdb3 query \
--database sensors \
"SELECT * FROM environment_10min WHERE time >= now() - 1h"预期输出
location | temperature | humidity | pressure | record_count | time
---------|-------------|----------|----------|--------------|-----
office | 22.3 | 44.8 | 1015.1 | 10 | 2024-01-01T00:10:00Z示例 3:HTTP API 降采样与回填
使用 HTTP API 对历史数据进行按需降采样
# Send HTTP request for backfill downsampling
curl -X POST https://:8181/api/v3/engine/downsample \
--header "Authorization: Bearer YOUR_TOKEN" \
--data '{
"source_measurement": "metrics",
"target_measurement": "metrics_daily",
"target_database": "analytics",
"interval": "1d",
"batch_size": "7d",
"calculations": [["cpu_usage", "avg"], ["memory_usage", "max"], ["disk_usage", "avg"]],
"backfill_start": "2024-01-01T00:00:00Z",
"backfill_end": "2024-01-31T00:00:00Z",
"max_retries": 3
}'代码概述
文件
downsampler.py:包含计划性和 HTTP 触发降采样处理程序的 मुख्य 插件代码downsampling_config_scheduler.toml:计划性触发器的示例 TOML 配置文件
日志记录
日志存储在 _internal 数据库(或创建触发器的数据库)的 system.processing_engine_logs 表中。要查看日志
influxdb3 query --database _internal "SELECT * FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name'"日志列
- event_time:日志事件的时间戳(具有纳秒精度)
- trigger_name:生成日志的触发器名称
- log_level:严重性级别(INFO、WARN、ERROR)
- log_text:描述操作或错误的与唯一 task_id 相关的消息,用于可追溯性
主函数
process_scheduled_call(influxdb3_local, call_time, args)
处理计划性降采样任务。查询指定窗口内的历史数据并应用聚合函数。
关键操作
- 从参数或 TOML 文件解析配置
- 使用可选的标签过滤器查询源测量
- 使用指定的函数应用基于时间的聚合
- 写入带有元数据列的降采样数据
process_http_request(influxdb3_local, request_body, args)
处理 HTTP 触发的按需降采样。处理批量降采样,并为回填场景配置可配置的时间范围。
关键操作
- 解析 JSON 请求正文参数
- 处理可配置时间批次中的数据
- 对历史数据应用聚合函数
- 返回处理统计信息和结果
aggregate_data(data, interval, calculations)
核心聚合引擎,将统计函数应用于时间序列数据。
支持的聚合函数
avg:平均值sum:值之和min:最小值max:最大值derivative:变化率median:中位数
故障排除
常见问题
问题:目标测量中没有数据
解决方案:检查源测量是否存在并在指定的时间窗口内包含数据
influxdb3 query --database mydb "SELECT COUNT(*) FROM source_measurement WHERE time >= now() - 1h"问题:聚合函数不起作用
解决方案:验证字段名称和聚合语法。使用 SHOW FIELD KEYS 检查可用字段
influxdb3 query --database mydb "SHOW FIELD KEYS FROM source_measurement"问题:标签过滤器未应用
解决方案:检查标签值格式。使用 @ 分隔符表示多个值
--trigger-arguments 'tag_values=host:server1@server2@server3'问题:HTTP 端点无法访问
解决方案:验证触发器是否使用正确的请求规范创建
influxdb3 list triggers --database mydb调试技巧
检查执行日志(通过 task ID 过滤)
influxdb3 query --database _internal \ "SELECT * FROM system.processing_engine_logs WHERE log_text LIKE '%task_id%' ORDER BY event_time DESC LIMIT 10"使用较小的时间窗口进行测试以进行调试
--trigger-arguments 'window=5min,interval=1min'在聚合之前验证字段类型
influxdb3 query --database mydb "SELECT * FROM source_measurement LIMIT 1"
性能注意事项
- 批量处理:对 HTTP 请求使用适当的 batch_size 以平衡内存使用和性能
- 字段过滤:使用 specific_fields 仅处理必要的数据
- 重试逻辑:根据网络可靠性配置 max_retries
- 元数据开销:元数据列会增加约 20% 的存储开销,但提供有用的调试信息
- 索引优化:对于大型数据集,标签过滤器比字段过滤器更有效
报告问题
有关插件问题,请参阅插件存储库的 issues 页面。
查找 InfluxDB 3 Core 的支持
加入 InfluxDB Discord 服务器 是获取 InfluxDB 3 Core 和 InfluxDB 3 Enterprise 支持的最佳途径。对于其他 InfluxDB 版本,请参阅 支持和反馈 选项。
此页面是否有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一员!我们欢迎并鼓励您对 InfluxDB 3 Core 和本文档提供反馈和错误报告。要获得支持,请使用以下资源
具有年度合同或支持合同的客户可以 联系 InfluxData 支持。