跨任务执行跟踪状态更改
问题
通常使用 InfluxDB 任务 来评估和分配状态给您的时间序列数据,然后检测这些状态的变化。任务分批处理数据,但是如果跨批次边界发生状态更改会怎么样?任务在不知道先前任务执行的最终状态的情况下将无法识别它。本指南将引导您创建一个任务,该任务为行分配状态,然后使用先前任务执行的结果来检测跨批次边界的任何状态更改,以便您不会错过任何状态更改。
解决方案
根据阈值显式分配数据级别。
解决方案优势
如果您从未编写过带有 monitor
包的任务,那么这是最容易理解的解决方案。
解决方案缺点
您必须显式定义阈值,这可能需要更多代码。
解决方案概述
创建一个任务,您可以在其中
- 样板代码。导入包并定义任务选项。
- 查询您的数据。
- 根据阈值将状态分配给您的数据。将此数据存储在变量中,例如“states”。
- 将“states”写入存储桶。
- 查找先前任务运行中的最新值,并将其存储在变量“last_state_previous_task”中。
- 联合“states”和“last_state_previous_task”。将此数据存储在变量“unioned_states”中。
- 在“unioned_states”中发现状态更改。将此数据存储在变量“state_changes”中。
- 通知跨越最后两个任务的状态更改,以捕获跨任务执行发生的任何状态更改。
解决方案详解
- 导入包并定义任务选项和密钥。导入以下包
Flux InfluxDB 密钥包:此包包含 secrets.get() 函数,该函数允许您从 InfluxDB 密钥存储中检索密钥。了解如何在 InfluxDB 中管理密钥以使用此包。
Flux InfluxDB 监控包:此包包含用于监控您的数据的函数和工具。
import "contrib/sranka/telegram" import "influxdata/influxdb/secrets" import "influxdata/influxdb/monitor" option task = {name: "State changes across tasks", every: 30m, offset: 5m} telegram_token = secrets.get(key: "telegram_token") telegram_channel_ID = secrets.get(key: "telegram_channel_ID")
查询您要监控的数据。
data = from(bucket: "example-bucket") // Query for data from the last successful task run or from the 1 every duration ago. // This ensures that you won’t miss any data. |> range(start: tasks.lastSuccess(orTime: -task.every)) |> filter(fn: (r) => r._measurement == "example-measurement") |> filter(fn: (r) => r.tagKey1 == "example-tag-value") |> filter(fn: (r) => r._field == "example-field")
其中
data
可能看起来像_measurement tagKey1 _field _value _time example-measurement example-tag-value example-field 30.0 2022-01-01T00:00:00Z example-measurement example-tag-value example-field 50.0 2022-01-01T00:00:00Z 根据阈值将状态分配给您的数据。将此数据存储在变量中,例如“states”。为了简化此示例,只有两个状态:“ok”和“crit”。将状态存储在
_level
列中(monitor
包需要)。states = data |> map(fn: (r) => ({r with _level: if r._value > 40.0 then "crit" else "ok"}))
其中
states
可能看起来像_measurement tagKey1 _field _value _level _time example-measurement example-tag-value example-field 30.0 ok 2022-01-01T00:00:00Z example-measurement example-tag-value example-field 50.0 crit 2022-01-01T00:01:00Z 将“states”写回 InfluxDB。您可以将数据写入新的测量或新的存储桶。要将数据写入新的测量,请使用
set()
来更新“states”数据中_measurement
列的值。states // (Optional) Change the measurement name to write the data to a new measurement |> set(key: "_measurement", value: "new-measurement") |> to(bucket : "example-bucket")
查找先前任务运行中的最新值,并将其存储在变量“last_state_previous_task”中,
last_state_previous_task = from(bucket: "example-bucket") |> range(start: date.sub(d: task.every, from: tasks.lastSuccess(orTime: -task.every)) |> filter(fn: (r) => r._measurement == "example-measurement") |> filter(fn: (r) => r.tagKey == "example-tag-value") |> filter(fn: (r) => r._field == "example-field") |> last()
其中
last_state_previous_task
可能看起来像_measurement tagKey1 _field _value _level _time example-measurement example-tag-value example-field 55.0 crit 2021-12-31T23:59:00Z 联合“states”和“last_state_previous_task”。将此数据存储在变量“unioned_states”中。使用
sort()
以确保行按时间排序。unioned_states = union(tables: [states, last_state_previous_task]) |> sort(columns: ["_time"], desc: true)
其中
unioned_states
可能看起来像_measurement tagKey1 _field _value _level _time example-measurement example-tag-value example-field 55.0 crit 2021-12-31T23:59:00Z example-measurement example-tag-value example-field 30.0 ok 2022-01-01T00:00:00Z example-measurement example-tag-value example-field 50.0 crit 2022-01-01T00:01:00Z 使用
monitor.stateChangesOnly()
仅返回“unioned_states”中状态更改的行。将此数据存储在变量“state_changes”中。state_changes = unioned_states |> monitor.stateChangesOnly()
其中
state_changes
可能看起来像_measurement tagKey1 _field _value _level _time example-measurement example-tag-value example-field 30.0 ok 2022-01-01T00:00:00Z example-measurement example-tag-value example-field 50.0 crit 2022-01-01T00:01:00Z 通知跨越最后两个任务的状态更改,以捕获跨任务执行发生的任何状态更改。
state_changes = data |> map( fn: (r) => ({ _value: telegram.message( token: telegram_token, channel: telegram_channel_ID, text: "state change at ${r._value} at ${r._time}", ), }), )
使用联合数据,以下警报将发送到 Telegram
状态在 30.0 处更改,时间为 2022-01-01T00:00:00Z
状态在 50.0 处更改,时间为 2022-01-01T00:01:00Z
此页对您有帮助吗?
感谢您的反馈!