既然您已经了解了从 InfluxDB 查询数据的基本知识,让我们超越基本查询,开始处理查询到的数据。“处理”数据可能意味着转换、聚合、降采样或对数据发出警报。本教程涵盖以下数据处理用例
大多数数据处理操作都需要手动编辑 Flux 查询。如果您正在使用 InfluxDB 数据浏览器,请切换到 脚本编辑器 而不是使用 查询构建器。
重映射或分配数据中的值
使用 map()
函数 迭代数据中的每一行,并更新该行中的值。map()
是 Flux 中最有用的函数之一,它将帮助您完成许多需要执行的数据处理操作。
了解更多关于 map()
如何工作的信息
map()
接受一个参数 fn
。fn
接受一个匿名函数,该函数将每一行读取为名为 r
的 记录。在 r
记录中,每个键值对代表一列及其值。例如
r = {
_time: 2020-01-01T00:00:00Z,
_measurement: "home",
room: "Kitchen",
_field: "temp",
_value: 21.0,
}
_time | _measurement | room | _field | _value |
---|
2020-01-01T00:00:00Z | home | Kitchen | temp | 21.0 |
fn
函数以您需要的任何方式修改 r
记录,并为该行返回一个新的记录。例如,使用上面的记录
(r) => ({ _time: r._time, _field: "temp_F", _value: (r._value * 1.8) + 32.0})
// Returns: {_time: 2020-01-01T00:00:00Z, _field: "temp_F", _value: 69.8}
_time | _field | _value |
---|
2020-01-01T00:00:00Z | temp_F | 69.8 |
请注意,某些列已从原始行记录中删除。这是因为 fn
函数显式映射了 _time
、_field
和 _value
列。要保留现有列并仅更新或添加特定列,请使用 with
运算符扩展您的行记录。例如,使用上面的记录
(r) => ({r with _value: (r._value * 1.8) + 32.0, degrees: "F"})
// Returns:
// {
// _time: 2020-01-01T00:00:00Z,
// _measurement: "home",
// room: "Kitchen",
// _field: "temp",
// _value: 69.8,
// degrees: "F",
// }
_time | _measurement | room | _field | _value | degrees |
---|
2020-01-01T00:00:00Z | home | Kitchen | temp | 69.8 | F |
from(bucket: "get-started")
|> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "hum")
|> map(fn: (r) => ({r with _value: r._value / 100.0}))
Map 示例
有条件地分配状态
在 map()
函数中,您可以使用 条件表达式 (if/then/else) 有条件地分配值。例如,使用 “开始写入 InfluxDB” 中写入的数据
查询 co
字段以返回每个房间的一氧化碳百万分比 (ppm) 读数。
使用 map()
迭代每一行,评估 _value
列中的值,然后有条件地分配状态
- 如果一氧化碳小于 10 ppm,则分配状态:ok。
- 否则,分配状态:warning。
将状态存储在 state 列中。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "co")
|> map(fn: (r) => ({r with state: if r._value < 10 then "ok" else "warning"}))
_time | _measurement | room | _field | _value |
---|
2022-01-01T14:00:00Z | home | Kitchen | co | 1 |
2022-01-01T15:00:00Z | home | Kitchen | co | 3 |
2022-01-01T16:00:00Z | home | Kitchen | co | 7 |
2022-01-01T17:00:00Z | home | Kitchen | co | 9 |
2022-01-01T18:00:00Z | home | Kitchen | co | 18 |
2022-01-01T19:00:00Z | home | Kitchen | co | 22 |
2022-01-01T20:00:00Z | home | Kitchen | co | 26 |
_time | _measurement | room | _field | _value |
---|
2022-01-01T14:00:00Z | home | Living Room | co | 1 |
2022-01-01T15:00:00Z | home | Living Room | co | 1 |
2022-01-01T16:00:00Z | home | Living Room | co | 4 |
2022-01-01T17:00:00Z | home | Living Room | co | 5 |
2022-01-01T18:00:00Z | home | Living Room | co | 9 |
2022-01-01T19:00:00Z | home | Living Room | co | 14 |
2022-01-01T20:00:00Z | home | Living Room | co | 17 |
_time | _measurement | room | _field | _value | state |
---|
2022-01-01T14:00:00Z | home | Kitchen | co | 1 | ok |
2022-01-01T15:00:00Z | home | Kitchen | co | 3 | ok |
2022-01-01T16:00:00Z | home | Kitchen | co | 7 | ok |
2022-01-01T17:00:00Z | home | Kitchen | co | 9 | ok |
2022-01-01T18:00:00Z | home | Kitchen | co | 18 | warning |
2022-01-01T19:00:00Z | home | Kitchen | co | 22 | warning |
2022-01-01T20:00:00Z | home | Kitchen | co | 26 | warning |
_time | _measurement | room | _field | _value | state |
---|
2022-01-01T14:00:00Z | home | Living Room | co | 1 | ok |
2022-01-01T15:00:00Z | home | Living Room | co | 1 | ok |
2022-01-01T16:00:00Z | home | Living Room | co | 4 | ok |
2022-01-01T17:00:00Z | home | Living Room | co | 5 | ok |
2022-01-01T18:00:00Z | home | Living Room | co | 9 | ok |
2022-01-01T19:00:00Z | home | Living Room | co | 14 | warning |
2022-01-01T20:00:00Z | home | Living Room | co | 17 | warning |
数据警报
map()
允许您在每行的基础上执行更复杂的操作。在 fn
函数中使用 Flux 代码块 ({}
),您可以在每个行的上下文中创建作用域变量并执行其他函数。例如,您可以向 Slack 发送消息。
例如,使用 “开始写入 InfluxDB” 中写入的数据
导入 slack
包。
查询 co
字段以返回每个房间的一氧化碳百万分比 (ppm) 读数。
使用 map()
迭代每一行,评估 _value
列中的值,然后有条件地分配状态
- 如果一氧化碳小于 10 ppm,则分配状态:ok。
- 否则,分配状态:warning。
将状态存储在 state 列中。
使用 filter()
仅返回 state 列中包含 warning 的行。
使用 map()
迭代每一行。在您的 fn
函数中,使用 Flux 代码块 ({}
) 来
- 创建一个
responseCode
变量,该变量使用 slack.message()
使用来自输入行的数据向 Slack 发送消息。slack.message()
将 Slack API 请求的响应代码作为整数返回。 - 使用
return
语句返回新的行记录。新行应使用一个新列 sent 扩展输入行,该列的值为布尔值,由 responseCode
变量确定。
map()
为管道传输到该函数中的每一行向 Slack 发送一条消息。
import "slack"
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "co")
|> map(fn: (r) => ({r with state: if r._value < 10 then "ok" else "warning"}))
|> filter(fn: (r) => r.state == "warning")
|> map(
fn: (r) => {
responseCode =
slack.message(
token: "mYSlacK70k3n",
color: "#ff0000",
channel: "#alerts",
text: "Carbon monoxide is at dangerous levels in the ${r.room}: ${r._value} ppm.",
)
return {r with sent: responseCode == 200}
},
)
以下输入表示按 warning 状态筛选的数据。
_time | _measurement | room | _field | _value | state |
---|
2022-01-01T18:00:00Z | home | Kitchen | co | 18 | warning |
2022-01-01T19:00:00Z | home | Kitchen | co | 22 | warning |
2022-01-01T20:00:00Z | home | Kitchen | co | 26 | warning |
_time | _measurement | room | _field | _value | state |
---|
2022-01-01T19:00:00Z | home | Living Room | co | 14 | warning |
2022-01-01T20:00:00Z | home | Living Room | co | 17 | warning |
输出包括一个 sent 列,指示消息是否已发送。
_time | _measurement | room | _field | _value | state | sent |
---|
2022-01-01T18:00:00Z | home | Kitchen | co | 18 | warning | true |
2022-01-01T19:00:00Z | home | Kitchen | co | 22 | warning | true |
2022-01-01T20:00:00Z | home | Kitchen | co | 26 | warning | true |
_time | _measurement | room | _field | _value | state | sent |
---|
2022-01-01T19:00:00Z | home | Living Room | co | 14 | warning | true |
2022-01-01T20:00:00Z | home | Living Room | co | 17 | warning | true |
使用上面的结果,您将在 Slack 中收到以下消息
厨房的一氧化碳浓度处于危险水平:18 ppm。
厨房的一氧化碳浓度处于危险水平:22 ppm。
客厅的一氧化碳浓度处于危险水平:14 ppm。
厨房的一氧化碳浓度处于危险水平:26 ppm。
客厅的一氧化碳浓度处于危险水平:17 ppm。
分组数据
使用 group()
函数 按特定列值重新分组数据,以便进一步处理。
from(bucket: "get-started")
|> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> group(columns: ["room", "_field"])
理解数据分组及其重要性很重要,但这对于本“入门”教程来说可能太多了。有关数据如何分组以及为什么重要的更多信息,请参阅 Flux 数据模型 文档。
默认情况下,from()
返回从 InfluxDB 查询的数据,这些数据按序列(measurement、标签和字段键)分组。返回的表流中的每个表代表一个组。每个表都包含相同的值,这些值用于对数据进行分组的列。当您聚合数据时,此分组非常重要。
Group 示例
按特定列分组数据
使用 “开始写入 InfluxDB” 中写入的数据
- 查询
temp
和 hum
字段。 - 使用
group()
仅按 _field
列分组。
from(bucket: "get-started")
|> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T10:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp" or r._field == "hum")
|> group(columns: ["_field"])
以下数据是从上次 filter()
输出并管道传输到 group()
的数据
组键实例 = [_measurement=home, room=Kitchen, _field=hum]
_time | _measurement | room | _field | _value |
---|
2022-01-01T08:00:00Z | home | Kitchen | hum | 35.9 |
2022-01-01T09:00:00Z | home | Kitchen | hum | 36.2 |
2022-01-01T10:00:00Z | home | Kitchen | hum | 36.1 |
组键实例 = [_measurement=home, room=Living Room, _field=hum]
_time | _measurement | room | _field | _value |
---|
2022-01-01T08:00:00Z | home | Living Room | hum | 35.9 |
2022-01-01T09:00:00Z | home | Living Room | hum | 35.9 |
2022-01-01T10:00:00Z | home | Living Room | hum | 36 |
组键实例 = [_measurement=home, room=Kitchen, _field=temp]
_time | _measurement | room | _field | _value |
---|
2022-01-01T08:00:00Z | home | Kitchen | temp | 21 |
2022-01-01T09:00:00Z | home | Kitchen | temp | 23 |
2022-01-01T10:00:00Z | home | Kitchen | temp | 22.7 |
组键实例 = [_measurement=home, room=Living Room, _field=temp]
_time | _measurement | room | _field | _value |
---|
2022-01-01T08:00:00Z | home | Living Room | temp | 21.1 |
2022-01-01T09:00:00Z | home | Living Room | temp | 21.4 |
2022-01-01T10:00:00Z | home | Living Room | temp | 21.8 |
当按 _field
分组时,所有具有 temp
字段的行将位于一个表中,而所有具有 hum
字段的行将位于另一个表中。_measurement
和 room
列不再影响行的分组方式。
组键实例 = [_field=hum]
_time | _measurement | room | _field | _value |
---|
2022-01-01T08:00:00Z | home | Kitchen | hum | 35.9 |
2022-01-01T09:00:00Z | home | Kitchen | hum | 36.2 |
2022-01-01T10:00:00Z | home | Kitchen | hum | 36.1 |
2022-01-01T08:00:00Z | home | Living Room | hum | 35.9 |
2022-01-01T09:00:00Z | home | Living Room | hum | 35.9 |
2022-01-01T10:00:00Z | home | Living Room | hum | 36 |
组键实例 = [_field=temp]
_time | _measurement | room | _field | _value |
---|
2022-01-01T08:00:00Z | home | Kitchen | temp | 21 |
2022-01-01T09:00:00Z | home | Kitchen | temp | 23 |
2022-01-01T10:00:00Z | home | Kitchen | temp | 22.7 |
2022-01-01T08:00:00Z | home | Living Room | temp | 21.1 |
2022-01-01T09:00:00Z | home | Living Room | temp | 21.4 |
2022-01-01T10:00:00Z | home | Living Room | temp | 21.8 |
取消分组数据
使用 “开始写入 InfluxDB” 中写入的数据
- 查询
temp
和 hum
字段。 - 使用不带任何参数的
group()
来“取消分组”数据,或按无列分组。columns
参数的默认值是一个空数组 ([]
)。
from(bucket: "get-started")
|> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T10:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp" or r._field == "hum")
|> group()
以下数据是从上次 filter()
输出并管道传输到 group()
的数据
组键实例 = [_measurement=home, room=Kitchen, _field=hum]
_time | _measurement | room | _field | _value |
---|
2022-01-01T08:00:00Z | home | Kitchen | hum | 35.9 |
2022-01-01T09:00:00Z | home | Kitchen | hum | 36.2 |
2022-01-01T10:00:00Z | home | Kitchen | hum | 36.1 |
组键实例 = [_measurement=home, room=Living Room, _field=hum]
_time | _measurement | room | _field | _value |
---|
2022-01-01T08:00:00Z | home | Living Room | hum | 35.9 |
2022-01-01T09:00:00Z | home | Living Room | hum | 35.9 |
2022-01-01T10:00:00Z | home | Living Room | hum | 36 |
组键实例 = [_measurement=home, room=Kitchen, _field=temp]
_time | _measurement | room | _field | _value |
---|
2022-01-01T08:00:00Z | home | Kitchen | temp | 21 |
2022-01-01T09:00:00Z | home | Kitchen | temp | 23 |
2022-01-01T10:00:00Z | home | Kitchen | temp | 22.7 |
组键实例 = [_measurement=home, room=Living Room, _field=temp]
_time | _measurement | room | _field | _value |
---|
2022-01-01T08:00:00Z | home | Living Room | temp | 21.1 |
2022-01-01T09:00:00Z | home | Living Room | temp | 21.4 |
2022-01-01T10:00:00Z | home | Living Room | temp | 21.8 |
取消分组后,数据将在单个表中返回。
组键实例 = []
_time | _measurement | room | _field | _value |
---|
2022-01-01T08:00:00Z | home | Kitchen | hum | 35.9 |
2022-01-01T09:00:00Z | home | Kitchen | hum | 36.2 |
2022-01-01T10:00:00Z | home | Kitchen | hum | 36.1 |
2022-01-01T08:00:00Z | home | Kitchen | temp | 21 |
2022-01-01T09:00:00Z | home | Kitchen | temp | 23 |
2022-01-01T10:00:00Z | home | Kitchen | temp | 22.7 |
2022-01-01T08:00:00Z | home | Living Room | hum | 35.9 |
2022-01-01T09:00:00Z | home | Living Room | hum | 35.9 |
2022-01-01T10:00:00Z | home | Living Room | hum | 36 |
2022-01-01T08:00:00Z | home | Living Room | temp | 21.1 |
2022-01-01T09:00:00Z | home | Living Room | temp | 21.4 |
2022-01-01T10:00:00Z | home | Living Room | temp | 21.8 |
聚合或选择特定数据
使用 Flux 聚合 或 选择器 函数从 每个 输入表中返回聚合值或选定值。
from(bucket: "get-started")
|> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "co" or r._field == "hum" or r._field == "temp")
|> mean()
按时间聚合
如果您想查询随时间变化的聚合值,这是一种 降采样 形式。
聚合函数
聚合函数 删除 不 在 组键 中的列,并为每个输入表返回一个单行,其中包含该表的聚合值。
聚合示例
计算每个房间的平均温度
使用 “开始写入 InfluxDB” 中写入的数据
- 查询
temp
字段。默认情况下,from()
返回按 _measurement
、room
和 _field
分组的数据,因此每个表代表一个房间。 - 使用
mean()
返回每个房间的平均温度。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp")
|> mean()
_time | _measurement | room | _field | _value |
---|
2022-01-01T14:00:00Z | home | Kitchen | temp | 22.8 |
2022-01-01T15:00:00Z | home | Kitchen | temp | 22.7 |
2022-01-01T16:00:00Z | home | Kitchen | temp | 22.4 |
2022-01-01T17:00:00Z | home | Kitchen | temp | 22.7 |
2022-01-01T18:00:00Z | home | Kitchen | temp | 23.3 |
2022-01-01T19:00:00Z | home | Kitchen | temp | 23.1 |
2022-01-01T20:00:00Z | home | Kitchen | temp | 22.7 |
_time | _measurement | room | _field | _value |
---|
2022-01-01T14:00:00Z | home | Living Room | temp | 22.3 |
2022-01-01T15:00:00Z | home | Living Room | temp | 22.3 |
2022-01-01T16:00:00Z | home | Living Room | temp | 22.4 |
2022-01-01T17:00:00Z | home | Living Room | temp | 22.6 |
2022-01-01T18:00:00Z | home | Living Room | temp | 22.8 |
2022-01-01T19:00:00Z | home | Living Room | temp | 22.5 |
2022-01-01T20:00:00Z | home | Living Room | temp | 22.2 |
_measurement | room | _field | _value |
---|
home | Kitchen | temp | 22.814285714285713 |
_measurement | room | _field | _value |
---|
home | Living Room | temp | 22.44285714285714 |
计算所有房间的总体平均温度
使用 “开始写入 InfluxDB” 中写入的数据
- 查询
temp
字段。 - 使用
group()
将数据 取消分组 到单个表中。默认情况下,from()
返回按 _measurement
、room
和 _field
分组的数据。要获得总体平均值,您需要将所有结果构造为单个表。 - 使用
mean()
返回平均温度。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp")
|> group()
|> mean()
以下输入数据表示管道传输到 mean()
中的取消分组数据。
_time | _measurement | room | _field | _value |
---|
2022-01-01T14:00:00Z | home | Kitchen | temp | 22.8 |
2022-01-01T15:00:00Z | home | Kitchen | temp | 22.7 |
2022-01-01T16:00:00Z | home | Kitchen | temp | 22.4 |
2022-01-01T17:00:00Z | home | Kitchen | temp | 22.7 |
2022-01-01T18:00:00Z | home | Kitchen | temp | 23.3 |
2022-01-01T19:00:00Z | home | Kitchen | temp | 23.1 |
2022-01-01T20:00:00Z | home | Kitchen | temp | 22.7 |
2022-01-01T14:00:00Z | home | Living Room | temp | 22.3 |
2022-01-01T15:00:00Z | home | Living Room | temp | 22.3 |
2022-01-01T16:00:00Z | home | Living Room | temp | 22.4 |
2022-01-01T17:00:00Z | home | Living Room | temp | 22.6 |
2022-01-01T18:00:00Z | home | Living Room | temp | 22.8 |
2022-01-01T19:00:00Z | home | Living Room | temp | 22.5 |
2022-01-01T20:00:00Z | home | Living Room | temp | 22.2 |
计算每个房间在所有字段中报告的点数
使用 “开始写入 InfluxDB” 中写入的数据
- 通过简单地按
home
measurement 进行筛选来查询所有字段。 home
measurement 中的字段是不同的类型。使用 toFloat()
将所有字段值强制转换为浮点数。- 使用
group()
按 room
对数据进行分组。 - 使用
count()
返回每个输入表中的行数。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> toFloat()
|> group(columns: ["room"])
|> count()
输出
分配新的聚合时间戳
_time
通常不是组键的一部分,并且在使用聚合函数时将被删除。要为聚合点分配新的时间戳,请将代表查询边界的 _start
或 _stop
列复制为新的 _time
列。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp")
|> mean()
|> duplicate(column: "_stop", as: "_time")
选择器函数
选择器函数 从每个输入表返回一个或多个列,并保留所有列及其值。
选择器示例
返回每个房间的第一个温度
使用 “开始写入 InfluxDB” 中写入的数据
- 查询
temp
字段。 - 使用
first()
返回每个表的第一行。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp")
|> first()
_time | _measurement | room | _field | _value |
---|
2022-01-01T14:00:00Z | home | Kitchen | temp | 22.8 |
2022-01-01T15:00:00Z | home | Kitchen | temp | 22.7 |
2022-01-01T16:00:00Z | home | Kitchen | temp | 22.4 |
2022-01-01T17:00:00Z | home | Kitchen | temp | 22.7 |
2022-01-01T18:00:00Z | home | Kitchen | temp | 23.3 |
2022-01-01T19:00:00Z | home | Kitchen | temp | 23.1 |
2022-01-01T20:00:00Z | home | Kitchen | temp | 22.7 |
_time | _measurement | room | _field | _value |
---|
2022-01-01T14:00:00Z | home | Living Room | temp | 22.3 |
2022-01-01T15:00:00Z | home | Living Room | temp | 22.3 |
2022-01-01T16:00:00Z | home | Living Room | temp | 22.4 |
2022-01-01T17:00:00Z | home | Living Room | temp | 22.6 |
2022-01-01T18:00:00Z | home | Living Room | temp | 22.8 |
2022-01-01T19:00:00Z | home | Living Room | temp | 22.5 |
2022-01-01T20:00:00Z | home | Living Room | temp | 22.2 |
_time | _measurement | room | _field | _value |
---|
2022-01-01T14:00:00Z | home | Kitchen | temp | 22.8 |
_time | _measurement | room | _field | _value |
---|
2022-01-01T14:00:00Z | home | Living Room | temp | 22.3 |
返回每个房间的最后一个温度
使用 “开始写入 InfluxDB” 中写入的数据
- 查询
temp
字段。 - 使用
last()
返回每个表的最后一行。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp")
|> last()
_time | _measurement | room | _field | _value |
---|
2022-01-01T14:00:00Z | home | Kitchen | temp | 22.8 |
2022-01-01T15:00:00Z | home | Kitchen | temp | 22.7 |
2022-01-01T16:00:00Z | home | Kitchen | temp | 22.4 |
2022-01-01T17:00:00Z | home | Kitchen | temp | 22.7 |
2022-01-01T18:00:00Z | home | Kitchen | temp | 23.3 |
2022-01-01T19:00:00Z | home | Kitchen | temp | 23.1 |
2022-01-01T20:00:00Z | home | Kitchen | temp | 22.7 |
_time | _measurement | room | _field | _value |
---|
2022-01-01T14:00:00Z | home | Living Room | temp | 22.3 |
2022-01-01T15:00:00Z | home | Living Room | temp | 22.3 |
2022-01-01T16:00:00Z | home | Living Room | temp | 22.4 |
2022-01-01T17:00:00Z | home | Living Room | temp | 22.6 |
2022-01-01T18:00:00Z | home | Living Room | temp | 22.8 |
2022-01-01T19:00:00Z | home | Living Room | temp | 22.5 |
2022-01-01T20:00:00Z | home | Living Room | temp | 22.2 |
_time | _measurement | room | _field | _value |
---|
2022-01-01T20:00:00Z | home | Kitchen | temp | 22.7 |
_time | _measurement | room | _field | _value |
---|
2022-01-01T20:00:00Z | home | Living Room | temp | 22.2 |
返回每个房间的最高温度
使用 “开始写入 InfluxDB” 中写入的数据
- 查询
temp
字段。 - 使用
max()
返回每个表中 _value
列中值最高的行。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp")
|> max()
_time | _measurement | room | _field | _value |
---|
2022-01-01T14:00:00Z | home | Kitchen | temp | 22.8 |
2022-01-01T15:00:00Z | home | Kitchen | temp | 22.7 |
2022-01-01T16:00:00Z | home | Kitchen | temp | 22.4 |
2022-01-01T17:00:00Z | home | Kitchen | temp | 22.7 |
2022-01-01T18:00:00Z | home | Kitchen | temp | 23.3 |
2022-01-01T19:00:00Z | home | Kitchen | temp | 23.1 |
2022-01-01T20:00:00Z | home | Kitchen | temp | 22.7 |
_time | _measurement | room | _field | _value |
---|
2022-01-01T14:00:00Z | home | Living Room | temp | 22.3 |
2022-01-01T15:00:00Z | home | Living Room | temp | 22.3 |
2022-01-01T16:00:00Z | home | Living Room | temp | 22.4 |
2022-01-01T17:00:00Z | home | Living Room | temp | 22.6 |
2022-01-01T18:00:00Z | home | Living Room | temp | 22.8 |
2022-01-01T19:00:00Z | home | Living Room | temp | 22.5 |
2022-01-01T20:00:00Z | home | Living Room | temp | 22.2 |
_time | _measurement | room | _field | _value |
---|
2022-01-01T14:00:00Z | home | Kitchen | temp | 22.8 |
_time | _measurement | room | _field | _value |
---|
2022-01-01T18:00:00Z | home | Living Room | temp | 22.8 |
将数据透视到关系模式中
如果来自关系型 SQL 或类似 SQL 的查询语言(例如 InfluxQL),则 Flux 使用的数据模型与您习惯的模型不同。Flux 返回多个表,其中每个表包含不同的字段。“关系”模式将每个字段构造为每行中的一列。
使用 pivot()
函数 将数据透视到基于时间戳的“关系”模式中。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "co" or r._field == "hum" or r._field == "temp")
|> filter(fn: (r) => r.room == "Kitchen")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
数据降采样
数据降采样是一种策略,可以提高查询时的性能,并优化长期数据存储。简而言之,降采样减少了查询返回的点数,而不会丢失数据中的总体趋势。
有关降采样数据的更多信息,请参阅 降采样数据。
降采样数据最常见的方法是按时间间隔或“窗口”。例如,您可能想要查询最后一小时的数据,并返回每五分钟窗口的平均值。
使用 aggregateWindow()
按指定的时间间隔降采样数据
- 使用
every
参数指定每个窗口的持续时间。 - 使用
fn
参数指定要应用于每个窗口的 聚合 或 选择器 函数。 - (可选) 使用
timeSrc
参数指定要使用哪个列值来为每个窗口创建新的聚合时间戳。默认值为 _stop
。
from(bucket: "get-started")
|> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
|> filter(fn: (r) => r._measurement == "home")
|> filter(fn: (r) => r._field == "temp")
|> aggregateWindow(every: 2h, fn: mean)
使用 InfluxDB 任务自动化处理
InfluxDB 任务是计划查询,可以执行上述任何数据处理操作。通常,任务然后使用 to()
函数 将处理后的结果写回 InfluxDB。
有关创建和配置任务的更多信息,请参阅 InfluxDB 任务入门。
降采样任务示例
option task = {
name: "Example task"
every: 1d,
}
from(bucket: "get-started-downsampled")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "home")
|> aggregateWindow(every: 2h, fn: mean)
支持与反馈
感谢您成为我们社区的一份子!我们欢迎并鼓励您提供关于 InfluxDB 和本文档的反馈和错误报告。要获得支持,请使用以下资源
拥有年度合同或支持合同的客户 可以联系 InfluxData 支持。