文档说明

使用InfluxDB中的数据发送警报

查询、分析和使用存储在InfluxDB中的时间序列数据发送警报。

本指南使用PythonInfluxDB v3 Python客户端库Python Slack SDK来演示如何从InfluxDB查询数据并向Slack发送警报,但您可以使用任何可用的InfluxDB v3客户端库以及您选择的运行时和警报平台。无论您选择哪种客户端和平台,过程都是相同的

警报流程

  1. 使用外部运行时和InfluxDB客户端从InfluxDB查询数据。
  2. 使用运行时中可用的查询数据和工具发送警报。

创建Slack应用

要将警报发送到Slack,首先创建一个Slack应用并收集所需的连接凭据以与您的应用交互。更多详细信息请参阅Slack基本应用设置文档

安装依赖项

本指南假设您已经设置您的Python项目和虚拟环境

使用pip安装以下依赖项

  • influxdb_client_3
  • pandas
  • slack_sdk
pip install influxdb3-python pandas slack_sdk

创建InfluxDB客户端

使用influxdb_client_3模块中的InfluxDBClient3函数来创建一个InfluxDB客户端实例。提供以下凭据

  • host:您的InfluxDB集群URL (不包括协议)
  • org:您的InfluxDB组织名称
  • token:一个具有查询数据库权限的数据库令牌
  • database:您的数据库名称
from influxdb_client_3 import InfluxDBClient3
import pandas

# Instantiate an InfluxDBClient3 client configured for your database
influxdb = InfluxDBClient3(
    host='cluster-host.com',
    token='
DATABASE_TOKEN
'
,
database='
DATABASE_NAME
'
)

创建Slack客户端

  1. slack.sdk模块导入WebClient函数,从slack_sdk.errors模块导入SlackApiError函数。

  2. 使用WebClient函数来创建一个Slack客户端实例。提供以下凭据

from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

slack = WebClient(token='
SLACK_BOT_TOKEN
'
)

查询InfluxDB

定义一个SQL或InfluxQL查询以检索要警报的数据。根据您想要警报的数据,您可以

  • 在查询中包含逻辑,使其仅返回应警报的结果。
  • 查询用于进一步处理的数据,然后在您的运行时根据处理结果发送警报。

以下示例查询仅返回超过阈值的值,从而触发警报。

SELECT
  selector_last(co, time)['time'] AS time,
  selector_last(co, time)['value'] AS co,
  room
FROM home
WHERE co > 10
GROUP BY room
SELECT
  LAST(co) AS co,
  room
FROM home
WHERE co > 10
GROUP BY room

执行查询

  1. 将查询字符串分配给变量。

  2. 使用您的已实例化的客户端的query方法从InfluxDB查询原始数据。提供以下参数。

    • query:要执行的查询字符串
    • languagesqlinfluxql
  3. 使用to_pandas方法将返回的Arrow表转换为Pandas DataFrame。

# ...

query = '''
SELECT
  selector_last(co, time)['time'] AS time,
  selector_last(co, time)['value'] AS co,
  room
FROM home
WHERE co > 10
GROUP BY room
'''

table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
# ...

query = '''
SELECT
  LAST(co) AS co,
  room
FROM home
WHERE co > 10
GROUP BY room
'''

table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()

发送警报

遍历DataFrame并针对每行发送Slack警报。

  1. 使用数据帧上的reset_index函数确保索引与DataFrame中的行数对齐。

  2. 遍历每一行,并使用您的chat_postMessage方法发送消息(每行)到Slack。提供以下参数

    • channel:发送警报的Slack频道。
    • text:要发送的消息文本。使用字符串插值将每行的列值插入到消息文本中。
# ...

data_frame = data_frame.reset_index()

for index, row in data_frame.iterrows():
    slack.chat_postMessage(
        channel="#
SLACK_CHANNEL
"
,
text=f'Carbon monoxide (co) high in {row.room}: {row.co} ppm at {row.time}' )

完整的警报脚本

from influxdb_client_3 import InfluxDBClient3
import pandas
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

influxdb = InfluxDBClient3(
    host='cluster-host.com',
    token='
DATABASE_TOKEN
'
,
database='
DATABASE_NAME
'
) slack = WebClient(token='
SLACK_BOT_TOKEN
'
)
query = ''' SELECT selector_last(co, time)['time'] AS time, selector_last(co, time)['value'] AS co, room FROM home WHERE co > 10 GROUP BY room ''' table = influxdb_raw.query(query=query, language="sql") data_frame = table.to_pandas() data_frame = data_frame.reset_index() for index, row in data_frame.iterrows(): slack.chat_postMessage( channel="#
SLACK_CHANNEL
"
,
text=f'Carbon monoxide (co) high in {row.room}: {row.co} ppm at {row.time}' )
from influxdb_client_3 import InfluxDBClient3
import pandas
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

influxdb = InfluxDBClient3(
    host='cluster-host.com',
    token='
DATABASE_TOKEN
'
,
database='
DATABASE_NAME
'
) slack = WebClient(token='
SLACK_BOT_TOKEN
'
)
query = ''' SELECT LAST(co) AS co, room FROM home WHERE co > 10 GROUP BY room ''' table = influxdb_raw.query(query=query, language="influxql") data_frame = table.to_pandas() data_frame = data_frame.reset_index() for index, row in data_frame.iterrows(): slack.chat_postMessage( channel="#
SLACK_CHANNEL
"
,
text=f'Carbon monoxide (co) high in {row.room}: {row.co} ppm at {row.time}' )

本页面有帮助吗?

感谢您的反馈!


Flux的未来

Flux 正在进入维护模式。您可以继续像现在这样使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB v3增强功能和InfluxDB集群版现已全面上市

新功能包括更快的查询性能和管理工具,推动InfluxDB v3产品线的进步。InfluxDB集群版现已全面上市。

InfluxDB v3性能和特性

InfluxDB v3产品线在查询性能方面取得了显著提升,并提供了新的管理工具。这些增强包括一个操作仪表板,用于监控InfluxDB集群的健康状况,InfluxDB云专用版中的单点登录(SSO)支持,以及用于令牌和数据库的新管理API。

了解v3增强功能


InfluxDB集群版全面上市

InfluxDB集群版现已全面上市,为您在自行管理的堆栈中提供InfluxDB v3的力量。

与我们谈论InfluxDB集群版