文档文档

使用客户端库对数据进行降采样

查询和降采样存储在 InfluxDB 中的时间序列数据,并将降采样数据写回 InfluxDB。

本指南使用 PythonInfluxDB 3 Python 客户端库,但您可以使用您选择的运行时和任何可用的 InfluxDB 3 客户端库。本指南还假设您已经 设置了您的 Python 项目和虚拟环境

安装依赖项

使用 pip 安装以下依赖项

  • influxdb_client_3
  • pandas
pip install influxdb3-python pandas

准备 InfluxDB 数据库

降采样过程涉及两个 InfluxDB 数据库。每个数据库都有一个 保留期,用于指定数据在数据库中持续存在多长时间,然后过期并被删除。通过使用两个数据库,您可以将未修改的高分辨率数据存储在保留期较短的数据库中,然后将降采样后的低分辨率数据存储在保留期较长的数据库中。

确保您为以下各项准备了数据库

  • 一个用于查询未修改的数据
  • 另一个用于写入降采样数据

有关创建数据库的信息,请参阅 创建数据库

创建 InfluxDB 客户端

使用 influxdb_client_3 模块中的 InfluxDBClient3 函数实例化两个 InfluxDB 客户端

  • 一个配置为连接到包含未修改数据的 InfluxDB 数据库。
  • 另一个配置为连接到您要写入降采样数据的 InfluxDB 数据库。

为每个客户端提供以下凭据

  • host:您的 InfluxDB 集群 URL (不带协议)
  • token:一个 数据库令牌,具有对您要查询和写入的数据库的读取和写入权限。
  • database:您的 数据库 名称
from influxdb_client_3 import InfluxDBClient3
import pandas

# Instantiate an InfluxDBClient3 client configured for your unmodified database
influxdb_raw = InfluxDBClient3(
    host='cluster-host.com',
    token='
DATABASE_TOKEN
'
,
database='
RAW_DATABASE_NAME
'
) # Instantiate an InfluxDBClient3 client configured for your downsampled database. # When writing, the org= argument is required by the client (but ignored by InfluxDB). influxdb_downsampled = InfluxDBClient3( host='cluster-host.com', token='
DATABASE_TOKEN
'
,
database='
DOWNSAMPLED_DATABASE_NAME
'
,
org='' )

查询 InfluxDB

定义执行基于时间的聚合的查询

用于降采样时间序列数据最常用的方法是对时间间隔执行聚合或选择器操作。例如,返回查询时间范围内每小时的平均值。

使用 SQL 或 InfluxQL 通过将聚合或选择器函数应用于时间间隔来降采样数据。

  1. SELECT 子句中

  2. 包含一个 GROUP BY 子句,该子句按 SELECT 子句中 DATE_BIN 函数返回的间隔以及任何其他查询的标签进行分组。以下示例使用 GROUP BY 1SELECT 子句中的第一列进行分组。

  3. 包含一个 ORDER BY 子句,该子句按 time 对数据进行排序。

有关更多信息,请参阅 使用 SQL 聚合数据 - 通过应用基于间隔的聚合来降采样数据

SELECT
  DATE_BIN(INTERVAL '1 hour', time) AS time,
  room,
  AVG(temp) AS temp,
  AVG(hum) AS hum,
  AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY time
  1. SELECT 子句中,将 聚合选择器 函数应用于查询字段。

  2. 包含一个 GROUP BY 子句,该子句按指定间隔的 time() 进行分组。

SELECT
  MEAN(temp) AS temp,
  MEAN(hum) AS hum,
  MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)

执行查询

  1. 将查询字符串分配给变量。

  2. 使用 实例化的客户端query 方法从 InfluxDB 查询原始数据。提供以下参数。

    • query:要执行的查询字符串
    • languagesqlinfluxql
  3. 使用 to_pandas 方法将返回的 Arrow 表格转换为 Pandas DataFrame。

# ...

query = '''
SELECT
  DATE_BIN(INTERVAL '1 hour', time) AS time,
  room,
  AVG(temp) AS temp,
  AVG(hum) AS hum,
  AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY 1
'''

table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
# ...

query = '''
SELECT
  MEAN(temp) AS temp,
  MEAN(hum) AS hum,
  MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)
'''

table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()
\

将降采样数据写回 InfluxDB

  1. 对于 InfluxQL 查询结果,在将数据写回 InfluxDB 之前,删除(dropiox::measurement 列。这样可以避免稍后查询降采样数据时出现 measurement 名称冲突。

  2. 使用 sort_values 方法按 time 对 Pandas DataFrame 中的数据进行排序,以确保尽可能高效地写回 InfluxDB。

  3. 使用 实例化的降采样客户端write 方法将查询结果写回到 InfluxDB 数据库中以用于降采样数据。包括以下参数

    • record:包含降采样数据的 Pandas DataFrame
    • data_frame_measurement_name:目标 measurement 名称
    • data_frame_timestamp_column:包含每个数据点时间戳的列
    • data_frame_tag_columns标签 列列表

    未在 data_frame_tag_columnsdata_frame_timestamp_column 参数中列出的列将作为 字段 写入 InfluxDB。

# ...

data_frame = data_frame.sort_values(by="time")

influxdb_downsampled.write(
    record=data_frame,
    data_frame_measurement_name="home_ds",
    data_frame_timestamp_column="time",
    data_frame_tag_columns=['room']
)

完整降采样脚本

from influxdb_client_3 import InfluxDBClient3
import pandas

influxdb_raw = InfluxDBClient3(
    host='cluster-host.com',
    token='
DATABASE_TOKEN
'
,
database='
RAW_DATABASE_NAME
'
) # When writing, the org= argument is required by the client (but ignored by InfluxDB). influxdb_downsampled = InfluxDBClient3( host='cluster-host.com', token='
DATABASE_TOKEN
'
,
database='
DOWNSAMPLED_DATABASE_NAME
'
,
org='' ) query = ''' SELECT DATE_BIN(INTERVAL '1 hour', time) AS time, room, AVG(temp) AS temp, AVG(hum) AS hum, AVG(co) AS co FROM home --In WHERE, time refers to <source_table>.time WHERE time >= now() - INTERVAL '24 hours' --1 refers to the DATE_BIN column GROUP BY 1, room ORDER BY 1 ''' table = influxdb_raw.query(query=query, language="sql") data_frame = table.to_pandas() data_frame = data_frame.sort_values(by="time") influxdb_downsampled.write( record=data_frame, data_frame_measurement_name="home_ds", data_frame_timestamp_column="time", data_frame_tag_columns=['room'] )
from influxdb_client_3 import InfluxDBClient3
import pandas

influxdb_raw = InfluxDBClient3(
    host='cluster-host.com',
    token='
DATABASE_TOKEN
'
,
database='
RAW_DATABASE_NAME
'
) # When writing, the org= argument is required by the client (but ignored by InfluxDB). influxdb_downsampled = InfluxDBClient3( host='cluster-host.com', token='
DATABASE_TOKEN
'
,
database='
DOWNSAMPLED_DATABASE_NAME
'
,
org='' ) query = ''' SELECT MEAN(temp) AS temp, MEAN(hum) AS hum, MEAN(co) AS co FROM home WHERE time >= now() - 24h GROUP BY time(1h) ''' # To prevent naming conflicts when querying downsampled data, # drop the iox::measurement column before writing the data # with the new measurement. data_frame = data_frame.drop(columns=['iox::measurement']) table = influxdb_raw.query(query=query, language="influxql") data_frame = table.to_pandas() data_frame = data_frame.sort_values(by="time") influxdb_downsampled.write( record=data_frame, data_frame_measurement_name="home_ds", data_frame_timestamp_column="time", data_frame_tag_columns=['room'] )

此页面是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在这样使用它,而无需对代码进行任何更改。

阅读更多

InfluxDB 3 开源版现已公开发布 Alpha 版本

InfluxDB 3 开源版现已可用于 Alpha 测试,根据 MIT 或 Apache 2 许可获得许可。

我们正在发布两个作为 Alpha 版本一部分的产品。

InfluxDB 3 Core 是我们新的开源产品。它是用于时间序列和事件数据的最新数据引擎。InfluxDB 3 Enterprise 是一个商业版本,它建立在 Core 的基础上,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度的安全性。

有关如何开始使用的更多信息,请查看