文档文档

使用 InfluxDB 中的数据发送警报

使用存储在 InfluxDB 中的时间序列数据进行查询、分析和发送警报。

本指南使用 PythonInfluxDB 3 Python 客户端库Python Slack SDK 来演示如何从 InfluxDB 查询数据并将警报发送到 Slack,但您可以将您选择的运行时和警报平台与任何可用的 InfluxDB 3 客户端库 一起使用。无论您选择使用什么客户端和平台,流程都是相同的

警报流程

  1. 使用外部运行时和 InfluxDB 客户端从 InfluxDB 查询数据。
  2. 使用查询到的数据和运行时中可用的工具发送警报。

创建 Slack 应用

要将警报发送到 Slack,首先创建一个 Slack 应用并收集所需的连接凭据以与您的应用进行交互。更多信息请参阅 Slack 基本应用设置文档

安装依赖项

使用 pip 安装以下依赖项

  • influxdb_client_3
  • pandas
  • slack_sdk
pip install influxdb3-python pandas slack_sdk

创建 InfluxDB 客户端

使用 influxdb_client_3 模块中的 InfluxDBClient3 函数来实例化 InfluxDB 客户端。提供以下凭据

  • host: 您的 InfluxDB 集群 URL (不带协议)
  • org: 您的 InfluxDB 组织名称
  • token: 具有对您要查询的数据库的读取权限的 数据库令牌
  • database: 您的 数据库 名称
from influxdb_client_3 import InfluxDBClient3
import pandas

# Instantiate an InfluxDBClient3 client configured for your database
influxdb = InfluxDBClient3(
    host='cluster-host.com',
    token='
DATABASE_TOKEN
'
,
database='
DATABASE_NAME
'
)

创建 Slack 客户端

  1. slack.sdk 模块导入 WebClient 函数,并从 slack_sdk.errors 模块导入 SlackApiError 函数。

  2. 使用 WebClient 函数实例化 Slack 客户端。提供以下凭据

from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

slack = WebClient(token='
SLACK_BOT_TOKEN
'
)

查询 InfluxDB

定义 SQL 或 InfluxQL 查询以检索要发出警报的数据。根据您要警报的数据,您可以

  • 在查询中包含逻辑,使其仅返回应发出警报的结果。
  • 查询进一步处理所需的数据,然后根据在运行时中执行的处理发送警报。

以下示例查询仅返回高于应触发警报的阈值的值。

SELECT
  selector_last(co, time)['time'] AS time,
  selector_last(co, time)['value'] AS co,
  room
FROM home
WHERE co > 10
GROUP BY room
SELECT
  LAST(co) AS co,
  room
FROM home
WHERE co > 10
GROUP BY room

执行查询

  1. 将查询字符串分配给变量。

  2. 使用您的 实例化的客户端query 方法从 InfluxDB 查询原始数据。提供以下参数。

    • query: 要执行的查询字符串
    • language: sqlinfluxql
  3. 使用 to_pandas 方法将返回的 Arrow 表转换为 Pandas DataFrame。

# ...

query = '''
SELECT
  selector_last(co, time)['time'] AS time,
  selector_last(co, time)['value'] AS co,
  room
FROM home
WHERE co > 10
GROUP BY room
'''

table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
# ...

query = '''
SELECT
  LAST(co) AS co,
  room
FROM home
WHERE co > 10
GROUP BY room
'''

table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()

发送警报

遍历 DataFrame 并为每一行向 Slack 发送警报。

  1. 在数据帧上使用 reset_index 函数以确保索引与 DataFrame 中的行数对齐。

  2. 遍历每一行,并使用您的 Slack 客户端chat_postMessage 方法向 Slack 发送消息(每行一条)。提供以下参数

    • channel: 要向其发送警报的 Slack 频道。
    • text: 要发送的消息文本。使用字符串插值将每行中的列值插入到消息文本中。
# ...

data_frame = data_frame.reset_index()

for index, row in data_frame.iterrows():
    slack.chat_postMessage(
        channel="#
SLACK_CHANNEL
"
,
text=f'Carbon monoxide (co) high in {row.room}: {row.co} ppm at {row.time}' )

完整警报脚本

from influxdb_client_3 import InfluxDBClient3
import pandas
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

influxdb = InfluxDBClient3(
    host='cluster-host.com',
    token='
DATABASE_TOKEN
'
,
database='
DATABASE_NAME
'
) slack = WebClient(token='
SLACK_BOT_TOKEN
'
)
query = ''' SELECT selector_last(co, time)['time'] AS time, selector_last(co, time)['value'] AS co, room FROM home WHERE co > 10 GROUP BY room ''' table = influxdb_raw.query(query=query, language="sql") data_frame = table.to_pandas() data_frame = data_frame.reset_index() for index, row in data_frame.iterrows(): slack.chat_postMessage( channel="#
SLACK_CHANNEL
"
,
text=f'Carbon monoxide (co) high in {row.room}: {row.co} ppm at {row.time}' )
from influxdb_client_3 import InfluxDBClient3
import pandas
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

influxdb = InfluxDBClient3(
    host='cluster-host.com',
    token='
DATABASE_TOKEN
'
,
database='
DATABASE_NAME
'
) slack = WebClient(token='
SLACK_BOT_TOKEN
'
)
query = ''' SELECT LAST(co) AS co, room FROM home WHERE co > 10 GROUP BY room ''' table = influxdb_raw.query(query=query, language="influxql") data_frame = table.to_pandas() data_frame = data_frame.reset_index() for index, row in data_frame.iterrows(): slack.chat_postMessage( channel="#
SLACK_CHANNEL
"
,
text=f'Carbon monoxide (co) high in {row.room}: {row.co} ppm at {row.time}' )

此页是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 将进入维护模式。您可以继续像现在一样使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开发布 Alpha 版

InfluxDB 3 开源版本现已可用于 Alpha 测试,根据 MIT 或 Apache 2 许可授权。

我们正在发布两个产品作为 Alpha 版本的一部分。

InfluxDB 3 Core 是我们新的开源产品。它是一个用于时间序列和事件数据的最新数据引擎。InfluxDB 3 Enterprise 是一个商业版本,它建立在 Core 的基础上,增加了历史查询功能、读取副本、高可用性、可扩展性和细粒度的安全性。

有关如何开始使用的更多信息,请查看