文档文档

StreamNode

stream 节点代表通过 Kapacitor 的任何输入传入数据的源。stream 任务中的 stream 变量是 StreamNode 的一个实例。 StreamNode.From 是此节点的 方法/属性。

构造函数

链式方法描述
stream没有构造函数签名。

属性方法

Setter 方法描述
quiet ( )抑制此节点的所有错误日志事件。

链式方法

Deadman, From, Stats


属性

属性方法会修改调用节点的状态。它们不会向管道添加新节点,并且始终返回对调用节点的引用。属性方法使用 . 运算符标记。

Quiet

抑制此节点的所有错误日志事件。

stream.quiet()

链式方法

链式方法在管道中创建一个新的节点作为调用节点的子节点。它们不会修改调用节点。链式方法使用 | 运算符标记。

死人开关(Deadman)

用于创建低吞吐量警报(即死人开关)的辅助函数。

  • 阈值:如果吞吐量低于每 points/interval 的点数,则触发警报。
  • 间隔:检查吞吐量的频率。
  • 表达式:可选的表达式列表,也用于评估。对于按时警报很有用。

示例

    var data = stream
        |from()...
    // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
    data
        |deadman(100.0, 10s)
    //Do normal processing of data
    data...

以上等同于以下示例

    var data = stream
        |from()...
    // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
    data
        |stats(10s)
            .align()
        |derivative('emitted')
            .unit(10s)
            .nonNegative()
        |alert()
            .id('node \'stream0\' in task \'{{ .TaskName }}\'')
            .message('{{ .ID }} is {{ if eq .Level "OK" }}alive{{ else }}dead{{ end }}: {{ index .Fields "emitted" | printf "%0.3f" }} points/10s.')
            .crit(lambda: "emitted" <= 100.0)
    //Do normal processing of data
    data...

idmessage 警报属性可以通过 'deadman' 配置部分全局配置。

由于 AlertNode 是最后一个组件,因此可以像往常一样对其进行进一步修改。示例

    var data = stream
        |from()...
    // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
    data
        |deadman(100.0, 10s)
            .slack()
            .channel('#dead_tasks')
    //Do normal processing of data
    data...

您可以指定额外的 lambda 表达式来进一步限制死人开关的触发时间。示例

    var data = stream
        |from()...
    // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
    // Only trigger the alert if the time of day is between 8am-5pm.
    data
        |deadman(100.0, 10s, lambda: hour("time") >= 8 AND hour("time") <= 17)
    //Do normal processing of data
    data...
stream|deadman(threshold float64, interval time.Duration, expr ...ast.LambdaNode)

返回:AlertNode

From

创建一个新的 FromNode,然后可以使用 Database、RetentionPolicy、Measurement 和 Where 属性对其进行进一步过滤。可以多次调用 From 来创建数据流的多个独立分支。

示例

    // Select the 'cpu' measurement from just the database 'mydb'
    // and retention policy 'myrp'.
    var cpu = stream
        |from()
            .database('mydb')
            .retentionPolicy('myrp')
            .measurement('cpu')
    // Select the 'load' measurement from any database and retention policy.
    var load = stream
        |from()
            .measurement('load')
    // Join cpu and load streams and do further processing.
    cpu
        |join(load)
            .as('cpu', 'load')
        ...
stream|from()

返回: FromNode

Stats

创建一个新的数据流,其中包含节点的内部统计信息。interval 表示基于实际时间发出统计信息的频率。这意味着 interval 时间与源节点接收的数据点的时间无关。

stream|stats(interval time.Duration)

返回:StatsNode


此页面是否有帮助?

感谢您的反馈!


InfluxDB 3.8 新特性

InfluxDB 3.8 和 InfluxDB 3 Explorer 1.6 的主要增强功能。

查看博客文章

InfluxDB 3.8 现已适用于 Core 和 Enterprise 版本,同时发布了 InfluxDB 3 Explorer UI 的 1.6 版本。本次发布着重于操作成熟度,以及如何更轻松地部署、管理和可靠地运行 InfluxDB。

更多信息,请查看

InfluxDB Docker 的 latest 标签将指向 InfluxDB 3 Core

在 **2026 年 2 月 3 日**,InfluxDB Docker 镜像的 latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。

如果使用 Docker 来安装和运行 InfluxDB,latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。例如,如果使用 Docker 运行 InfluxDB v2,请将 latest 版本标签替换为 Docker pull 命令中的特定版本标签 — 例如

docker pull influxdb:2