Flux 与 InfluxQL
此页面记录了早期版本的 InfluxDB OSS。InfluxDB OSS v2 是最新的稳定版本。请参阅 InfluxDB v2 文档。
Flux 是 InfluxQL 和其他类似 SQL 的查询语言的替代方案,用于查询和分析数据。Flux 使用函数式语言模式,使其功能非常强大、灵活,并且能够克服 InfluxQL 的许多限制。本文概述了许多 Flux 可以实现但 InfluxQL 无法实现的任务,并提供了有关 Flux 和 InfluxQL 对等性的信息。
Flux 可能实现的功能
连接
InfluxQL 从未支持连接。它们可以使用 TICKscript 完成,但即使是 TICKscript 的连接功能也受到限制。Flux 的 join()
函数 允许您从任何存储桶、任何测量和任何列连接数据,只要每个数据集都包含要连接的列。这为真正强大且有用的操作打开了大门。
dataStream1 = from(bucket: "bucket1")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "network" and
r._field == "bytes-transferred"
)
dataStream2 = from(bucket: "bucket1")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "httpd" and
r._field == "requests-per-sec"
)
join(
tables: {d1:dataStream1, d2:dataStream2},
on: ["_time", "_stop", "_start", "host"]
)
有关使用 join()
函数的深入演练,请参阅如何使用 Flux 连接数据。
跨测量进行数学运算
能够执行跨测量连接还允许您使用来自不同测量的数据运行计算——这是 InfluxData 社区强烈要求的功能。下面的示例从不同的测量 mem
和 processes
中获取两个数据流,将它们连接起来,然后计算每个正在运行的进程使用的平均内存量
// Memory used (in bytes)
memUsed = from(bucket: "telegraf/autogen")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "mem" and
r._field == "used"
)
// Total processes running
procTotal = from(bucket: "telegraf/autogen")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "processes" and
r._field == "total"
)
// Join memory used with total processes and calculate
// the average memory (in MB) used for running processes.
join(
tables: {mem:memUsed, proc:procTotal},
on: ["_time", "_stop", "_start", "host"]
)
|> map(fn: (r) => ({
_time: r._time,
_value: (r._value_mem / r._value_proc) / 1000000
})
)
按标签排序
InfluxQL 的排序功能非常有限,只允许您使用 ORDER BY time
子句控制 time
的排序顺序。Flux 的 sort()
函数 根据列列表对记录进行排序。根据列类型,记录按字典顺序、数字顺序或时间顺序排序。
from(bucket:"telegraf/autogen")
|> range(start:-12h)
|> filter(fn: (r) =>
r._measurement == "system" and
r._field == "uptime"
)
|> sort(columns:["region", "host", "_value"])
按任何列分组
InfluxQL 允许您按标签或时间间隔分组,但仅此而已。Flux 允许您按数据集中的任何列(包括 _value
)分组。使用 Flux group()
函数 来定义按哪些列对数据进行分组。
from(bucket:"telegraf/autogen")
|> range(start:-12h)
|> filter(fn: (r) => r._measurement == "system" and r._field == "uptime" )
|> group(columns:["host", "_value"])
按日历月和年进行窗口化
InfluxQL 不支持按日历月和年对数据进行窗口化,因为它们的长度各不相同。Flux 支持日历月和年持续时间单位(1mo
、1y
),并允许您按日历月和年对数据进行窗口化和聚合。
from(bucket:"telegraf/autogen")
|> range(start:-1y)
|> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent" )
|> aggregateWindow(every: 1mo, fn: mean)
使用多个数据源
InfluxQL 只能查询存储在 InfluxDB 中的数据。Flux 可以查询来自其他数据源的数据,例如 CSV、PostgreSQL、MySQL、Google BigTable 等。将这些数据与 InfluxDB 中的数据连接起来,以丰富查询结果。
import "csv"
import "sql"
csvData = csv.from(csv: rawCSV)
sqlData = sql.from(
driverName: "postgres",
dataSourceName: "postgresql://user:password@localhost",
query:"SELECT * FROM example_table"
)
data = from(bucket: "telegraf/autogen")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "sensor")
auxData = join(tables: {csv: csvData, sql: sqlData}, on: ["sensor_id"])
enrichedData = join(tables: {data: data, aux: auxData}, on: ["sensor_id"])
enrichedData
|> yield(name: "enriched_data")
有关查询 SQL 数据的深入演练,请参阅查询 SQL 数据源。
类似 DatePart 的查询
InfluxQL 不支持类似 DatePart 的查询,该查询仅返回指定时段内的结果。Flux hourSelection
函数 仅返回时间值在指定小时范围内的数据。
from(bucket: "telegraf/autogen")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r.cpu == "cpu-total"
)
|> hourSelection(start: 9, stop: 17)
透视
在 InfluxQL 中从未支持过数据表透视。Flux pivot()
函数 提供了通过指定 rowKey
、columnKey
和 valueColumn
参数来透视数据表的能力。
from(bucket: "telegraf/autogen")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r.cpu == "cpu-total"
)
|> pivot(
rowKey:["_time"],
columnKey: ["_field"],
valueColumn: "_value"
)
直方图
生成直方图的能力一直是 InfluxQL 的一项高需求功能,但从未得到支持。Flux 的 histogram()
函数 使用输入数据生成累积直方图,并支持未来出现的其他直方图类型。
from(bucket: "telegraf/autogen")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "mem" and
r._field == "used_percent"
)
|> histogram(
buckets: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
)
有关使用 Flux 创建累积直方图的示例,请参阅创建直方图。
协方差
Flux 提供了用于简单协方差计算的函数。covariance()
函数 计算两列之间的协方差,而 cov()
函数 计算两个数据流之间的协方差。
两列之间的协方差
from(bucket: "telegraf/autogen")
|> range(start:-5m)
|> covariance(columns: ["x", "y"])
两个数据流之间的协方差
table1 = from(bucket: "telegraf/autogen")
|> range(start: -15m)
|> filter(fn: (r) =>
r._measurement == "measurement_1"
)
table2 = from(bucket: "telegraf/autogen")
|> range(start: -15m)
|> filter(fn: (r) =>
r._measurement == "measurement_2"
)
cov(x: table1, y: table2, on: ["_time", "_field"])
将布尔值转换为整数
InfluxQL 支持类型转换,但仅适用于数字数据类型(浮点数到整数,反之亦然)。Flux 类型转换函数 为类型转换提供了更广泛的支持,并让您执行一些长期请求的操作,例如将布尔值转换为整数。
将布尔字段值转换为整数
from(bucket: "telegraf/autogen")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "m" and
r._field == "bool_field"
)
|> toInt()
字符串操作和数据塑造
InfluxQL 在查询数据时不支持字符串操作。Flux Strings 包 是一组操作字符串数据的函数。当与 map()
函数 结合使用时,字符串包中的函数允许进行字符串清理和规范化等操作。
import "strings"
from(bucket: "telegraf/autogen")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "weather" and
r._field == "temp"
)
|> map(fn: (r) => ({
r with
location: strings.toTitle(v: r.location),
sensor: strings.replaceAll(v: r.sensor, t: " ", u: "-"),
status: strings.substring(v: r.status, start: 0, end: 8)
}))
使用地理时序数据
InfluxQL 不提供用于处理地理时序数据的功能。Flux Geo 包 是一组函数,可让您塑造、过滤和分组地理时序数据。
import "experimental/geo"
from(bucket: "geo/autogen")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "taxi")
|> geo.shapeData(latField: "latitude", lonField: "longitude", level: 20)
|> geo.filterRows(
region: {lat: 40.69335938, lon: -73.30078125, radius: 20.0},
strict: true
)
|> geo.asTracks(groupBy: ["fare-id"])
InfluxQL 和 Flux 的对等性
Flux 正在努力实现与 InfluxQL 的完全对等,并且正在为此目的添加新函数。下表显示了 InfluxQL 语句、子句和函数以及它们等效的 Flux 函数。
有关 Flux 函数的完整列表,查看所有 Flux 函数。
InfluxQL 和 Flux 的对等性
* to()
函数仅写入 InfluxDB 2.0。
此页是否对您有帮助?
感谢您的反馈!