文档文档

Flux 与 InfluxQL

Flux 是 InfluxQL 和其他类似 SQL 的查询语言的替代方案,用于查询和分析数据。Flux 使用函数式语言模式,克服了 InfluxQL 的许多限制。查看以下 Flux 和 InfluxQL 之间的区别

Flux 可以完成的任务

连接

InfluxQL 从不支持连接。虽然您可以在 TICKscript 中使用连接,但 TICKscript 的连接功能有限。Flux 的 join() 函数 允许您从任何 Bucket、任何 measurement 以及任何列连接数据,只要每个数据集都包含要连接的列。

dataStream1 = from(bucket: "example-bucket1")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "network" and r._field == "bytes-transferred")

dataStream2 = from(bucket: "example-bucket2")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "httpd" and r._field == "requests-per-sec")

join(tables: {d1: dataStream1, d2: dataStream2}, on: ["_time", "_stop", "_start", "host"])

有关使用 join() 函数的深入演练,请参阅如何使用 Flux 连接数据

跨 measurements 的数学运算

能够跨 measurements 执行连接使您能够计算来自不同 measurements 的数据。下面的示例从两个 measurements (memprocesses) 中获取数据,将它们连接起来,然后计算每个运行进程使用的平均内存量

// Memory used (in bytes)
memUsed = from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "mem" and r._field == "used")

// Total processes running
procTotal = from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "processes" and r._field == "total")

// Join memory used with total processes to calculate
// the average memory (in MB) used for running processes.
join(tables: {mem: memUsed, proc: procTotal}, on: ["_time", "_stop", "_start", "host"])
    |> map(fn: (r) => ({_time: r._time, _value: r._value_mem / r._value_proc / 1000000}))

按标签排序

InfluxQL 的排序功能仅允许您使用 ORDER BY time 子句控制 time 的排序顺序。Flux sort() 函数 根据列列表对记录进行排序。根据列类型,Flux 按字典顺序、数值顺序或时间顺序对记录进行排序。

from(bucket: "example-bucket")
    |> range(start: -12h)
    |> filter(fn: (r) => r._measurement == "system" and r._field == "uptime")
    |> sort(columns: ["region", "host", "_value"])

按任何列分组

InfluxQL 仅允许您按标签或时间间隔分组。Flux 允许您按任何列(包括 _value)对数据进行分组。使用 Flux group() 函数 定义按哪些列对数据进行分组。

from(bucket:"example-bucket")
    |> range(start: -12h)
    |> filter(fn: (r) => r._measurement == "system" and r._field == "uptime" )
    |> group(columns:["host", "_value"])

按日历月和年进行窗口化

InfluxQL 不支持按日历月和年对数据进行窗口化,因为它们的长度各不相同。Flux 支持日历月和年持续时间单位(1mo1y),并允许您按日历月和年对数据进行窗口化和聚合。

from(bucket:"example-bucket")
    |> range(start:-1y)
    |> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent" )
    |> aggregateWindow(every: 1mo, fn: mean)

使用多个数据源

InfluxQL 只能查询存储在 InfluxDB 中的数据。Flux 可以查询来自其他数据源的数据,例如 CSV、PostgreSQL、MySQL、Google BigTable 等。将该数据与 InfluxDB 中的数据连接起来,以丰富查询结果。

import "csv"
import "sql"

csvData = csv.from(csv: rawCSV)

sqlData = sql.from(
    driverName: "postgres",
    dataSourceName: "postgresql://user:password@localhost",
    query: "SELECT * FROM example_table",
)

data = from(bucket: "example-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._measurement == "sensor")

auxData = join(tables: {csv: csvData, sql: sqlData}, on: ["sensor_id"])
enrichedData = join(tables: {data: data, aux: auxData}, on: ["sensor_id"])

enrichedData
    |> yield(name: "enriched_data")

有关查询 SQL 数据的深入演练,请参阅查询 SQL 数据源

类似 DatePart 的查询

InfluxQL 不支持类似 DatePart 的查询,该查询仅返回一天中指定小时内的结果。Flux hourSelection 函数 仅返回时间值在指定小时范围内的数据。

from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu" and r.cpu == "cpu-total")
    |> hourSelection(start: 9, stop: 17)

透视

InfluxQL 不支持透视数据表。使用 Flux pivot() 函数 通过 rowKeycolumnKeyvalueColumn 参数透视数据表。

from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu" and r.cpu == "cpu-total")
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")

直方图

InfluxQL 不支持生成直方图。使用 Flux histogram() 函数 生成累积直方图。

from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent")
    |> histogram(buckets: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100])

有关更多示例,请参阅如何使用 Flux 创建直方图

协方差

Flux 提供了用于简单协方差计算的函数。使用 covariance() 函数 计算两列之间的协方差,并使用 cov() 函数 计算两个数据流之间的协方差。

两列之间的协方差
from(bucket: "example-bucket")
    |> range(start:-5m)
    |> covariance(columns: ["x", "y"])
两个数据流之间的协方差
table1 = from(bucket: "example-bucket")
    |> range(start: -15m)
    |> filter(fn: (r) => r._measurement == "measurement_1")

table2 = from(bucket: "example-bucket")
    |> range(start: -15m)
    |> filter(fn: (r) => r._measurement == "measurement_2")

cov(x: table1, y: table2, on: ["_time", "_field"])

将布尔值转换为整数

InfluxQL 仅支持数值数据类型(浮点数到整数以及反之亦然)的类型转换。使用 Flux 类型转换函数 执行更多类型转换,包括将布尔值转换为整数。

将布尔字段值转换为整数
from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "m" and r._field == "bool_field")
    |> toInt()

字符串操作和数据整形

InfluxQL 不支持在查询数据时进行字符串操作。使用 Flux Strings 包 中的函数来操作字符串数据。将此包中的函数与 map() 函数 结合使用,可以执行诸如清理和规范化字符串之类的操作。

import "strings"

from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "weather" and r._field == "temp")
    |> map(
        fn: (r) => ({
            r with
            location: strings.toTitle(v: r.location),
            sensor: strings.replaceAll(v: r.sensor, t: " ", u: "-"),
            status: strings.substring(v: r.status, start: 0, end: 8)
        })
    )

使用地理时序数据

InfluxQL 不支持使用地理时序数据。Flux Geo 包 是一个函数集合,可让您塑造、过滤和分组地理时序数据。

import "experimental/geo"

from(bucket: "geo/autogen")
    |> range(start: -1w)
    |> filter(fn: (r) => r._measurement == "taxi")
    |> geo.shapeData(latField: "latitude", lonField: "longitude", level: 20)
    |> geo.filterRows(region: {lat: 40.69335938, lon: -73.30078125, radius: 20.0}, strict: true)
    |> geo.asTracks(groupBy: ["fare-id"])

InfluxQL 和 Flux 对等性

我们正在继续添加函数以完成 Flux 和 InfluxQL 之间的对等性。下表显示了 InfluxQL 语句、子句和函数以及它们等效的 Flux 函数。

有关 Flux 函数的完整列表,请查看所有 Flux 函数

InfluxQLFlux 函数
SELECTfilter()
WHEREfilter(), range()
GROUP BYgroup()
INTOto()
ORDER BYsort()
LIMITlimit()
SLIMIT
OFFSET
SOFFSET
SHOW DATABASESbuckets()
SHOW MEASUREMENTSschema.measurements
SHOW FIELD KEYSkeys()
SHOW RETENTION POLICIESbuckets()
SHOW TAG KEYSschema.tagKeys(), schema.measurementTagKeys()
SHOW TAG VALUESschema.tagValues(), schema.measurementTagValues()
SHOW SERIES
CREATE DATABASEN/A
DROP DATABASEN/A
DROP SERIESN/A
DELETEN/A
DROP MEASUREMENTN/A
DROP SHARDN/A
CREATE RETENTION POLICYN/A
ALTER RETENTION POLICYN/A
DROP RETENTION POLICYN/A
COUNTcount()
DISTINCTdistinct()
INTEGRALintegral()
MEANmean()
MEDIANmedian()
MODEmode()
SPREADspread()
STDDEVstddev()
SUMsum()
BOTTOMbottom()
FIRSTfirst()
LASTlast()
MAXmax()
MINmin()
PERCENTILEquantile()
SAMPLEsample()
TOPtop()
ABSmath.abs()
ACOSmath.acos()
ASINmath.asin()
ATANmath.atan()
ATAN2math.atan2()
CEILmath.ceil()
COSmath.cos()
CUMULATIVE_SUMcumulativeSum()
DERIVATIVEderivative()
DIFFERENCEdifference()
ELAPSEDelapsed()
EXPmath.exp()
FLOORmath.floor()
HISTOGRAMhistogram()
LNmath.log()
LOGmath.logb()
LOG2math.log2()
LOG10math.log10()
MOVING_AVERAGEmovingAverage()
NON_NEGATIVE_DERIVATIVEderivative(nonNegative:true)
NON_NEGATIVE_DIFFERENCEdifference(nonNegative:true)
POWmath.pow()
ROUNDmath.round()
SINmath.sin()
SQRTmath.sqrt()
TANmath.tan()
HOLT_WINTERSholtWinters()
CHANDE_MOMENTUM_OSCILLATORchandeMomentumOscillator()
EXPONENTIAL_MOVING_AVERAGEexponentialMovingAverage()
DOUBLE_EXPONENTIAL_MOVING_AVERAGEdoubleEMA()
KAUFMANS_EFFICIENCY_RATIOkaufmansER()
KAUFMANS_ADAPTIVE_MOVING_AVERAGEkaufmansAMA()
TRIPLE_EXPONENTIAL_MOVING_AVERAGEtripleEMA()
TRIPLE_EXPONENTIAL_DERIVATIVEtripleExponentialDerivative()
RELATIVE_STRENGTH_INDEXrelativeStrengthIndex()

此页是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像往常一样使用,无需更改任何代码。

阅读更多

现已正式发布

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速的最新数据引擎,可实时收集和处理数据,并将其存储到本地磁盘或对象存储。InfluxDB 3 Enterprise 在 Core 的基础上构建,增加了高可用性、读取副本、增强的安全性和数据压缩,以实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费版本可用于非商业用途,例如家庭或业余爱好。

欲了解更多信息,请参阅