文档

使用客户端库降采样数据

查询并降采样存储在 InfluxDB 中的时间序列数据,然后将降采样数据写回 InfluxDB。

本指南使用 PythonInfluxDB v3 Python 客户端库,但您可以使用您选择的运行时和任何可用的 InfluxDB v3 客户端库。本指南还假设您已经 设置了您的 Python 项目和虚拟环境

安装依赖项

使用 pip 安装以下依赖项

  • influxdb_client_3
  • pandas
pip install influxdb3-python pandas

准备 InfluxDB 数据库

降采样过程涉及两个 InfluxDB 数据库。每个数据库都有一个 保留期,指定数据在数据库中持续多久后过期并删除。通过使用两个数据库,您可以将未修改的高分辨率数据存储在保留期较短的数据库中,然后将降采样、低分辨率数据存储在保留期较长的数据库中。

确保您为以下各项都拥有数据库

  • 一个用于查询未修改的数据
  • 另一个用于写入降采样数据

有关创建数据库的信息,请参阅 创建数据库

创建 InfluxDB 客户端

使用 influxdb_client_3 模块中的 InfluxDBClient3 函数实例化两个 InfluxDB 客户端。

  • 一个配置为连接到你的 InfluxDB 数据库,用于存储未修改的数据。
  • 另一个配置为连接到你想要写入 下采样 数据的 InfluxDB 数据库。

为每个客户端提供以下凭据:

  • 主机:你的 InfluxDB 集群 URL(不带协议)
  • 令牌:具有查询和写入你想要查询和写入的数据库权限的 数据库令牌
  • 数据库名:你的 数据库名
from influxdb_client_3 import InfluxDBClient3
import pandas

# Instantiate an InfluxDBClient3 client configured for your unmodified database
influxdb_raw = InfluxDBClient3(
    host='cluster-host.com',
    token='
DATABASE_TOKEN
'
,
database='
RAW_DATABASE_NAME
'
) # Instantiate an InfluxDBClient3 client configured for your downsampled database. # When writing, the org= argument is required by the client (but ignored by InfluxDB). influxdb_downsampled = InfluxDBClient3( host='cluster-host.com', token='
DATABASE_TOKEN
'
,
database='
DOWNSAMPLED_DATABASE_NAME
'
,
org='' )

查询 InfluxDB

定义一个执行时间聚合查询的查询

最常用的下采样时间序列数据的方法是对时间间隔进行聚合或选择操作。例如,返回查询时间范围内每小时的平均值。

使用 SQL 或 InfluxQL 通过对时间间隔应用聚合或选择函数来下采样数据。

  1. SELECT 子句中

  2. SELECT 子句中包含一个 GROUP BY 子句,该子句根据来自 DATE_BIN 函数的间隔和任何其他查询标签对结果进行分组。下面的示例使用 GROUP BY 1SELECT 子句中的第一列进行分组。

  3. 包含一个按 time 排序的 ORDER BY 子句。

有关更多信息,请参阅 使用 SQL 聚合数据 - 通过应用基于间隔的聚合进行下采样

SELECT
  DATE_BIN(INTERVAL '1 hour', time) AS time,
  room,
  AVG(temp) AS temp,
  AVG(hum) AS hum,
  AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY time
  1. SELECT 子句中,对查询字段应用 聚合选择 函数。

  2. 包含一个按指定间隔分组的 GROUP BY 子句。

SELECT
  MEAN(temp) AS temp,
  MEAN(hum) AS hum,
  MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)

执行查询

  1. 将查询字符串分配给变量。

  2. 使用你已实例化的客户端的 query 方法从 InfluxDB 查询原始数据。提供以下参数。

    • 查询:要执行的查询字符串
    • 语言sqlinfluxql
  3. 使用 to_pandas 方法将返回的 Arrow 表转换为 Pandas DataFrame。

# ...

query = '''
SELECT
  DATE_BIN(INTERVAL '1 hour', time) AS time,
  room,
  AVG(temp) AS temp,
  AVG(hum) AS hum,
  AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY 1
'''

table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
# ...

query = '''
SELECT
  MEAN(temp) AS temp,
  MEAN(hum) AS hum,
  MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)
'''

table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()
\

将降采样数据写回 InfluxDB

  1. 对于 InfluxQL 查询结果,在将数据写回 InfluxDB 之前删除(dropiox::measurement 列。您将避免在以后查询下采样数据时出现测量名称冲突。

  2. 使用 sort_values 方法按 time 对 Pandas DataFrame 中的数据进行排序,以确保将数据写回 InfluxDB 的性能尽可能高效。

  3. 使用您的已实例化的降采样客户端write方法,将查询结果写回您的InfluxDB数据库中的降采样数据。包括以下参数:

    • record:包含降采样数据的Pandas DataFrame
    • data_frame_measurement_name:目标度量名称
    • data_frame_timestamp_column:包含每个数据点时间戳的列
    • data_frame_tag_columns:标签列的列表

    未在data_frame_tag_columnsdata_frame_timestamp_column参数中列出的列将作为字段写入InfluxDB。

# ...

data_frame = data_frame.sort_values(by="time")

influxdb_downsampled.write(
    record=data_frame,
    data_frame_measurement_name="home_ds",
    data_frame_timestamp_column="time",
    data_frame_tag_columns=['room']
)

完整的降采样脚本

from influxdb_client_3 import InfluxDBClient3
import pandas

influxdb_raw = InfluxDBClient3(
    host='cluster-host.com',
    token='
DATABASE_TOKEN
'
,
database='
RAW_DATABASE_NAME
'
) # When writing, the org= argument is required by the client (but ignored by InfluxDB). influxdb_downsampled = InfluxDBClient3( host='cluster-host.com', token='
DATABASE_TOKEN
'
,
database='
DOWNSAMPLED_DATABASE_NAME
'
,
org='' ) query = ''' SELECT DATE_BIN(INTERVAL '1 hour', time) AS time, room, AVG(temp) AS temp, AVG(hum) AS hum, AVG(co) AS co FROM home --In WHERE, time refers to <source_table>.time WHERE time >= now() - INTERVAL '24 hours' --1 refers to the DATE_BIN column GROUP BY 1, room ORDER BY 1 ''' table = influxdb_raw.query(query=query, language="sql") data_frame = table.to_pandas() data_frame = data_frame.sort_values(by="time") influxdb_downsampled.write( record=data_frame, data_frame_measurement_name="home_ds", data_frame_timestamp_column="time", data_frame_tag_columns=['room'] )
from influxdb_client_3 import InfluxDBClient3
import pandas

influxdb_raw = InfluxDBClient3(
    host='cluster-host.com',
    token='
DATABASE_TOKEN
'
,
database='
RAW_DATABASE_NAME
'
) # When writing, the org= argument is required by the client (but ignored by InfluxDB). influxdb_downsampled = InfluxDBClient3( host='cluster-host.com', token='
DATABASE_TOKEN
'
,
database='
DOWNSAMPLED_DATABASE_NAME
'
,
org='' ) query = ''' SELECT MEAN(temp) AS temp, MEAN(hum) AS hum, MEAN(co) AS co FROM home WHERE time >= now() - 24h GROUP BY time(1h) ''' # To prevent naming conflicts when querying downsampled data, # drop the iox::measurement column before writing the data # with the new measurement. data_frame = data_frame.drop(columns=['iox::measurement']) table = influxdb_raw.query(query=query, language="influxql") data_frame = table.to_pandas() data_frame = data_frame.sort_values(by="time") influxdb_downsampled.write( record=data_frame, data_frame_measurement_name="home_ds", data_frame_timestamp_column="time", data_frame_tag_columns=['room'] )

这个页面有帮助吗?

感谢您的反馈!


Flux的未来

Flux将进入维护模式。您可以继续按照目前的方式使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB v3增强功能和InfluxDB Clustered现已上市

新功能包括更快的查询性能和管理工具,这些工具推进了InfluxDB v3产品线。InfluxDB Clustered现已上市。

InfluxDB v3性能和功能

InfluxDB v3产品线在查询性能方面进行了重大改进,并提供了新的管理工具。这些改进包括用于监控您的InfluxDB集群健康情况的操作仪表板、InfluxDB Cloud Dedicated中的单一登录(SSO)支持和用于令牌和数据库的新管理API。

了解新的v3增强功能


InfluxDB Clustered上市

InfluxDB Clustered现已上市,并为您的自管理堆栈带来了InfluxDB v3的力量。

与我们谈谈InfluxDB Clustered