reduce()函数
reduce()
函数使用一个缩减函数(fn
)对每个输入表中的行进行聚合。
每个表的输出是表的组键,列对应于缩减记录中的每个字段。如果缩减记录包含与组键列同名的列,则覆盖组键列的值,并且输出的组键会更改。然而,如果两个缩减表写入相同的目标组键,则函数返回错误。
删除的列
reduce()
删除以下列:
- 不是输入表组键的一部分。
- 没有在
identity
记录或缩减函数(fn
)中显式映射。
函数类型签名
(<-tables: stream[B], fn: (accumulator: A, r: B) => A, identity: A) => stream[C] where A: Record, B: Record, C: Record
参数
fn
(必需) 应用到每个行记录(r
)的缩减函数。
缩减函数接受两个参数
- r:表示当前行的记录。
- accumulator:缩减函数对前一行的操作返回的记录。
identity
(必需) 定义缩减记录并为缩减操作的第一行提供初始值的记录。
在异步处理用例中可能使用多次。身份记录中值的类型决定了输出值的类型。
tables
输入数据。默认是管道前数据(<-
)。
示例
计算值列的总和
import "sampledata"
sampledata.int()
|> reduce(fn: (r, accumulator) => ({sum: r._value + accumulator.sum}), identity: {sum: 0})
在单个缩减器中计算总和和计数
import "sampledata"
sampledata.int()
|> reduce(
fn: (r, accumulator) => ({sum: r._value + accumulator.sum, count: accumulator.count + 1}),
identity: {sum: 0, count: 0},
)
计算所有值的乘积
import "sampledata"
sampledata.int()
|> reduce(fn: (r, accumulator) => ({prod: r._value * accumulator.prod}), identity: {prod: 1})
计算所有值的平均值
import "sampledata"
sampledata.int()
|> reduce(
fn: (r, accumulator) =>
({
count: accumulator.count + 1,
total: accumulator.total + r._value,
avg: float(v: accumulator.total + r._value) / float(v: accumulator.count + 1),
}),
identity: {count: 0, total: 0, avg: 0.0},
)
这个页面有帮助吗?
感谢您的反馈!