使用Quix Streams降采样数据
使用Quix Streams查询存储在InfluxDB中并定期写入Kafka的时间序列数据,持续降采样,然后将降采样后的数据写回到InfluxDB。Quix Streams是一个开源的Python库,用于构建与Apache Kafka集成的容器化流处理应用程序。它被设计为运行在作为服务连续处理数据流并将结果流式传输到Kafka主题的服务。您可以在本地使用,配合本地Kafka安装,或使用Quix Cloud进行免费试用。
本指南使用Python和InfluxDB v3 Python客户端库,但您可以使用您选择的运行时和任何可用的InfluxDB v3客户端库。本指南还假设您已经设置好您的Python项目和虚拟环境。
管道架构
以下图示展示了数据在降采样过程中如何在不同进程间传递
InfluxDB v3源生产者
降采样过程
InfluxDB v3目标消费者
通常直接将原始数据写入Kafka比先写入InfluxDB(本质上是从“raw-data”主题开始Quix Streams管道)更有效。然而,本指南假设您已经有在InfluxDB中的原始数据,您想要对其进行降采样。
设置先决条件
本指南中描述的过程需要以下条件
- 一个已准备好的InfluxDB集群,其中包含可用于降采样的数据。
- 一个Quix Cloud账户或本地Apache Kafka或Red Panda安装。
- 熟悉基本的Python和Docker概念。
安装依赖项
使用pip
安装以下依赖项
influxdb_client_3
quixstreams<2.5
pandas
pip install influxdb3-python pandas quixstreams<2.5
准备InfluxDB存储桶
下采样过程涉及两个InfluxDB数据库。每个数据库都有一个保留期,它指定数据在过期和删除之前保持多长时间。通过使用两个数据库,您可以在保留期较短的数据库中存储未修改的高分辨率数据,然后在保留期较长的数据库中存储下采样的低分辨率数据。
确保您为以下每个项目都有一个数据库
- 一个用于查询未修改数据的
- 另一个用于将下采样数据写入的
有关创建数据库的信息,请参阅创建一个桶。
创建降采样逻辑
此过程从存储从InfluxDB流式传输的数据的输入Kafka主题中读取原始数据,对其进行下采样,然后将它发送到用于将数据写回InfluxDB的输出主题。
使用Quix Streams库的
Application
类初始化到Apache Kafka的连接。from quixstreams import Application app = Application(consumer_group='downsampling-process', auto_offset_reset='earliest') input_topic = app.topic('raw-data') output_topic = app.topic('downsampled-data') # ...
配置Quix Streams内置的窗口函数以创建一个连续下采样数据到1分钟桶的滑动窗口。
# ... target_field = 'temperature' # The field that you want to downsample. def custom_ts_extractor(value): # ... # truncated for brevity - custom code that defines the 'time_recorded' # field as the timestamp to use for windowing... topic = app.topic(input_topic, timestamp_extractor=custom_ts_extractor) sdf = ( sdf.apply(lambda value: value[target_field]) # Extract temperature values .tumbling_window(timedelta(minutes=1)) # 1-minute tumbling windows .mean() # Calculate average temperature .final() # Emit results at window completion ) sdf = sdf.apply( lambda value: { 'time': value['end'], # End of the window 'temperature_avg': value['value'], # Average temperature } ) sdf.to_topic(output_topic) # Output results to the 'downsampled-data' topic # ...
结果被流式传输到Kafka主题downsampled-data
。
注意:“sdf”代表“Streaming Dataframe”。
您可以在Quix GitHub存储库中找到此过程的完整代码。
创建生产者和消费者客户端
使用influxdb_client_3
和quixstreams
模块实例化两个与InfluxDB和Apache Kafka交互的客户端
- 生产者客户端配置为从您的InfluxDB数据库中读取未修改的数据并将其生产到Kafka。
- 消费者客户端配置为从Kafka中消费数据并将下采样数据写入相应的InfluxDB数据库。
创建生产者客户端
为生产者提供以下凭据
- host:InfluxDB集群URL(不包括协议)
- org:任意字符串。InfluxDB Clustered忽略组织。
- token:具有读取和写入权限的InfluxDB数据库令牌,您想查询和写入的数据库。
- database:InfluxDB数据库名称
生产者以特定间隔从InfluxDB查询新鲜数据。它配置为查找变量中定义的特定度量。它将原始数据写入名为“raw-data”的Kafka主题。
from influxdb_client_3 import InfluxDBClient3
from quixstreams import Application
import pandas
# Instantiate an InfluxDBClient3 client configured for your unmodified database
influxdb_raw = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='RAW_DATABASE_NAME'
)
# os.environ['localdev'] = 'true' # Uncomment if you're using local Kafka rather than Quix Cloud
# Create a Quix Streams producer application that connects to a local Kafka installation
app = Application(
broker_address=os.environ.get('BROKER_ADDRESS','localhost:9092'),
consumer_group=consumer_group_name,
auto_create_topics=True
)
# Override the app variable if the local development env var is set to false or is not present.
# This causes Quix Streams to use an application configured for Quix Cloud
localdev = os.environ.get('localdev', 'false')
if localdev == 'false':
# Create a Quix platform-specific application instead (broker address is in-built)
app = Application(consumer_group=consumer_group_name, auto_create_topics=True)
topic = app.topic(name='raw-data')
## ... remaining code trunctated for brevity ...
# Query InfluxDB for the raw data and store it in a Dataframe
def get_data():
# Run in a loop until the main thread is terminated
while run:
try:
myquery = f'SELECT * FROM "{measurement_name}" WHERE time >= {interval}'
print(f'sending query {myquery}')
# Query InfluxDB 3.0 using influxql or sql
table = influxdb_raw.query(
query=myquery,
mode='pandas',
language='influxql')
#... remaining code trunctated for brevity ...
# Send the data to a Kafka topic for the downsampling process to consumer
def main():
"""
Read data from the Query and publish it to Kafka
"""
#... remaining code trunctated for brevity ...
for index, obj in enumerate(records):
print(obj) # Obj contains each row in the table includimng temperature
# Generate a unique message_key for each row
message_key = obj['machineId']
logger.info(f'Produced message with key:{message_key}, value:{obj}')
serialized = topic.serialize(
key=message_key, value=obj, headers={'uuid': str(uuid.uuid4())}
)
# publish each row returned in the query to the topic 'raw-data'
producer.produce(
topic=topic.name,
headers=serialized.headers,
key=serialized.key,
value=serialized.value,
)
您可以在Quix GitHub存储库中找到此过程的完整代码。
创建消费者
如前所述,为消费者提供以下凭据
- host:InfluxDB集群URL(不包括协议)
- org:任意字符串。InfluxDB Clustered忽略组织。
- token:具有读取和写入权限的InfluxDB数据库令牌,您想查询和写入的数据库。
- database:InfluxDB数据库名称
此过程从Kafka主题downsampled-data
读取消息,并将每个消息作为点字典写回到InfluxDB。
# Instantiate an InfluxDBClient3 client configured for your downsampled database.
# When writing, the org= argument is required by the client (but ignored by InfluxDB).
influxdb_downsampled = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='DOWNSAMPLED_DATABASE_NAME',
org=''
)
# os.environ['localdev'] = 'true' # Uncomment if you're using local Kafka rather than Quix Cloud
# Create a Quix Streams consumer application that connects to a local Kafka installation
app = Application(
broker_address=os.environ.get('BROKER_ADDRESS','localhost:9092'),
consumer_group=consumer_group_name,
auto_create_topics=True
)
# Override the app variable if the local development env var is set to false or is not present.
# This causes Quix Streams to use an application configured for Quix Cloud
localdev = os.environ.get('localdev', 'false')
if localdev == 'false':
# Create a Quix platform-specific application instead (broker address is in-built)
app = Application(consumer_group=consumer_group_name, auto_create_topics=True)
input_topic = app.topic('downsampled-data')
## ... remaining code trunctated for brevity ...
def send_data_to_influx(message):
logger.info(f'Processing message: {message}')
try:
## ... remaining code trunctated for brevity ...
# Construct the points dictionary
points = {
'measurement': measurement_name,
'tags': tags,
'fields': fields,
'time': message['time']
}
influxdb_downsampled.write(record=points, write_precision='ms')
sdf = app.dataframe(input_topic)
sdf = sdf.update(send_data_to_influx) # Continuously apply the 'send_data' function to each message in the incoming stream
## ... remaining code trunctated for brevity ...
您可以在Quix GitHub存储库中找到此过程的完整代码。
获取完整的降采样代码文件
要获取本教程中引用的完整文件集,克隆Quix“下采样模板”存储库或使用保存为Jupyter笔记本的此教程的交互式版本。
克隆下采样模板存储库
要克隆下采样模板,请在命令行中输入以下命令
git clone https://github.com/quixio/template-influxdbv3-downsampling.git
此存储库包含以下文件夹,它们存储整个管道的不同部分
从机器数据到InfluxDB:一个生成合成机器数据并将其写入InfluxDB的脚本。如果您还没有自己的数据,或者只是想先使用测试数据,这很有用。
- 它每250毫秒产生一个读取。
- 此脚本最初来自InfluxCommunity存储库,但已修改为直接写入InfluxDB而不是使用MQTT代理。
InfluxDB V3 数据源:一种服务,会在特定时间间隔从 InfluxDB 查询新鲜数据。它被配置为寻找之前提到的合成机器数据生成器产生的测量值。它将原始数据写入名为“raw-data”的 Kafka 主题。
降采样器:一种服务,对来自 InfluxDB 的数据进行 1 分钟滑动窗口操作,并每分钟输出“温度”读数的平均值。它将输出写入名为“downsampled-data”的 Kafka 主题。
InfluxDB V3 数据汇:一种服务,从“downsampled-data”主题读取数据,并将降采样记录作为点写回到 InfluxDB。
使用降采样 Jupyter Notebook
您可以使用交互式笔记本“使用 InfluxDB 和 Quix Streams 持续降采样数据”来尝试自己编写降采样代码。它配置为在运行时环境(如 Google Colab)内安装 Apache Kafka。
每个进程也都设置为在后台运行,以确保正在运行的单元不会阻塞整个教程。
这个页面有帮助吗?
感谢您的反馈!