使用 InfluxDB 中的数据发送警报
使用存储在 InfluxDB 中的时间序列数据进行查询、分析和发送警报。
本指南使用 Python、InfluxDB 3 Python 客户端库 和 Python Slack SDK 来演示如何从 InfluxDB 查询数据并将警报发送到 Slack,但您可以将您选择的运行时和警报平台与任何可用的 InfluxDB 3 客户端库 一起使用。无论您选择使用什么客户端和平台,流程都是相同的
警报流程
- 使用外部运行时和 InfluxDB 客户端从 InfluxDB 查询数据。
- 使用查询到的数据和运行时中可用的工具发送警报。
创建 Slack 应用
要将警报发送到 Slack,首先创建一个 Slack 应用并收集所需的连接凭据以与您的应用进行交互。更多信息请参阅 Slack 基本应用设置文档。
安装依赖项
本指南假设您已经 设置了您的 Python 项目和虚拟环境。
使用 pip
安装以下依赖项
influxdb_client_3
pandas
slack_sdk
pip install influxdb3-python pandas slack_sdk
创建 InfluxDB 客户端
使用 influxdb_client_3
模块中的 InfluxDBClient3
函数来实例化 InfluxDB 客户端。提供以下凭据
- host: 您的 InfluxDB 集群 URL (不带协议)
- org: 您的 InfluxDB 组织名称
- token: 具有对您要查询的数据库的读取权限的 数据库令牌
- database: 您的 数据库 名称
from influxdb_client_3 import InfluxDBClient3
import pandas
# Instantiate an InfluxDBClient3 client configured for your database
influxdb = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='DATABASE_NAME'
)
创建 Slack 客户端
从
slack.sdk
模块导入WebClient
函数,并从slack_sdk.errors
模块导入SlackApiError
函数。使用
WebClient
函数实例化 Slack 客户端。提供以下凭据- token: Slack 机器人令牌
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
slack = WebClient(token='SLACK_BOT_TOKEN')
查询 InfluxDB
定义 SQL 或 InfluxQL 查询以检索要发出警报的数据。根据您要警报的数据,您可以
- 在查询中包含逻辑,使其仅返回应发出警报的结果。
- 查询进一步处理所需的数据,然后根据在运行时中执行的处理发送警报。
以下示例查询仅返回高于应触发警报的阈值的值。
SELECT
selector_last(co, time)['time'] AS time,
selector_last(co, time)['value'] AS co,
room
FROM home
WHERE co > 10
GROUP BY room
SELECT
LAST(co) AS co,
room
FROM home
WHERE co > 10
GROUP BY room
执行查询
将查询字符串分配给变量。
使用您的 实例化的客户端 的
query
方法从 InfluxDB 查询原始数据。提供以下参数。- query: 要执行的查询字符串
- language:
sql
或influxql
使用
to_pandas
方法将返回的 Arrow 表转换为 Pandas DataFrame。
# ...
query = '''
SELECT
selector_last(co, time)['time'] AS time,
selector_last(co, time)['value'] AS co,
room
FROM home
WHERE co > 10
GROUP BY room
'''
table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
# ...
query = '''
SELECT
LAST(co) AS co,
room
FROM home
WHERE co > 10
GROUP BY room
'''
table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()
发送警报
遍历 DataFrame 并为每一行向 Slack 发送警报。
在数据帧上使用
reset_index
函数以确保索引与 DataFrame 中的行数对齐。遍历每一行,并使用您的 Slack 客户端 的
chat_postMessage
方法向 Slack 发送消息(每行一条)。提供以下参数- channel: 要向其发送警报的 Slack 频道。
- text: 要发送的消息文本。使用字符串插值将每行中的列值插入到消息文本中。
# ...
data_frame = data_frame.reset_index()
for index, row in data_frame.iterrows():
slack.chat_postMessage(
channel="#SLACK_CHANNEL",
text=f'Carbon monoxide (co) high in {row.room}: {row.co} ppm at {row.time}'
)
完整警报脚本
from influxdb_client_3 import InfluxDBClient3
import pandas
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
influxdb = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='DATABASE_NAME'
)
slack = WebClient(token='SLACK_BOT_TOKEN')
query = '''
SELECT
selector_last(co, time)['time'] AS time,
selector_last(co, time)['value'] AS co,
room
FROM home
WHERE co > 10
GROUP BY room
'''
table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
data_frame = data_frame.reset_index()
for index, row in data_frame.iterrows():
slack.chat_postMessage(
channel="#SLACK_CHANNEL",
text=f'Carbon monoxide (co) high in {row.room}: {row.co} ppm at {row.time}'
)
from influxdb_client_3 import InfluxDBClient3
import pandas
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
influxdb = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='DATABASE_NAME'
)
slack = WebClient(token='SLACK_BOT_TOKEN')
query = '''
SELECT
LAST(co) AS co,
room
FROM home
WHERE co > 10
GROUP BY room
'''
table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()
data_frame = data_frame.reset_index()
for index, row in data_frame.iterrows():
slack.chat_postMessage(
channel="#SLACK_CHANNEL",
text=f'Carbon monoxide (co) high in {row.room}: {row.co} ppm at {row.time}'
)
此页是否对您有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一份子!我们欢迎并鼓励您对 InfluxDB Clustered 和本文档提出反馈和错误报告。要寻求支持,请使用以下资源
拥有年度合同或支持合同的客户 可以 联系 InfluxData 支持。