创建自定义聚合函数
要聚合您的数据,请使用 Flux 聚合函数 或使用 reduce()
函数 创建自定义聚合函数。
聚合函数特性
聚合函数都具有相同的基本特性
- 它们对单个输入表进行操作,并将所有记录转换为单个记录。
- 输出表具有与输入表相同的 分组键。
reduce()
函数的工作原理
reduce()
函数使用 fn
参数 中定义的函数,一次操作一行。fn
函数使用以下参数指定的两个 记录 将键映射到特定值
参数 | 描述 |
---|---|
r | 表示行或记录的记录。 |
accumulator | 包含每行聚合计算中使用的值的记录。 |
reduce()
函数的 identity
参数 定义了初始 accumulator
记录。
reduce()
函数示例
以下 reduce()
函数示例生成输入表中所有值的总和与乘积。
|> reduce(
fn: (r, accumulator) => ({
sum: r._value + accumulator.sum,
product: r._value * accumulator.product
}),
identity: {sum: 0.0, product: 1.0},
)
为了说明此函数的工作原理,以以下简化表为例
_time | _value |
---|---|
2019-04-23T16:10:49Z | 1.6 |
2019-04-23T16:10:59Z | 2.3 |
2019-04-23T16:11:09Z | 0.7 |
2019-04-23T16:11:19Z | 1.2 |
2019-04-23T16:11:29Z | 3.8 |
输入记录
fn
函数使用第一行中的数据来定义 r
记录。它使用 identity
参数定义 accumulator
记录。
r = { _time: 2019-04-23T16:10:49.00Z, _value: 1.6 }
accumulator = { sum : 0.0, product : 1.0 }
键映射
然后,它使用 r
和 accumulator
记录来填充键映射中的值
// sum: r._value + accumulator.sum
sum: 1.6 + 0.0
// product: r._value * accumulator.product
product: 1.6 * 1.0
输出记录
这将生成一个输出记录,其中包含以下键值对
{ sum: 1.6, product: 1.6 }
然后,该函数处理下一行,使用此输出记录作为 accumulator
。
由于 reduce()
在处理下一行时将输出记录用作 accumulator
,因此在 fn
函数中映射的键必须与 identity
和 accumulator
记录中的键匹配。
处理下一行
// Input records for the second row
r = { _time: 2019-04-23T16:10:59.00Z, _value: 2.3 }
accumulator = { sum : 1.6, product : 1.6 }
// Key mappings for the second row
sum: 2.3 + 1.6
product: 2.3 * 1.6
// Output record of the second row
{ sum: 3.9, product: 3.68 }
然后,它将新的输出记录用作下一行的 accumulator
。此循环持续进行,直到处理完表中的所有行。
最终输出记录和表
处理完表中的所有记录后,reduce()
使用最终输出记录创建一个转换后的表,其中包含一行和每映射键的列。
最终输出记录
{ sum: 9.6, product: 11.74656 }
输出表
sum | product |
---|---|
9.6 | 11.74656 |
_time 列发生了什么?
reduce()
函数仅保留以下列
- 输入表的 分组键 的一部分。
- 在
fn
函数中显式映射。
它会删除所有其他列。由于 _time
不是分组键的一部分,并且未在 fn
函数中映射,因此它不包含在输出表中。
自定义聚合函数示例
要创建自定义聚合函数,请使用 创建自定义函数 中概述的原则和 reduce()
函数来聚合每个输入表中的行。
创建自定义平均值函数
此示例说明了如何创建计算表中值的平均值的函数。这仅用于演示目的。内置的 mean()
函数 执行相同的操作,并且性能更高。
average = (tables=<-, outputField="average") => tables
|> reduce(
// Define the initial accumulator record
identity: {count: 0.0, sum: 0.0, avg: 0.0},
fn: (r, accumulator) => ({
// Increment the counter on each reduce loop
count: accumulator.count + 1.0,
// Add the _value to the existing sum
sum: accumulator.sum + r._value,
// Divide the existing sum by the existing count for a new average
avg: (accumulator.sum + r._value) / (accumulator.count + 1.0),
}),
)
// Drop the sum and the count columns since they are no longer needed
|> drop(columns: ["sum", "count"])
// Set the _field column of the output table to to the value
// provided in the outputField parameter
|> set(key: "_field", value: outputField)
// Rename avg column to _value
|> rename(columns: {avg: "_value"})
average = (tables=<-, outputField="average") => tables
|> reduce(
identity: {count: 0.0, sum: 0.0, avg: 0.0},
fn: (r, accumulator) => ({
count: accumulator.count + 1.0,
sum: accumulator.sum + r._value,
avg: (accumulator.sum + r._value) / (accumulator.count + 1.0),
}),
)
|> drop(columns: ["sum", "count"])
|> set(key: "_field", value: outputField)
|> rename(columns: {avg: "_value"})
聚合多列
内置聚合函数仅对一列进行操作。使用 reduce()
创建一个自定义聚合函数,该函数聚合多列。
以下函数期望输入表具有 c1_value
和 c2_value
列,并为每一列生成平均值。
multiAvg = (tables=<-) => tables
|> reduce(
identity: {
count: 1.0,
c1_sum: 0.0,
c1_avg: 0.0,
c2_sum: 0.0,
c2_avg: 0.0,
},
fn: (r, accumulator) => ({
count: accumulator.count + 1.0,
c1_sum: accumulator.c1_sum + r.c1_value,
c1_avg: accumulator.c1_sum / accumulator.count,
c2_sum: accumulator.c2_sum + r.c2_value,
c2_avg: accumulator.c2_sum / accumulator.count,
}),
)
聚合毛利润和净利润
使用 reduce()
创建一个聚合毛利润和净利润的函数。此示例期望输入表中有 profit
和 expenses
列。
profitSummary = (tables=<-) => tables
|> reduce(
identity: {gross: 0.0, net: 0.0},
fn: (r, accumulator) => ({
gross: accumulator.gross + r.profit,
net: accumulator.net + r.profit - r.expenses
}
)
)
此页面是否对您有帮助?
感谢您的反馈!