文档文档

Flux 与 InfluxQL

此页面记录了早期版本的 InfluxDB OSS。InfluxDB OSS v2 是最新的稳定版本。请参阅 InfluxDB v2 文档

Flux 是 InfluxQL 和其他类似 SQL 的查询语言的替代方案,用于查询和分析数据。Flux 使用函数式语言模式,使其功能非常强大、灵活,并且能够克服 InfluxQL 的许多限制。本文概述了许多 Flux 可以实现但 InfluxQL 无法实现的任务,并提供了有关 Flux 和 InfluxQL 对等性的信息。

Flux 可能实现的功能

连接

InfluxQL 从未支持连接。它们可以使用 TICKscript 完成,但即使是 TICKscript 的连接功能也受到限制。Flux 的 join() 函数 允许您从任何存储桶、任何测量和任何列连接数据,只要每个数据集都包含要连接的列。这为真正强大且有用的操作打开了大门。

dataStream1 = from(bucket: "bucket1")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "network" and
    r._field == "bytes-transferred"
  )

dataStream2 = from(bucket: "bucket1")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "httpd" and
    r._field == "requests-per-sec"
    )

join(
    tables: {d1:dataStream1, d2:dataStream2},
    on: ["_time", "_stop", "_start", "host"]
  )

有关使用 join() 函数的深入演练,请参阅如何使用 Flux 连接数据


跨测量进行数学运算

能够执行跨测量连接还允许您使用来自不同测量的数据运行计算——这是 InfluxData 社区强烈要求的功能。下面的示例从不同的测量 memprocesses 中获取两个数据流,将它们连接起来,然后计算每个正在运行的进程使用的平均内存量

// Memory used (in bytes)
memUsed = from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "mem" and
    r._field == "used"
  )

// Total processes running
procTotal = from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "processes" and
    r._field == "total"
    )

// Join memory used with total processes and calculate
// the average memory (in MB) used for running processes.
join(
    tables: {mem:memUsed, proc:procTotal},
    on: ["_time", "_stop", "_start", "host"]
  )
  |> map(fn: (r) => ({
    _time: r._time,
    _value: (r._value_mem / r._value_proc) / 1000000
  })
)

按标签排序

InfluxQL 的排序功能非常有限,只允许您使用 ORDER BY time 子句控制 time 的排序顺序。Flux 的 sort() 函数 根据列列表对记录进行排序。根据列类型,记录按字典顺序、数字顺序或时间顺序排序。

from(bucket:"telegraf/autogen")
  |> range(start:-12h)
  |> filter(fn: (r) =>
    r._measurement == "system" and
    r._field == "uptime"
  )
  |> sort(columns:["region", "host", "_value"])

按任何列分组

InfluxQL 允许您按标签或时间间隔分组,但仅此而已。Flux 允许您按数据集中的任何列(包括 _value)分组。使用 Flux group() 函数 来定义按哪些列对数据进行分组。

from(bucket:"telegraf/autogen")
  |> range(start:-12h)
  |> filter(fn: (r) => r._measurement == "system" and r._field == "uptime" )
  |> group(columns:["host", "_value"])

按日历月和年进行窗口化

InfluxQL 不支持按日历月和年对数据进行窗口化,因为它们的长度各不相同。Flux 支持日历月和年持续时间单位(1mo1y),并允许您按日历月和年对数据进行窗口化和聚合。

from(bucket:"telegraf/autogen")
  |> range(start:-1y)
  |> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent" )
  |> aggregateWindow(every: 1mo, fn: mean)

使用多个数据源

InfluxQL 只能查询存储在 InfluxDB 中的数据。Flux 可以查询来自其他数据源的数据,例如 CSV、PostgreSQL、MySQL、Google BigTable 等。将这些数据与 InfluxDB 中的数据连接起来,以丰富查询结果。

import "csv"
import "sql"

csvData = csv.from(csv: rawCSV)
sqlData = sql.from(
  driverName: "postgres",
  dataSourceName: "postgresql://user:password@localhost",
  query:"SELECT * FROM example_table"
)
data = from(bucket: "telegraf/autogen")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "sensor")

auxData = join(tables: {csv: csvData, sql: sqlData}, on: ["sensor_id"])
enrichedData = join(tables: {data: data, aux: auxData}, on: ["sensor_id"])

enrichedData
  |> yield(name: "enriched_data")

有关查询 SQL 数据的深入演练,请参阅查询 SQL 数据源


类似 DatePart 的查询

InfluxQL 不支持类似 DatePart 的查询,该查询仅返回指定时段内的结果。Flux hourSelection 函数 仅返回时间值在指定小时范围内的数据。

from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r.cpu == "cpu-total"
  )
  |> hourSelection(start: 9, stop: 17)

透视

在 InfluxQL 中从未支持过数据表透视。Flux pivot() 函数 提供了通过指定 rowKeycolumnKeyvalueColumn 参数来透视数据表的能力。

from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r.cpu == "cpu-total"
  )
  |> pivot(
    rowKey:["_time"],
    columnKey: ["_field"],
    valueColumn: "_value"
  )

直方图

生成直方图的能力一直是 InfluxQL 的一项高需求功能,但从未得到支持。Flux 的 histogram() 函数 使用输入数据生成累积直方图,并支持未来出现的其他直方图类型。

from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "mem" and
    r._field == "used_percent"
  )
  |> histogram(
    buckets: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
  )

有关使用 Flux 创建累积直方图的示例,请参阅创建直方图


协方差

Flux 提供了用于简单协方差计算的函数。covariance() 函数 计算两列之间的协方差,而 cov() 函数 计算两个数据流之间的协方差。

两列之间的协方差
from(bucket: "telegraf/autogen")
  |> range(start:-5m)
  |> covariance(columns: ["x", "y"])
两个数据流之间的协方差
table1 = from(bucket: "telegraf/autogen")
  |> range(start: -15m)
  |> filter(fn: (r) =>
    r._measurement == "measurement_1"
  )

table2 = from(bucket: "telegraf/autogen")
  |> range(start: -15m)
  |> filter(fn: (r) =>
    r._measurement == "measurement_2"
  )

cov(x: table1, y: table2, on: ["_time", "_field"])

将布尔值转换为整数

InfluxQL 支持类型转换,但仅适用于数字数据类型(浮点数到整数,反之亦然)。Flux 类型转换函数 为类型转换提供了更广泛的支持,并让您执行一些长期请求的操作,例如将布尔值转换为整数。

将布尔字段值转换为整数
from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "m" and
    r._field == "bool_field"
  )
  |> toInt()

字符串操作和数据塑造

InfluxQL 在查询数据时不支持字符串操作。Flux Strings 包 是一组操作字符串数据的函数。当与 map() 函数 结合使用时,字符串包中的函数允许进行字符串清理和规范化等操作。

import "strings"

from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "weather" and
    r._field == "temp"
  )
  |> map(fn: (r) => ({
    r with
    location: strings.toTitle(v: r.location),
    sensor: strings.replaceAll(v: r.sensor, t: " ", u: "-"),
    status: strings.substring(v: r.status, start: 0, end: 8)
  }))

使用地理时序数据

InfluxQL 不提供用于处理地理时序数据的功能。Flux Geo 包 是一组函数,可让您塑造、过滤和分组地理时序数据。

import "experimental/geo"

from(bucket: "geo/autogen")
  |> range(start: -1w)
  |> filter(fn: (r) => r._measurement == "taxi")
  |> geo.shapeData(latField: "latitude", lonField: "longitude", level: 20)
  |> geo.filterRows(
    region: {lat: 40.69335938, lon: -73.30078125, radius: 20.0},
    strict: true
  )
  |> geo.asTracks(groupBy: ["fare-id"])

InfluxQL 和 Flux 的对等性

Flux 正在努力实现与 InfluxQL 的完全对等,并且正在为此目的添加新函数。下表显示了 InfluxQL 语句、子句和函数以及它们等效的 Flux 函数。

有关 Flux 函数的完整列表,查看所有 Flux 函数

InfluxQL 和 Flux 的对等性

InfluxQLFlux 函数
SELECTfilter()
WHEREfilter(), range()
GROUP BYgroup()
INTOto() *
ORDER BYsort()
LIMITlimit()
SLIMIT
OFFSET
SOFFSET
SHOW DATABASESbuckets()
SHOW MEASUREMENTSv1.measurements
SHOW FIELD KEYSkeys()
SHOW RETENTION POLICIESbuckets()
SHOW TAG KEYSv1.tagKeys(), v1.measurementTagKeys()
SHOW TAG VALUESv1.tagValues(), v1.measurementTagValues()
SHOW SERIES
CREATE DATABASE
DROP DATABASE
DROP SERIES
DELETE
DROP MEASUREMENT
DROP SHARD
CREATE RETENTION POLICY
ALTER RETENTION POLICY
DROP RETENTION POLICY
COUNTcount()
DISTINCTdistinct()
INTEGRALintegral()
MEANmean()
MEDIANmedian()
MODEmode()
SPREADspread()
STDDEVstddev()
SUMsum()
BOTTOMbottom()
FIRSTfirst()
LASTlast()
MAXmax()
MINmin()
PERCENTILEquantile()
SAMPLEsample()
TOPtop()
ABSmath.abs()
ACOSmath.acos()
ASINmath.asin()
ATANmath.atan()
ATAN2math.atan2()
CEILmath.ceil()
COSmath.cos()
CUMULATIVE_SUMcumulativeSum()
DERIVATIVEderivative()
DIFFERENCEdifference()
ELAPSEDelapsed()
EXPmath.exp()
FLOORmath.floor()
HISTOGRAMhistogram()
LNmath.log()
LOGmath.logb()
LOG2math.log2()
LOG10math.log10()
MOVING_AVERAGEmovingAverage()
NON_NEGATIVE_DERIVATIVEderivative(nonNegative:true)
NON_NEGATIVE_DIFFERENCEdifference(nonNegative:true)
POWmath.pow()
ROUNDmath.round()
SINmath.sin()
SQRTmath.sqrt()
TANmath.tan()
HOLT_WINTERSholtWinters()
CHANDE_MOMENTUM_OSCILLATORchandeMomentumOscillator()
EXPONENTIAL_MOVING_AVERAGEexponentialMovingAverage()
DOUBLE_EXPONENTIAL_MOVING_AVERAGEdoubleEMA()
KAUFMANS_EFFICIENCY_RATIOkaufmansER()
KAUFMANS_ADAPTIVE_MOVING_AVERAGEkaufmansAMA()
TRIPLE_EXPONENTIAL_MOVING_AVERAGEtripleEMA()
TRIPLE_EXPONENTIAL_DERIVATIVEtripleExponentialDerivative()
RELATIVE_STRENGTH_INDEXrelativeStrengthIndex()

* to() 函数仅写入 InfluxDB 2.0。


此页是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像目前一样使用它,而无需对代码进行任何更改。

阅读更多

InfluxDB 3 开源版现已公开发布 Alpha 版本

InfluxDB 3 开源版现已可用于 Alpha 测试,根据 MIT 或 Apache 2 许可授权。

我们正在发布两个产品作为 Alpha 版本的一部分。

InfluxDB 3 Core 是我们新的开源产品。它是一个用于时间序列和事件数据的最新数据引擎。InfluxDB 3 Enterprise 是一个商业版本,它建立在 Core 的基础上,增加了历史查询能力、读取副本、高可用性、可伸缩性和细粒度的安全性。

有关如何开始使用的更多信息,请查看: