文档文档

reduce() 函数

reduce() 使用 reducer 函数 (fn) 聚合每个输入表中的行。

每个表的输出是表的组键,列对应于 reducer 记录中的每个字段。如果 reducer 记录包含与组键列同名的列,则组键列的值将被覆盖,并且传出的组键将更改。但是,如果两个 reduce 后的表写入相同的目标组键,则该函数将返回错误。

删除的列

reduce() 删除任何列,这些列:

  • 不是输入表的组键的一部分。
  • 未在 identity 记录或 reducer 函数 (fn) 中显式映射。
函数类型签名
(<-tables: stream[B], fn: (accumulator: A, r: B) => A, identity: A) => stream[C] where A: Record, B: Record, C: Record

有关更多信息,请参阅 函数类型签名

参数

fn

(必需) 要应用于每个行记录 (r) 的 Reducer 函数。

Reducer 函数接受两个参数:

  • r: 表示当前行的记录。
  • accumulator: 从 reducer 函数对上一行执行操作后返回的记录。

identity

(必需) 定义 reducer 记录并为第一行上的 reducer 操作提供初始值的记录。

在异步处理用例中可以多次使用。identity 记录中值的数据类型决定了输出值的数据类型。

tables

输入数据。默认为管道转发数据 (<-)。

示例

计算 value 列的总和

import "sampledata"

sampledata.int()
    |> reduce(fn: (r, accumulator) => ({sum: r._value + accumulator.sum}), identity: {sum: 0})

查看示例输入和输出

在单个 reducer 中计算总和与计数

import "sampledata"

sampledata.int()
    |> reduce(
        fn: (r, accumulator) => ({sum: r._value + accumulator.sum, count: accumulator.count + 1}),
        identity: {sum: 0, count: 0},
    )

查看示例输入和输出

计算所有值的乘积

import "sampledata"

sampledata.int()
    |> reduce(fn: (r, accumulator) => ({prod: r._value * accumulator.prod}), identity: {prod: 1})

查看示例输入和输出

计算所有值的平均值

import "sampledata"

sampledata.int()
    |> reduce(
        fn: (r, accumulator) =>
            ({
                count: accumulator.count + 1,
                total: accumulator.total + r._value,
                avg: float(v: accumulator.total + r._value) / float(v: accumulator.count + 1),
            }),
        identity: {count: 0, total: 0, avg: 0.0},
    )

查看示例输入和输出


此页内容是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像当前一样使用它,而无需对代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速的最新数据引擎,可在实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 构建在 Core 的基础上,增加了高可用性、读取副本、增强的安全性以及数据压缩,以实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看: