使用客户端库降采样数据
查询并降采样存储在 InfluxDB 中的时间序列数据,然后将降采样数据写回 InfluxDB。
本指南使用 Python 和 InfluxDB v3 Python 客户端库,但您可以使用您选择的运行时和任何可用的 InfluxDB v3 客户端库。本指南还假设您已经 设置了您的 Python 项目和虚拟环境。
安装依赖项
使用 pip
安装以下依赖项
influxdb_client_3
pandas
pip install influxdb3-python pandas
准备 InfluxDB 数据库
降采样过程涉及两个 InfluxDB 数据库。每个数据库都有一个 保留期,指定数据在数据库中持续多久后过期并删除。通过使用两个数据库,您可以将未修改的高分辨率数据存储在保留期较短的数据库中,然后将降采样、低分辨率数据存储在保留期较长的数据库中。
确保您为以下各项都拥有数据库
- 一个用于查询未修改的数据
- 另一个用于写入降采样数据
有关创建数据库的信息,请参阅 创建数据库。
创建 InfluxDB 客户端
使用 influxdb_client_3
模块中的 InfluxDBClient3
函数实例化两个 InfluxDB 客户端。
- 一个配置为连接到你的 InfluxDB 数据库,用于存储未修改的数据。
- 另一个配置为连接到你想要写入 下采样 数据的 InfluxDB 数据库。
为每个客户端提供以下凭据:
from influxdb_client_3 import InfluxDBClient3
import pandas
# Instantiate an InfluxDBClient3 client configured for your unmodified database
influxdb_raw = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='RAW_DATABASE_NAME'
)
# Instantiate an InfluxDBClient3 client configured for your downsampled database.
# When writing, the org= argument is required by the client (but ignored by InfluxDB).
influxdb_downsampled = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='DOWNSAMPLED_DATABASE_NAME',
org=''
)
查询 InfluxDB
定义一个执行时间聚合查询的查询
最常用的下采样时间序列数据的方法是对时间间隔进行聚合或选择操作。例如,返回查询时间范围内每小时的平均值。
使用 SQL 或 InfluxQL 通过对时间间隔应用聚合或选择函数来下采样数据。
在
SELECT
子句中- 使用
DATE_BIN
将每一行分配到基于行时间戳的间隔,并使用分配的间隔时间戳更新time
列。您还可以使用DATE_BIN_GAPFILL
用没有数据的间隔填充任何间隔(请参阅使用 SQL 填充数据中的间隙)。 - 对每个查询的字段应用 聚合 或 选择 函数。
- 使用
在
SELECT
子句中包含一个GROUP BY
子句,该子句根据来自DATE_BIN
函数的间隔和任何其他查询标签对结果进行分组。下面的示例使用GROUP BY 1
对SELECT
子句中的第一列进行分组。包含一个按
time
排序的ORDER BY
子句。
有关更多信息,请参阅 使用 SQL 聚合数据 - 通过应用基于间隔的聚合进行下采样。
SELECT
DATE_BIN(INTERVAL '1 hour', time) AS time,
room,
AVG(temp) AS temp,
AVG(hum) AS hum,
AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY time
SELECT
MEAN(temp) AS temp,
MEAN(hum) AS hum,
MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)
执行查询
将查询字符串分配给变量。
使用你已实例化的客户端的
query
方法从 InfluxDB 查询原始数据。提供以下参数。- 查询:要执行的查询字符串
- 语言:
sql
或influxql
使用
to_pandas
方法将返回的 Arrow 表转换为 Pandas DataFrame。
# ...
query = '''
SELECT
DATE_BIN(INTERVAL '1 hour', time) AS time,
room,
AVG(temp) AS temp,
AVG(hum) AS hum,
AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY 1
'''
table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
# ...
query = '''
SELECT
MEAN(temp) AS temp,
MEAN(hum) AS hum,
MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)
'''
table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()
将降采样数据写回 InfluxDB
对于 InfluxQL 查询结果,在将数据写回 InfluxDB 之前删除(
drop
)iox::measurement
列。您将避免在以后查询下采样数据时出现测量名称冲突。使用
sort_values
方法按time
对 Pandas DataFrame 中的数据进行排序,以确保将数据写回 InfluxDB 的性能尽可能高效。使用您的已实例化的降采样客户端的
write
方法,将查询结果写回您的InfluxDB数据库中的降采样数据。包括以下参数:- record:包含降采样数据的Pandas DataFrame
- data_frame_measurement_name:目标度量名称
- data_frame_timestamp_column:包含每个数据点时间戳的列
- data_frame_tag_columns:标签列的列表
未在data_frame_tag_columns或data_frame_timestamp_column参数中列出的列将作为字段写入InfluxDB。
# ...
data_frame = data_frame.sort_values(by="time")
influxdb_downsampled.write(
record=data_frame,
data_frame_measurement_name="home_ds",
data_frame_timestamp_column="time",
data_frame_tag_columns=['room']
)
完整的降采样脚本
from influxdb_client_3 import InfluxDBClient3
import pandas
influxdb_raw = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='RAW_DATABASE_NAME'
)
# When writing, the org= argument is required by the client (but ignored by InfluxDB).
influxdb_downsampled = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='DOWNSAMPLED_DATABASE_NAME',
org=''
)
query = '''
SELECT
DATE_BIN(INTERVAL '1 hour', time) AS time,
room,
AVG(temp) AS temp,
AVG(hum) AS hum,
AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY 1
'''
table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
data_frame = data_frame.sort_values(by="time")
influxdb_downsampled.write(
record=data_frame,
data_frame_measurement_name="home_ds",
data_frame_timestamp_column="time",
data_frame_tag_columns=['room']
)
from influxdb_client_3 import InfluxDBClient3
import pandas
influxdb_raw = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='RAW_DATABASE_NAME'
)
# When writing, the org= argument is required by the client (but ignored by InfluxDB).
influxdb_downsampled = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='DOWNSAMPLED_DATABASE_NAME',
org=''
)
query = '''
SELECT
MEAN(temp) AS temp,
MEAN(hum) AS hum,
MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)
'''
# To prevent naming conflicts when querying downsampled data,
# drop the iox::measurement column before writing the data
# with the new measurement.
data_frame = data_frame.drop(columns=['iox::measurement'])
table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()
data_frame = data_frame.sort_values(by="time")
influxdb_downsampled.write(
record=data_frame,
data_frame_measurement_name="home_ds",
data_frame_timestamp_column="time",
data_frame_tag_columns=['room']
)
这个页面有帮助吗?
感谢您的反馈!