连接数据
使用 Flux join
包 基于通用值连接两个数据集。了解如何使用以下连接方法连接两个数据集
何时使用 join 包
我们建议使用 join
包连接具有大部分不同架构或来自两个不同数据源的流。如果您要连接来自同一数据源且具有相同架构的数据,则使用 union()
和 pivot()
来组合数据可能会更高效。
有关更多信息,请参阅何时使用 union 和 pivot 而不是 join 函数。
join 函数的工作原理
join
函数基于每个输入流中的通用值将两个表流连接在一起。
输入流
每个输入流都分配给 left
或 right
参数。输入流可以从任何有效数据源定义。有关更多信息,请参阅
- 查询数据源
- 使用
array.from()
定义临时表
数据要求
要连接数据,每个输入流必须具有以下内容
一个或多个具有要连接的通用值的列.
列不需要相同的标签,但它们确实需要具有可比较的值。相同的 分组键.
join
包中的函数使用分组键来快速确定应将每个输入流中的哪些表配对并评估以进行连接操作。
两个输入流都应具有相同的分组键。 如果没有,则您的连接操作可能找不到任何匹配的表,并将返回意外的输出。如果输入流的分组键不相同,请在使用group()
连接它们之前,先对每个输入流重新分组。仅连接具有相同分组键实例的表。
连接谓词函数 (on)
join
包函数需要 on
参数来比较来自每个输入流(由 l
(左)和 r
(右)表示)的值,并返回 true
或 false
。返回 true
的行将被连接。此参数是一个谓词函数。
(l, r) => l.column == r.column
连接输出函数 (as)
join
包函数(join.time()
除外) 需要 as
参数来定义连接的输出架构。as
参数使用来自连接行的值返回一个新记录 - 左 (l
) 和右 (r
)。
(l, r) => ({l with name: r.name, location: r.location})
请勿修改分组键列
请勿修改分组键列。as
函数必须返回与两个输入流相同的分组键,才能成功执行连接。
执行连接操作
join
包支持以下连接类型和特殊用例
执行内连接
使用 join.inner()
对两个数据流执行内连接。内连接会删除两个输入流中在另一个流中没有匹配行的所有行。
import "join"
left = from(bucket: "example-bucket-1") |> //...
right = from(bucket: "example-bucket-2") |> //...
join.inner(
left: left,
right: right,
on: (l, r) => l.column == r.column,
as: (l, r) => ({l with name: r.name, location: r.location}),
)
执行左外连接
使用 join.left()
对两个数据流执行外左连接。左连接为左侧数据流中的每一行输出一行,其中包含来自右侧数据流的匹配数据。如果右侧数据流中没有匹配的数据,则来自右侧数据流且非分组键的列的值为空值。
import "join"
left = from(bucket: "example-bucket-1") |> //...
right = from(bucket: "example-bucket-2") |> //...
join.left(
left: left,
right: right,
on: (l, r) => l.column == r.column,
as: (l, r) => ({l with name: r.name, location: r.location}),
)
执行右外连接
使用 join.right()
对两个数据流执行外右连接。右连接为右侧数据流中的每一行输出一行,其中包含来自左侧数据流的匹配数据。如果左侧数据流中没有匹配的数据,则来自左侧数据流且非分组键的列的值为空值。
import "join"
left = from(bucket: "example-bucket-1") |> //...
right = from(bucket: "example-bucket-2") |> //...
join.right(
left: left,
right: right,
on: (l, r) => l.column == r.column,
as: (l, r) => ({r with name: l.name, location: l.location}),
)
执行全外连接
使用 join.full()
对两个数据流执行全外连接。全外连接为左侧和右侧输入流中的所有行输出一行,并根据 on
谓词连接匹配的行。
import "join"
left = from(bucket: "example-bucket-1") |> //...
right = from(bucket: "example-bucket-2") |> //...
join.full(
left: left,
right: right,
on: (l, r) => l.id== r.id,
as: (l, r) => {
id = if exists l.id then l.id else r.id
return {name: l.name, location: r.location, id: id}
},
)
按时间连接
使用 join.time()
基于 _time
列中的时间值连接两个数据流。当连接两个时间序列数据流时,这种类型的连接操作很常见。
import "join"
left = from(bucket: "example-bucket-1") |> //...
right = from(bucket: "example-bucket-2") |> //...
join.time(
left: left,
right: right,
as: (l, r) => ({l with field1: l._value, field2: r._value_}),
)
排查连接操作故障
有关使用 join
包时意外行为和错误的信息,请参阅排查连接操作故障。
此页内容是否对您有帮助?
感谢您的反馈!