将连续查询迁移到任务
InfluxDB OSS 2.7 使用 InfluxDB 任务 替换 1.x 连续查询 (CQs)。要将连续查询迁移到 InfluxDB 2.7 任务,请执行以下操作
输出所有 InfluxDB 1.x 连续查询
如果使用 influxd upgrade
命令,默认情况下,所有连续查询将在升级过程中输出到 ~/continuous_queries.txt
。要自定义连续查询文件的目标路径,请将 --continuous-query-export-path
标志与 influxd upgrade
命令一起使用。
influxd upgrade --continuous-query-export-path /path/to/continuous_queries.txt
手动输出连续查询
使用 InfluxDB 1.x
influx
交互式 shell 运行SHOW CONTINUOUS QUERIES
$ influx Connected to http://localhost:8086 version InfluxDB shell version: > SHOW CONTINUOUS QUERIES
复制并保存显示的连续查询。
将连续查询转换为 Flux 查询
要将 InfluxDB 1.x 连续查询迁移到 InfluxDB 2.7 任务,请将 InfluxQL 查询语法转换为 Flux。 大多数连续查询都是简单的降采样查询,可以使用 aggregateWindow()
函数 快速转换。 例如
连续查询示例
CREATE CONTINUOUS QUERY "downsample-daily" ON "my-db"
BEGIN
SELECT mean("example-field")
INTO "my-db"."example-rp"."average-example-measurement"
FROM "example-measurement"
GROUP BY time(1h)
END
等效的 Flux 任务
option task = {name: "downsample-daily", every: 1d}
from(bucket: "my-db/")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "example-measurement")
|> filter(fn: (r) => r._field == "example-field")
|> aggregateWindow(every: 1h, fn: mean)
|> set(key: "_measurement", value: "average-example-measurement")
|> to(org: "example-org", bucket: "my-db/example-rp")
将 InfluxQL 连续查询转换为 Flux
查看以下语句和子句,了解如何将 CQ 转换为 Flux
ON 子句
ON
子句定义要查询的数据库。在 InfluxDB OSS 2.7 中,数据库和保留策略组合映射到特定的存储桶(有关详细信息,请参阅 数据库和保留策略映射)。
使用 from()
函数 指定要查询的存储桶
InfluxQL
CREATE CONTINUOUS QUERY "downsample-daily" ON "my-db"
-- ...
Flux
from(bucket: "my-db/")
// ...
SELECT 语句
SELECT
语句从特定的测量中按字段、标签和时间查询数据。 SELECT
语句可以采用多种不同的形式,将其转换为 Flux 取决于您的用例。有关 Flux 和 InfluxQL 函数对等性的信息,请参阅 Flux 与 InfluxQL。 请参阅 可用的其他帮助资源。
INTO 子句
INTO
子句定义要将结果写入的测量。 INTO
也支持完全限定的测量,其中包括数据库和保留策略。 在 InfluxDB OSS 2.7 中,数据库和保留策略组合映射到特定的存储桶(有关详细信息,请参阅 数据库和保留策略映射)。
要写入与查询测量不同的测量,请使用 set()
或 map()
更改测量名称。 使用 to()
函数指定要将结果写入的存储桶。
InfluxQL
-- ...
INTO "example-db"."example-rp"."example-measurement"
-- ...
Flux
// ...
|> set(key: "_measurement", value: "example-measurement")
|> to(bucket: "example-db/example-rp")
// ...
|> map(fn: (r) => ({ r with _measurement: "example-measurement"}))
|> to(bucket: "example-db/example-rp")
将透视数据写入 InfluxDB
InfluxDB 1.x 查询结果包含每个字段的列。 InfluxDB 2.7 默认情况下不执行此操作,但可以使用 pivot()
或 schema.fieldsAsCols()
实现。
如果您使用 to()
将透视数据写回 InfluxDB 2.7,则每个字段列都将存储为标签。 要将透视字段作为字段写回 InfluxDB,请导入 experimental
包并使用 experimental.to()
函数。
InfluxQL
CREATE CONTINUOUS QUERY "downsample-daily" ON "my-db"
BEGIN
SELECT mean("example-field-1"), mean("example-field-2")
INTO "example-db"."example-rp"."example-measurement"
FROM "example-measurement"
GROUP BY time(1h)
END
Flux
// ...
from(bucket: "my-db/")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "example-measurement")
|> filter(fn: (r) => r._field == "example-field-1" or r._field == "example-field-2")
|> aggregateWindow(every: task.every, fn: mean)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> experimental.to(bucket: "example-db/example-rp")
FROM 子句
from 子句定义要查询的测量。 使用 filter()
函数 指定要查询的测量。
InfluxQL
-- ...
FROM "example-measurement"
-- ...
Flux
// ...
|> filter(fn: (r) => r._measurement == "example-measurement")
AS 子句
AS
子句在将数据写回 InfluxDB 时更改字段的名称。 使用 set()
或 map()
更改字段名称。
InfluxQL
-- ...
AS newfield
-- ...
Flux
// ...
|> set(key: "_field", value: "newfield")
// ...
|> map(fn: (r) => ({ r with _field: "newfield"}))
WHERE 子句
WHERE
子句使用谓词逻辑根据字段、标签或时间戳过滤结果。 使用 filter()
函数 和 Flux 比较运算符 根据字段和标签过滤结果。 使用 range()
函数 根据时间戳过滤结果。
InfluxQL
-- ...
WHERE "example-tag" = "foo" AND time > now() - 7d
Flux
// ...
|> range(start: -7d)
|> filter(fn: (r) => r["example-tag"] == "foo")
GROUP BY 子句
InfluxQL GROUP BY
子句按特定标签或按时间对数据进行分组(通常是为了计算时间窗口的聚合值)。
按标签分组
使用 group()
函数 修改 分组键 并更改数据分组方式。
InfluxQL
-- ...
GROUP BY "location"
Flux
// ...
|> group(columns: ["location"])
按时间分组
使用 aggregateWindow()
函数 将数据分组到时间窗口中,并对每个窗口执行聚合。 在 CQ 中,GROUP BY time()
子句中指定的间隔确定 CQ 执行间隔。 使用 GROUP BY time()
间隔设置 every
任务选项。
InfluxQL
-- ...
SELECT MEAN("example-field")
FROM "example-measurement"
GROUP BY time(1h)
Flux
option task = {name: "task-name", every: 1h}
// ...
|> filter(fn: (r) => r._measurement == "example-measurement" and r._field == "example-field")
|> aggregateWindow(every: task.every, fn: mean)
RESAMPLE 子句
CQ RESAMPLE
子句使用上次指定的持续时间的数据来计算新的聚合点。 RESAMPLE
中的 EVERY
间隔定义了 CQ 的运行频率。 FOR
间隔定义了 CQ 查询的总时间范围。
要在 Flux 任务中实现相同的功能,请将 range()
函数中的 start
参数设置为负 FOR
持续时间。 在 task
选项中定义任务执行间隔。 例如
InfluxQL
CREATE CONTINUOUS QUERY "resample-example" ON "my-db"
RESAMPLE EVERY 1m FOR 30m
BEGIN
SELECT exponential_moving_average(mean("example-field"), 30)
INTO "resample-average-example-measurement"
FROM "example-measurement"
WHERE region = 'example-region'
GROUP BY time(1m)
END
Flux
option task = {name: "resample-example", every: 1m}
from(bucket: "my-db/")
|> range(start: -30m)
|> filter(fn: (r) => r._measurement == "example-measurement" and r._field == "example-field" and r.region == "example-region")
|> aggregateWindow(every: 1m, fn: mean)
|> exponentialMovingAverage(n: 30)
|> set(key: "_measurement", value: "resample-average-example-measurement")
|> to(bucket: "my-db/")
创建新的 InfluxDB 任务
将连续查询转换为 Flux 后,使用 Flux 查询 创建一个新任务。
其他有用的资源
以下资源可用,可能在将连续查询转换为 Flux 任务时有所帮助。
文档
社区
- 在 InfluxData 社区 中发帖
- 在 InfluxDB 社区 Slack 中提问
此页是否对您有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一份子! 我们欢迎并鼓励您提供关于 InfluxDB 和本文档的反馈和错误报告。 要寻求支持,请使用以下资源
拥有年度合同或支持合同的客户 可以联系 InfluxData 支持。