文档文档

join.tables() 函数

join.tables() 使用指定的方法、谓词和一个函数来连接两个输入流,以连接来自每个输入流的两个对应记录。

join.tables() 仅比较具有相同分组键的记录。输出表与输入表具有相同的分组。

函数类型签名
(
    <-left: stream[A],
    as: (l: A, r: B) => C,
    method: string,
    on: (l: A, r: B) => bool,
    right: stream[B],
) => stream[C] where A: Record, B: Record, C: Record

有关更多信息,请参阅 函数类型签名

参数

left

左侧输入流。默认为管道转发数据 (<-)。

(必需) 右侧输入流。

on

(必需) 接受左侧和右侧记录(分别为 lr)并返回布尔值的函数。

函数体必须是单个布尔表达式,由 l 的属性和 r 的属性之间的一个或多个相等比较组成,每个比较都由 and 运算符链接在一起。

as

(必需) 接受左侧和右侧记录(分别为 lr)并返回记录的函数。返回的记录包含在最终输出中。

method

(必需) 指定连接方法的字符串。

支持的方法

  • inner
  • left
  • right
  • full

示例

执行内连接

import "sampledata"
import "join"

ints = sampledata.int()
strings = sampledata.string()

join.tables(
    method: "inner",
    left: ints,
    right: strings,
    on: (l, r) => l._time == r._time,
    as: (l, r) => ({l with label: r._value}),
)

查看示例输出

执行左外连接

如果连接方法不是 inner,请特别注意如何在 as 函数中构造输出记录。

由于 Flux 处理外连接的方式,lr 可能为默认记录。这意味着非分组键列中的任何值都可能为空。

有关外连接行为的更多信息,请参阅 join 包文档中的 外连接 部分。

在左外连接的情况下,保证 l 不是默认记录。为确保输出记录对于任何非分组键列都具有非空值,请使用来自 l 的值。使用来自 r 的非分组键值可能会导致该值为空。

下面的示例几乎完全从 l 的属性构造输出记录。唯一的例外是 v_right 列,它从 r._value 获取其值。在这种情况下,请理解并预期 v_right 有时会为空。

import "array"
import "join"

left =
    array.from(
        rows: [
            {_time: 2022-01-01T00:00:00Z, _value: 1, label: "a"},
            {_time: 2022-01-01T00:00:00Z, _value: 2, label: "b"},
            {_time: 2022-01-01T00:00:00Z, _value: 3, label: "d"},
        ],
    )
right =
    array.from(
        rows: [
            {_time: 2022-01-01T00:00:00Z, _value: 0.4, id: "a"},
            {_time: 2022-01-01T00:00:00Z, _value: 0.5, id: "c"},
            {_time: 2022-01-01T00:00:00Z, _value: 0.6, id: "d"},
        ],
    )

join.tables(
    method: "left",
    left: left,
    right: right,
    on: (l, r) => l.label == r.id and l._time == r._time,
    as: (l, r) => ({_time: l._time, label: l.label, v_left: l._value, v_right: r._value}),
)

查看示例输出

执行右外连接

下一个示例与上一个示例几乎相同,但使用 right 连接方法。使用此方法,保证 r 不是默认记录,但 l 可能是默认记录。由于 l 更可能包含空值,因此输出记录几乎完全从 r 的属性构建,但 v_left 除外,我们预计它有时会为空。

import "array"
import "join"

left =
    array.from(
        rows: [
            {_time: 2022-01-01T00:00:00Z, _value: 1, label: "a"},
            {_time: 2022-01-01T00:00:00Z, _value: 2, label: "b"},
            {_time: 2022-01-01T00:00:00Z, _value: 3, label: "d"},
        ],
    )
right =
    array.from(
        rows: [
            {_time: 2022-01-01T00:00:00Z, _value: 0.4, id: "a"},
            {_time: 2022-01-01T00:00:00Z, _value: 0.5, id: "c"},
            {_time: 2022-01-01T00:00:00Z, _value: 0.6, id: "d"},
        ],
    )

join.tables(
    method: "right",
    left: left,
    right: right,
    on: (l, r) => l.label == r.id and l._time == r._time,
    as: (l, r) => ({_time: r._time, label: r.id, v_left: l._value, v_right: r._value}),
)

查看示例输出

执行全外连接

在全外连接中,无法保证 lr。它们中的任何一个都可能是默认记录,但它们永远不会同时成为默认记录。

要获取输出记录的非空值,请检查 lr,以查看哪个包含所需的值。

下面的示例为 as 参数定义了一个函数,该函数适当地处理了全外连接的不确定性。

v_leftv_right 仍然直接使用来自 lr 的值,因为我们预计它们有时在输出表中为空。

import "array"
import "join"

left =
    array.from(
        rows: [
            {_time: 2022-01-01T00:00:00Z, _value: 1, label: "a"},
            {_time: 2022-01-01T00:00:00Z, _value: 2, label: "b"},
            {_time: 2022-01-01T00:00:00Z, _value: 3, label: "d"},
        ],
    )
right =
    array.from(
        rows: [
            {_time: 2022-01-01T00:00:00Z, _value: 0.4, id: "a"},
            {_time: 2022-01-01T00:00:00Z, _value: 0.5, id: "c"},
            {_time: 2022-01-01T00:00:00Z, _value: 0.6, id: "d"},
        ],
    )

join.tables(
    method: "full",
    left: left,
    right: right,
    on: (l, r) => l.label == r.id and l._time == r._time,
    as: (l, r) => {
        time = if exists l._time then l._time else r._time
        label = if exists l.label then l.label else r.id

        return {_time: time, label: label, v_left: l._value, v_right: r._value}
    },
)

查看示例输出


此页面是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 将进入维护模式。您可以继续像现在一样使用它,而无需对代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一款开源、高速、最近数据引擎,可实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 构建在 Core 的基础上,增加了高可用性、读取副本、增强的安全性以及数据压缩,从而实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看