文档

使用Quix Streams降采样数据

使用Quix Streams查询存储在InfluxDB中并定期写入Kafka的时间序列数据,持续降采样,然后将降采样后的数据写回到InfluxDB。Quix Streams是一个开源的Python库,用于构建与Apache Kafka集成的容器化流处理应用程序。它被设计为运行在作为服务连续处理数据流并将结果流式传输到Kafka主题的服务。您可以在本地使用,配合本地Kafka安装,或使用Quix Cloud进行免费试用。

本指南使用PythonInfluxDB v3 Python客户端库,但您可以使用您选择的运行时和任何可用的InfluxDB v3客户端库。本指南还假设您已经设置好您的Python项目和虚拟环境

管道架构

以下图示展示了数据在降采样过程中如何在不同进程间传递

InfluxDB v3源生产者

降采样过程

InfluxDB v3目标消费者

通常直接将原始数据写入Kafka比先写入InfluxDB(本质上是从“raw-data”主题开始Quix Streams管道)更有效。然而,本指南假设您已经有在InfluxDB中的原始数据,您想要对其进行降采样。


  1. 设置先决条件
  2. 安装依赖项
  3. 准备InfluxDB存储桶
  4. 创建降采样逻辑
  5. 创建生产者和消费者客户端
    1. 创建生产者
    2. 创建消费者
  6. 获取完整的降采样代码文件

设置先决条件

本指南中描述的过程需要以下条件

  • 一个已准备好的InfluxDB集群,其中包含可用于降采样的数据。
  • 一个Quix Cloud账户或本地Apache Kafka或Red Panda安装。
  • 熟悉基本的Python和Docker概念。

安装依赖项

使用pip安装以下依赖项

  • influxdb_client_3
  • quixstreams<2.5
  • pandas
pip install influxdb3-python pandas quixstreams<2.5

准备InfluxDB存储桶

下采样过程涉及两个InfluxDB数据库。每个数据库都有一个保留期,它指定数据在过期和删除之前保持多长时间。通过使用两个数据库,您可以在保留期较短的数据库中存储未修改的高分辨率数据,然后在保留期较长的数据库中存储下采样的低分辨率数据。

确保您为以下每个项目都有一个数据库

  • 一个用于查询未修改数据的
  • 另一个用于将下采样数据写入的

有关创建数据库的信息,请参阅创建一个桶

创建降采样逻辑

此过程从存储从InfluxDB流式传输的数据的输入Kafka主题中读取原始数据,对其进行下采样,然后将它发送到用于将数据写回InfluxDB的输出主题。

  1. 使用Quix Streams库的Application类初始化到Apache Kafka的连接。

    from quixstreams import Application
    
    app = Application(consumer_group='downsampling-process', auto_offset_reset='earliest')
    input_topic = app.topic('raw-data')
    output_topic = app.topic('downsampled-data')
    
    # ...
    
  2. 配置Quix Streams内置的窗口函数以创建一个连续下采样数据到1分钟桶的滑动窗口。

    # ...
    target_field = 'temperature' # The field that you want to downsample.
    
    def custom_ts_extractor(value):
        # ...
        # truncated for brevity - custom code that defines the 'time_recorded'
        # field as the timestamp to use for windowing...
    
    topic = app.topic(input_topic, timestamp_extractor=custom_ts_extractor)
    
    sdf = (
        sdf.apply(lambda value: value[target_field])  # Extract temperature values
        .tumbling_window(timedelta(minutes=1))   # 1-minute tumbling windows
        .mean()                                  # Calculate average temperature
        .final()                                 # Emit results at window completion
    )
    
    sdf = sdf.apply(
        lambda value: {
            'time': value['end'],                  # End of the window
            'temperature_avg': value['value'],     # Average temperature
        }
    )
    
    sdf.to_topic(output_topic) # Output results to the 'downsampled-data' topic
    # ...
    

结果被流式传输到Kafka主题downsampled-data

注意:“sdf”代表“Streaming Dataframe”。

您可以在Quix GitHub存储库中找到此过程的完整代码。

创建生产者和消费者客户端

使用influxdb_client_3quixstreams模块实例化两个与InfluxDB和Apache Kafka交互的客户端

  • 生产者客户端配置为从您的InfluxDB数据库中读取未修改的数据并将其生产到Kafka。
  • 消费者客户端配置为从Kafka中消费数据并将下采样数据写入相应的InfluxDB数据库。

创建生产者客户端

为生产者提供以下凭据

  • host:InfluxDB集群URL(不包括协议)
  • org:任意字符串。InfluxDB Clustered忽略组织。
  • token:具有读取和写入权限的InfluxDB数据库令牌,您想查询和写入的数据库。
  • database:InfluxDB数据库名称

生产者以特定间隔从InfluxDB查询新鲜数据。它配置为查找变量中定义的特定度量。它将原始数据写入名为“raw-data”的Kafka主题。

from influxdb_client_3 import InfluxDBClient3
from quixstreams import Application
import pandas

# Instantiate an InfluxDBClient3 client configured for your unmodified database
influxdb_raw = InfluxDBClient3(
    host='cluster-host.com',
    token='DATABASE_TOKEN',
    database='
RAW_DATABASE_NAME
'
) # os.environ['localdev'] = 'true' # Uncomment if you're using local Kafka rather than Quix Cloud # Create a Quix Streams producer application that connects to a local Kafka installation app = Application( broker_address=os.environ.get('BROKER_ADDRESS','localhost:9092'), consumer_group=consumer_group_name, auto_create_topics=True ) # Override the app variable if the local development env var is set to false or is not present. # This causes Quix Streams to use an application configured for Quix Cloud localdev = os.environ.get('localdev', 'false') if localdev == 'false': # Create a Quix platform-specific application instead (broker address is in-built) app = Application(consumer_group=consumer_group_name, auto_create_topics=True) topic = app.topic(name='raw-data') ## ... remaining code trunctated for brevity ... # Query InfluxDB for the raw data and store it in a Dataframe def get_data(): # Run in a loop until the main thread is terminated while run: try: myquery = f'SELECT * FROM "{measurement_name}" WHERE time >= {interval}' print(f'sending query {myquery}') # Query InfluxDB 3.0 using influxql or sql table = influxdb_raw.query( query=myquery, mode='pandas', language='influxql') #... remaining code trunctated for brevity ... # Send the data to a Kafka topic for the downsampling process to consumer def main(): """ Read data from the Query and publish it to Kafka """ #... remaining code trunctated for brevity ... for index, obj in enumerate(records): print(obj) # Obj contains each row in the table includimng temperature # Generate a unique message_key for each row message_key = obj['machineId'] logger.info(f'Produced message with key:{message_key}, value:{obj}') serialized = topic.serialize( key=message_key, value=obj, headers={'uuid': str(uuid.uuid4())} ) # publish each row returned in the query to the topic 'raw-data' producer.produce( topic=topic.name, headers=serialized.headers, key=serialized.key, value=serialized.value, )

您可以在Quix GitHub存储库中找到此过程的完整代码。

创建消费者

如前所述,为消费者提供以下凭据

  • host:InfluxDB集群URL(不包括协议)
  • org:任意字符串。InfluxDB Clustered忽略组织。
  • token:具有读取和写入权限的InfluxDB数据库令牌,您想查询和写入的数据库。
  • database:InfluxDB数据库名称

此过程从Kafka主题downsampled-data读取消息,并将每个消息作为点字典写回到InfluxDB。

# Instantiate an InfluxDBClient3 client configured for your downsampled database.
# When writing, the org= argument is required by the client (but ignored by InfluxDB).
influxdb_downsampled = InfluxDBClient3(
    host='cluster-host.com',
    token='DATABASE_TOKEN',
    database='
DOWNSAMPLED_DATABASE_NAME
'
,
org='' ) # os.environ['localdev'] = 'true' # Uncomment if you're using local Kafka rather than Quix Cloud # Create a Quix Streams consumer application that connects to a local Kafka installation app = Application( broker_address=os.environ.get('BROKER_ADDRESS','localhost:9092'), consumer_group=consumer_group_name, auto_create_topics=True ) # Override the app variable if the local development env var is set to false or is not present. # This causes Quix Streams to use an application configured for Quix Cloud localdev = os.environ.get('localdev', 'false') if localdev == 'false': # Create a Quix platform-specific application instead (broker address is in-built) app = Application(consumer_group=consumer_group_name, auto_create_topics=True) input_topic = app.topic('downsampled-data') ## ... remaining code trunctated for brevity ... def send_data_to_influx(message): logger.info(f'Processing message: {message}') try: ## ... remaining code trunctated for brevity ... # Construct the points dictionary points = { 'measurement': measurement_name, 'tags': tags, 'fields': fields, 'time': message['time'] } influxdb_downsampled.write(record=points, write_precision='ms') sdf = app.dataframe(input_topic) sdf = sdf.update(send_data_to_influx) # Continuously apply the 'send_data' function to each message in the incoming stream ## ... remaining code trunctated for brevity ...

您可以在Quix GitHub存储库中找到此过程的完整代码。

获取完整的降采样代码文件

要获取本教程中引用的完整文件集,克隆Quix“下采样模板”存储库或使用保存为Jupyter笔记本的此教程的交互式版本。

克隆下采样模板存储库

要克隆下采样模板,请在命令行中输入以下命令

git clone https://github.com/quixio/template-influxdbv3-downsampling.git

此存储库包含以下文件夹,它们存储整个管道的不同部分

  • 从机器数据到InfluxDB:一个生成合成机器数据并将其写入InfluxDB的脚本。如果您还没有自己的数据,或者只是想先使用测试数据,这很有用。

    • 它每250毫秒产生一个读取。
    • 此脚本最初来自InfluxCommunity存储库,但已修改为直接写入InfluxDB而不是使用MQTT代理。
  • InfluxDB V3 数据源:一种服务,会在特定时间间隔从 InfluxDB 查询新鲜数据。它被配置为寻找之前提到的合成机器数据生成器产生的测量值。它将原始数据写入名为“raw-data”的 Kafka 主题。

  • 降采样器:一种服务,对来自 InfluxDB 的数据进行 1 分钟滑动窗口操作,并每分钟输出“温度”读数的平均值。它将输出写入名为“downsampled-data”的 Kafka 主题。

  • InfluxDB V3 数据汇:一种服务,从“downsampled-data”主题读取数据,并将降采样记录作为点写回到 InfluxDB。

使用降采样 Jupyter Notebook

您可以使用交互式笔记本“使用 InfluxDB 和 Quix Streams 持续降采样数据”来尝试自己编写降采样代码。它配置为在运行时环境(如 Google Colab)内安装 Apache Kafka。

每个进程也都设置为在后台运行,以确保正在运行的单元不会阻塞整个教程。

Open In Colab


这个页面有帮助吗?

感谢您的反馈!


Flux 的未来

Flux 正在进入维护模式。您可以在不更改代码的情况下继续按当前方式使用它。

阅读更多

InfluxDB v3 增强功能和 InfluxDB 集群现在已普遍可用

新功能,包括更快的查询性能和管理工具,推进了 InfluxDB v3 产品线。InfluxDB 集群现在已普遍可用。

InfluxDB v3 性能和功能

InfluxDB v3 产品线在查询性能方面取得了显著提升,并提供了新的管理工具。这些增强包括一个操作仪表板,用于监控 InfluxDB 集群的健康状况,InfluxDB Cloud Dedicated 中的单点登录(SSO)支持,以及针对令牌和数据库的新管理 API。

了解新的 v3 增强功能


InfluxDB 集群普遍可用

InfluxDB 集群现在已普遍可用,为您在自管理堆栈中提供 InfluxDB v3 的功能。

与我们讨论 InfluxDB 集群