使用客户端库对数据进行降采样
查询和降采样存储在 InfluxDB 中的时间序列数据,并将降采样数据写回 InfluxDB。
本指南使用 Python 和 InfluxDB 3 Python 客户端库,但您可以使用您选择的运行时和任何可用的 InfluxDB 3 客户端库。本指南还假设您已经 设置了您的 Python 项目和虚拟环境。
安装依赖项
使用 pip
安装以下依赖项
influxdb_client_3
pandas
pip install influxdb3-python pandas
准备 InfluxDB 数据库
降采样过程涉及两个 InfluxDB 数据库。每个数据库都有一个 保留期,用于指定数据在数据库中持续存在多长时间,然后过期并被删除。通过使用两个数据库,您可以将未修改的高分辨率数据存储在保留期较短的数据库中,然后将降采样后的低分辨率数据存储在保留期较长的数据库中。
确保您为以下各项准备了数据库
- 一个用于查询未修改的数据
- 另一个用于写入降采样数据
有关创建数据库的信息,请参阅 创建数据库。
创建 InfluxDB 客户端
使用 influxdb_client_3
模块中的 InfluxDBClient3
函数实例化两个 InfluxDB 客户端
- 一个配置为连接到包含未修改数据的 InfluxDB 数据库。
- 另一个配置为连接到您要写入降采样数据的 InfluxDB 数据库。
为每个客户端提供以下凭据
from influxdb_client_3 import InfluxDBClient3
import pandas
# Instantiate an InfluxDBClient3 client configured for your unmodified database
influxdb_raw = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='RAW_DATABASE_NAME'
)
# Instantiate an InfluxDBClient3 client configured for your downsampled database.
# When writing, the org= argument is required by the client (but ignored by InfluxDB).
influxdb_downsampled = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='DOWNSAMPLED_DATABASE_NAME',
org=''
)
查询 InfluxDB
定义执行基于时间的聚合的查询
用于降采样时间序列数据最常用的方法是对时间间隔执行聚合或选择器操作。例如,返回查询时间范围内每小时的平均值。
使用 SQL 或 InfluxQL 通过将聚合或选择器函数应用于时间间隔来降采样数据。
在
SELECT
子句中- 使用
DATE_BIN
根据行的时间戳将每行分配到一个间隔,并使用分配的间隔时间戳更新time
列。您还可以使用DATE_BIN_GAPFILL
来填充由没有数据的间隔创建的任何空白(请参阅 使用 SQL 填充数据中的空白)。 - 将 聚合 或 选择器 函数应用于每个查询字段。
- 使用
包含一个
GROUP BY
子句,该子句按SELECT
子句中DATE_BIN
函数返回的间隔以及任何其他查询的标签进行分组。以下示例使用GROUP BY 1
按SELECT
子句中的第一列进行分组。包含一个
ORDER BY
子句,该子句按time
对数据进行排序。
有关更多信息,请参阅 使用 SQL 聚合数据 - 通过应用基于间隔的聚合来降采样数据。
SELECT
DATE_BIN(INTERVAL '1 hour', time) AS time,
room,
AVG(temp) AS temp,
AVG(hum) AS hum,
AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY time
SELECT
MEAN(temp) AS temp,
MEAN(hum) AS hum,
MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)
执行查询
将查询字符串分配给变量。
使用 实例化的客户端 的
query
方法从 InfluxDB 查询原始数据。提供以下参数。- query:要执行的查询字符串
- language:
sql
或influxql
使用
to_pandas
方法将返回的 Arrow 表格转换为 Pandas DataFrame。
# ...
query = '''
SELECT
DATE_BIN(INTERVAL '1 hour', time) AS time,
room,
AVG(temp) AS temp,
AVG(hum) AS hum,
AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY 1
'''
table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
# ...
query = '''
SELECT
MEAN(temp) AS temp,
MEAN(hum) AS hum,
MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)
'''
table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()
将降采样数据写回 InfluxDB
对于 InfluxQL 查询结果,在将数据写回 InfluxDB 之前,删除(
drop
)iox::measurement
列。这样可以避免稍后查询降采样数据时出现 measurement 名称冲突。使用
sort_values
方法按time
对 Pandas DataFrame 中的数据进行排序,以确保尽可能高效地写回 InfluxDB。使用 实例化的降采样客户端 的
write
方法将查询结果写回到 InfluxDB 数据库中以用于降采样数据。包括以下参数- record:包含降采样数据的 Pandas DataFrame
- data_frame_measurement_name:目标 measurement 名称
- data_frame_timestamp_column:包含每个数据点时间戳的列
- data_frame_tag_columns:标签 列列表
未在 data_frame_tag_columns 或 data_frame_timestamp_column 参数中列出的列将作为 字段 写入 InfluxDB。
# ...
data_frame = data_frame.sort_values(by="time")
influxdb_downsampled.write(
record=data_frame,
data_frame_measurement_name="home_ds",
data_frame_timestamp_column="time",
data_frame_tag_columns=['room']
)
完整降采样脚本
from influxdb_client_3 import InfluxDBClient3
import pandas
influxdb_raw = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='RAW_DATABASE_NAME'
)
# When writing, the org= argument is required by the client (but ignored by InfluxDB).
influxdb_downsampled = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='DOWNSAMPLED_DATABASE_NAME',
org=''
)
query = '''
SELECT
DATE_BIN(INTERVAL '1 hour', time) AS time,
room,
AVG(temp) AS temp,
AVG(hum) AS hum,
AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY 1
'''
table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
data_frame = data_frame.sort_values(by="time")
influxdb_downsampled.write(
record=data_frame,
data_frame_measurement_name="home_ds",
data_frame_timestamp_column="time",
data_frame_tag_columns=['room']
)
from influxdb_client_3 import InfluxDBClient3
import pandas
influxdb_raw = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='RAW_DATABASE_NAME'
)
# When writing, the org= argument is required by the client (but ignored by InfluxDB).
influxdb_downsampled = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='DOWNSAMPLED_DATABASE_NAME',
org=''
)
query = '''
SELECT
MEAN(temp) AS temp,
MEAN(hum) AS hum,
MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)
'''
# To prevent naming conflicts when querying downsampled data,
# drop the iox::measurement column before writing the data
# with the new measurement.
data_frame = data_frame.drop(columns=['iox::measurement'])
table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()
data_frame = data_frame.sort_values(by="time")
influxdb_downsampled.write(
record=data_frame,
data_frame_measurement_name="home_ds",
data_frame_timestamp_column="time",
data_frame_tag_columns=['room']
)
此页面是否对您有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一份子!我们欢迎并鼓励您提供关于 InfluxDB 集群版和本文档的反馈和错误报告。如需获得支持,请使用以下资源
拥有年度合同或支持合同的客户 可以 联系 InfluxData 支持。