文档文档

使用 Quix Streams 对数据进行降采样

使用 Quix Streams 查询存储在 InfluxDB 中并定期写入 Kafka 的时间序列数据,持续对其进行降采样,然后将降采样后的数据写回 InfluxDB。Quix Streams 是一个开源 Python 库,用于构建使用 Apache Kafka 的容器化流处理应用程序。它被设计为作为服务运行,持续处理数据流,同时将结果流式传输到 Kafka 主题。您可以在本地使用本地 Kafka 安装进行尝试,或者在 Quix Cloud 中使用免费试用版运行。

本指南使用 PythonInfluxDB 3 Python 客户端库,但您可以使用您选择的运行时和任何可用的 InfluxDB 3 客户端库。本指南还假设您已经 设置了您的 Python 项目和虚拟环境

管道架构

下图说明了数据在降采样过程中如何在进程之间传递

InfluxDB v3 源生产者

降采样过程

InfluxDB v3 Sink 消费者

通常,将原始数据直接写入 Kafka 比首先将原始数据写入 InfluxDB(本质上是从“raw-data”主题开始 Quix Streams 管道)更有效。但是,本指南假设您已经在 InfluxDB 中拥有要降采样的原始数据。


  1. 设置先决条件
  2. 安装依赖项
  3. 准备 InfluxDB Bucket
  4. 创建降采样逻辑
  5. 创建生产者和消费者客户端
    1. 创建生产者
    2. 创建消费者
  6. 获取完整的降采样代码文件

设置先决条件

本指南中描述的过程需要以下各项

  • 一个 InfluxDB 集群,其中包含准备好进行降采样的数据。
  • 一个 Quix Cloud 帐户或本地 Apache Kafka 或 Red Panda 安装。
  • 熟悉基本的 Python 和 Docker 概念。

安装依赖项

使用 pip 安装以下依赖项

  • influxdb_client_3
  • quixstreams<2.5
  • pandas
pip install influxdb3-python pandas quixstreams<2.5

准备 InfluxDB Bucket

降采样过程涉及两个 InfluxDB 数据库。每个数据库都有一个 保留期,指定数据在过期和删除之前持续存在的时间。通过使用两个数据库,您可以将未修改的高分辨率数据存储在保留期较短的数据库中,然后将降采样的低分辨率数据存储在保留期较长的数据库中。

确保您为以下各项准备了一个数据库

  • 一个用于查询未修改的数据
  • 另一个用于写入降采样后的数据

有关创建数据库的信息,请参阅 创建 Bucket

创建降采样逻辑

此过程从输入 Kafka 主题读取原始数据,该主题存储从 InfluxDB 流式传输的数据,对其进行降采样,然后将其发送到输出主题,该主题用于写回 InfluxDB。

  1. 使用 Quix Streams 库的 Application 类初始化与 Apache Kafka 的连接。

    from quixstreams import Application
    
    app = Application(consumer_group='downsampling-process', auto_offset_reset='earliest')
    input_topic = app.topic('raw-data')
    output_topic = app.topic('downsampled-data')
    
    # ...
    
  2. 配置 Quix Streams 内置的窗口函数以创建翻滚窗口,该窗口将数据持续降采样到 1 分钟的 Bucket 中。

    # ...
    target_field = 'temperature' # The field that you want to downsample.
    
    def custom_ts_extractor(value):
        # ...
        # truncated for brevity - custom code that defines the 'time_recorded'
        # field as the timestamp to use for windowing...
    
    topic = app.topic(input_topic, timestamp_extractor=custom_ts_extractor)
    
    sdf = (
        sdf.apply(lambda value: value[target_field])  # Extract temperature values
        .tumbling_window(timedelta(minutes=1))   # 1-minute tumbling windows
        .mean()                                  # Calculate average temperature
        .final()                                 # Emit results at window completion
    )
    
    sdf = sdf.apply(
        lambda value: {
            'time': value['end'],                  # End of the window
            'temperature_avg': value['value'],     # Average temperature
        }
    )
    
    sdf.to_topic(output_topic) # Output results to the 'downsampled-data' topic
    # ...
    

结果将流式传输到 Kafka 主题 downsampled-data

注意:“sdf”代表“流式 Dataframe”。

您可以在 Quix GitHub 存储库 中找到此过程的完整代码。

创建生产者和消费者客户端

使用 influxdb_client_3quixstreams 模块实例化两个客户端,它们与 InfluxDB 和 Apache Kafka 交互

  • 一个生产者客户端,配置为从您的 InfluxDB 数据库读取未修改的数据,并将该数据生成到 Kafka。
  • 一个消费者客户端,配置为从 Kafka 消费数据,并将降采样后的数据写入相应的 InfluxDB 数据库。

创建生产者客户端

为生产者提供以下凭据

  • host: InfluxDB 集群 URL (不带协议)
  • org: 任意字符串。InfluxDB Clustered 忽略组织。
  • token: InfluxDB 数据库令牌,具有对您要查询和写入的数据库的读取和写入权限。
  • database: InfluxDB 数据库名称

生产者以特定间隔从 InfluxDB 查询新数据。它被配置为查找变量中定义的特定测量。它将原始数据写入名为“raw-data”的 Kafka 主题

from influxdb_client_3 import InfluxDBClient3
from quixstreams import Application
import pandas

# Instantiate an InfluxDBClient3 client configured for your unmodified database
influxdb_raw = InfluxDBClient3(
    host='cluster-host.com',
    token='DATABASE_TOKEN',
    database='
RAW_DATABASE_NAME
'
) # os.environ['localdev'] = 'true' # Uncomment if you're using local Kafka rather than Quix Cloud # Create a Quix Streams producer application that connects to a local Kafka installation app = Application( broker_address=os.environ.get('BROKER_ADDRESS','localhost:9092'), consumer_group=consumer_group_name, auto_create_topics=True ) # Override the app variable if the local development env var is set to false or is not present. # This causes Quix Streams to use an application configured for Quix Cloud localdev = os.environ.get('localdev', 'false') if localdev == 'false': # Create a Quix platform-specific application instead (broker address is in-built) app = Application(consumer_group=consumer_group_name, auto_create_topics=True) topic = app.topic(name='raw-data') ## ... remaining code trunctated for brevity ... # Query InfluxDB for the raw data and store it in a Dataframe def get_data(): # Run in a loop until the main thread is terminated while run: try: myquery = f'SELECT * FROM "{measurement_name}" WHERE time >= {interval}' print(f'sending query {myquery}') # Query InfluxDB 3 using influxql or sql table = influxdb_raw.query( query=myquery, mode='pandas', language='influxql') #... remaining code trunctated for brevity ... # Send the data to a Kafka topic for the downsampling process to consumer def main(): """ Read data from the Query and publish it to Kafka """ #... remaining code trunctated for brevity ... for index, obj in enumerate(records): print(obj) # Obj contains each row in the table includimng temperature # Generate a unique message_key for each row message_key = obj['machineId'] logger.info(f'Produced message with key:{message_key}, value:{obj}') serialized = topic.serialize( key=message_key, value=obj, headers={'uuid': str(uuid.uuid4())} ) # publish each row returned in the query to the topic 'raw-data' producer.produce( topic=topic.name, headers=serialized.headers, key=serialized.key, value=serialized.value, )

您可以在 Quix GitHub 存储库 中找到此过程的完整代码。

创建消费者

与之前一样,为消费者提供以下凭据

  • host: InfluxDB 集群 URL (不带协议)
  • org: 任意字符串。InfluxDB Clustered 忽略组织。
  • token: InfluxDB 数据库令牌,具有对您要查询和写入的数据库的读取和写入权限。
  • database: InfluxDB 数据库名称

此过程从 Kafka 主题 downsampled-data 读取消息,并将每条消息作为点字典写回 InfluxDB。

# Instantiate an InfluxDBClient3 client configured for your downsampled database.
# When writing, the org= argument is required by the client (but ignored by InfluxDB).
influxdb_downsampled = InfluxDBClient3(
    host='cluster-host.com',
    token='DATABASE_TOKEN',
    database='
DOWNSAMPLED_DATABASE_NAME
'
,
org='' ) # os.environ['localdev'] = 'true' # Uncomment if you're using local Kafka rather than Quix Cloud # Create a Quix Streams consumer application that connects to a local Kafka installation app = Application( broker_address=os.environ.get('BROKER_ADDRESS','localhost:9092'), consumer_group=consumer_group_name, auto_create_topics=True ) # Override the app variable if the local development env var is set to false or is not present. # This causes Quix Streams to use an application configured for Quix Cloud localdev = os.environ.get('localdev', 'false') if localdev == 'false': # Create a Quix platform-specific application instead (broker address is in-built) app = Application(consumer_group=consumer_group_name, auto_create_topics=True) input_topic = app.topic('downsampled-data') ## ... remaining code trunctated for brevity ... def send_data_to_influx(message): logger.info(f'Processing message: {message}') try: ## ... remaining code trunctated for brevity ... # Construct the points dictionary points = { 'measurement': measurement_name, 'tags': tags, 'fields': fields, 'time': message['time'] } influxdb_downsampled.write(record=points, write_precision='ms') sdf = app.dataframe(input_topic) sdf = sdf.update(send_data_to_influx) # Continuously apply the 'send_data' function to each message in the incoming stream ## ... remaining code trunctated for brevity ...

您可以在 Quix GitHub 存储库 中找到此过程的完整代码。

获取完整的降采样代码文件

要获取本教程中引用的完整文件集,请克隆 Quix “降采样模板”存储库或使用保存为 Jupyter Notebook 的本教程的交互式版本。

克隆降采样模板存储库

要克隆降采样模板,请在命令行中输入以下命令

git clone https://github.com/quixio/template-influxdbv3-downsampling.git

此存储库包含以下文件夹,这些文件夹存储整个管道的不同部分

  • Machine Data to InfluxDB:一个脚本,用于生成合成机器数据并将其写入 InfluxDB。如果您还没有自己的数据,或者只是想先使用测试数据,这将非常有用。

    • 它每 250 毫秒生成一个读数。
    • 此脚本最初来自 InfluxCommunity 存储库,但已进行调整以直接写入 InfluxDB,而不是使用 MQTT 代理。
  • InfluxDB V3 Data Source:一种服务,用于以特定间隔从 InfluxDB 查询新数据。它被配置为查找先前提到的合成机器数据生成器生成的测量。它将原始数据写入名为“raw-data”的 Kafka 主题。

  • Downsampler:一种服务,对来自 InfluxDB 的数据执行 1 分钟翻滚窗口操作,并每分钟发出“temperature”读数的平均值。它将输出写入“downsampled-data”Kafka 主题。

  • InfluxDB V3 Data Sink:一种服务,从“downsampled-data”主题读取数据,并将降采样记录作为点写回 InfluxDB。

使用降采样 Jupyter Notebook

您可以使用交互式 Notebook “使用 InfluxDB 和 Quix Streams 持续降采样数据” 来自己尝试降采样代码。它被配置为在运行时环境(例如 Google Colab)中安装 Apache Kafka。

每个进程也设置为在后台运行,以便正在运行的单元格不会阻止教程的其余部分。

Open In Colab


此页内容对您有帮助吗?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像目前一样使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版现已公开发布 Alpha 版本

InfluxDB 3 开源版现已可用于 Alpha 测试,根据 MIT 或 Apache 2 许可授权。

我们正在发布两个产品作为 Alpha 版本的一部分。

InfluxDB 3 Core 是我们的新开源产品。它是用于时间序列和事件数据的最新数据引擎。InfluxDB 3 Enterprise 是一个商业版本,它建立在 Core 的基础上,增加了历史查询功能、读取副本、高可用性、可扩展性和细粒度的安全性。

有关如何开始使用的更多信息,请查看