文档文档

Flux 查询基础知识

大多数 Flux 查询遵循相同的基本结构。熟悉使用 Flux 查询数据时的基本概念和步骤。

基本查询结构

大多数基本 Flux 查询包括以下步骤

from(bucket: "example-bucket")              // ── Source
    |> range(start: -1d)                    // ── Filter on time
    |> filter(fn: (r) => r._field == "foo") // ── Filter on column values
    |> group(columns: ["sensorID"])         // ── Shape
    |> mean()                               // ── Process

数据源

Flux 输入函数 从数据源检索数据。所有输入函数都返回 表流

Flux 支持多种数据源,包括时间序列数据库(如 InfluxDBPrometheus)、关系数据库(如 MySQLPostgreSQL)、CSV 等。

筛选

筛选函数迭代并评估每个输入行,以查看它是否与指定的条件匹配。满足条件的行包含在函数输出中。不满足指定条件的行将被删除。

Flux 提供了以下主要筛选函数

  • range():根据时间筛选数据。
  • filter():根据列值筛选数据。filter() 使用在 fn 参数中定义的谓词函数来评估输入行。每行都作为记录 r 传递到谓词函数中,其中包含行中每列的键值对。

还有其他筛选函数可用。有关更多信息,请参阅函数类型和类别 – 筛选器

塑形数据

许多查询需要修改数据的结构,以便为处理做好准备。常见的数据塑形任务包括按列值或按时间重新分组数据或将列值透视为行。

重塑数据的函数包括以下内容

  • group():修改组键
  • window():修改行的 _start_stop 值,以按时间对数据进行分组
  • pivot():将列值透视为行
  • drop():删除特定列
  • keep():保留特定列并删除所有其他列

处理

处理数据可以采用多种形式,包括以下类型的操作

  • 聚合数据:将输入表的所有行聚合为单行。有关信息,请参阅函数类型和类别 - 聚合
  • 选择特定数据点:从每个输入表中返回特定行。例如,返回第一行或最后一行、具有最高值或最低值的行等等。有关信息,请参阅函数类型和类别 - 选择器
  • 重写行:使用 map() 重写每个输入行。使用数学运算转换值、处理字符串、动态添加新列等等。
  • 发送通知:评估数据并使用 Flux 通知端点函数向外部服务发送通知。有关信息,请参阅函数类型和类别 - 通知端点

aggregateWindow 辅助函数

aggregateWindow() 是一个辅助函数,它既可以塑形数据,又可以处理数据。该函数按时间对数据进行窗口化和分组,然后将聚合选择器函数应用于重构的表。


编写基本查询

使用 InfluxDB 示例数据 编写基本的 Flux 查询,该查询查询数据,按时间和列值筛选数据,然后应用聚合

使用 InfluxDB 数据浏览器Flux REPL 构建并执行以下基本查询。

  1. 导入 influxdata/influxdb/sample 并使用 sample.data() 函数 加载 airSensor 示例数据集。

    import "influxdata/influxdb/sample"
    
    sample.data(set: "airSensor")
    

    sample.data() 返回数据,就像从 InfluxDB 查询数据一样。要实际从 InfluxDB 查询数据,请将 sample.data() 替换为 from() 函数

  2. 将返回的数据通过管道传递到 range() 中,以按时间筛选数据。返回最近一小时的数据。

    import "influxdata/influxdb/sample"
    
    sample.data(set: "airSensor")
        |> range(start: -1h)
    
  3. 使用 filter() 根据列值筛选行。在此示例中,仅返回包含 co 字段值的行。字段名称存储在 _field 列中。

    import "influxdata/influxdb/sample"
    
    sample.data(set: "airSensor")
        |> range(start: -1h)
        |> filter(fn: (r) => r._field == "co")
    
  4. 使用 mean() 计算每个输入表中的平均值。由于 InfluxDB 按序列对数据进行分组,因此 mean() 会为每个唯一的 sensor_id 返回一个表,其中包含一行,该行在 _value 列中包含平均值。

    import "influxdata/influxdb/sample"
    
    sample.data(set: "airSensor")
        |> range(start: -1h)
        |> filter(fn: (r) => r._field == "co")
        |> mean()
    
  5. 使用 group()表重构为单个表

    import "influxdata/influxdb/sample"
    
    sample.data(set: "airSensor")
        |> range(start: -1h)
        |> filter(fn: (r) => r._field == "co")
        |> mean()
        |> group()
    

此基本查询的结果应类似于以下内容

_start_stop 列已被省略。

_field_measurementsensor_id_value
coairSensorsTLM01000.42338714381053716
coairSensorsTLM01010.4223251339463061
coairSensorsTLM01020.8543452859060252
coairSensorsTLM01030.2782783780205422
coairSensorsTLM02004.612143110484339
coairSensorsTLM02010.297474366047375
coairSensorsTLM02020.3336370208486757
coairSensorsTLM02030.4948166816959906

此页面是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像目前一样使用它,而无需对代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速的最近数据引擎,可实时收集和处理数据,并将其持久保存到本地磁盘或对象存储。InfluxDB 3 Enterprise 以 Core 的基础为构建,增加了高可用性、读取副本、增强的安全性以及数据压缩,以实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看