文档文档

reduce() 函数

reduce() 使用 reducer 函数 (fn) 聚合每个输入表中的行。

每个表的输出都是表的组键,列对应于 reducer 记录中的每个字段。 如果 reducer 记录包含与组键列同名的列,则组键列的值将被覆盖,并且传出的组键将被更改。 但是,如果两个缩减的表写入相同的目标组键,则该函数将返回错误。

丢弃的列

reduce() 丢弃任何列,这些列

  • 不是输入表的组键的一部分。
  • 未在 identity 记录或 reducer 函数 (fn) 中显式映射。
函数类型签名
(<-tables: stream[B], fn: (accumulator: A, r: B) => A, identity: A) => stream[C] where A: Record, B: Record, C: Record

有关更多信息,请参阅 函数类型签名

参数

fn

(必需) 应用于每个行记录 (r) 的 Reducer 函数。

Reducer 函数接受两个参数

  • r: 表示当前行的记录。
  • accumulator: 从 reducer 函数对上一行的操作返回的记录。

identity

(必需) 定义 reducer 记录并为 reducer 函数在第一行上的操作提供初始值的记录。

在异步处理用例中可以多次使用。 标识记录中值的数据类型决定了输出值的数据类型。

tables

输入数据。 默认为管道转发数据 (<-)。

示例

计算 value 列的总和

import "sampledata"

sampledata.int()
    |> reduce(fn: (r, accumulator) => ({sum: r._value + accumulator.sum}), identity: {sum: 0})

查看示例输入和输出

在单个 reducer 中计算总和和计数

import "sampledata"

sampledata.int()
    |> reduce(
        fn: (r, accumulator) => ({sum: r._value + accumulator.sum, count: accumulator.count + 1}),
        identity: {sum: 0, count: 0},
    )

查看示例输入和输出

计算所有值的乘积

import "sampledata"

sampledata.int()
    |> reduce(fn: (r, accumulator) => ({prod: r._value * accumulator.prod}), identity: {prod: 1})

查看示例输入和输出

计算所有值的平均值

import "sampledata"

sampledata.int()
    |> reduce(
        fn: (r, accumulator) =>
            ({
                count: accumulator.count + 1,
                total: accumulator.total + r._value,
                avg: float(v: accumulator.total + r._value) / float(v: accumulator.count + 1),
            }),
        identity: {count: 0, total: 0, avg: 0.0},
    )

查看示例输入和输出


此页面是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。 您可以像当前一样继续使用它,而无需更改任何代码。

阅读更多

InfluxDB 3 开源版现已进入公共 Alpha 测试阶段

InfluxDB 3 开源版现已可用于 alpha 测试,根据 MIT 或 Apache 2 许可进行授权。

我们将发布两款产品作为 Alpha 测试的一部分。

InfluxDB 3 Core 是我们新的开源产品。 它是用于时间序列和事件数据的最新数据引擎。 InfluxDB 3 Enterprise 是一个商业版本,它建立在 Core 的基础上,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度的安全性。

有关如何开始使用的更多信息,请查看