文档文档

创建自定义聚合函数

要聚合您的数据,请使用 Flux 聚合函数 或使用 reduce() 函数 创建自定义聚合函数。

聚合函数特性

聚合函数都具有相同的基本特性

  • 它们对单个输入表进行操作,并将所有记录转换为单个记录。
  • 输出表具有与输入表相同的 分组键

reduce() 函数的工作原理

reduce() 函数使用 fn 参数 中定义的函数,一次操作一行。fn 函数使用以下参数指定的两个 记录 将键映射到特定值

参数描述
r表示行或记录的记录。
accumulator包含每行聚合计算中使用的值的记录。

reduce() 函数的 identity 参数 定义了初始 accumulator 记录。

reduce() 函数示例

以下 reduce() 函数示例生成输入表中所有值的总和与乘积。

|> reduce(
    fn: (r, accumulator) => ({
        sum: r._value + accumulator.sum,
        product: r._value * accumulator.product
    }),
    identity: {sum: 0.0, product: 1.0},
)

为了说明此函数的工作原理,以以下简化表为例

_time_value
2019-04-23T16:10:49Z1.6
2019-04-23T16:10:59Z2.3
2019-04-23T16:11:09Z0.7
2019-04-23T16:11:19Z1.2
2019-04-23T16:11:29Z3.8
输入记录

fn 函数使用第一行中的数据来定义 r 记录。它使用 identity 参数定义 accumulator 记录。

r           = { _time: 2019-04-23T16:10:49.00Z, _value: 1.6 }
accumulator = { sum  : 0.0, product : 1.0 }
键映射

然后,它使用 raccumulator 记录来填充键映射中的值

// sum: r._value + accumulator.sum
sum: 1.6 + 0.0

// product: r._value * accumulator.product
product: 1.6 * 1.0
输出记录

这将生成一个输出记录,其中包含以下键值对

{ sum: 1.6, product: 1.6 }

然后,该函数处理下一行,使用此输出记录作为 accumulator

由于 reduce() 在处理下一行时将输出记录用作 accumulator,因此在 fn 函数中映射的键必须与 identityaccumulator 记录中的键匹配。

处理下一行
// Input records for the second row
r           = { _time: 2019-04-23T16:10:59.00Z, _value: 2.3 }
accumulator = { sum  : 1.6, product : 1.6 }

// Key mappings for the second row
sum: 2.3 + 1.6
product: 2.3 * 1.6

// Output record of the second row
{ sum: 3.9, product: 3.68 }

然后,它将新的输出记录用作下一行的 accumulator。此循环持续进行,直到处理完表中的所有行。

最终输出记录和表

处理完表中的所有记录后,reduce() 使用最终输出记录创建一个转换后的表,其中包含一行和每映射键的列。

最终输出记录
{ sum: 9.6, product: 11.74656 }
输出表
sumproduct
9.611.74656

_time 列发生了什么?

reduce() 函数仅保留以下列

  1. 输入表的 分组键 的一部分。
  2. fn 函数中显式映射。

它会删除所有其他列。由于 _time 不是分组键的一部分,并且未在 fn 函数中映射,因此它不包含在输出表中。

自定义聚合函数示例

要创建自定义聚合函数,请使用 创建自定义函数 中概述的原则和 reduce() 函数来聚合每个输入表中的行。

创建自定义平均值函数

此示例说明了如何创建计算表中值的平均值的函数。这仅用于演示目的。内置的 mean() 函数 执行相同的操作,并且性能更高。

average = (tables=<-, outputField="average") => tables
    |> reduce(
        // Define the initial accumulator record
        identity: {count: 0.0, sum: 0.0, avg: 0.0},
        fn: (r, accumulator) => ({
            // Increment the counter on each reduce loop
            count: accumulator.count + 1.0,
            // Add the _value to the existing sum
            sum: accumulator.sum + r._value,
            // Divide the existing sum by the existing count for a new average
            avg: (accumulator.sum + r._value) / (accumulator.count + 1.0),
        }),
    )
    // Drop the sum and the count columns since they are no longer needed
    |> drop(columns: ["sum", "count"])
    // Set the _field column of the output table to to the value
    // provided in the outputField parameter
    |> set(key: "_field", value: outputField)
    // Rename avg column to _value
    |> rename(columns: {avg: "_value"})
average = (tables=<-, outputField="average") => tables
    |> reduce(
        identity: {count: 0.0, sum: 0.0, avg: 0.0},
        fn: (r, accumulator) => ({
            count: accumulator.count + 1.0,
            sum: accumulator.sum + r._value,
            avg: (accumulator.sum + r._value) / (accumulator.count + 1.0),
        }),
    )
    |> drop(columns: ["sum", "count"])
    |> set(key: "_field", value: outputField)
    |> rename(columns: {avg: "_value"})

聚合多列

内置聚合函数仅对一列进行操作。使用 reduce() 创建一个自定义聚合函数,该函数聚合多列。

以下函数期望输入表具有 c1_valuec2_value 列,并为每一列生成平均值。

multiAvg = (tables=<-) => tables
    |> reduce(
        identity: {
            count: 1.0,
            c1_sum: 0.0,
            c1_avg: 0.0,
            c2_sum: 0.0,
            c2_avg: 0.0,
        },
        fn: (r, accumulator) => ({
            count: accumulator.count + 1.0,
            c1_sum: accumulator.c1_sum + r.c1_value,
            c1_avg: accumulator.c1_sum / accumulator.count,
            c2_sum: accumulator.c2_sum + r.c2_value,
            c2_avg: accumulator.c2_sum / accumulator.count,
        }),
    )

聚合毛利润和净利润

使用 reduce() 创建一个聚合毛利润和净利润的函数。此示例期望输入表中有 profitexpenses 列。

profitSummary = (tables=<-) => tables
    |> reduce(
        identity: {gross: 0.0, net: 0.0},
        fn: (r, accumulator) => ({
            gross: accumulator.gross + r.profit,
            net: accumulator.net + r.profit - r.expenses
            }
        )
    )

此页面是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在这样使用它,而无需对您的代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速、近实时数据引擎,可实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 构建在 Core 的基础上,增加了高可用性、读取副本、增强的安全性和数据压缩,以实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看