文档文档

Flux 与 InfluxQL

此页面记录了早期版本的 InfluxDB OSS。 InfluxDB OSS v2 是最新的稳定版本。请参阅 InfluxDB v2 文档

Flux 是 InfluxQL 和其他类 SQL 查询语言的替代品,用于查询和分析数据。 Flux 使用函数式语言模式,使其功能非常强大、灵活,并且能够克服 InfluxQL 的许多限制。本文概述了 Flux 可以实现但 InfluxQL 无法实现的许多任务,并提供了有关 Flux 和 InfluxQL 对等性的信息。

Flux 可以实现

连接

InfluxQL 从未支持连接。可以使用 TICKscript 完成连接,但即使是 TICKscript 的连接功能也有限。 Flux 的 join() 函数 允许您从任何存储桶、任何测量和任何列连接数据,只要每个数据集都包含要连接的列即可。这为真正强大且有用的操作打开了大门。

dataStream1 = from(bucket: "bucket1")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "network" and
    r._field == "bytes-transferred"
  )

dataStream2 = from(bucket: "bucket1")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "httpd" and
    r._field == "requests-per-sec"
    )

join(
    tables: {d1:dataStream1, d2:dataStream2},
    on: ["_time", "_stop", "_start", "host"]
  )

有关使用 join() 函数的深入演练,请参阅 如何使用 Flux 连接数据


跨测量进行数学运算

能够执行跨测量连接还允许您使用来自不同测量的数据运行计算——这是 InfluxData 社区强烈要求的功能。下面的示例从不同的测量 memprocesses 中获取两个数据流,将它们连接起来,然后计算每个正在运行的进程使用的平均内存量

// Memory used (in bytes)
memUsed = from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "mem" and
    r._field == "used"
  )

// Total processes running
procTotal = from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "processes" and
    r._field == "total"
    )

// Join memory used with total processes and calculate
// the average memory (in MB) used for running processes.
join(
    tables: {mem:memUsed, proc:procTotal},
    on: ["_time", "_stop", "_start", "host"]
  )
  |> map(fn: (r) => ({
    _time: r._time,
    _value: (r._value_mem / r._value_proc) / 1000000
  })
)

按标签排序

InfluxQL 的排序功能非常有限,仅允许您使用 ORDER BY time 子句控制 time 的排序顺序。 Flux 的 sort() 函数 根据列列表对记录进行排序。根据列类型,记录按字典顺序、数字顺序或时间顺序排序。

from(bucket:"telegraf/autogen")
  |> range(start:-12h)
  |> filter(fn: (r) =>
    r._measurement == "system" and
    r._field == "uptime"
  )
  |> sort(columns:["region", "host", "_value"])

按任何列分组

InfluxQL 允许您按标签或时间间隔分组,但仅此而已。 Flux 允许您按数据集中的任何列(包括 _value)分组。使用 Flux group() 函数 定义要按哪些列对数据进行分组。

from(bucket:"telegraf/autogen")
  |> range(start:-12h)
  |> filter(fn: (r) => r._measurement == "system" and r._field == "uptime" )
  |> group(columns:["host", "_value"])

按日历月和年进行窗口化

InfluxQL 不支持按日历月和年对数据进行窗口化,因为它们的长度各不相同。 Flux 支持日历月和年持续时间单位(1mo1y),并允许您按日历月和年对数据进行窗口化和聚合。

from(bucket:"telegraf/autogen")
  |> range(start:-1y)
  |> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent" )
  |> aggregateWindow(every: 1mo, fn: mean)

使用多个数据源

InfluxQL 只能查询存储在 InfluxDB 中的数据。 Flux 可以查询来自其他数据源的数据,例如 CSV、PostgreSQL、MySQL、Google BigTable 等。将该数据与 InfluxDB 中的数据连接起来以丰富查询结果。

import "csv"
import "sql"

csvData = csv.from(csv: rawCSV)
sqlData = sql.from(
  driverName: "postgres",
  dataSourceName: "postgresql://user:password@localhost",
  query:"SELECT * FROM example_table"
)
data = from(bucket: "telegraf/autogen")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "sensor")

auxData = join(tables: {csv: csvData, sql: sqlData}, on: ["sensor_id"])
enrichedData = join(tables: {data: data, aux: auxData}, on: ["sensor_id"])

enrichedData
  |> yield(name: "enriched_data")

有关查询 SQL 数据的深入演练,请参阅 查询 SQL 数据源


类似 DatePart 的查询

InfluxQL 不支持类似 DatePart 的查询,此类查询仅在一天中的指定小时内返回结果。 Flux hourSelection 函数 仅返回时间值在指定小时范围内的数据。

from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r.cpu == "cpu-total"
  )
  |> hourSelection(start: 9, stop: 17)

透视

InfluxQL 从未支持数据表透视。 Flux pivot() 函数 提供了通过指定 rowKeycolumnKeyvalueColumn 参数来透视数据表的功能。

from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r.cpu == "cpu-total"
  )
  |> pivot(
    rowKey:["_time"],
    columnKey: ["_field"],
    valueColumn: "_value"
  )

直方图

生成直方图的功能一直是 InfluxQL 的一项强烈要求的功能,但从未得到支持。 Flux 的 histogram() 函数 使用输入数据生成累积直方图,并支持未来推出的其他直方图类型。

from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "mem" and
    r._field == "used_percent"
  )
  |> histogram(
    buckets: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
  )

有关使用 Flux 创建累积直方图的示例,请参阅 创建直方图


协方差

Flux 提供了用于简单协方差计算的函数。 covariance() 函数 计算两列之间的协方差,cov() 函数 计算两个数据流之间的协方差。

两列之间的协方差
from(bucket: "telegraf/autogen")
  |> range(start:-5m)
  |> covariance(columns: ["x", "y"])
两个数据流之间的协方差
table1 = from(bucket: "telegraf/autogen")
  |> range(start: -15m)
  |> filter(fn: (r) =>
    r._measurement == "measurement_1"
  )

table2 = from(bucket: "telegraf/autogen")
  |> range(start: -15m)
  |> filter(fn: (r) =>
    r._measurement == "measurement_2"
  )

cov(x: table1, y: table2, on: ["_time", "_field"])

将布尔值转换为整数

InfluxQL 支持类型转换,但仅适用于数值数据类型(浮点数到整数,反之亦然)。 Flux 类型转换函数 为类型转换提供了更广泛的支持,并允许您执行一些长期请求的操作,例如将布尔值转换为整数。

将布尔字段值转换为整数
from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "m" and
    r._field == "bool_field"
  )
  |> toInt()

字符串操作和数据整形

InfluxQL 在查询数据时不支持字符串操作。 Flux Strings 包 是一系列操作字符串数据的函数集合。当与 map() 函数 结合使用时,字符串包中的函数允许执行字符串清理和规范化等操作。

import "strings"

from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "weather" and
    r._field == "temp"
  )
  |> map(fn: (r) => ({
    r with
    location: strings.toTitle(v: r.location),
    sensor: strings.replaceAll(v: r.sensor, t: " ", u: "-"),
    status: strings.substring(v: r.status, start: 0, end: 8)
  }))

处理地理时空数据

InfluxQL 没有提供处理地理时空数据的功能。 Flux Geo 包 是一系列函数,可让您塑造、过滤和分组地理时空数据。

import "experimental/geo"

from(bucket: "geo/autogen")
  |> range(start: -1w)
  |> filter(fn: (r) => r._measurement == "taxi")
  |> geo.shapeData(latField: "latitude", lonField: "longitude", level: 20)
  |> geo.filterRows(
    region: {lat: 40.69335938, lon: -73.30078125, radius: 20.0},
    strict: true
  )
  |> geo.asTracks(groupBy: ["fare-id"])

InfluxQL 和 Flux 对等性

Flux 正在努力实现与 InfluxQL 的完全对等,并且为此目的正在添加新函数。下表显示了 InfluxQL 语句、子句和函数及其等效的 Flux 函数。

有关 Flux 函数的完整列表,请查看所有 Flux 函数

InfluxQL 和 Flux 对等性

InfluxQLFlux 函数
SELECTfilter()
WHEREfilter(), range()
GROUP BYgroup()
INTOto() *
ORDER BYsort()
LIMITlimit()
SLIMIT
OFFSET
SOFFSET
SHOW DATABASESbuckets()
SHOW MEASUREMENTSv1.measurements
SHOW FIELD KEYSkeys()
SHOW RETENTION POLICIESbuckets()
SHOW TAG KEYSv1.tagKeys(), v1.measurementTagKeys()
SHOW TAG VALUESv1.tagValues(), v1.measurementTagValues()
SHOW SERIES
CREATE DATABASE
DROP DATABASE
DROP SERIES
DELETE
DROP MEASUREMENT
DROP SHARD
CREATE RETENTION POLICY
ALTER RETENTION POLICY
DROP RETENTION POLICY
COUNTcount()
DISTINCTdistinct()
INTEGRALintegral()
MEANmean()
MEDIANmedian()
MODEmode()
SPREADspread()
STDDEVstddev()
SUMsum()
BOTTOMbottom()
FIRSTfirst()
LASTlast()
MAXmax()
MINmin()
PERCENTILEquantile()
SAMPLEsample()
TOPtop()
ABSmath.abs()
ACOSmath.acos()
ASINmath.asin()
ATANmath.atan()
ATAN2math.atan2()
CEILmath.ceil()
COSmath.cos()
CUMULATIVE_SUMcumulativeSum()
DERIVATIVEderivative()
DIFFERENCEdifference()
ELAPSEDelapsed()
EXPmath.exp()
FLOORmath.floor()
HISTOGRAMhistogram()
LNmath.log()
LOGmath.logb()
LOG2math.log2()
LOG10math.log10()
MOVING_AVERAGEmovingAverage()
NON_NEGATIVE_DERIVATIVEderivative(nonNegative:true)
NON_NEGATIVE_DIFFERENCEdifference(nonNegative:true)
POWmath.pow()
ROUNDmath.round()
SINmath.sin()
SQRTmath.sqrt()
TANmath.tan()
HOLT_WINTERSholtWinters()
CHANDE_MOMENTUM_OSCILLATORchandeMomentumOscillator()
EXPONENTIAL_MOVING_AVERAGEexponentialMovingAverage()
DOUBLE_EXPONENTIAL_MOVING_AVERAGEdoubleEMA()
KAUFMANS_EFFICIENCY_RATIOkaufmansER()
KAUFMANS_ADAPTIVE_MOVING_AVERAGEkaufmansAMA()
TRIPLE_EXPONENTIAL_MOVING_AVERAGEtripleEMA()
TRIPLE_EXPONENTIAL_DERIVATIVEtripleExponentialDerivative()
RELATIVE_STRENGTH_INDEXrelativeStrengthIndex()

* to() 函数仅写入 InfluxDB 2.0。


此页面对您有帮助吗?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在这样使用它,而无需对您的代码进行任何更改。

阅读更多

现已正式发布

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一款开源、高速、最近数据引擎,可实时收集和处理数据,并将其持久保存到本地磁盘或对象存储。 InfluxDB 3 Enterprise 以 Core 的基础为构建,增加了高可用性、读取副本、增强的安全性以及数据压缩,从而实现更快的查询和优化的存储。 InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看