文档文档

使用 Quix Streams 对数据进行降采样

处理高容量数据时,常见的做法是在将数据提交到 InfluxDB 之前对其进行降采样,以减少随着时间推移数据累积造成的总体磁盘使用量。

本指南将逐步介绍如何创建一系列 Python 服务,这些服务从 InfluxDB v2 存储桶中摄取数据,对其进行降采样,并将数据发布到另一个 InfluxDB v2 存储桶。通过在时间窗口内聚合数据,然后将聚合值存储回 InfluxDB,您可以随着时间的推移减少磁盘使用量和成本。

本指南使用 InfluxDB v2 和 Quix Streams Python 客户端库,可以在本地运行,也可以使用免费试用版部署在 Quix Cloud 中。它假设您已经设置了一个 Python 项目和虚拟环境。

管道架构

下图说明了数据在降采样过程中如何在进程之间传递

InfluxDB v2 源生产者

降采样过程

InfluxDB v2 Sink 消费者

通常,将原始数据直接写入 Kafka 比先将原始数据写入 InfluxDB 更有效(本质上是从 “influxv2-data” 主题开始 Quix Streams 管道)。但是,本指南假设您已经在 InfluxDB 中有要降采样的原始数据。


  1. 设置先决条件
  2. 安装依赖项
  3. 准备 InfluxDB 存储桶
  4. 创建降采样逻辑
  5. 创建生产者和消费者客户端
    1. 创建生产者
    2. 创建消费者
  6. 运行机器数据生成器
  7. 获取完整的降采样代码文件

设置先决条件

本指南中描述的过程需要以下内容

安装依赖项

使用 pip 安装以下依赖项

  • influxdb-client (InfluxDB v2 客户端库)
  • quixstreams<2.5 (Quix Streams 客户端库)
  • pandas (数据分析和操作工具)
pip install influxdb-client pandas quixstreams<2.5 

准备 InfluxDB 存储桶

降采样过程涉及两个 InfluxDB 存储桶。每个存储桶都有一个 保留期,指定数据在过期和删除之前持续存在的时间。通过使用两个存储桶,您可以将未修改的高分辨率数据存储在保留期较短的存储桶中,然后将降采样的低分辨率数据存储在保留期较长的存储桶中。

确保您为以下各项都准备了一个存储桶

  • 一个用于从您的 InfluxDB v2 集群查询未修改的数据
  • 另一个用于写入降采样数据

创建降采样逻辑

此过程从输入 Kafka 主题读取原始数据,该主题存储从 InfluxDB v2 存储桶流式传输的数据,对其进行降采样,然后将其发送到输出主题,该主题稍后写回另一个存储桶。

  1. 使用 Quix Streams 库的 Application 类来初始化与 Kafka 主题的连接。

    from quixstreams import Application
    
    app = Application(consumer_group="downsampling-process", auto_offset_reset="earliest")
    input_topic = app.topic("input")
    output_topic = app.topic("output")
    # ...
    
  2. 配置 Quix Streams 内置的窗口函数,以创建一个滚动窗口,该窗口持续将数据降采样到 1 分钟的存储桶中。

    # ...
    target_field = "temperature" # The field that you want to downsample.
    
    def custom_ts_extractor(value):
        # ...
        # truncated for brevity - custom code that defines the "time_recorded"
        # field as the timestamp to use for windowing...
    
    topic = app.topic(input_topic, timestamp_extractor=custom_ts_extractor)
    
    sdf = (
        sdf.apply(lambda value: value[target_field])  # Extract temperature values
        .tumbling_window(timedelta(minutes=1))        # 1-minute tumbling windows
        .mean()                                       # Calculate average temperature
        .final()                                      # Emit results at window completion
    )
    
    sdf = sdf.apply(
        lambda value: {
            "time": value["end"],                  # End of the window
            "temperature_avg": value["value"],     # Average temperature
        }
    )
    
    sdf.to_topic(output_topic) # Output results to the "downsampled" topic
    # ...
    

结果将流式传输到 Kafka 主题 downsampled

注意:“sdf” 代表 “流式 Dataframe”。

您可以在 Quix GitHub 存储库 中找到此过程的完整代码。

创建生产者和消费者客户端

使用 influxdb_clientquixstreams 模块实例化两个客户端,它们与 InfluxDB 和 Kafka 交互

  • 一个 生产者 客户端,配置为从您的 InfluxDB 存储桶读取 未修改的 数据,并将该数据 生产 到 Kafka。
  • 一个 消费者 客户端,配置为从 Kafka 消费 数据,并将 降采样 的数据写入到相应的 InfluxDB 存储桶。

创建生产者

为生产者提供以下凭据

  • INFLUXDB_HOST: InfluxDB URL (不带协议)
  • INFLUXDB_ORG: InfluxDB 组织名称
  • INFLUXDB_TOKEN: InfluxDB API 令牌,具有对您要查询和写入的存储桶的读取和写入权限。
  • INFLUXDB_BUCKET: InfluxDB 存储桶名称

生产者以特定间隔从 InfluxDB 查询新数据。它将原始数据写入名为 influxv2-data 的 Kafka 主题。

from quixstreams import Application
import influxdb_client
# Create a Quix Application
app = Application(consumer_group="influxdbv2_migrate", auto_create_topics=True)
# Define the topic using the "output" environment variable
topic = app.topic(os.getenv("output", "influxv2-data"))
# Create an InfluxDB v2 client
influxdb2_client = influxdb_client.InfluxDBClient(token=os.environ["INFLUXDB_TOKEN"],
                        org=os.environ["INFLUXDB_ORG"],
                        url=os.environ["INFLUXDB_HOST"])

## ... remaining code trunctated for brevity ...

# Function to fetch data from InfluxDB
# It runs in a continuous loop, periodically fetching data based on the interval.
def get_data():
    # Run in a loop until the main thread is terminated
    while run:
        try:            
            # Query InfluxDB 2.0 using flux
            flux_query = f'''
            from(bucket: "{bucket}")
                |> range(start: -{interval})
                |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
            '''
            logger.info(f"Sending query: {flux_query}")

## ... remaining code trunctated for brevity ...

# Create a pre-configured Producer object.
with app.get_producer() as producer:
    for res in get_data():
        # Get the data from InfluxDB
        records = json.loads(res)
        for index, obj in enumerate(records):
            logger.info(f"Produced message with key:{message_key}, value:{obj}")
            # Publish the data to the Kafka topic
            producer.produce(
                topic=topic.name,
                key=message_key,
                value=obj,
            ) 

您可以在 Quix GitHub 存储库 中找到此过程的完整代码。

创建消费者

与之前一样,为消费者提供以下凭据

  • INFLUXDB_HOST: InfluxDB URL (不带协议)
  • INFLUXDB_ORG: InfluxDB 组织名称
  • INFLUXDB_TOKEN: InfluxDB API 令牌,具有对您要查询和写入的存储桶的读取和写入权限。
  • INFLUXDB_BUCKET: InfluxDB 存储桶名称

注意:这些将是您的 InfluxDB v2 凭据。

此过程从 Kafka 主题 downsampled-data 读取消息,并将每条消息作为点字典写回 InfluxDB。

from quixstreams import Application, State
from influxdb_client import InfluxDBClient, Point

# Create a Quix platform-specific application instead
app = Application(consumer_group=consumer_group_name, auto_offset_reset="earliest", use_changelog_topics=False)

input_topic = app.topic(os.getenv("input", "input-data"))

# Initialize InfluxDB v2 client
influx2_client = InfluxDBClient(url=localhost:8086,
                                token=
API_TOKEN
,
org=
ORG_NAME
)
## ... remaining code trunctated for brevity ... def send_data_to_influx(message: dict, state: State): global last_write_time_ns, points_buffer, service_start_state try: ## ... code trunctated for brevity ... # Check if it's time to write the batch # 10k records have accumulated or 15 seconds have passed if len(points_buffer) >= 10000 or int(time() * 1e9) - last_write_time_ns >= 15e9: with influx2_client.write_api() as write_api: logger.info(f"Writing batch of {len(points_buffer)} points written to InfluxDB.") write_api.write(influx_bucket, influx_org, points_buffer) # Clear the buffer and update the last write time points_buffer = [] last_write_time_ns = int(time() * 1e9) ## ... code trunctated for brevity ... except Exception as e: logger.info(f"{str(datetime.utcnow())}: Write failed") logger.info(e) ## ... code trunctated for brevity ... # We use Quix Streams StreamingDataframe (SDF) to handle every message # in the Kafka topic by writing it to InfluxDB sdf = app.dataframe(input_topic) sdf = sdf.update(send_data_to_influx, stateful=True) if __name__ == "__main__": logger.info("Starting application") app.run(sdf)

您可以在 Quix GitHub 存储库 中找到此过程的完整代码。

运行机器数据生成器

现在是时候运行机器数据生成器代码了,它将用数据填充您的源存储桶,这些数据将由 生产者 读取。

从 GitHub 存储库中的 Machine data to InfluxDB 文件夹运行 main.py

获取完整的降采样代码文件

要获取本教程中引用的完整文件集,请克隆 Quix “降采样” 存储库。

克隆降采样模板存储库

要克隆降采样模板,请在命令行中输入以下命令

git clone https://github.com/quixio/template-invluxdbv2-tsm-downsampling.git

此存储库包含以下文件夹,这些文件夹存储整个管道的不同部分

  • Machine Data to InfluxDB:一个生成合成机器数据并将其写入 InfluxDB 的脚本。如果您还没有自己的数据,或者只想首先使用测试数据,这将非常有用。

    • 它每 250 毫秒生成一次读数。
    • 此脚本最初来自 InfluxCommunity 存储库,但已进行调整以直接写入 InfluxDB,而不是使用 MQTT 代理。
  • InfluxDB v2 数据源:一种服务,用于以特定间隔从 InfluxDB 查询新数据。它配置为查找先前提到的合成机器数据生成器生成的测量值。它将原始数据写入名为 “influxv2-data” 的 Kafka 主题。

  • Downsampler:一种服务,对来自 InfluxDB 的数据执行 1 分钟的滚动窗口操作,并每分钟发出 “temperature” 读数的平均值。它将输出写入 “downsampled” Kafka 主题。

  • InfluxDB v2 数据 Sink:一种服务,从 “downsampled” 主题读取数据,并将降采样的记录作为点写回 InfluxDB。


此页面是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在这样使用它,而无需对您的代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速的最新数据引擎,可实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 在 Core 的基础上构建,增加了高可用性、读取副本、增强的安全性以及数据压缩,从而实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看