Flux 查询基础知识
大多数 Flux 查询遵循相同的基本结构。熟悉使用 Flux 查询数据时的基本概念和步骤。
基本查询结构
大多数基本 Flux 查询包括以下步骤
from(bucket: "example-bucket") // ── Source
|> range(start: -1d) // ── Filter on time
|> filter(fn: (r) => r._field == "foo") // ── Filter on column values
|> group(columns: ["sensorID"]) // ── Shape
|> mean() // ── Process
数据源
Flux 输入函数 从数据源检索数据。所有输入函数都返回 表流。
Flux 支持多种数据源,包括时间序列数据库(如 InfluxDB 和 Prometheus)、关系数据库(如 MySQL 和 PostgreSQL)、CSV 等。
- 有关支持的数据源的更多信息,请参阅查询数据源。
- 有关输入函数的列表,请参阅函数类型和类别 – 输入。
筛选
筛选函数迭代并评估每个输入行,以查看它是否与指定的条件匹配。满足条件的行包含在函数输出中。不满足指定条件的行将被删除。
Flux 提供了以下主要筛选函数
range()
:根据时间筛选数据。filter()
:根据列值筛选数据。filter()
使用在fn
参数中定义的谓词函数来评估输入行。每行都作为记录r
传递到谓词函数中,其中包含行中每列的键值对。
还有其他筛选函数可用。有关更多信息,请参阅函数类型和类别 – 筛选器。
塑形数据
许多查询需要修改数据的结构,以便为处理做好准备。常见的数据塑形任务包括按列值或按时间重新分组数据或将列值透视为行。
重塑数据的函数包括以下内容
group()
:修改组键window()
:修改行的_start
和_stop
值,以按时间对数据进行分组pivot()
:将列值透视为行drop()
:删除特定列keep()
:保留特定列并删除所有其他列
处理
处理数据可以采用多种形式,包括以下类型的操作
- 聚合数据:将输入表的所有行聚合为单行。有关信息,请参阅函数类型和类别 - 聚合。
- 选择特定数据点:从每个输入表中返回特定行。例如,返回第一行或最后一行、具有最高值或最低值的行等等。有关信息,请参阅函数类型和类别 - 选择器。
- 重写行:使用
map()
重写每个输入行。使用数学运算转换值、处理字符串、动态添加新列等等。 - 发送通知:评估数据并使用 Flux 通知端点函数向外部服务发送通知。有关信息,请参阅函数类型和类别 - 通知端点。
aggregateWindow 辅助函数
aggregateWindow()
是一个辅助函数,它既可以塑形数据,又可以处理数据。该函数按时间对数据进行窗口化和分组,然后将聚合或选择器函数应用于重构的表。
编写基本查询
使用 InfluxDB 示例数据 编写基本的 Flux 查询,该查询查询数据,按时间和列值筛选数据,然后应用聚合。
使用 InfluxDB 数据浏览器 或 Flux REPL 构建并执行以下基本查询。
导入
influxdata/influxdb/sample
包 并使用sample.data()
函数 加载airSensor
示例数据集。import "influxdata/influxdb/sample" sample.data(set: "airSensor")
sample.data()
返回数据,就像从 InfluxDB 查询数据一样。要实际从 InfluxDB 查询数据,请将sample.data()
替换为from()
函数。将返回的数据通过管道传递到
range()
中,以按时间筛选数据。返回最近一小时的数据。import "influxdata/influxdb/sample" sample.data(set: "airSensor") |> range(start: -1h)
使用
filter()
根据列值筛选行。在此示例中,仅返回包含co
字段值的行。字段名称存储在_field
列中。import "influxdata/influxdb/sample" sample.data(set: "airSensor") |> range(start: -1h) |> filter(fn: (r) => r._field == "co")
使用
mean()
计算每个输入表中的平均值。由于 InfluxDB 按序列对数据进行分组,因此mean()
会为每个唯一的sensor_id
返回一个表,其中包含一行,该行在_value
列中包含平均值。import "influxdata/influxdb/sample" sample.data(set: "airSensor") |> range(start: -1h) |> filter(fn: (r) => r._field == "co") |> mean()
import "influxdata/influxdb/sample" sample.data(set: "airSensor") |> range(start: -1h) |> filter(fn: (r) => r._field == "co") |> mean() |> group()
此基本查询的结果应类似于以下内容
_start
和 _stop
列已被省略。
_field | _measurement | sensor_id | _value |
---|---|---|---|
co | airSensors | TLM0100 | 0.42338714381053716 |
co | airSensors | TLM0101 | 0.4223251339463061 |
co | airSensors | TLM0102 | 0.8543452859060252 |
co | airSensors | TLM0103 | 0.2782783780205422 |
co | airSensors | TLM0200 | 4.612143110484339 |
co | airSensors | TLM0201 | 0.297474366047375 |
co | airSensors | TLM0202 | 0.3336370208486757 |
co | airSensors | TLM0203 | 0.4948166816959906 |
此页面是否对您有帮助?
感谢您的反馈!