文档文档

跨任务执行跟踪状态更改

问题

通常使用 InfluxDB 任务 来评估和分配时间序列数据的状态,然后检测这些状态的变化。任务以批处理方式处理数据,但是如果跨批处理边界发生状态更改会怎样?如果不知道上一次任务执行的最终状态,任务将无法识别它。本指南将引导您创建一个任务,该任务为行分配状态,然后使用上一次任务执行的结果来检测跨批处理边界的任何状态更改,这样您就不会错过任何状态更改。

解决方案

根据阈值显式分配数据的级别。

解决方案优势

如果您从未使用 monitor编写过任务,那么这是最容易理解的解决方案。

解决方案缺点

您必须显式定义阈值,这可能需要更多代码。

解决方案概述

创建一个任务,您可以在其中

  1. 样板代码。导入包并定义任务选项。
  2. 查询您的数据。
  3. 根据阈值将状态分配给您的数据。将此数据存储在变量中,例如“states”。
  4. 将“states”写入存储桶。
  5. 查找上一次任务运行的最新值,并将其存储在变量“last_state_previous_task”中。
  6. 联合“states”和“last_state_previous_task”。将此数据存储在变量“unioned_states”中。
  7. 在“unioned_states”中发现状态更改。将此数据存储在变量“state_changes”中。
  8. 通知跨越最后两个任务的状态更改,以捕获在任务执行之间发生的任何状态更改。

解决方案详解

  1. 导入包并定义任务选项和密钥。导入以下包
  • Flux Telegram 包:此包

  • Flux InfluxDB 密钥包:此包包含 secrets.get() 函数,该函数允许您从 InfluxDB 密钥存储中检索密钥。了解如何在 InfluxDB 中 管理密钥 以使用此包。

  • Flux InfluxDB 监控包:此包包含用于监控数据的功能和工具。

    import "contrib/sranka/telegram"
    import "influxdata/influxdb/secrets"
    import "influxdata/influxdb/monitor"
    
    option task = {name: "State changes across tasks", every: 30m, offset: 5m}
    
    telegram_token = secrets.get(key: "telegram_token")
    telegram_channel_ID = secrets.get(key: "telegram_channel_ID")
    
  1. 查询您要监控的数据。

    data = from(bucket: "example-bucket")
        // Query for data from the last successful task run or from the 1 every duration ago.
        // This ensures that you won’t miss any data.
        |> range(start: tasks.lastSuccess(orTime: -task.every))
        |> filter(fn: (r) => r._measurement == "example-measurement")
        |> filter(fn: (r) => r.tagKey1 == "example-tag-value")
        |> filter(fn: (r) => r._field == "example-field")
    

    其中 data 可能看起来像

    _measurementtagKey1_field_value_time
    example-measurementexample-tag-valueexample-field30.02022-01-01T00:00:00Z
    example-measurementexample-tag-valueexample-field50.02022-01-01T00:00:00Z
  2. 根据阈值将状态分配给您的数据。将此数据存储在变量中,例如“states”。为了简化此示例,只有两个状态:“ok”和“crit”。将状态存储在 _level 列中(monitor 包要求)。

    states =
        data
            |> map(fn: (r) => ({r with _level: if r._value > 40.0 then "crit" else "ok"}))
    

    其中 states 可能看起来像

    _measurementtagKey1_field_value_level_time
    example-measurementexample-tag-valueexample-field30.0ok2022-01-01T00:00:00Z
    example-measurementexample-tag-valueexample-field50.0crit2022-01-01T00:01:00Z
  3. 将“states”写回 InfluxDB。您可以将数据写入新测量或新存储桶。要将数据写入新测量,请使用 set() 来更新“states”数据中 _measurement 列的值。

    states
        // (Optional) Change the measurement name to write the data to a new measurement
        |> set(key: "_measurement", value: "new-measurement")
        |> to(bucket : "example-bucket") 
    
  4. 查找上一次任务运行的最新值,并将其存储在变量“last_state_previous_task”中,

    last_state_previous_task =
        from(bucket: "example-bucket")
            |> range(start: date.sub(d: task.every, from: tasks.lastSuccess(orTime: -task.every))
            |> filter(fn: (r) => r._measurement == "example-measurement")
            |> filter(fn: (r) => r.tagKey == "example-tag-value")
            |> filter(fn: (r) => r._field == "example-field")
            |> last() 
    

    其中 last_state_previous_task 可能看起来像

    _measurementtagKey1_field_value_level_time
    example-measurementexample-tag-valueexample-field55.0crit2021-12-31T23:59:00Z
  5. 联合“states”和“last_state_previous_task”。将此数据存储在变量“unioned_states”中。使用 sort() 确保行按时间排序。

    unioned_states =
        union(tables: [states, last_state_previous_task])
            |> sort(columns: ["_time"], desc: true)
    

    其中 unioned_states 可能看起来像

    _measurementtagKey1_field_value_level_time
    example-measurementexample-tag-valueexample-field55.0crit2021-12-31T23:59:00Z
    example-measurementexample-tag-valueexample-field30.0ok2022-01-01T00:00:00Z
    example-measurementexample-tag-valueexample-field50.0crit2022-01-01T00:01:00Z
  6. 使用 monitor.stateChangesOnly() 仅返回“unioned_states”中状态已更改的行。将此数据存储在变量“state_changes”中。

    state_changes =
        unioned_states 
            |> monitor.stateChangesOnly()
    

    其中 state_changes 可能看起来像

    _measurementtagKey1_field_value_level_time
    example-measurementexample-tag-valueexample-field30.0ok2022-01-01T00:00:00Z
    example-measurementexample-tag-valueexample-field50.0crit2022-01-01T00:01:00Z
  7. 通知跨越最后两个任务的状态更改,以捕获在任务执行之间发生的任何状态更改。

    state_changes =
        data
            |> map(
                fn: (r) =>
                    ({
                        _value:
                            telegram.message(
                                token: telegram_token,
                                channel: telegram_channel_ID,
                                text: "state change at ${r._value} at ${r._time}",
                            ),
                    }),
            )
    

    使用联合数据,以下警报将发送到 Telegram

    • 状态在 2022-01-01T00:00:00Z 时更改为 30.0
    • 状态在 2022-01-01T00:01:00Z 时更改为 50.0

此页内容是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在这样使用它,而无需对代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速、近实时数据引擎,可实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 构建在 Core 的基础上,增加了高可用性、读取副本、增强的安全性以及数据压缩功能,从而实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看