文档文档

将连续查询迁移到任务

InfluxDB OSS 2.7 使用 InfluxDB 任务 替换 1.x 连续查询 (CQs)。要将连续查询迁移到 InfluxDB 2.7 任务,请执行以下操作

  1. 输出所有 InfluxDB 1.x 连续查询
  2. 将连续查询转换为 Flux 查询
  3. 创建新的 InfluxDB 任务

输出所有 InfluxDB 1.x 连续查询

如果使用 influxd upgrade 命令,默认情况下,所有连续查询将在升级过程中输出到 ~/continuous_queries.txt。要自定义连续查询文件的目标路径,请将 --continuous-query-export-path 标志与 influxd upgrade 命令一起使用。

influxd upgrade --continuous-query-export-path /path/to/continuous_queries.txt

手动输出连续查询

  1. 使用 InfluxDB 1.x influx 交互式 shell 运行 SHOW CONTINUOUS QUERIES

    $ influx
    Connected to http://localhost:8086 version 
    InfluxDB shell version: 
    > SHOW CONTINUOUS QUERIES
    
  2. 复制并保存显示的连续查询。

将连续查询转换为 Flux 查询

要将 InfluxDB 1.x 连续查询迁移到 InfluxDB 2.7 任务,请将 InfluxQL 查询语法转换为 Flux。 大多数连续查询都是简单的降采样查询,可以使用 aggregateWindow() 函数 快速转换。 例如

连续查询示例
CREATE CONTINUOUS QUERY "downsample-daily" ON "my-db"
BEGIN
  SELECT mean("example-field")
  INTO "my-db"."example-rp"."average-example-measurement"
  FROM "example-measurement"
  GROUP BY time(1h)
END
等效的 Flux 任务
option task = {name: "downsample-daily", every: 1d}

from(bucket: "my-db/")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "example-measurement")
    |> filter(fn: (r) => r._field == "example-field")
    |> aggregateWindow(every: 1h, fn: mean)
    |> set(key: "_measurement", value: "average-example-measurement")
    |> to(org: "example-org", bucket: "my-db/example-rp")

将 InfluxQL 连续查询转换为 Flux

查看以下语句和子句,了解如何将 CQ 转换为 Flux

ON 子句

ON 子句定义要查询的数据库。在 InfluxDB OSS 2.7 中,数据库和保留策略组合映射到特定的存储桶(有关详细信息,请参阅 数据库和保留策略映射)。

使用 from() 函数 指定要查询的存储桶

InfluxQL
CREATE CONTINUOUS QUERY "downsample-daily" ON "my-db"
-- ...
Flux
from(bucket: "my-db/")
// ...

SELECT 语句

SELECT 语句从特定的测量中按字段、标签和时间查询数据。 SELECT 语句可以采用多种不同的形式,将其转换为 Flux 取决于您的用例。有关 Flux 和 InfluxQL 函数对等性的信息,请参阅 Flux 与 InfluxQL。 请参阅 可用的其他帮助资源

INTO 子句

INTO 子句定义要将结果写入的测量。 INTO 也支持完全限定的测量,其中包括数据库和保留策略。 在 InfluxDB OSS 2.7 中,数据库和保留策略组合映射到特定的存储桶(有关详细信息,请参阅 数据库和保留策略映射)。

要写入与查询测量不同的测量,请使用 set()map() 更改测量名称。 使用 to() 函数指定要将结果写入的存储桶。

InfluxQL
-- ...
INTO "example-db"."example-rp"."example-measurement"
-- ...
Flux
// ...
    |> set(key: "_measurement", value: "example-measurement")
    |> to(bucket: "example-db/example-rp")
// ...
    |> map(fn: (r) => ({ r with _measurement: "example-measurement"}))
    |> to(bucket: "example-db/example-rp")
将透视数据写入 InfluxDB

InfluxDB 1.x 查询结果包含每个字段的列。 InfluxDB 2.7 默认情况下不执行此操作,但可以使用 pivot()schema.fieldsAsCols() 实现。

如果您使用 to()透视数据写回 InfluxDB 2.7,则每个字段列都将存储为标签。 要将透视字段作为字段写回 InfluxDB,请导入 experimental 包并使用 experimental.to() 函数

InfluxQL
CREATE CONTINUOUS QUERY "downsample-daily" ON "my-db"
BEGIN
  SELECT mean("example-field-1"), mean("example-field-2")
  INTO "example-db"."example-rp"."example-measurement"
  FROM "example-measurement"
  GROUP BY time(1h)
END
Flux
// ...

from(bucket: "my-db/")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "example-measurement")
    |> filter(fn: (r) => r._field == "example-field-1" or r._field == "example-field-2")
    |> aggregateWindow(every: task.every, fn: mean)
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> experimental.to(bucket: "example-db/example-rp")

FROM 子句

from 子句定义要查询的测量。 使用 filter() 函数 指定要查询的测量。

InfluxQL
-- ...
FROM "example-measurement"
-- ...
Flux
// ...
    |> filter(fn: (r) => r._measurement == "example-measurement")

AS 子句

AS 子句在将数据写回 InfluxDB 时更改字段的名称。 使用 set()map() 更改字段名称。

InfluxQL
-- ...
AS newfield
-- ...
Flux
// ...
    |> set(key: "_field", value: "newfield")
// ...
    |> map(fn: (r) => ({ r with _field: "newfield"}))

WHERE 子句

WHERE 子句使用谓词逻辑根据字段、标签或时间戳过滤结果。 使用 filter() 函数 和 Flux 比较运算符 根据字段和标签过滤结果。 使用 range() 函数 根据时间戳过滤结果。

InfluxQL
-- ...
WHERE "example-tag" = "foo" AND time > now() - 7d
Flux
// ...
    |> range(start: -7d)
    |> filter(fn: (r) => r["example-tag"] == "foo")

GROUP BY 子句

InfluxQL GROUP BY 子句按特定标签或按时间对数据进行分组(通常是为了计算时间窗口的聚合值)。

按标签分组

使用 group() 函数 修改 分组键 并更改数据分组方式。

InfluxQL
-- ...
GROUP BY "location"
Flux
// ...
    |> group(columns: ["location"])
按时间分组

使用 aggregateWindow() 函数 将数据分组到时间窗口中,并对每个窗口执行聚合。 在 CQ 中,GROUP BY time() 子句中指定的间隔确定 CQ 执行间隔。 使用 GROUP BY time() 间隔设置 every 任务选项。

InfluxQL
-- ...
SELECT MEAN("example-field")
FROM "example-measurement"
GROUP BY time(1h)
Flux
option task = {name: "task-name", every: 1h}

// ...
    |> filter(fn: (r) => r._measurement == "example-measurement" and r._field == "example-field")
    |> aggregateWindow(every: task.every, fn: mean)

RESAMPLE 子句

CQ RESAMPLE 子句使用上次指定的持续时间的数据来计算新的聚合点。 RESAMPLE 中的 EVERY 间隔定义了 CQ 的运行频率。 FOR 间隔定义了 CQ 查询的总时间范围。

要在 Flux 任务中实现相同的功能,请将 range() 函数中的 start 参数设置为负 FOR 持续时间。 在 task 选项中定义任务执行间隔。 例如

InfluxQL
CREATE CONTINUOUS QUERY "resample-example" ON "my-db"
RESAMPLE EVERY 1m FOR 30m
BEGIN
  SELECT exponential_moving_average(mean("example-field"), 30)
  INTO "resample-average-example-measurement"
  FROM "example-measurement"
  WHERE region = 'example-region'
  GROUP BY time(1m)
END
Flux
option task = {name: "resample-example", every: 1m}

from(bucket: "my-db/")
    |> range(start: -30m)
    |> filter(fn: (r) => r._measurement == "example-measurement" and r._field == "example-field" and r.region == "example-region")
    |> aggregateWindow(every: 1m, fn: mean)
    |> exponentialMovingAverage(n: 30)
    |> set(key: "_measurement", value: "resample-average-example-measurement")
    |> to(bucket: "my-db/")

创建新的 InfluxDB 任务

将连续查询转换为 Flux 后,使用 Flux 查询 创建一个新任务

其他有用的资源

以下资源可用,可能在将连续查询转换为 Flux 任务时有所帮助。

文档
社区

此页是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。 您可以继续像当前一样使用它,而无需对代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。 加速扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速、近实时数据引擎,可实时收集和处理数据,并将其持久保存到本地磁盘或对象存储。 InfluxDB 3 Enterprise 构建在 Core 的基础上,增加了高可用性、读取副本、增强的安全性以及数据压缩功能,从而实现更快的查询和优化的存储。 InfluxDB 3 Enterprise 的免费层级可供非商业家庭或业余爱好者使用。

有关更多信息,请查看