使用InfluxDB中的数据发送警报
查询、分析和使用存储在InfluxDB中的时间序列数据发送警报。
本指南使用Python、InfluxDB v3 Python客户端库和Python Slack SDK来演示如何从InfluxDB查询数据并向Slack发送警报,但您可以使用任何可用的InfluxDB v3客户端库以及您选择的运行时和警报平台。无论您选择哪种客户端和平台,过程都是相同的
警报流程
- 使用外部运行时和InfluxDB客户端从InfluxDB查询数据。
- 使用运行时中可用的查询数据和工具发送警报。
创建Slack应用
要将警报发送到Slack,首先创建一个Slack应用并收集所需的连接凭据以与您的应用交互。更多详细信息请参阅Slack基本应用设置文档。
安装依赖项
本指南假设您已经设置您的Python项目和虚拟环境。
使用pip
安装以下依赖项
influxdb_client_3
pandas
slack_sdk
pip install influxdb3-python pandas slack_sdk
创建InfluxDB客户端
使用influxdb_client_3
模块中的InfluxDBClient3
函数来创建一个InfluxDB客户端实例。提供以下凭据
from influxdb_client_3 import InfluxDBClient3
import pandas
# Instantiate an InfluxDBClient3 client configured for your database
influxdb = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='DATABASE_NAME'
)
创建Slack客户端
从
slack.sdk
模块导入WebClient
函数,从slack_sdk.errors
模块导入SlackApiError
函数。使用
WebClient
函数来创建一个Slack客户端实例。提供以下凭据- token:Slack机器人令牌
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
slack = WebClient(token='SLACK_BOT_TOKEN')
查询InfluxDB
定义一个SQL或InfluxQL查询以检索要警报的数据。根据您想要警报的数据,您可以
- 在查询中包含逻辑,使其仅返回应警报的结果。
- 查询用于进一步处理的数据,然后在您的运行时根据处理结果发送警报。
以下示例查询仅返回超过阈值的值,从而触发警报。
SELECT
selector_last(co, time)['time'] AS time,
selector_last(co, time)['value'] AS co,
room
FROM home
WHERE co > 10
GROUP BY room
SELECT
LAST(co) AS co,
room
FROM home
WHERE co > 10
GROUP BY room
执行查询
将查询字符串分配给变量。
使用您的已实例化的客户端的
query
方法从InfluxDB查询原始数据。提供以下参数。- query:要执行的查询字符串
- language:
sql
或influxql
使用
to_pandas
方法将返回的Arrow表转换为Pandas DataFrame。
# ...
query = '''
SELECT
selector_last(co, time)['time'] AS time,
selector_last(co, time)['value'] AS co,
room
FROM home
WHERE co > 10
GROUP BY room
'''
table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
# ...
query = '''
SELECT
LAST(co) AS co,
room
FROM home
WHERE co > 10
GROUP BY room
'''
table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()
发送警报
遍历DataFrame并针对每行发送Slack警报。
使用数据帧上的
reset_index
函数确保索引与DataFrame中的行数对齐。遍历每一行,并使用您的
chat_postMessage
方法发送消息(每行)到Slack。提供以下参数- channel:发送警报的Slack频道。
- text:要发送的消息文本。使用字符串插值将每行的列值插入到消息文本中。
# ...
data_frame = data_frame.reset_index()
for index, row in data_frame.iterrows():
slack.chat_postMessage(
channel="#SLACK_CHANNEL",
text=f'Carbon monoxide (co) high in {row.room}: {row.co} ppm at {row.time}'
)
完整的警报脚本
from influxdb_client_3 import InfluxDBClient3
import pandas
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
influxdb = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='DATABASE_NAME'
)
slack = WebClient(token='SLACK_BOT_TOKEN')
query = '''
SELECT
selector_last(co, time)['time'] AS time,
selector_last(co, time)['value'] AS co,
room
FROM home
WHERE co > 10
GROUP BY room
'''
table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
data_frame = data_frame.reset_index()
for index, row in data_frame.iterrows():
slack.chat_postMessage(
channel="#SLACK_CHANNEL",
text=f'Carbon monoxide (co) high in {row.room}: {row.co} ppm at {row.time}'
)
from influxdb_client_3 import InfluxDBClient3
import pandas
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
influxdb = InfluxDBClient3(
host='cluster-host.com',
token='DATABASE_TOKEN',
database='DATABASE_NAME'
)
slack = WebClient(token='SLACK_BOT_TOKEN')
query = '''
SELECT
LAST(co) AS co,
room
FROM home
WHERE co > 10
GROUP BY room
'''
table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()
data_frame = data_frame.reset_index()
for index, row in data_frame.iterrows():
slack.chat_postMessage(
channel="#SLACK_CHANNEL",
text=f'Carbon monoxide (co) high in {row.room}: {row.co} ppm at {row.time}'
)
本页面有帮助吗?
感谢您的反馈!