使用 Flux 连接数据
使用 Flux join
包,通过以下连接方法,基于公共值连接两个数据集
内连接
左外连接
右外连接
全外连接
join 包允许你连接来自不同数据源的数据,例如 InfluxDB、SQL 数据库、CSV 和 其他。
使用 join 函数连接你的数据
导入
join
包。定义要连接的 左侧 和 右侧 数据流
- 每个流必须有一个或多个具有公共值的列。列标签不需要匹配,但列值需要。
- 每个流应具有相同的 分组键。
有关更多信息,请参阅 连接数据要求。
使用
join.inner()
将两个流连接在一起。提供以下必需的参数
import "join"
import "sql"
left =
from(bucket: "example-bucket-1")
|> range(start: "-1h")
|> filter(fn: (r) => r._measurement == "example-measurement")
|> filter(fn: (r) => r._field == "example-field")
right =
sql.from(
driverName: "postgres",
dataSourceName: "postgresql://username:password@localhost:5432",
query: "SELECT * FROM example_table",
)
join.inner(
left: left,
right: right,
on: (l, r) => l.column == r.column,
as: (l, r) => ({l with name: r.name, location: r.location}),
)
有关更多信息和详细示例,请参阅 Flux 文档中的 执行内连接。
导入
join
包。定义要连接的 左侧 和 右侧 数据流
- 每个流必须有一个或多个具有公共值的列。列标签不需要匹配,但列值需要。
- 每个流应具有相同的 分组键。
有关更多信息,请参阅 连接数据要求。
使用
join.left()
将两个流连接在一起。提供以下必需的参数
import "join"
import "sql"
left =
from(bucket: "example-bucket-1")
|> range(start: "-1h")
|> filter(fn: (r) => r._measurement == "example-measurement")
|> filter(fn: (r) => r._field == "example-field")
right =
sql.from(
driverName: "postgres",
dataSourceName: "postgresql://username:password@localhost:5432",
query: "SELECT * FROM example_table",
)
join.left(
left: left,
right: right,
on: (l, r) => l.column == r.column,
as: (l, r) => ({l with name: r.name, location: r.location}),
)
有关更多信息和详细示例,请参阅 Flux 文档中的 执行左外连接。
导入
join
包。定义要连接的 左侧 和 右侧 数据流
- 每个流必须有一个或多个具有公共值的列。列标签不需要匹配,但列值需要。
- 每个流应具有相同的 分组键。
有关更多信息,请参阅 连接数据要求。
使用
join.right()
将两个流连接在一起。提供以下必需的参数
import "join"
import "sql"
left =
from(bucket: "example-bucket-1")
|> range(start: "-1h")
|> filter(fn: (r) => r._measurement == "example-measurement")
|> filter(fn: (r) => r._field == "example-field")
right =
sql.from(
driverName: "postgres",
dataSourceName: "postgresql://username:password@localhost:5432",
query: "SELECT * FROM example_table",
)
join.right(
left: left,
right: right,
on: (l, r) => l.column == r.column,
as: (l, r) => ({l with name: r.name, location: r.location}),
)
有关更多信息和详细示例,请参阅 Flux 文档中的 执行右外连接。
导入
join
包。定义要连接的 左侧 和 右侧 数据流
- 每个流必须有一个或多个具有公共值的列。列标签不需要匹配,但列值需要。
- 每个流应具有相同的 分组键。
有关更多信息,请参阅 连接数据要求。
使用
join.full()
将两个流连接在一起。提供以下必需的参数
全外连接必须考虑 l
和 r
记录中非分组键列可能为空的情况。使用条件逻辑检查哪个记录包含非分组键列的非空值。 有关更多信息,请参阅 考虑缺失的非分组键值。
import "join"
import "sql"
left =
from(bucket: "example-bucket-1")
|> range(start: "-1h")
|> filter(fn: (r) => r._measurement == "example-measurement")
|> filter(fn: (r) => r._field == "example-field")
right =
sql.from(
driverName: "postgres",
dataSourceName: "postgresql://username:password@localhost:5432",
query: "SELECT * FROM example_table",
)
join.full(
left: left,
right: right,
on: (l, r) => l.id== r.id,
as: (l, r) => {
id = if exists l.id then l.id else r.id
return {name: l.name, location: r.location, id: id}
},
)
有关更多信息和详细示例,请参阅 Flux 文档中的 执行全外连接。
导入
join
包。定义要连接的 左侧 和 右侧 数据流
- 每个流还必须有一个
_time
列。 - 每个流必须有一个或多个具有公共值的列。列标签不需要匹配,但列值需要。
- 每个流应具有相同的 分组键。
有关更多信息,请参阅 连接数据要求。
- 每个流还必须有一个
使用
join.time()
基于时间值将两个流连接在一起。提供以下参数left
: (必需) 代表连接左侧的数据流。right
: (必需) 代表连接右侧的数据流。as
: (必需) 连接输出函数,返回包含来自每个输入流的值的记录。例如:(l, r) => ({r with column1: l.column1, column2: l.column2})
。method
: 要使用的连接方法。默认为inner
。
import "join"
import "sql"
left =
from(bucket: "example-bucket-1")
|> range(start: "-1h")
|> filter(fn: (r) => r._measurement == "example-m1")
|> filter(fn: (r) => r._field == "example-f1")
right =
from(bucket: "example-bucket-2")
|> range(start: "-1h")
|> filter(fn: (r) => r._measurement == "example-m2")
|> filter(fn: (r) => r._field == "example-f2")
join.time(method: "left", left: left, right: right, as: (l, r) => ({l with f2: r._value}))
有关更多信息和详细示例,请参阅 Flux 文档中的 按时间连接。
何时使用 union 和 pivot 而不是 join 函数
我们建议使用 join
包来连接模式大多不同或来自两个不同数据源的流。 如果你要连接从 InfluxDB 查询的两个数据集,使用 union()
和 pivot()
组合数据可能会更高效。
例如,如果你需要从不同的 InfluxDB bucket 查询字段,并根据时间对齐每行中的字段值
f1 =
from(bucket: "example-bucket-1")
|> range(start: "-1h")
|> filter(fn: (r) => r._field == "f1")
|> drop(columns: "_measurement")
f2 =
from(bucket: "example-bucket-2")
|> range(start: "-1h")
|> filter(fn: (r) => r._field == "f2")
|> drop(columns: "_measurement")
union(tables: [f1, f2])
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
此页面是否对您有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一份子!我们欢迎并鼓励您提供关于 InfluxDB 和本文档的反馈和错误报告。要寻求支持,请使用以下资源
拥有年度合同或支持合同的客户 可以 联系 InfluxData 支持。