使用 Quix Streams 对数据进行降采样
使用 Quix Streams 查询存储在 InfluxDB 中并定期写入 Kafka 的时间序列数据,持续对其进行降采样,然后将降采样后的数据写回 InfluxDB。Quix Streams 是一个开源 Python 库,用于构建使用 Apache Kafka 的容器化流处理应用程序。它被设计为作为服务运行,持续处理数据流,同时将结果流式传输到 Kafka 主题。您可以在本地使用本地 Kafka 安装进行尝试,或者在 Quix Cloud 中使用免费试用版运行。
本指南使用 Python 和 InfluxDB 3 Python 客户端库,但您可以使用您选择的运行时和任何可用的 InfluxDB 3 客户端库。本指南还假设您已经 设置了您的 Python 项目和虚拟环境。
管道架构
下图说明了数据在降采样过程中如何在进程之间传递
InfluxDB v3 源生产者
降采样过程
InfluxDB v3 Sink 消费者
通常,将原始数据直接写入 Kafka 比首先将原始数据写入 InfluxDB(本质上是从“raw-data”主题开始 Quix Streams 管道)更有效。但是,本指南假设您已经在 InfluxDB 中拥有要降采样的原始数据。
设置先决条件
本指南中描述的过程需要以下各项
- 一个 InfluxDB 集群,其中包含准备好进行降采样的数据。
- 一个 Quix Cloud 帐户或本地 Apache Kafka 或 Red Panda 安装。
- 熟悉基本的 Python 和 Docker 概念。
安装依赖项
使用 pip
安装以下依赖项
influxdb_client_3
quixstreams<2.5
pandas
pip install influxdb3-python pandas quixstreams<2.5
准备 InfluxDB Bucket
降采样过程涉及两个 InfluxDB 数据库。每个数据库都有一个 保留期,指定数据在过期和删除之前持续存在的时间。通过使用两个数据库,您可以将未修改的高分辨率数据存储在保留期较短的数据库中,然后将降采样的低分辨率数据存储在保留期较长的数据库中。
确保您为以下各项准备了一个数据库
- 一个用于查询未修改的数据
- 另一个用于写入降采样后的数据
有关创建数据库的信息,请参阅 创建 Bucket。
创建降采样逻辑
此过程从输入 Kafka 主题读取原始数据,该主题存储从 InfluxDB 流式传输的数据,对其进行降采样,然后将其发送到输出主题,该主题用于写回 InfluxDB。
使用 Quix Streams 库的
Application
类初始化与 Apache Kafka 的连接。from quixstreams import Application app = Application(consumer_group='downsampling-process', auto_offset_reset='earliest') input_topic = app.topic('raw-data') output_topic = app.topic('downsampled-data') # ...
配置 Quix Streams 内置的窗口函数以创建翻滚窗口,该窗口将数据持续降采样到 1 分钟的 Bucket 中。
# ... target_field = 'temperature' # The field that you want to downsample. def custom_ts_extractor(value): # ... # truncated for brevity - custom code that defines the 'time_recorded' # field as the timestamp to use for windowing... topic = app.topic(input_topic, timestamp_extractor=custom_ts_extractor) sdf = ( sdf.apply(lambda value: value[target_field]) # Extract temperature values .tumbling_window(timedelta(minutes=1)) # 1-minute tumbling windows .mean() # Calculate average temperature .final() # Emit results at window completion ) sdf = sdf.apply( lambda value: { 'time': value['end'], # End of the window 'temperature_avg': value['value'], # Average temperature } ) sdf.to_topic(output_topic) # Output results to the 'downsampled-data' topic # ...
结果将流式传输到 Kafka 主题 downsampled-data
。
注意:“sdf”代表“流式 Dataframe”。
您可以在 Quix GitHub 存储库 中找到此过程的完整代码。
创建生产者和消费者客户端
使用 influxdb_client_3
和 quixstreams
模块实例化两个客户端,它们与 InfluxDB 和 Apache Kafka 交互
- 一个生产者客户端,配置为从您的 InfluxDB 数据库读取未修改的数据,并将该数据生成到 Kafka。
- 一个消费者客户端,配置为从 Kafka 消费数据,并将降采样后的数据写入相应的 InfluxDB 数据库。
创建生产者客户端
为生产者提供以下凭据
- host: InfluxDB 集群 URL (不带协议)
- org: 任意字符串。InfluxDB Clustered 忽略组织。
- token: InfluxDB 数据库令牌,具有对您要查询和写入的数据库的读取和写入权限。
- database: InfluxDB 数据库名称
生产者以特定间隔从 InfluxDB 查询新数据。它被配置为查找变量中定义的特定测量。它将原始数据写入名为“raw-data”的 Kafka 主题
from influxdb_client_3 import InfluxDBClient3
from quixstreams import Application
import pandas
# Instantiate an InfluxDBClient3 client configured for your unmodified database
influxdb_raw = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='RAW_DATABASE_NAME'
)
# os.environ['localdev'] = 'true' # Uncomment if you're using local Kafka rather than Quix Cloud
# Create a Quix Streams producer application that connects to a local Kafka installation
app = Application(
broker_address=os.environ.get('BROKER_ADDRESS','localhost:9092'),
consumer_group=consumer_group_name,
auto_create_topics=True
)
# Override the app variable if the local development env var is set to false or is not present.
# This causes Quix Streams to use an application configured for Quix Cloud
localdev = os.environ.get('localdev', 'false')
if localdev == 'false':
# Create a Quix platform-specific application instead (broker address is in-built)
app = Application(consumer_group=consumer_group_name, auto_create_topics=True)
topic = app.topic(name='raw-data')
## ... remaining code trunctated for brevity ...
# Query InfluxDB for the raw data and store it in a Dataframe
def get_data():
# Run in a loop until the main thread is terminated
while run:
try:
myquery = f'SELECT * FROM "{measurement_name}" WHERE time >= {interval}'
print(f'sending query {myquery}')
# Query InfluxDB 3 using influxql or sql
table = influxdb_raw.query(
query=myquery,
mode='pandas',
language='influxql')
#... remaining code trunctated for brevity ...
# Send the data to a Kafka topic for the downsampling process to consumer
def main():
"""
Read data from the Query and publish it to Kafka
"""
#... remaining code trunctated for brevity ...
for index, obj in enumerate(records):
print(obj) # Obj contains each row in the table includimng temperature
# Generate a unique message_key for each row
message_key = obj['machineId']
logger.info(f'Produced message with key:{message_key}, value:{obj}')
serialized = topic.serialize(
key=message_key, value=obj, headers={'uuid': str(uuid.uuid4())}
)
# publish each row returned in the query to the topic 'raw-data'
producer.produce(
topic=topic.name,
headers=serialized.headers,
key=serialized.key,
value=serialized.value,
)
您可以在 Quix GitHub 存储库 中找到此过程的完整代码。
创建消费者
与之前一样,为消费者提供以下凭据
- host: InfluxDB 集群 URL (不带协议)
- org: 任意字符串。InfluxDB Clustered 忽略组织。
- token: InfluxDB 数据库令牌,具有对您要查询和写入的数据库的读取和写入权限。
- database: InfluxDB 数据库名称
此过程从 Kafka 主题 downsampled-data
读取消息,并将每条消息作为点字典写回 InfluxDB。
# Instantiate an InfluxDBClient3 client configured for your downsampled database.
# When writing, the org= argument is required by the client (but ignored by InfluxDB).
influxdb_downsampled = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='DOWNSAMPLED_DATABASE_NAME',
org=''
)
# os.environ['localdev'] = 'true' # Uncomment if you're using local Kafka rather than Quix Cloud
# Create a Quix Streams consumer application that connects to a local Kafka installation
app = Application(
broker_address=os.environ.get('BROKER_ADDRESS','localhost:9092'),
consumer_group=consumer_group_name,
auto_create_topics=True
)
# Override the app variable if the local development env var is set to false or is not present.
# This causes Quix Streams to use an application configured for Quix Cloud
localdev = os.environ.get('localdev', 'false')
if localdev == 'false':
# Create a Quix platform-specific application instead (broker address is in-built)
app = Application(consumer_group=consumer_group_name, auto_create_topics=True)
input_topic = app.topic('downsampled-data')
## ... remaining code trunctated for brevity ...
def send_data_to_influx(message):
logger.info(f'Processing message: {message}')
try:
## ... remaining code trunctated for brevity ...
# Construct the points dictionary
points = {
'measurement': measurement_name,
'tags': tags,
'fields': fields,
'time': message['time']
}
influxdb_downsampled.write(record=points, write_precision='ms')
sdf = app.dataframe(input_topic)
sdf = sdf.update(send_data_to_influx) # Continuously apply the 'send_data' function to each message in the incoming stream
## ... remaining code trunctated for brevity ...
您可以在 Quix GitHub 存储库 中找到此过程的完整代码。
获取完整的降采样代码文件
要获取本教程中引用的完整文件集,请克隆 Quix “降采样模板”存储库或使用保存为 Jupyter Notebook 的本教程的交互式版本。
克隆降采样模板存储库
要克隆降采样模板,请在命令行中输入以下命令
git clone https://github.com/quixio/template-influxdbv3-downsampling.git
此存储库包含以下文件夹,这些文件夹存储整个管道的不同部分
Machine Data to InfluxDB:一个脚本,用于生成合成机器数据并将其写入 InfluxDB。如果您还没有自己的数据,或者只是想先使用测试数据,这将非常有用。
- 它每 250 毫秒生成一个读数。
- 此脚本最初来自 InfluxCommunity 存储库,但已进行调整以直接写入 InfluxDB,而不是使用 MQTT 代理。
InfluxDB V3 Data Source:一种服务,用于以特定间隔从 InfluxDB 查询新数据。它被配置为查找先前提到的合成机器数据生成器生成的测量。它将原始数据写入名为“raw-data”的 Kafka 主题。
Downsampler:一种服务,对来自 InfluxDB 的数据执行 1 分钟翻滚窗口操作,并每分钟发出“temperature”读数的平均值。它将输出写入“downsampled-data”Kafka 主题。
InfluxDB V3 Data Sink:一种服务,从“downsampled-data”主题读取数据,并将降采样记录作为点写回 InfluxDB。
使用降采样 Jupyter Notebook
您可以使用交互式 Notebook “使用 InfluxDB 和 Quix Streams 持续降采样数据” 来自己尝试降采样代码。它被配置为在运行时环境(例如 Google Colab)中安装 Apache Kafka。
每个进程也设置为在后台运行,以便正在运行的单元格不会阻止教程的其余部分。
此页内容对您有帮助吗?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一份子!我们欢迎并鼓励您提供关于 InfluxDB Clustered 和本文档的反馈和错误报告。要寻求支持,请使用以下资源
拥有年度合同或支持合同的客户可以联系 InfluxData 支持。