既然您已经了解了从 InfluxDB 查询数据的基本知识,让我们超越基本查询,开始处理查询到的数据。“处理”数据可能意味着转换、聚合、降采样或对数据发出警报。本教程涵盖以下数据处理用例
大多数数据处理操作都需要手动编辑 Flux 查询。如果您正在使用 InfluxDB 数据浏览器,请切换到 脚本编辑器 而不是使用 查询构建器。
重映射或分配数据中的值
使用 map() 函数 迭代数据中的每一行,并更新该行中的值。map() 是 Flux 中最有用的函数之一,它将帮助您完成许多需要执行的数据处理操作。
了解更多关于 map() 如何工作的信息
map() 接受一个参数 fn。fn 接受一个匿名函数,该函数将每一行读取为名为 r 的 记录。在 r 记录中,每个键值对代表一列及其值。例如
r = {
_time: 2020-01-01T00:00:00Z,
_measurement: "home",
room: "Kitchen",
_field: "temp",
_value: 21.0,
}
| _time | _measurement | room | _field | _value |
|---|
| 2020-01-01T00:00:00Z | home | Kitchen | temp | 21.0 |
fn 函数以您需要的任何方式修改 r 记录,并为该行返回一个新的记录。例如,使用上面的记录
(r) => ({ _time: r._time, _field: "temp_F", _value: (r._value * 1.8) + 32.0})
// Returns: {_time: 2020-01-01T00:00:00Z, _field: "temp_F", _value: 69.8}
| _time | _field | _value |
|---|
| 2020-01-01T00:00:00Z | temp_F | 69.8 |
请注意,某些列已从原始行记录中删除。这是因为 fn 函数显式映射了 _time、_field 和 _value 列。要保留现有列并仅更新或添加特定列,请使用 with 运算符扩展您的行记录。例如,使用上面的记录
(r) => ({r with _value: (r._value * 1.8) + 32.0, degrees: "F"})
// Returns:
// {
// _time: 2020-01-01T00:00:00Z,
// _measurement: "home",
// room: "Kitchen",
// _field: "temp",
// _value: 69.8,
// degrees: "F",
// }
| _time | _measurement | room | _field | _value | degrees |
|---|
| 2020-01-01T00:00:00Z | home | Kitchen | temp | 69.8 | F |
from(bucket: "get-started")
|> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "hum")
|> map(fn: (r) => ({r with _value: r._value / 100.0}))
Map 示例
有条件地分配状态
在 map() 函数中,您可以使用 条件表达式 (if/then/else) 有条件地分配值。例如,使用 “开始写入 InfluxDB” 中写入的数据
查询 co 字段以返回每个房间的一氧化碳百万分比 (ppm) 读数。
使用 map() 迭代每一行,评估 _value 列中的值,然后有条件地分配状态
- 如果一氧化碳小于 10 ppm,则分配状态:ok。
- 否则,分配状态:warning。
将状态存储在 state 列中。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "co")
|> map(fn: (r) => ({r with state: if r._value < 10 then "ok" else "warning"}))
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T14:00:00Z | home | Kitchen | co | 1 |
| 2022-01-01T15:00:00Z | home | Kitchen | co | 3 |
| 2022-01-01T16:00:00Z | home | Kitchen | co | 7 |
| 2022-01-01T17:00:00Z | home | Kitchen | co | 9 |
| 2022-01-01T18:00:00Z | home | Kitchen | co | 18 |
| 2022-01-01T19:00:00Z | home | Kitchen | co | 22 |
| 2022-01-01T20:00:00Z | home | Kitchen | co | 26 |
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T14:00:00Z | home | Living Room | co | 1 |
| 2022-01-01T15:00:00Z | home | Living Room | co | 1 |
| 2022-01-01T16:00:00Z | home | Living Room | co | 4 |
| 2022-01-01T17:00:00Z | home | Living Room | co | 5 |
| 2022-01-01T18:00:00Z | home | Living Room | co | 9 |
| 2022-01-01T19:00:00Z | home | Living Room | co | 14 |
| 2022-01-01T20:00:00Z | home | Living Room | co | 17 |
| _time | _measurement | room | _field | _value | state |
|---|
| 2022-01-01T14:00:00Z | home | Kitchen | co | 1 | ok |
| 2022-01-01T15:00:00Z | home | Kitchen | co | 3 | ok |
| 2022-01-01T16:00:00Z | home | Kitchen | co | 7 | ok |
| 2022-01-01T17:00:00Z | home | Kitchen | co | 9 | ok |
| 2022-01-01T18:00:00Z | home | Kitchen | co | 18 | warning |
| 2022-01-01T19:00:00Z | home | Kitchen | co | 22 | warning |
| 2022-01-01T20:00:00Z | home | Kitchen | co | 26 | warning |
| _time | _measurement | room | _field | _value | state |
|---|
| 2022-01-01T14:00:00Z | home | Living Room | co | 1 | ok |
| 2022-01-01T15:00:00Z | home | Living Room | co | 1 | ok |
| 2022-01-01T16:00:00Z | home | Living Room | co | 4 | ok |
| 2022-01-01T17:00:00Z | home | Living Room | co | 5 | ok |
| 2022-01-01T18:00:00Z | home | Living Room | co | 9 | ok |
| 2022-01-01T19:00:00Z | home | Living Room | co | 14 | warning |
| 2022-01-01T20:00:00Z | home | Living Room | co | 17 | warning |
数据警报
map() 允许您在每行的基础上执行更复杂的操作。在 fn 函数中使用 Flux 代码块 ({}),您可以在每个行的上下文中创建作用域变量并执行其他函数。例如,您可以向 Slack 发送消息。
例如,使用 “开始写入 InfluxDB” 中写入的数据
导入 slack 包。
查询 co 字段以返回每个房间的一氧化碳百万分比 (ppm) 读数。
使用 map() 迭代每一行,评估 _value 列中的值,然后有条件地分配状态
- 如果一氧化碳小于 10 ppm,则分配状态:ok。
- 否则,分配状态:warning。
将状态存储在 state 列中。
使用 filter() 仅返回 state 列中包含 warning 的行。
使用 map() 迭代每一行。在您的 fn 函数中,使用 Flux 代码块 ({}) 来
- 创建一个
responseCode 变量,该变量使用 slack.message() 使用来自输入行的数据向 Slack 发送消息。slack.message() 将 Slack API 请求的响应代码作为整数返回。 - 使用
return 语句返回新的行记录。新行应使用一个新列 sent 扩展输入行,该列的值为布尔值,由 responseCode 变量确定。
map() 为管道传输到该函数中的每一行向 Slack 发送一条消息。
import "slack"
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "co")
|> map(fn: (r) => ({r with state: if r._value < 10 then "ok" else "warning"}))
|> filter(fn: (r) => r.state == "warning")
|> map(
fn: (r) => {
responseCode =
slack.message(
token: "mYSlacK70k3n",
color: "#ff0000",
channel: "#alerts",
text: "Carbon monoxide is at dangerous levels in the ${r.room}: ${r._value} ppm.",
)
return {r with sent: responseCode == 200}
},
)
以下输入表示按 warning 状态筛选的数据。
| _time | _measurement | room | _field | _value | state |
|---|
| 2022-01-01T18:00:00Z | home | Kitchen | co | 18 | warning |
| 2022-01-01T19:00:00Z | home | Kitchen | co | 22 | warning |
| 2022-01-01T20:00:00Z | home | Kitchen | co | 26 | warning |
| _time | _measurement | room | _field | _value | state |
|---|
| 2022-01-01T19:00:00Z | home | Living Room | co | 14 | warning |
| 2022-01-01T20:00:00Z | home | Living Room | co | 17 | warning |
输出包括一个 sent 列,指示消息是否已发送。
| _time | _measurement | room | _field | _value | state | sent |
|---|
| 2022-01-01T18:00:00Z | home | Kitchen | co | 18 | warning | true |
| 2022-01-01T19:00:00Z | home | Kitchen | co | 22 | warning | true |
| 2022-01-01T20:00:00Z | home | Kitchen | co | 26 | warning | true |
| _time | _measurement | room | _field | _value | state | sent |
|---|
| 2022-01-01T19:00:00Z | home | Living Room | co | 14 | warning | true |
| 2022-01-01T20:00:00Z | home | Living Room | co | 17 | warning | true |
使用上面的结果,您将在 Slack 中收到以下消息
厨房的一氧化碳浓度处于危险水平:18 ppm。
厨房的一氧化碳浓度处于危险水平:22 ppm。
客厅的一氧化碳浓度处于危险水平:14 ppm。
厨房的一氧化碳浓度处于危险水平:26 ppm。
客厅的一氧化碳浓度处于危险水平:17 ppm。
分组数据
使用 group() 函数 按特定列值重新分组数据,以便进一步处理。
from(bucket: "get-started")
|> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> group(columns: ["room", "_field"])
理解数据分组及其重要性很重要,但这对于本“入门”教程来说可能太多了。有关数据如何分组以及为什么重要的更多信息,请参阅 Flux 数据模型 文档。
默认情况下,from() 返回从 InfluxDB 查询的数据,这些数据按序列(measurement、标签和字段键)分组。返回的表流中的每个表代表一个组。每个表都包含相同的值,这些值用于对数据进行分组的列。当您聚合数据时,此分组非常重要。
Group 示例
按特定列分组数据
使用 “开始写入 InfluxDB” 中写入的数据
- 查询
temp 和 hum 字段。 - 使用
group() 仅按 _field 列分组。
from(bucket: "get-started")
|> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T10:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp" or r._field == "hum")
|> group(columns: ["_field"])
以下数据是从上次 filter() 输出并管道传输到 group() 的数据
组键实例 = [_measurement=home, room=Kitchen, _field=hum]
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T08:00:00Z | home | Kitchen | hum | 35.9 |
| 2022-01-01T09:00:00Z | home | Kitchen | hum | 36.2 |
| 2022-01-01T10:00:00Z | home | Kitchen | hum | 36.1 |
组键实例 = [_measurement=home, room=Living Room, _field=hum]
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T08:00:00Z | home | Living Room | hum | 35.9 |
| 2022-01-01T09:00:00Z | home | Living Room | hum | 35.9 |
| 2022-01-01T10:00:00Z | home | Living Room | hum | 36 |
组键实例 = [_measurement=home, room=Kitchen, _field=temp]
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T08:00:00Z | home | Kitchen | temp | 21 |
| 2022-01-01T09:00:00Z | home | Kitchen | temp | 23 |
| 2022-01-01T10:00:00Z | home | Kitchen | temp | 22.7 |
组键实例 = [_measurement=home, room=Living Room, _field=temp]
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T08:00:00Z | home | Living Room | temp | 21.1 |
| 2022-01-01T09:00:00Z | home | Living Room | temp | 21.4 |
| 2022-01-01T10:00:00Z | home | Living Room | temp | 21.8 |
当按 _field 分组时,所有具有 temp 字段的行将位于一个表中,而所有具有 hum 字段的行将位于另一个表中。_measurement 和 room 列不再影响行的分组方式。
组键实例 = [_field=hum]
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T08:00:00Z | home | Kitchen | hum | 35.9 |
| 2022-01-01T09:00:00Z | home | Kitchen | hum | 36.2 |
| 2022-01-01T10:00:00Z | home | Kitchen | hum | 36.1 |
| 2022-01-01T08:00:00Z | home | Living Room | hum | 35.9 |
| 2022-01-01T09:00:00Z | home | Living Room | hum | 35.9 |
| 2022-01-01T10:00:00Z | home | Living Room | hum | 36 |
组键实例 = [_field=temp]
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T08:00:00Z | home | Kitchen | temp | 21 |
| 2022-01-01T09:00:00Z | home | Kitchen | temp | 23 |
| 2022-01-01T10:00:00Z | home | Kitchen | temp | 22.7 |
| 2022-01-01T08:00:00Z | home | Living Room | temp | 21.1 |
| 2022-01-01T09:00:00Z | home | Living Room | temp | 21.4 |
| 2022-01-01T10:00:00Z | home | Living Room | temp | 21.8 |
取消分组数据
使用 “开始写入 InfluxDB” 中写入的数据
- 查询
temp 和 hum 字段。 - 使用不带任何参数的
group() 来“取消分组”数据,或按无列分组。columns 参数的默认值是一个空数组 ([])。
from(bucket: "get-started")
|> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T10:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp" or r._field == "hum")
|> group()
以下数据是从上次 filter() 输出并管道传输到 group() 的数据
组键实例 = [_measurement=home, room=Kitchen, _field=hum]
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T08:00:00Z | home | Kitchen | hum | 35.9 |
| 2022-01-01T09:00:00Z | home | Kitchen | hum | 36.2 |
| 2022-01-01T10:00:00Z | home | Kitchen | hum | 36.1 |
组键实例 = [_measurement=home, room=Living Room, _field=hum]
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T08:00:00Z | home | Living Room | hum | 35.9 |
| 2022-01-01T09:00:00Z | home | Living Room | hum | 35.9 |
| 2022-01-01T10:00:00Z | home | Living Room | hum | 36 |
组键实例 = [_measurement=home, room=Kitchen, _field=temp]
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T08:00:00Z | home | Kitchen | temp | 21 |
| 2022-01-01T09:00:00Z | home | Kitchen | temp | 23 |
| 2022-01-01T10:00:00Z | home | Kitchen | temp | 22.7 |
组键实例 = [_measurement=home, room=Living Room, _field=temp]
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T08:00:00Z | home | Living Room | temp | 21.1 |
| 2022-01-01T09:00:00Z | home | Living Room | temp | 21.4 |
| 2022-01-01T10:00:00Z | home | Living Room | temp | 21.8 |
取消分组后,数据将在单个表中返回。
组键实例 = []
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T08:00:00Z | home | Kitchen | hum | 35.9 |
| 2022-01-01T09:00:00Z | home | Kitchen | hum | 36.2 |
| 2022-01-01T10:00:00Z | home | Kitchen | hum | 36.1 |
| 2022-01-01T08:00:00Z | home | Kitchen | temp | 21 |
| 2022-01-01T09:00:00Z | home | Kitchen | temp | 23 |
| 2022-01-01T10:00:00Z | home | Kitchen | temp | 22.7 |
| 2022-01-01T08:00:00Z | home | Living Room | hum | 35.9 |
| 2022-01-01T09:00:00Z | home | Living Room | hum | 35.9 |
| 2022-01-01T10:00:00Z | home | Living Room | hum | 36 |
| 2022-01-01T08:00:00Z | home | Living Room | temp | 21.1 |
| 2022-01-01T09:00:00Z | home | Living Room | temp | 21.4 |
| 2022-01-01T10:00:00Z | home | Living Room | temp | 21.8 |
聚合或选择特定数据
使用 Flux 聚合 或 选择器 函数从 每个 输入表中返回聚合值或选定值。
from(bucket: "get-started")
|> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "co" or r._field == "hum" or r._field == "temp")
|> mean()
按时间聚合
如果您想查询随时间变化的聚合值,这是一种 降采样 形式。
聚合函数
聚合函数 删除 不 在 组键 中的列,并为每个输入表返回一个单行,其中包含该表的聚合值。
聚合示例
计算每个房间的平均温度
使用 “开始写入 InfluxDB” 中写入的数据
- 查询
temp 字段。默认情况下,from() 返回按 _measurement、room 和 _field 分组的数据,因此每个表代表一个房间。 - 使用
mean() 返回每个房间的平均温度。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp")
|> mean()
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T14:00:00Z | home | Kitchen | temp | 22.8 |
| 2022-01-01T15:00:00Z | home | Kitchen | temp | 22.7 |
| 2022-01-01T16:00:00Z | home | Kitchen | temp | 22.4 |
| 2022-01-01T17:00:00Z | home | Kitchen | temp | 22.7 |
| 2022-01-01T18:00:00Z | home | Kitchen | temp | 23.3 |
| 2022-01-01T19:00:00Z | home | Kitchen | temp | 23.1 |
| 2022-01-01T20:00:00Z | home | Kitchen | temp | 22.7 |
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T14:00:00Z | home | Living Room | temp | 22.3 |
| 2022-01-01T15:00:00Z | home | Living Room | temp | 22.3 |
| 2022-01-01T16:00:00Z | home | Living Room | temp | 22.4 |
| 2022-01-01T17:00:00Z | home | Living Room | temp | 22.6 |
| 2022-01-01T18:00:00Z | home | Living Room | temp | 22.8 |
| 2022-01-01T19:00:00Z | home | Living Room | temp | 22.5 |
| 2022-01-01T20:00:00Z | home | Living Room | temp | 22.2 |
| _measurement | room | _field | _value |
|---|
| home | Kitchen | temp | 22.814285714285713 |
| _measurement | room | _field | _value |
|---|
| home | Living Room | temp | 22.44285714285714 |
计算所有房间的总体平均温度
使用 “开始写入 InfluxDB” 中写入的数据
- 查询
temp 字段。 - 使用
group() 将数据 取消分组 到单个表中。默认情况下,from() 返回按 _measurement、room 和 _field 分组的数据。要获得总体平均值,您需要将所有结果构造为单个表。 - 使用
mean() 返回平均温度。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp")
|> group()
|> mean()
以下输入数据表示管道传输到 mean() 中的取消分组数据。
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T14:00:00Z | home | Kitchen | temp | 22.8 |
| 2022-01-01T15:00:00Z | home | Kitchen | temp | 22.7 |
| 2022-01-01T16:00:00Z | home | Kitchen | temp | 22.4 |
| 2022-01-01T17:00:00Z | home | Kitchen | temp | 22.7 |
| 2022-01-01T18:00:00Z | home | Kitchen | temp | 23.3 |
| 2022-01-01T19:00:00Z | home | Kitchen | temp | 23.1 |
| 2022-01-01T20:00:00Z | home | Kitchen | temp | 22.7 |
| 2022-01-01T14:00:00Z | home | Living Room | temp | 22.3 |
| 2022-01-01T15:00:00Z | home | Living Room | temp | 22.3 |
| 2022-01-01T16:00:00Z | home | Living Room | temp | 22.4 |
| 2022-01-01T17:00:00Z | home | Living Room | temp | 22.6 |
| 2022-01-01T18:00:00Z | home | Living Room | temp | 22.8 |
| 2022-01-01T19:00:00Z | home | Living Room | temp | 22.5 |
| 2022-01-01T20:00:00Z | home | Living Room | temp | 22.2 |
计算每个房间在所有字段中报告的点数
使用 “开始写入 InfluxDB” 中写入的数据
- 通过简单地按
home measurement 进行筛选来查询所有字段。 home measurement 中的字段是不同的类型。使用 toFloat() 将所有字段值强制转换为浮点数。- 使用
group() 按 room 对数据进行分组。 - 使用
count() 返回每个输入表中的行数。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> toFloat()
|> group(columns: ["room"])
|> count()
输出
分配新的聚合时间戳
_time 通常不是组键的一部分,并且在使用聚合函数时将被删除。要为聚合点分配新的时间戳,请将代表查询边界的 _start 或 _stop 列复制为新的 _time 列。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp")
|> mean()
|> duplicate(column: "_stop", as: "_time")
选择器函数
选择器函数 从每个输入表返回一个或多个列,并保留所有列及其值。
选择器示例
返回每个房间的第一个温度
使用 “开始写入 InfluxDB” 中写入的数据
- 查询
temp 字段。 - 使用
first() 返回每个表的第一行。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp")
|> first()
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T14:00:00Z | home | Kitchen | temp | 22.8 |
| 2022-01-01T15:00:00Z | home | Kitchen | temp | 22.7 |
| 2022-01-01T16:00:00Z | home | Kitchen | temp | 22.4 |
| 2022-01-01T17:00:00Z | home | Kitchen | temp | 22.7 |
| 2022-01-01T18:00:00Z | home | Kitchen | temp | 23.3 |
| 2022-01-01T19:00:00Z | home | Kitchen | temp | 23.1 |
| 2022-01-01T20:00:00Z | home | Kitchen | temp | 22.7 |
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T14:00:00Z | home | Living Room | temp | 22.3 |
| 2022-01-01T15:00:00Z | home | Living Room | temp | 22.3 |
| 2022-01-01T16:00:00Z | home | Living Room | temp | 22.4 |
| 2022-01-01T17:00:00Z | home | Living Room | temp | 22.6 |
| 2022-01-01T18:00:00Z | home | Living Room | temp | 22.8 |
| 2022-01-01T19:00:00Z | home | Living Room | temp | 22.5 |
| 2022-01-01T20:00:00Z | home | Living Room | temp | 22.2 |
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T14:00:00Z | home | Kitchen | temp | 22.8 |
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T14:00:00Z | home | Living Room | temp | 22.3 |
返回每个房间的最后一个温度
使用 “开始写入 InfluxDB” 中写入的数据
- 查询
temp 字段。 - 使用
last() 返回每个表的最后一行。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp")
|> last()
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T14:00:00Z | home | Kitchen | temp | 22.8 |
| 2022-01-01T15:00:00Z | home | Kitchen | temp | 22.7 |
| 2022-01-01T16:00:00Z | home | Kitchen | temp | 22.4 |
| 2022-01-01T17:00:00Z | home | Kitchen | temp | 22.7 |
| 2022-01-01T18:00:00Z | home | Kitchen | temp | 23.3 |
| 2022-01-01T19:00:00Z | home | Kitchen | temp | 23.1 |
| 2022-01-01T20:00:00Z | home | Kitchen | temp | 22.7 |
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T14:00:00Z | home | Living Room | temp | 22.3 |
| 2022-01-01T15:00:00Z | home | Living Room | temp | 22.3 |
| 2022-01-01T16:00:00Z | home | Living Room | temp | 22.4 |
| 2022-01-01T17:00:00Z | home | Living Room | temp | 22.6 |
| 2022-01-01T18:00:00Z | home | Living Room | temp | 22.8 |
| 2022-01-01T19:00:00Z | home | Living Room | temp | 22.5 |
| 2022-01-01T20:00:00Z | home | Living Room | temp | 22.2 |
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T20:00:00Z | home | Kitchen | temp | 22.7 |
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T20:00:00Z | home | Living Room | temp | 22.2 |
返回每个房间的最高温度
使用 “开始写入 InfluxDB” 中写入的数据
- 查询
temp 字段。 - 使用
max() 返回每个表中 _value 列中值最高的行。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp")
|> max()
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T14:00:00Z | home | Kitchen | temp | 22.8 |
| 2022-01-01T15:00:00Z | home | Kitchen | temp | 22.7 |
| 2022-01-01T16:00:00Z | home | Kitchen | temp | 22.4 |
| 2022-01-01T17:00:00Z | home | Kitchen | temp | 22.7 |
| 2022-01-01T18:00:00Z | home | Kitchen | temp | 23.3 |
| 2022-01-01T19:00:00Z | home | Kitchen | temp | 23.1 |
| 2022-01-01T20:00:00Z | home | Kitchen | temp | 22.7 |
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T14:00:00Z | home | Living Room | temp | 22.3 |
| 2022-01-01T15:00:00Z | home | Living Room | temp | 22.3 |
| 2022-01-01T16:00:00Z | home | Living Room | temp | 22.4 |
| 2022-01-01T17:00:00Z | home | Living Room | temp | 22.6 |
| 2022-01-01T18:00:00Z | home | Living Room | temp | 22.8 |
| 2022-01-01T19:00:00Z | home | Living Room | temp | 22.5 |
| 2022-01-01T20:00:00Z | home | Living Room | temp | 22.2 |
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T14:00:00Z | home | Kitchen | temp | 22.8 |
| _time | _measurement | room | _field | _value |
|---|
| 2022-01-01T18:00:00Z | home | Living Room | temp | 22.8 |
将数据透视到关系模式中
如果来自关系型 SQL 或类似 SQL 的查询语言(例如 InfluxQL),则 Flux 使用的数据模型与您习惯的模型不同。Flux 返回多个表,其中每个表包含不同的字段。“关系”模式将每个字段构造为每行中的一列。
使用 pivot() 函数 将数据透视到基于时间戳的“关系”模式中。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "co" or r._field == "hum" or r._field == "temp")
|> filter(fn: (r) => r.room == "Kitchen")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
数据降采样
数据降采样是一种策略,可以提高查询时的性能,并优化长期数据存储。简而言之,降采样减少了查询返回的点数,而不会丢失数据中的总体趋势。
有关降采样数据的更多信息,请参阅 降采样数据。
降采样数据最常见的方法是按时间间隔或“窗口”。例如,您可能想要查询最后一小时的数据,并返回每五分钟窗口的平均值。
使用 aggregateWindow() 按指定的时间间隔降采样数据
- 使用
every 参数指定每个窗口的持续时间。 - 使用
fn 参数指定要应用于每个窗口的 聚合 或 选择器 函数。 - (可选) 使用
timeSrc 参数指定要使用哪个列值来为每个窗口创建新的聚合时间戳。默认值为 _stop。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp")
|> aggregateWindow(every: 2h, fn: mean)
使用 InfluxDB 任务自动化处理
InfluxDB 任务是计划查询,可以执行上述任何数据处理操作。通常,任务然后使用 to() 函数 将处理后的结果写回 InfluxDB。
有关创建和配置任务的更多信息,请参阅 InfluxDB 任务入门。
降采样任务示例
option task = {
name: "Example task"
every: 1d,
}
from(bucket: "get-started-downsampled")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "home")
|> aggregateWindow(every: 2h, fn: mean)
支持与反馈
感谢您成为我们社区的一份子!我们欢迎并鼓励您提供关于 InfluxDB 和本文档的反馈和错误报告。要获得支持,请使用以下资源
拥有年度合同或支持合同的客户 可以联系 InfluxData 支持。