使用 Quix Streams 对数据进行降采样
处理高容量数据时,常见的做法是在将数据提交到 InfluxDB 之前对其进行降采样,以减少随着时间推移数据累积造成的总体磁盘使用量。
本指南将逐步介绍如何创建一系列 Python 服务,这些服务从 InfluxDB v2 存储桶中摄取数据,对其进行降采样,并将数据发布到另一个 InfluxDB v2 存储桶。通过在时间窗口内聚合数据,然后将聚合值存储回 InfluxDB,您可以随着时间的推移减少磁盘使用量和成本。
本指南使用 InfluxDB v2 和 Quix Streams Python 客户端库,可以在本地运行,也可以使用免费试用版部署在 Quix Cloud 中。它假设您已经设置了一个 Python 项目和虚拟环境。
管道架构
下图说明了数据在降采样过程中如何在进程之间传递
InfluxDB v2 源生产者
降采样过程
InfluxDB v2 Sink 消费者
通常,将原始数据直接写入 Kafka 比先将原始数据写入 InfluxDB 更有效(本质上是从 “influxv2-data” 主题开始 Quix Streams 管道)。但是,本指南假设您已经在 InfluxDB 中有要降采样的原始数据。
设置先决条件
本指南中描述的过程需要以下内容
- InfluxDB v2,其中包含准备好进行降采样的数据。 使用下面的机器数据生成器代码。
- Quix Cloud 帐户或本地 Apache Kafka 或 Red Panda 安装。
- 熟悉基本的 Python 和 Docker 概念。
安装依赖项
使用 pip
安装以下依赖项
influxdb-client
(InfluxDB v2 客户端库)quixstreams<2.5
(Quix Streams 客户端库)pandas
(数据分析和操作工具)
pip install influxdb-client pandas quixstreams<2.5
准备 InfluxDB 存储桶
降采样过程涉及两个 InfluxDB 存储桶。每个存储桶都有一个 保留期,指定数据在过期和删除之前持续存在的时间。通过使用两个存储桶,您可以将未修改的高分辨率数据存储在保留期较短的存储桶中,然后将降采样的低分辨率数据存储在保留期较长的存储桶中。
确保您为以下各项都准备了一个存储桶
- 一个用于从您的 InfluxDB v2 集群查询未修改的数据
- 另一个用于写入降采样数据
创建降采样逻辑
此过程从输入 Kafka 主题读取原始数据,该主题存储从 InfluxDB v2 存储桶流式传输的数据,对其进行降采样,然后将其发送到输出主题,该主题稍后写回另一个存储桶。
使用 Quix Streams 库的
Application
类来初始化与 Kafka 主题的连接。from quixstreams import Application app = Application(consumer_group="downsampling-process", auto_offset_reset="earliest") input_topic = app.topic("input") output_topic = app.topic("output") # ...
配置 Quix Streams 内置的窗口函数,以创建一个滚动窗口,该窗口持续将数据降采样到 1 分钟的存储桶中。
# ... target_field = "temperature" # The field that you want to downsample. def custom_ts_extractor(value): # ... # truncated for brevity - custom code that defines the "time_recorded" # field as the timestamp to use for windowing... topic = app.topic(input_topic, timestamp_extractor=custom_ts_extractor) sdf = ( sdf.apply(lambda value: value[target_field]) # Extract temperature values .tumbling_window(timedelta(minutes=1)) # 1-minute tumbling windows .mean() # Calculate average temperature .final() # Emit results at window completion ) sdf = sdf.apply( lambda value: { "time": value["end"], # End of the window "temperature_avg": value["value"], # Average temperature } ) sdf.to_topic(output_topic) # Output results to the "downsampled" topic # ...
结果将流式传输到 Kafka 主题 downsampled
。
注意:“sdf” 代表 “流式 Dataframe”。
您可以在 Quix GitHub 存储库 中找到此过程的完整代码。
创建生产者和消费者客户端
使用 influxdb_client
和 quixstreams
模块实例化两个客户端,它们与 InfluxDB 和 Kafka 交互
- 一个 生产者 客户端,配置为从您的 InfluxDB 存储桶读取 未修改的 数据,并将该数据 生产 到 Kafka。
- 一个 消费者 客户端,配置为从 Kafka 消费 数据,并将 降采样 的数据写入到相应的 InfluxDB 存储桶。
创建生产者
为生产者提供以下凭据
- INFLUXDB_HOST: InfluxDB URL (不带协议)
- INFLUXDB_ORG: InfluxDB 组织名称
- INFLUXDB_TOKEN: InfluxDB API 令牌,具有对您要查询和写入的存储桶的读取和写入权限。
- INFLUXDB_BUCKET: InfluxDB 存储桶名称
生产者以特定间隔从 InfluxDB 查询新数据。它将原始数据写入名为 influxv2-data
的 Kafka 主题。
from quixstreams import Application
import influxdb_client
# Create a Quix Application
app = Application(consumer_group="influxdbv2_migrate", auto_create_topics=True)
# Define the topic using the "output" environment variable
topic = app.topic(os.getenv("output", "influxv2-data"))
# Create an InfluxDB v2 client
influxdb2_client = influxdb_client.InfluxDBClient(token=os.environ["INFLUXDB_TOKEN"],
org=os.environ["INFLUXDB_ORG"],
url=os.environ["INFLUXDB_HOST"])
## ... remaining code trunctated for brevity ...
# Function to fetch data from InfluxDB
# It runs in a continuous loop, periodically fetching data based on the interval.
def get_data():
# Run in a loop until the main thread is terminated
while run:
try:
# Query InfluxDB 2.0 using flux
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: -{interval})
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
'''
logger.info(f"Sending query: {flux_query}")
## ... remaining code trunctated for brevity ...
# Create a pre-configured Producer object.
with app.get_producer() as producer:
for res in get_data():
# Get the data from InfluxDB
records = json.loads(res)
for index, obj in enumerate(records):
logger.info(f"Produced message with key:{message_key}, value:{obj}")
# Publish the data to the Kafka topic
producer.produce(
topic=topic.name,
key=message_key,
value=obj,
)
您可以在 Quix GitHub 存储库 中找到此过程的完整代码。
创建消费者
与之前一样,为消费者提供以下凭据
- INFLUXDB_HOST: InfluxDB URL (不带协议)
- INFLUXDB_ORG: InfluxDB 组织名称
- INFLUXDB_TOKEN: InfluxDB API 令牌,具有对您要查询和写入的存储桶的读取和写入权限。
- INFLUXDB_BUCKET: InfluxDB 存储桶名称
注意:这些将是您的 InfluxDB v2 凭据。
此过程从 Kafka 主题 downsampled-data
读取消息,并将每条消息作为点字典写回 InfluxDB。
from quixstreams import Application, State
from influxdb_client import InfluxDBClient, Point
# Create a Quix platform-specific application instead
app = Application(consumer_group=consumer_group_name, auto_offset_reset="earliest", use_changelog_topics=False)
input_topic = app.topic(os.getenv("input", "input-data"))
# Initialize InfluxDB v2 client
influx2_client = InfluxDBClient(url=localhost:8086,
token=API_TOKEN,
org=ORG_NAME)
## ... remaining code trunctated for brevity ...
def send_data_to_influx(message: dict, state: State):
global last_write_time_ns, points_buffer, service_start_state
try:
## ... code trunctated for brevity ...
# Check if it's time to write the batch
# 10k records have accumulated or 15 seconds have passed
if len(points_buffer) >= 10000 or int(time() * 1e9) - last_write_time_ns >= 15e9:
with influx2_client.write_api() as write_api:
logger.info(f"Writing batch of {len(points_buffer)} points written to InfluxDB.")
write_api.write(influx_bucket, influx_org, points_buffer)
# Clear the buffer and update the last write time
points_buffer = []
last_write_time_ns = int(time() * 1e9)
## ... code trunctated for brevity ...
except Exception as e:
logger.info(f"{str(datetime.utcnow())}: Write failed")
logger.info(e)
## ... code trunctated for brevity ...
# We use Quix Streams StreamingDataframe (SDF) to handle every message
# in the Kafka topic by writing it to InfluxDB
sdf = app.dataframe(input_topic)
sdf = sdf.update(send_data_to_influx, stateful=True)
if __name__ == "__main__":
logger.info("Starting application")
app.run(sdf)
您可以在 Quix GitHub 存储库 中找到此过程的完整代码。
运行机器数据生成器
现在是时候运行机器数据生成器代码了,它将用数据填充您的源存储桶,这些数据将由 生产者 读取。
从 GitHub 存储库中的 Machine data to InfluxDB
文件夹运行 main.py
。
获取完整的降采样代码文件
要获取本教程中引用的完整文件集,请克隆 Quix “降采样” 存储库。
克隆降采样模板存储库
要克隆降采样模板,请在命令行中输入以下命令
git clone https://github.com/quixio/template-invluxdbv2-tsm-downsampling.git
此存储库包含以下文件夹,这些文件夹存储整个管道的不同部分
Machine Data to InfluxDB:一个生成合成机器数据并将其写入 InfluxDB 的脚本。如果您还没有自己的数据,或者只想首先使用测试数据,这将非常有用。
- 它每 250 毫秒生成一次读数。
- 此脚本最初来自 InfluxCommunity 存储库,但已进行调整以直接写入 InfluxDB,而不是使用 MQTT 代理。
InfluxDB v2 数据源:一种服务,用于以特定间隔从 InfluxDB 查询新数据。它配置为查找先前提到的合成机器数据生成器生成的测量值。它将原始数据写入名为 “influxv2-data” 的 Kafka 主题。
Downsampler:一种服务,对来自 InfluxDB 的数据执行 1 分钟的滚动窗口操作,并每分钟发出 “temperature” 读数的平均值。它将输出写入 “downsampled” Kafka 主题。
InfluxDB v2 数据 Sink:一种服务,从 “downsampled” 主题读取数据,并将降采样的记录作为点写回 InfluxDB。
此页面是否对您有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一份子!我们欢迎并鼓励您提供关于 InfluxDB 和本文档的反馈和错误报告。要查找支持,请使用以下资源
拥有年度合同或支持合同的客户 可以 联系 InfluxData 支持。