Flux 与 InfluxQL
Flux 是 InfluxQL 和其他类似 SQL 的查询语言的替代方案,用于查询和分析数据。Flux 使用函数式语言模式,克服了 InfluxQL 的许多限制。查看以下 Flux 和 InfluxQL 之间的区别
Flux 可以完成的任务
- 连接
- 跨 measurements 的数学运算
- 按标签排序
- 按任何列分组
- 按日历月和年进行窗口化
- 使用多个数据源
- 类似 DatePart 的查询
- 透视
- 直方图
- 协方差
- 将布尔值转换为整数
- 字符串操作和数据整形
- 使用地理时序数据
连接
InfluxQL 从不支持连接。虽然您可以在 TICKscript 中使用连接,但 TICKscript 的连接功能有限。Flux 的 join()
函数 允许您从任何 Bucket、任何 measurement 以及任何列连接数据,只要每个数据集都包含要连接的列。
dataStream1 = from(bucket: "example-bucket1")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "network" and r._field == "bytes-transferred")
dataStream2 = from(bucket: "example-bucket2")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "httpd" and r._field == "requests-per-sec")
join(tables: {d1: dataStream1, d2: dataStream2}, on: ["_time", "_stop", "_start", "host"])
有关使用 join()
函数的深入演练,请参阅如何使用 Flux 连接数据。
跨 measurements 的数学运算
能够跨 measurements 执行连接使您能够计算来自不同 measurements 的数据。下面的示例从两个 measurements (mem
和 processes
) 中获取数据,将它们连接起来,然后计算每个运行进程使用的平均内存量
// Memory used (in bytes)
memUsed = from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "mem" and r._field == "used")
// Total processes running
procTotal = from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "processes" and r._field == "total")
// Join memory used with total processes to calculate
// the average memory (in MB) used for running processes.
join(tables: {mem: memUsed, proc: procTotal}, on: ["_time", "_stop", "_start", "host"])
|> map(fn: (r) => ({_time: r._time, _value: r._value_mem / r._value_proc / 1000000}))
按标签排序
InfluxQL 的排序功能仅允许您使用 ORDER BY time
子句控制 time
的排序顺序。Flux sort()
函数 根据列列表对记录进行排序。根据列类型,Flux 按字典顺序、数值顺序或时间顺序对记录进行排序。
from(bucket: "example-bucket")
|> range(start: -12h)
|> filter(fn: (r) => r._measurement == "system" and r._field == "uptime")
|> sort(columns: ["region", "host", "_value"])
按任何列分组
InfluxQL 仅允许您按标签或时间间隔分组。Flux 允许您按任何列(包括 _value
)对数据进行分组。使用 Flux group()
函数 定义按哪些列对数据进行分组。
from(bucket:"example-bucket")
|> range(start: -12h)
|> filter(fn: (r) => r._measurement == "system" and r._field == "uptime" )
|> group(columns:["host", "_value"])
按日历月和年进行窗口化
InfluxQL 不支持按日历月和年对数据进行窗口化,因为它们的长度各不相同。Flux 支持日历月和年持续时间单位(1mo
、1y
),并允许您按日历月和年对数据进行窗口化和聚合。
from(bucket:"example-bucket")
|> range(start:-1y)
|> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent" )
|> aggregateWindow(every: 1mo, fn: mean)
使用多个数据源
InfluxQL 只能查询存储在 InfluxDB 中的数据。Flux 可以查询来自其他数据源的数据,例如 CSV、PostgreSQL、MySQL、Google BigTable 等。将该数据与 InfluxDB 中的数据连接起来,以丰富查询结果。
import "csv"
import "sql"
csvData = csv.from(csv: rawCSV)
sqlData = sql.from(
driverName: "postgres",
dataSourceName: "postgresql://user:password@localhost",
query: "SELECT * FROM example_table",
)
data = from(bucket: "example-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "sensor")
auxData = join(tables: {csv: csvData, sql: sqlData}, on: ["sensor_id"])
enrichedData = join(tables: {data: data, aux: auxData}, on: ["sensor_id"])
enrichedData
|> yield(name: "enriched_data")
有关查询 SQL 数据的深入演练,请参阅查询 SQL 数据源。
类似 DatePart 的查询
InfluxQL 不支持类似 DatePart 的查询,该查询仅返回一天中指定小时内的结果。Flux hourSelection
函数 仅返回时间值在指定小时范围内的数据。
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu" and r.cpu == "cpu-total")
|> hourSelection(start: 9, stop: 17)
透视
InfluxQL 不支持透视数据表。使用 Flux pivot()
函数 通过 rowKey
、columnKey
和 valueColumn
参数透视数据表。
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu" and r.cpu == "cpu-total")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
直方图
InfluxQL 不支持生成直方图。使用 Flux histogram()
函数 生成累积直方图。
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent")
|> histogram(buckets: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100])
有关更多示例,请参阅如何使用 Flux 创建直方图。
协方差
Flux 提供了用于简单协方差计算的函数。使用 covariance()
函数 计算两列之间的协方差,并使用 cov()
函数 计算两个数据流之间的协方差。
两列之间的协方差
from(bucket: "example-bucket")
|> range(start:-5m)
|> covariance(columns: ["x", "y"])
两个数据流之间的协方差
table1 = from(bucket: "example-bucket")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "measurement_1")
table2 = from(bucket: "example-bucket")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "measurement_2")
cov(x: table1, y: table2, on: ["_time", "_field"])
将布尔值转换为整数
InfluxQL 仅支持数值数据类型(浮点数到整数以及反之亦然)的类型转换。使用 Flux 类型转换函数 执行更多类型转换,包括将布尔值转换为整数。
将布尔字段值转换为整数
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "m" and r._field == "bool_field")
|> toInt()
字符串操作和数据整形
InfluxQL 不支持在查询数据时进行字符串操作。使用 Flux Strings 包 中的函数来操作字符串数据。将此包中的函数与 map()
函数 结合使用,可以执行诸如清理和规范化字符串之类的操作。
import "strings"
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "weather" and r._field == "temp")
|> map(
fn: (r) => ({
r with
location: strings.toTitle(v: r.location),
sensor: strings.replaceAll(v: r.sensor, t: " ", u: "-"),
status: strings.substring(v: r.status, start: 0, end: 8)
})
)
使用地理时序数据
InfluxQL 不支持使用地理时序数据。Flux Geo 包 是一个函数集合,可让您塑造、过滤和分组地理时序数据。
import "experimental/geo"
from(bucket: "geo/autogen")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "taxi")
|> geo.shapeData(latField: "latitude", lonField: "longitude", level: 20)
|> geo.filterRows(region: {lat: 40.69335938, lon: -73.30078125, radius: 20.0}, strict: true)
|> geo.asTracks(groupBy: ["fare-id"])
InfluxQL 和 Flux 对等性
我们正在继续添加函数以完成 Flux 和 InfluxQL 之间的对等性。下表显示了 InfluxQL 语句、子句和函数以及它们等效的 Flux 函数。
有关 Flux 函数的完整列表,请查看所有 Flux 函数。
此页是否对您有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一份子!我们欢迎并鼓励您提供关于 InfluxDB 和本文档的反馈和错误报告。如需获得支持,请使用以下资源
拥有年度合同或支持合同的客户可以联系 InfluxData 支持。