文档文档

使用 Flux 连接数据

使用 Flux join,通过以下连接方法,基于公共值连接两个数据集

内连接

左外连接

右外连接

全外连接

join 包允许你连接来自不同数据源的数据,例如 InfluxDBSQL 数据库CSV其他

使用 join 函数连接你的数据

  1. 导入 join 包。

  2. 定义要连接的 左侧右侧 数据流

    • 每个流必须有一个或多个具有公共值的列。列标签不需要匹配,但列值需要。
    • 每个流应具有相同的 分组键

    有关更多信息,请参阅 连接数据要求

  3. 使用 join.inner() 将两个流连接在一起。提供以下必需的参数

    • left: 代表连接左侧的数据流。
    • right: 代表连接右侧的数据流。
    • on: 连接谓词。例如:(l, r) => l.column == r.column
    • as: 连接输出函数,返回包含来自每个输入流的值的记录。例如:(l, r) => ({l with column1: r.column1, column2: r.column2})
import "join"
import "sql"

left =
    from(bucket: "example-bucket-1")
        |> range(start: "-1h")
        |> filter(fn: (r) => r._measurement == "example-measurement")
        |> filter(fn: (r) => r._field == "example-field")

right =
    sql.from(
        driverName: "postgres",
        dataSourceName: "postgresql://username:password@localhost:5432",
        query: "SELECT * FROM example_table",
    )

join.inner(
    left: left,
    right: right,
    on: (l, r) => l.column == r.column,
    as: (l, r) => ({l with name: r.name, location: r.location}),
)

有关更多信息和详细示例,请参阅 Flux 文档中的 执行内连接

  1. 导入 join 包。

  2. 定义要连接的 左侧右侧 数据流

    • 每个流必须有一个或多个具有公共值的列。列标签不需要匹配,但列值需要。
    • 每个流应具有相同的 分组键

    有关更多信息,请参阅 连接数据要求

  3. 使用 join.left() 将两个流连接在一起。提供以下必需的参数

    • left: 代表连接左侧的数据流。
    • right: 代表连接右侧的数据流。
    • on: 连接谓词。例如:(l, r) => l.column == r.column
    • as: 连接输出函数,返回包含来自每个输入流的值的记录。例如:(l, r) => ({l with column1: r.column1, column2: r.column2})
import "join"
import "sql"

left =
    from(bucket: "example-bucket-1")
        |> range(start: "-1h")
        |> filter(fn: (r) => r._measurement == "example-measurement")
        |> filter(fn: (r) => r._field == "example-field")

right =
    sql.from(
        driverName: "postgres",
        dataSourceName: "postgresql://username:password@localhost:5432",
        query: "SELECT * FROM example_table",
    )

join.left(
    left: left,
    right: right,
    on: (l, r) => l.column == r.column,
    as: (l, r) => ({l with name: r.name, location: r.location}),
)

有关更多信息和详细示例,请参阅 Flux 文档中的 执行左外连接

  1. 导入 join 包。

  2. 定义要连接的 左侧右侧 数据流

    • 每个流必须有一个或多个具有公共值的列。列标签不需要匹配,但列值需要。
    • 每个流应具有相同的 分组键

    有关更多信息,请参阅 连接数据要求

  3. 使用 join.right() 将两个流连接在一起。提供以下必需的参数

    • left: 代表连接左侧的数据流。
    • right: 代表连接右侧的数据流。
    • on: 连接谓词。例如:(l, r) => l.column == r.column
    • as: 连接输出函数,返回包含来自每个输入流的值的记录。例如:(l, r) => ({l with column1: r.column1, column2: r.column2})
import "join"
import "sql"

left =
    from(bucket: "example-bucket-1")
        |> range(start: "-1h")
        |> filter(fn: (r) => r._measurement == "example-measurement")
        |> filter(fn: (r) => r._field == "example-field")

right =
    sql.from(
        driverName: "postgres",
        dataSourceName: "postgresql://username:password@localhost:5432",
        query: "SELECT * FROM example_table",
    )

join.right(
    left: left,
    right: right,
    on: (l, r) => l.column == r.column,
    as: (l, r) => ({l with name: r.name, location: r.location}),
)

有关更多信息和详细示例,请参阅 Flux 文档中的 执行右外连接

  1. 导入 join 包。

  2. 定义要连接的 左侧右侧 数据流

    • 每个流必须有一个或多个具有公共值的列。列标签不需要匹配,但列值需要。
    • 每个流应具有相同的 分组键

    有关更多信息,请参阅 连接数据要求

  3. 使用 join.full() 将两个流连接在一起。提供以下必需的参数

    • left: 代表连接左侧的数据流。
    • right: 代表连接右侧的数据流。
    • on: 连接谓词。例如:(l, r) => l.column == r.column
    • as: 连接输出函数,返回包含来自每个输入流的值的记录。例如:(l, r) => ({l with column1: r.column1, column2: r.column2})

全外连接必须考虑 lr 记录中非分组键列可能为空的情况。使用条件逻辑检查哪个记录包含非分组键列的非空值。 有关更多信息,请参阅 考虑缺失的非分组键值

import "join"
import "sql"

left =
    from(bucket: "example-bucket-1")
        |> range(start: "-1h")
        |> filter(fn: (r) => r._measurement == "example-measurement")
        |> filter(fn: (r) => r._field == "example-field")

right =
    sql.from(
        driverName: "postgres",
        dataSourceName: "postgresql://username:password@localhost:5432",
        query: "SELECT * FROM example_table",
    )

join.full(
    left: left,
    right: right,
    on: (l, r) => l.id== r.id,
    as: (l, r) => {
        id = if exists l.id then l.id else r.id
        
        return {name: l.name, location: r.location, id: id}
    },
)

有关更多信息和详细示例,请参阅 Flux 文档中的 执行全外连接

  1. 导入 join 包。

  2. 定义要连接的 左侧右侧 数据流

    • 每个流还必须有一个 _time 列。
    • 每个流必须有一个或多个具有公共值的列。列标签不需要匹配,但列值需要。
    • 每个流应具有相同的 分组键

    有关更多信息,请参阅 连接数据要求

  3. 使用 join.time() 基于时间值将两个流连接在一起。提供以下参数

    • left: (必需) 代表连接左侧的数据流。
    • right: (必需) 代表连接右侧的数据流。
    • as: (必需) 连接输出函数,返回包含来自每个输入流的值的记录。例如:(l, r) => ({r with column1: l.column1, column2: l.column2})
    • method: 要使用的连接方法。默认为 inner
import "join"
import "sql"

left =
    from(bucket: "example-bucket-1")
        |> range(start: "-1h")
        |> filter(fn: (r) => r._measurement == "example-m1")
        |> filter(fn: (r) => r._field == "example-f1")

right =
    from(bucket: "example-bucket-2")
        |> range(start: "-1h")
        |> filter(fn: (r) => r._measurement == "example-m2")
        |> filter(fn: (r) => r._field == "example-f2")

join.time(method: "left", left: left, right: right, as: (l, r) => ({l with f2: r._value}))

有关更多信息和详细示例,请参阅 Flux 文档中的 按时间连接


何时使用 union 和 pivot 而不是 join 函数

我们建议使用 join 包来连接模式大多不同或来自两个不同数据源的流。 如果你要连接从 InfluxDB 查询的两个数据集,使用 union()pivot() 组合数据可能会更高效。

例如,如果你需要从不同的 InfluxDB bucket 查询字段,并根据时间对齐每行中的字段值

f1 =
    from(bucket: "example-bucket-1")
        |> range(start: "-1h")
        |> filter(fn: (r) => r._field == "f1")
        |> drop(columns: "_measurement")

f2 =
    from(bucket: "example-bucket-2")
        |> range(start: "-1h")
        |> filter(fn: (r) => r._field == "f2")
        |> drop(columns: "_measurement")

union(tables: [f1, f2])
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")

查看示例输入和输出数据


此页面是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。 您可以继续像当前一样使用它,而无需对代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。 更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速、最近数据引擎,可实时收集和处理数据,并将其持久保存到本地磁盘或对象存储。 InfluxDB 3 Enterprise 以 Core 的基础为构建,增加了高可用性、读取副本、增强的安全性以及数据压缩,从而实现更快的查询和优化的存储。 InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看