文档文档

跨任务执行跟踪状态更改

问题

通常使用 InfluxDB 任务 来评估和分配状态给您的时间序列数据,然后检测这些状态的变化。任务分批处理数据,但是如果跨批次边界发生状态更改会怎么样?任务在不知道先前任务执行的最终状态的情况下将无法识别它。本指南将引导您创建一个任务,该任务为行分配状态,然后使用先前任务执行的结果来检测跨批次边界的任何状态更改,以便您不会错过任何状态更改。

解决方案

根据阈值显式分配数据级别。

解决方案优势

如果您从未编写过带有 monitor的任务,那么这是最容易理解的解决方案。

解决方案缺点

您必须显式定义阈值,这可能需要更多代码。

解决方案概述

创建一个任务,您可以在其中

  1. 样板代码。导入包并定义任务选项。
  2. 查询您的数据。
  3. 根据阈值将状态分配给您的数据。将此数据存储在变量中,例如“states”。
  4. 将“states”写入存储桶。
  5. 查找先前任务运行中的最新值,并将其存储在变量“last_state_previous_task”中。
  6. 联合“states”和“last_state_previous_task”。将此数据存储在变量“unioned_states”中。
  7. 在“unioned_states”中发现状态更改。将此数据存储在变量“state_changes”中。
  8. 通知跨越最后两个任务的状态更改,以捕获跨任务执行发生的任何状态更改。

解决方案详解

  1. 导入包并定义任务选项和密钥。导入以下包
  • Flux Telegram 包:此包

  • Flux InfluxDB 密钥包:此包包含 secrets.get() 函数,该函数允许您从 InfluxDB 密钥存储中检索密钥。了解如何在 InfluxDB 中管理密钥以使用此包。

  • Flux InfluxDB 监控包:此包包含用于监控您的数据的函数和工具。

    import "contrib/sranka/telegram"
    import "influxdata/influxdb/secrets"
    import "influxdata/influxdb/monitor"
    
    option task = {name: "State changes across tasks", every: 30m, offset: 5m}
    
    telegram_token = secrets.get(key: "telegram_token")
    telegram_channel_ID = secrets.get(key: "telegram_channel_ID")
    
  1. 查询您要监控的数据。

    data = from(bucket: "example-bucket")
        // Query for data from the last successful task run or from the 1 every duration ago.
        // This ensures that you won’t miss any data.
        |> range(start: tasks.lastSuccess(orTime: -task.every))
        |> filter(fn: (r) => r._measurement == "example-measurement")
        |> filter(fn: (r) => r.tagKey1 == "example-tag-value")
        |> filter(fn: (r) => r._field == "example-field")
    

    其中 data 可能看起来像

    _measurementtagKey1_field_value_time
    example-measurementexample-tag-valueexample-field30.02022-01-01T00:00:00Z
    example-measurementexample-tag-valueexample-field50.02022-01-01T00:00:00Z
  2. 根据阈值将状态分配给您的数据。将此数据存储在变量中,例如“states”。为了简化此示例,只有两个状态:“ok”和“crit”。将状态存储在 _level 列中(monitor 包需要)。

    states =
        data
            |> map(fn: (r) => ({r with _level: if r._value > 40.0 then "crit" else "ok"}))
    

    其中 states 可能看起来像

    _measurementtagKey1_field_value_level_time
    example-measurementexample-tag-valueexample-field30.0ok2022-01-01T00:00:00Z
    example-measurementexample-tag-valueexample-field50.0crit2022-01-01T00:01:00Z
  3. 将“states”写回 InfluxDB。您可以将数据写入新的测量或新的存储桶。要将数据写入新的测量,请使用 set() 来更新“states”数据中 _measurement 列的值。

    states
        // (Optional) Change the measurement name to write the data to a new measurement
        |> set(key: "_measurement", value: "new-measurement")
        |> to(bucket : "example-bucket") 
    
  4. 查找先前任务运行中的最新值,并将其存储在变量“last_state_previous_task”中,

    last_state_previous_task =
        from(bucket: "example-bucket")
            |> range(start: date.sub(d: task.every, from: tasks.lastSuccess(orTime: -task.every))
            |> filter(fn: (r) => r._measurement == "example-measurement")
            |> filter(fn: (r) => r.tagKey == "example-tag-value")
            |> filter(fn: (r) => r._field == "example-field")
            |> last() 
    

    其中 last_state_previous_task 可能看起来像

    _measurementtagKey1_field_value_level_time
    example-measurementexample-tag-valueexample-field55.0crit2021-12-31T23:59:00Z
  5. 联合“states”和“last_state_previous_task”。将此数据存储在变量“unioned_states”中。使用 sort() 以确保行按时间排序。

    unioned_states =
        union(tables: [states, last_state_previous_task])
            |> sort(columns: ["_time"], desc: true)
    

    其中 unioned_states 可能看起来像

    _measurementtagKey1_field_value_level_time
    example-measurementexample-tag-valueexample-field55.0crit2021-12-31T23:59:00Z
    example-measurementexample-tag-valueexample-field30.0ok2022-01-01T00:00:00Z
    example-measurementexample-tag-valueexample-field50.0crit2022-01-01T00:01:00Z
  6. 使用 monitor.stateChangesOnly() 仅返回“unioned_states”中状态更改的行。将此数据存储在变量“state_changes”中。

    state_changes =
        unioned_states 
            |> monitor.stateChangesOnly()
    

    其中 state_changes 可能看起来像

    _measurementtagKey1_field_value_level_time
    example-measurementexample-tag-valueexample-field30.0ok2022-01-01T00:00:00Z
    example-measurementexample-tag-valueexample-field50.0crit2022-01-01T00:01:00Z
  7. 通知跨越最后两个任务的状态更改,以捕获跨任务执行发生的任何状态更改。

    state_changes =
        data
            |> map(
                fn: (r) =>
                    ({
                        _value:
                            telegram.message(
                                token: telegram_token,
                                channel: telegram_channel_ID,
                                text: "state change at ${r._value} at ${r._time}",
                            ),
                    }),
            )
    

    使用联合数据,以下警报将发送到 Telegram

    • 状态在 30.0 处更改,时间为 2022-01-01T00:00:00Z
    • 状态在 50.0 处更改,时间为 2022-01-01T00:01:00Z

此页对您有帮助吗?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在一样使用它,而无需对代码进行任何更改。

阅读更多

InfluxDB 3 开源现在处于公开 Alpha 阶段

InfluxDB 3 开源现在可用于 alpha 测试,根据 MIT 或 Apache 2 许可获得许可。

我们正在发布两个产品作为 alpha 的一部分。

InfluxDB 3 Core,是我们的新开源产品。它是用于时间序列和事件数据的最新数据引擎。InfluxDB 3 Enterprise 是一个商业版本,它建立在 Core 的基础上,增加了历史查询功能、读取副本、高可用性、可伸缩性和细粒度的安全性。

有关如何入门的更多信息,请查看